On 04/27/2017 07:48 PM, Daniel P. Berrange wrote:
On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote:
> One big downside of using the pipe to transfer the data is that
> we can really transfer just bare data. No metadata can be carried
> through unless some formatted messages are introduced. That would
> be quite painful to achieve so let's use a message queue. It's
> fairly easy to exchange info between threads now that iohelper is
> no longer used.
>
> The reason why we cannot use the FD for plain files directly is
> that despite us setting noblock flag on the FD, any
> read()/write() blocks regardless (which is a show stopper since
> those parts of the code are run from the event loop) and poll()
> reports such FD as always readable/writable - even though the
> subsequent operation might block.
>
> The pipe is still not gone though. It is used to signal to even
> loop that an event occurred (e.g. data are available for reading
> in the queue, or vice versa).
>
> Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
> ---
> src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++-------
> 1 file changed, 350 insertions(+), 52 deletions(-)
>
> +static int
> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
> + virFDStreamMsgPtr msg,
> + int fd,
> + const char *fdname)
> +{
> + virFDStreamMsgPtr *tmp = &fdst->msg;
> + char c = '1';
> +
> + while (*tmp)
> + tmp = &(*tmp)->next;
> +
> + *tmp = msg;
> + virCondSignal(&fdst->threadCond);
> +
> + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
> + virReportSystemError(errno,
> + _("Unable to write to %s"),
> + fdname);
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +
> +static virFDStreamMsgPtr
> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
> + int fd,
> + const char *fdname)
> +{
> + virFDStreamMsgPtr tmp = fdst->msg;
> + char c;
> +
> + if (tmp) {
> + fdst->msg = tmp->next;
> + tmp->next = NULL;
> + }
> +
> + virCondSignal(&fdst->threadCond);
> +
> + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
> + virReportSystemError(errno,
> + _("Unable to read from %s"),
> + fdname);
> + return NULL;
> + }
> +
> + return tmp;
> +}
Both these methods signal the condition
> +static ssize_t
> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
> + const int fdin,
> + const int fdout,
> + const char *fdinname,
> + const char *fdoutname,
> + size_t buflen)
> +{
> + virFDStreamMsgPtr msg = NULL;
> + char *buf = NULL;
> + ssize_t got;
> +
> + if (VIR_ALLOC(msg) < 0)
> + goto error;
> +
> + if (VIR_ALLOC_N(buf, buflen) < 0)
> + goto error;
> +
> + if ((got = saferead(fdin, buf, buflen)) < 0) {
> + virReportSystemError(errno,
> + _("Unable to read %s"),
> + fdinname);
> + goto error;
> + }
> +
> + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
> + msg->stream.data.buf = buf;
> + msg->stream.data.len = got;
> + buf = NULL;
> +
> + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
> + msg = NULL;
> +
> + return got;
> +
> + error:
> + VIR_FREE(buf);
> + virFDStreamMsgFree(msg);
> + return -1;
> +}
> +
> +
> +static ssize_t
> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
> + const int fdin,
> + const int fdout,
> + const char *fdinname,
> + const char *fdoutname)
> +{
> + ssize_t got;
> + virFDStreamMsgPtr msg = fdst->msg;
> + bool pop = false;
> +
> + switch (msg->type) {
> + case VIR_FDSTREAM_MSG_TYPE_DATA:
> + got = safewrite(fdout,
> + msg->stream.data.buf + msg->stream.data.offset,
> + msg->stream.data.len - msg->stream.data.offset);
> + if (got < 0) {
> + virReportSystemError(errno,
> + _("Unable to write %s"),
> + fdoutname);
> + return -1;
> + }
> +
> + msg->stream.data.offset += got;
> +
> + pop = msg->stream.data.offset == msg->stream.data.len;
> + break;
> + }
> +
> + if (pop) {
> + virFDStreamMsgQueuePop(fdst, fdin, fdinname);
> + virFDStreamMsgFree(msg);
> + }
> +
> + return got;
> +}
Both these methods call into the Pop/Push functions which
signal the condition.
> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
> int fdout = data->fdout;
> char *fdoutname = data->fdoutname;
> virFDStreamDataPtr fdst = st->privateData;
> - char *buf = NULL;
> + bool doRead = fdst->threadDoRead;
> size_t buflen = 256 * 1024;
> size_t total = 0;
>
> virObjectRef(fdst);
> -
> - if (VIR_ALLOC_N(buf, buflen) < 0)
> - goto error;
> + virObjectLock(fdst);
>
> while (1) {
> ssize_t got;
> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
> if (buflen == 0)
> break; /* End of requested data from client */
>
> - if ((got = saferead(fdin, buf, buflen)) < 0) {
> - virReportSystemError(errno,
> - _("Unable to read %s"),
> - fdinname);
> + while (doRead == (fdst->msg != NULL) &&
> + !fdst->threadQuit) {
> + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
> + virReportSystemError(errno, "%s",
> + _("failed to wait on condition"));
> + goto error;
> + }
> + }
> +
> + if (fdst->threadQuit) {
> + /* If stream abort was requested, quit early. */
> + if (fdst->threadAbort)
> + goto cleanup;
> +
> + /* Otherwise flush buffers and quit gracefully. */
> + if (doRead == (fdst->msg != NULL))
> + break;
> + }
> +
> + if (doRead)
> + got = virFDStreamThreadDoRead(fdst,
> + fdin, fdout,
> + fdinname, fdoutname,
> + buflen);
> + else
> + got = virFDStreamThreadDoWrite(fdst,
> + fdin, fdout,
> + fdinname, fdoutname);
> +
> + if (got < 0)
This method waits on the condition.
So unless I'm mistaken, this thread is signaling & waiting on
the same condition, which feels wrong. Generally different
threads would signal vs wait.
I hear what you're saying but I don't think this is a problem. Exactly
because of the way we wait on the condition. Assume this thread is doing
reads. That is it reads from fdin (an actual file on a disk) and feeds
the message queue with the data. Now, it reads some data, push it at the
end of the message queue (fdst->msg != NULL at that point) and goes to
the while loop above. Effectively it eats all the spurious wake ups for
as long as fdst->msg != NULL.
Now assume the thread is doing writes (read data from the queue and
writes into a file a disk). Again, as long as there are some messages to
be written (that is fdst->msg != NULL) the control won't even reach
virCondWait. And if it does, it's because fdst->msg == NULL in which
case there is no data to be written.
But something has just came up to my mind whilst writing these lines - I
wonder if we can ditch the condition entirely and rely on the pipe +
poll(). I mean, this worker would use pipe to signalize to the event
loop that there is a message waiting for it in the queue. Question is
how thread safe this approach would be.
Michal