
On 04/27/2017 07:48 PM, Daniel P. Berrange wrote:
On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that we can really transfer just bare data. No metadata can be carried through unless some formatted messages are introduced. That would be quite painful to achieve so let's use a message queue. It's fairly easy to exchange info between threads now that iohelper is no longer used.
The reason why we cannot use the FD for plain files directly is that despite us setting noblock flag on the FD, any read()/write() blocks regardless (which is a show stopper since those parts of the code are run from the event loop) and poll() reports such FD as always readable/writable - even though the subsequent operation might block.
The pipe is still not gone though. It is used to signal to even loop that an event occurred (e.g. data are available for reading in the queue, or vice versa).
Signed-off-by: Michal Privoznik <mprivozn@redhat.com> --- src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 350 insertions(+), 52 deletions(-)
+static int +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst, + virFDStreamMsgPtr msg, + int fd, + const char *fdname) +{ + virFDStreamMsgPtr *tmp = &fdst->msg; + char c = '1'; + + while (*tmp) + tmp = &(*tmp)->next; + + *tmp = msg; + virCondSignal(&fdst->threadCond); + + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) { + virReportSystemError(errno, + _("Unable to write to %s"), + fdname); + return -1; + } + + return 0; +} + + +static virFDStreamMsgPtr +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst, + int fd, + const char *fdname) +{ + virFDStreamMsgPtr tmp = fdst->msg; + char c; + + if (tmp) { + fdst->msg = tmp->next; + tmp->next = NULL; + } + + virCondSignal(&fdst->threadCond); + + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) { + virReportSystemError(errno, + _("Unable to read from %s"), + fdname); + return NULL; + } + + return tmp; +}
Both these methods signal the condition
+static ssize_t +virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + const int fdin, + const int fdout, + const char *fdinname, + const char *fdoutname, + size_t buflen) +{ + virFDStreamMsgPtr msg = NULL; + char *buf = NULL; + ssize_t got; + + if (VIR_ALLOC(msg) < 0) + goto error; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); + msg = NULL; + + return got; + + error: + VIR_FREE(buf); + virFDStreamMsgFree(msg); + return -1; +} + + +static ssize_t +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + const int fdin, + const int fdout, + const char *fdinname, + const char *fdoutname) +{ + ssize_t got; + virFDStreamMsgPtr msg = fdst->msg; + bool pop = false; + + switch (msg->type) { + case VIR_FDSTREAM_MSG_TYPE_DATA: + got = safewrite(fdout, + msg->stream.data.buf + msg->stream.data.offset, + msg->stream.data.len - msg->stream.data.offset); + if (got < 0) { + virReportSystemError(errno, + _("Unable to write %s"), + fdoutname); + return -1; + } + + msg->stream.data.offset += got; + + pop = msg->stream.data.offset == msg->stream.data.len; + break; + } + + if (pop) { + virFDStreamMsgQueuePop(fdst, fdin, fdinname); + virFDStreamMsgFree(msg); + } + + return got; +}
Both these methods call into the Pop/Push functions which signal the condition.
@@ -304,14 +496,12 @@ virFDStreamThread(void *opaque) int fdout = data->fdout; char *fdoutname = data->fdoutname; virFDStreamDataPtr fdst = st->privateData; - char *buf = NULL; + bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0;
virObjectRef(fdst); - - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; + virObjectLock(fdst);
while (1) { ssize_t got; @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque) if (buflen == 0) break; /* End of requested data from client */
- if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); + while (doRead == (fdst->msg != NULL) && + !fdst->threadQuit) { + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) { + virReportSystemError(errno, "%s", + _("failed to wait on condition")); + goto error; + } + } + + if (fdst->threadQuit) { + /* If stream abort was requested, quit early. */ + if (fdst->threadAbort) + goto cleanup; + + /* Otherwise flush buffers and quit gracefully. */ + if (doRead == (fdst->msg != NULL)) + break; + } + + if (doRead) + got = virFDStreamThreadDoRead(fdst, + fdin, fdout, + fdinname, fdoutname, + buflen); + else + got = virFDStreamThreadDoWrite(fdst, + fdin, fdout, + fdinname, fdoutname); + + if (got < 0)
This method waits on the condition.
So unless I'm mistaken, this thread is signaling & waiting on the same condition, which feels wrong. Generally different threads would signal vs wait.
I hear what you're saying but I don't think this is a problem. Exactly because of the way we wait on the condition. Assume this thread is doing reads. That is it reads from fdin (an actual file on a disk) and feeds the message queue with the data. Now, it reads some data, push it at the end of the message queue (fdst->msg != NULL at that point) and goes to the while loop above. Effectively it eats all the spurious wake ups for as long as fdst->msg != NULL. Now assume the thread is doing writes (read data from the queue and writes into a file a disk). Again, as long as there are some messages to be written (that is fdst->msg != NULL) the control won't even reach virCondWait. And if it does, it's because fdst->msg == NULL in which case there is no data to be written. But something has just came up to my mind whilst writing these lines - I wonder if we can ditch the condition entirely and rely on the pipe + poll(). I mean, this worker would use pipe to signalize to the event loop that there is a message waiting for it in the queue. Question is how thread safe this approach would be. Michal