
On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Basically, what is needed here is to introduce new message type for the messages passed between the event loop callbacks and the worker thread that does all the I/O. The idea is that instead of a queue of read buffers we will have a queue where "hole of size X" messages appear. That way the even loop callbacks can just
s/even/event/
check the head of the queue and see if the worker thread is in data or a hole section and how long the section is.
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/storage/storage_util.c | 4 +- src/util/virfdstream.c | 239 ++++++++++++++++++++++++++++++++++++++++----- src/util/virfdstream.h | 1 + 3 files changed, 220 insertions(+), 24 deletions(-)
[...]
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 4b42939e7..ba209025a 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c
[...]
static ssize_t virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const int fdout, const char *fdinname, const char *fdoutname, + size_t *dataLen, size_t buflen) { virFDStreamMsgPtr msg = NULL; + int inData = 0; + long long sectionLen = 0; char *buf = NULL; ssize_t got;
+ if (sparse && *dataLen == 0) { + if (virFileInData(fdin, &inData, §ionLen) < 0) + goto error; + + if (inData) + *dataLen = sectionLen; + } + if (VIR_ALLOC(msg) < 0) goto error;
- if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; - - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); - goto error; + if (sparse && *dataLen == 0) { + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = sectionLen; + got = sectionLen; + + /* HACK. The message queue is one directional. So caller
HACK or "By design"
+ * cannot make us skip the hole. Do that for them instead. */ + if (sectionLen && + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdinname); + goto error; + }
[...]
+static int +virFDStreamSendHole(virStreamPtr st, + long long length, + unsigned int flags) +{ + virFDStreamDataPtr fdst = st->privateData; + virFDStreamMsgPtr msg = NULL; + off_t off; + int ret = -1; + + virCheckFlags(0, -1); + + virObjectLock(fdst); + if (fdst->length) { + if (length > fdst->length - fdst->offset) + length = fdst->length - fdst->offset; + fdst->offset += length; + } + + if (fdst->thread) { + /* Things are a bit complicated here. But bear with me. If FDStream is
s/But bear with me.//
+ * in a read mode, then if the message at the queue head is HOLE, just + * pop it. The thread has lseek()-ed anyway. If however, the FDStream
However, if the FDStream Reviewed-by: John Ferlan <jferlan@redhat.com> John
+ * is in write mode, then tell the thread to do the lseek() for us. + * Under no circumstances we can do the lseek() ourselves here. We + * might mess up file position for the thread. */ + if (fdst->threadDoRead) { + msg = fdst->msg; + if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Invalid stream hole")); + goto cleanup; + } + + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); + } else { + if (VIR_ALLOC(msg) < 0) + goto cleanup; + + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = length; + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); + msg = NULL; + }
[...]