[libvirt-python][PATCH 0/3] Improve streams a bit

Inspired by patches I've sent for libvirt: https://www.redhat.com/archives/libvir-list/2020-July/msg00145.html Michal Privoznik (3): MANIFEST: Distribute sparsestream.py example virStream: Use larger buffer for sendAll/recvAll methods examples: Update sparsestream.py so that it handles block devices MANIFEST.in | 1 + examples/sparsestream.py | 73 ++++++++++++++++++++++++++--------- libvirt-override-virStream.py | 8 ++-- 3 files changed, 59 insertions(+), 23 deletions(-) -- 2.26.2

Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+) diff --git a/MANIFEST.in b/MANIFEST.in index da7cbae..fd76f3c 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -13,6 +13,7 @@ include examples/event-test.py include examples/guest-vcpus/guest-vcpu-daemon.py include examples/guest-vcpus/guest-vcpu.py include examples/nodestats.py +include examples/sparsestream.py include examples/topology.py include generator.py include libvirt-lxc-override-api.xml -- 2.26.2

On Fri, Jul 03, 2020 at 01:30:58PM +0200, Michal Privoznik wrote:
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+)
Reviewed-by: Pavel Hrdina <phrdina@redhat.com>

There are four methods which receive/send entire stream (sendAll(), recvAll(), sparseSendAll() and sparseRecvAll()). All these have an intermediary buffer which is either filled by incoming stream and passed to a user provided callback to handle the data, or the other way round - user fills it with data they want to send and the buffer is handed over to virStream. But the buffer is incredibly small which leads to smaller packets being sent and thus increased overhead. What we can do is to use the same buffer as their C counterparts do (e.g. virStreamSendAll()) - they all use VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX long buffer (which is the maximum size of a stream packet we send) - this is almost exactly 256KiB (it's 256KiB - 24B for the header). Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- libvirt-override-virStream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libvirt-override-virStream.py b/libvirt-override-virStream.py index 901c2e6..8880218 100644 --- a/libvirt-override-virStream.py +++ b/libvirt-override-virStream.py @@ -39,7 +39,7 @@ return os.write(fd, buf) """ while True: - got = self.recv(1024*64) + got = self.recv(262120) if got == -2: raise libvirtError("cannot use recvAll with " "nonblocking stream") @@ -74,7 +74,7 @@ """ while True: try: - got = handler(self, 1024*64, opaque) + got = handler(self, 262120, opaque) except: e = sys.exc_info()[1] try: @@ -189,7 +189,7 @@ # actually allocate the hole """ while True: - want = 64 * 1024 + want = 262120 got = self.recvFlags(want, VIR_STREAM_RECV_STOP_AT_HOLE) if got == -2: raise libvirtError("cannot use sparseRecvAll with " @@ -251,7 +251,7 @@ self.abort() continue - want = 64 * 1024 + want = 262120 if (want > sectionLen): want = sectionLen -- 2.26.2

On Fri, Jul 03, 2020 at 01:30:59PM +0200, Michal Privoznik wrote:
There are four methods which receive/send entire stream (sendAll(), recvAll(), sparseSendAll() and sparseRecvAll()). All these have an intermediary buffer which is either filled by incoming stream and passed to a user provided callback to handle the data, or the other way round - user fills it with data they want to send and the buffer is handed over to virStream.
But the buffer is incredibly small which leads to smaller packets being sent and thus increased overhead. What we can do is to use the same buffer as their C counterparts do (e.g. virStreamSendAll()) - they all use VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX long buffer (which is the maximum size of a stream packet we send) - this is almost exactly 256KiB (it's 256KiB - 24B for the header).
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- libvirt-override-virStream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-)
Sounds reasonable but can you please define some constant instead of using magic numbers? Reviewed-by: Pavel Hrdina <phrdina@redhat.com>

This patch is similar to what I sent for libvirt. It extends callbacks so that they work with block devices. Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- examples/sparsestream.py | 73 +++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/examples/sparsestream.py b/examples/sparsestream.py index d7c09b7..4e96d43 100755 --- a/examples/sparsestream.py +++ b/examples/sparsestream.py @@ -4,27 +4,27 @@ # Authors: # Michal Privoznik <mprivozn@redhat.com> -import libvirt, sys, os +import libvirt, stat, sys, os -def bytesWriteHandler(stream, buf, opaque): - fd = opaque - return os.write(fd, buf) +def bytesWriteHandler(stream, buf, cbData): + return os.write(cbData["fd"], buf) -def bytesReadHandler(stream, nbytes, opaque): - fd = opaque - return os.read(fd, nbytes) +def bytesReadHandler(stream, nbytes, cbData): + return os.read(cbData["fd"], nbytes) -def recvSkipHandler(stream, length, opaque): - fd = opaque - cur = os.lseek(fd, length, os.SEEK_CUR) - return os.ftruncate(fd, cur) +def recvSkipHandler(stream, length, cbData): + fd = cbData["fd"] + if cbData["isBlock"]: + os.write(fd, b'\x00' * length) + else : + cur = os.lseek(fd, length, os.SEEK_CUR) + os.ftruncate(fd, cur) + return 0 -def sendSkipHandler(stream, length, opaque): - fd = opaque - return os.lseek(fd, length, os.SEEK_CUR) +def sendSkipHandler(stream, length, cbData): + return os.lseek(cbData["fd"], length, os.SEEK_CUR) -def holeHandler(stream, opaque): - fd = opaque +def inDataRegular(fd): cur = os.lseek(fd, 0, os.SEEK_CUR) try: @@ -73,13 +73,45 @@ def holeHandler(stream, opaque): os.lseek(fd, cur, os.SEEK_SET) return [inData, sectionLen] +def inDataDetectZeores(fd): + cur = os.lseek(fd, 0, os.SEEK_CUR) + chunksize = 32 * 1024 * 1024 # 2MiB + + buf = os.read(fd, chunksize) + + zeroes = bytes(len(buf)) + inData = buf != zeroes + sectionLen = len(buf) + print("cur: %d inData: %s sectionLen: %d" % (cur, inData, sectionLen)) + + os.lseek(fd, cur, os.SEEK_SET) + return [inData, sectionLen] + + +def holeHandler(stream, cbData): + if cbData["isBlock"]: + return inDataDetectZeores(cbData["fd"]) + else: + return inDataRegular(cbData["fd"]) + def download(vol, st, filename): offset = 0 length = 0 + isBlock = False - fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=0o0660) + try: + sb = os.stat(filename) + isBlock = stat.S_ISBLK(sb.st_mode) + except FileNotFoundError: + pass + + if isBlock: + fd = os.open(filename, os.O_WRONLY, mode=0o0660) + else: + fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=0o0660) + cbData = {"fd": fd, "isBlock": isBlock} vol.download(st, offset, length, libvirt.VIR_STORAGE_VOL_DOWNLOAD_SPARSE_STREAM) - st.sparseRecvAll(bytesWriteHandler, recvSkipHandler, fd) + st.sparseRecvAll(bytesWriteHandler, recvSkipHandler, cbData) os.close(fd) @@ -87,9 +119,12 @@ def upload(vol, st, filename): offset = 0 length = 0 + sb = os.stat(filename) + isBlock = stat.S_ISBLK(sb.st_mode) fd = os.open(filename, os.O_RDONLY) + cbData = {"fd": fd, "isBlock": isBlock} vol.upload(st, offset, length, libvirt.VIR_STORAGE_VOL_UPLOAD_SPARSE_STREAM) - st.sparseSendAll(bytesReadHandler, holeHandler, sendSkipHandler, fd) + st.sparseSendAll(bytesReadHandler, holeHandler, sendSkipHandler, cbData) os.close(fd) -- 2.26.2
participants (2)
-
Michal Privoznik
-
Pavel Hrdina