On 04/20/2017 06:01 AM, Michal Privoznik wrote:
One big downside of using the pipe to transfer the data is that
we can really transfer just bare data. No metadata can be carried
through unless some formatted messages are introduced. That would
be quite painful to achieve so let's use a message queue. It's
fairly easy to exchange info between threads now that iohelper is
no longer used.
The reason why we cannot use the FD for plain files directly is
that despite us setting noblock flag on the FD, any
read()/write() blocks regardless (which is a show stopper since
those parts of the code are run from the event loop) and poll()
reports such FD as always readable/writable - even though the
subsequent operation might block.
The pipe is still not gone though. It is used to signal to even
loop that an event occurred (e.g. data are available for reading
in the queue, or vice versa).
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 350 insertions(+), 52 deletions(-)
Very strange - compilation breaks on this patch:
util/virfdstream.c: In function 'virFDStreamThread':
util/virfdstream.c:551:15: error: 'got' may be used uninitialized in
this function [-Werror=maybe-uninitialized]
total += got;
^~
The reason this happens is that virFDStreamThreadDoWrite doesn't
initialize got, so it's return is indeterminate if (msg->type) is not
VIR_FDSTREAM_MSG_TYPE_DATA
Wish the complaint was in the right function...
Before I forget... starting here - perhaps a bit "nervous" about the
claim from your ping that "Patches 01-07 are fairly trivial and are more
of a bug fixes than feature implementation"... I'd almost say 1-4 are
trivial, 5 is a little less so trivial since you're "removing" the
iohelper command and replacing it "inline", but this has moved into
altering more of the algorithm. So close to a release...
I see Daniel has responded to this one too ... still I'll point out a
few more things...
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index 7a8d65d..0350494 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -49,6 +49,27 @@
VIR_LOG_INIT("fdstream");
+typedef enum {
+ VIR_FDSTREAM_MSG_TYPE_DATA,
+} virFDStreamMsgType;
+
+typedef struct _virFDStreamMsg virFDStreamMsg;
+typedef virFDStreamMsg *virFDStreamMsgPtr;
+struct _virFDStreamMsg {
+ virFDStreamMsgPtr next;
+
+ virFDStreamMsgType type;
+
+ union {
+ struct {
+ char *buf;
+ size_t len;
+ size_t offset;
+ } data;
+ } stream;
+};
+
+
/* Tunnelled migration stream support */
typedef struct virFDStreamData virFDStreamData;
typedef virFDStreamData *virFDStreamDataPtr;
@@ -80,18 +101,25 @@ struct virFDStreamData {
/* Thread data */
virThreadPtr thread;
+ virCond threadCond;
int threadErr;
bool threadQuit;
+ bool threadAbort;
+ bool threadDoRead;
+ virFDStreamMsgPtr msg;
};
static virClassPtr virFDStreamDataClass;
+static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
+
static void
virFDStreamDataDispose(void *obj)
{
virFDStreamDataPtr fdst = obj;
VIR_DEBUG("obj=%p", fdst);
+ virFDStreamMsgQueueFree(&fdst->msg);
}
static int virFDStreamDataOnceInit(void)
@@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void)
VIR_ONCE_GLOBAL_INIT(virFDStreamData)
+static int
+virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
+ virFDStreamMsgPtr msg,
+ int fd,
+ const char *fdname)
+{
+ virFDStreamMsgPtr *tmp = &fdst->msg;
+ char c = '1';
+
+ while (*tmp)
+ tmp = &(*tmp)->next;
+
+ *tmp = msg;
+ virCondSignal(&fdst->threadCond);
+
+ if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
+ virReportSystemError(errno,
+ _("Unable to write to %s"),
+ fdname);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static virFDStreamMsgPtr
+virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
+ int fd,
+ const char *fdname)
+{
+ virFDStreamMsgPtr tmp = fdst->msg;
+ char c;
+
+ if (tmp) {
+ fdst->msg = tmp->next;
+ tmp->next = NULL;
+ }
+
+ virCondSignal(&fdst->threadCond);
+
+ if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
+ virReportSystemError(errno,
+ _("Unable to read from %s"),
+ fdname);
+ return NULL;
+ }
+
+ return tmp;
+}
+
+
+static void
+virFDStreamMsgFree(virFDStreamMsgPtr msg)
+{
+ if (!msg)
+ return;
+
+ switch (msg->type) {
+ case VIR_FDSTREAM_MSG_TYPE_DATA:
+ VIR_FREE(msg->stream.data.buf);
+ break;
+ }
+
+ VIR_FREE(msg);
+}
+
+
+static void
+virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
+{
+ virFDStreamMsgPtr tmp = *queue;
+
+ while (tmp) {
+ virFDStreamMsgPtr next = tmp->next;
+ virFDStreamMsgFree(tmp);
+ tmp = next;
+ }
+
+ *queue = NULL;
+}
+
+
static int virFDStreamRemoveCallback(virStreamPtr stream)
{
virFDStreamDataPtr fdst = stream->privateData;
@@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
struct _virFDStreamThreadData {
virStreamPtr st;
size_t length;
+ bool doRead;
int fdin;
char *fdinname;
int fdout;
@@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
}
+static ssize_t
+virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+ const int fdin,
+ const int fdout,
+ const char *fdinname,
+ const char *fdoutname,
+ size_t buflen)
+{
+ virFDStreamMsgPtr msg = NULL;
+ char *buf = NULL;
+ ssize_t got;
got = -1;
Not really required yet, but if additional code gets added...
+
+ if (VIR_ALLOC(msg) < 0)
+ goto error;
+
+ if (VIR_ALLOC_N(buf, buflen) < 0)
+ goto error;
+
+ if ((got = saferead(fdin, buf, buflen)) < 0) {
+ virReportSystemError(errno,
+ _("Unable to read %s"),
+ fdinname);
+ goto error;
+ }
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+ msg->stream.data.buf = buf;
Could also go with the VIR_STEAL_PTR(msg->stream.data.buf, buf);
avoiding the buf = NULL below
+ msg->stream.data.len = got;
+ buf = NULL;
+
+ virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
+ msg = NULL;
*QueuePush is not a void. What happens if safewrite fails?
+
+ return got;
+
+ error:
+ VIR_FREE(buf);
+ virFDStreamMsgFree(msg);
+ return -1;
+}
+
+
+static ssize_t
+virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+ const int fdin,
+ const int fdout,
+ const char *fdinname,
+ const char *fdoutname)
+{
+ ssize_t got;
got = -1
is required here since got is interdeterminate if msg->type != TYPE_DATA
+ virFDStreamMsgPtr msg = fdst->msg;
+ bool pop = false;
+
+ switch (msg->type) {
+ case VIR_FDSTREAM_MSG_TYPE_DATA:
+ got = safewrite(fdout,
+ msg->stream.data.buf + msg->stream.data.offset,
+ msg->stream.data.len - msg->stream.data.offset);
+ if (got < 0) {
+ virReportSystemError(errno,
+ _("Unable to write %s"),
+ fdoutname);
+ return -1;
+ }
+
+ msg->stream.data.offset += got;
+
+ pop = msg->stream.data.offset == msg->stream.data.len;
+ break;
+ }
+
+ if (pop) {
+ virFDStreamMsgQueuePop(fdst, fdin, fdinname);
*QueuePop is not a void... What if saferead fails?
+ virFDStreamMsgFree(msg);
+ }
+
+ return got;
+}
+
+
static void
virFDStreamThread(void *opaque)
{
@@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
int fdout = data->fdout;
char *fdoutname = data->fdoutname;
virFDStreamDataPtr fdst = st->privateData;
- char *buf = NULL;
+ bool doRead = fdst->threadDoRead;
size_t buflen = 256 * 1024;
size_t total = 0;
virObjectRef(fdst);
-
- if (VIR_ALLOC_N(buf, buflen) < 0)
- goto error;
+ virObjectLock(fdst);
while (1) {
ssize_t got;
@@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
if (buflen == 0)
break; /* End of requested data from client */
- if ((got = saferead(fdin, buf, buflen)) < 0) {
- virReportSystemError(errno,
- _("Unable to read %s"),
- fdinname);
+ while (doRead == (fdst->msg != NULL) &&
+ !fdst->threadQuit) {
+ if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
+ virReportSystemError(errno, "%s",
+ _("failed to wait on condition"));
+ goto error;
+ }
+ }
+
+ if (fdst->threadQuit) {
+ /* If stream abort was requested, quit early. */
+ if (fdst->threadAbort)
+ goto cleanup;
+
+ /* Otherwise flush buffers and quit gracefully. */
+ if (doRead == (fdst->msg != NULL))
+ break;
+ }
+
+ if (doRead)
+ got = virFDStreamThreadDoRead(fdst,
+ fdin, fdout,
+ fdinname, fdoutname,
+ buflen);
+ else
+ got = virFDStreamThreadDoWrite(fdst,
+ fdin, fdout,
+ fdinname, fdoutname);
+
+ if (got < 0)
goto error;
- }
if (got == 0)
break;
total += got;
-
- if (safewrite(fdout, buf, got) < 0) {
- virReportSystemError(errno,
- _("Unable to write %s"),
- fdoutname);
- goto error;
- }
}
cleanup:
+ fdst->threadQuit = true;
+ virObjectUnlock(fdst);
if (!virObjectUnref(fdst))
st->privateData = NULL;
VIR_FORCE_CLOSE(fdin);
VIR_FORCE_CLOSE(fdout);
virFDStreamThreadDataFree(data);
- VIR_FREE(buf);
return;
error:
- virObjectLock(fdst);
fdst->threadErr = errno;
- virObjectUnlock(fdst);
goto cleanup;
}
@@ -367,6 +574,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
if (!fdst->thread)
return 0;
+ fdst->threadAbort = streamAbort;
+ fdst->threadQuit = true;
+ virCondSignal(&fdst->threadCond);
+
/* Give the thread a chance to lock the FD stream object. */
virObjectUnlock(fdst);
virThreadJoin(fdst->thread);
@@ -380,6 +591,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
ret = 0;
cleanup:
VIR_FREE(fdst->thread);
+ virCondDestroy(&fdst->threadCond);
return ret;
}
@@ -426,11 +638,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
fdst->abortCallbackDispatching = false;
}
- /* mutex locked */
- ret = VIR_CLOSE(fdst->fd);
if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
ret = -1;
+ /* mutex locked */
+ if ((ret = VIR_CLOSE(fdst->fd)) < 0)
+ virReportSystemError(errno, "%s",
+ _("Unable to close"));
+
st->privateData = NULL;
/* call the internal stream closing callback */
@@ -467,7 +682,8 @@ virFDStreamAbort(virStreamPtr st)
static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
{
virFDStreamDataPtr fdst = st->privateData;
- int ret;
+ virFDStreamMsgPtr msg = NULL;
+ int ret = -1;
if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s",
@@ -495,25 +711,51 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes,
size_t nbytes)
nbytes = fdst->length - fdst->offset;
}
- retry:
- ret = write(fdst->fd, bytes, nbytes);
- if (ret < 0) {
- VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- VIR_WARNINGS_RESET
- ret = -2;
- } else if (errno == EINTR) {
- goto retry;
- } else {
- ret = -1;
- virReportSystemError(errno, "%s",
+ if (fdst->thread) {
+ char *buf;
+
+ if (fdst->threadQuit) {
+ virReportSystemError(EBADF, "%s",
_("cannot write to stream"));
+ return -1;
either virObjectUnlock(fdst); goto cleanup (other fdst remains locked)
+ }
+
+ if (VIR_ALLOC(msg) < 0 ||
+ VIR_ALLOC_N(buf, nbytes) < 0)
+ goto cleanup;
+
+ memcpy(buf, bytes, nbytes);
+ msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+ msg->stream.data.buf = buf;
+ msg->stream.data.len = nbytes;
+
+ virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
*QueuePush is not a void... What happens if safewrite fails?
+ msg = NULL;
+ ret = nbytes;
+ } else {
+ retry:
+ ret = write(fdst->fd, bytes, nbytes);
+ if (ret < 0) {
+ VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ VIR_WARNINGS_RESET
+ ret = -2;
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ ret = -1;
+ virReportSystemError(errno, "%s",
+ _("cannot write to stream"));
+ }
Should there be a goto cleanup here so that we avoid any chance that
fstd->length > 0 and thus fdst->offset gets decremented by 1 or 2? Or
me thinking that could happen without actually looking for whether
fdst->length could be non zero here.
}
- } else if (fdst->length) {
- fdst->offset += ret;
}
+ if (fdst->length)
+ fdst->offset += ret;
+
+ cleanup:
virObjectUnlock(fdst);
+ virFDStreamMsgFree(msg);
return ret;
}
@@ -521,7 +763,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes,
size_t nbytes)
static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
{
virFDStreamDataPtr fdst = st->privateData;
- int ret;
+ int ret = -1;
if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s",
@@ -547,24 +789,70 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t
nbytes)
nbytes = fdst->length - fdst->offset;
}
- retry:
- ret = read(fdst->fd, bytes, nbytes);
- if (ret < 0) {
- VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- VIR_WARNINGS_RESET
- ret = -2;
- } else if (errno == EINTR) {
- goto retry;
- } else {
- ret = -1;
- virReportSystemError(errno, "%s",
- _("cannot read from stream"));
+ if (fdst->thread) {
+ virFDStreamMsgPtr msg = NULL;
+
+ while (!(msg = fdst->msg)) {
+ if (fdst->threadQuit) {
+ if (nbytes) {
+ virReportSystemError(EBADF, "%s",
+ _("stream is not open"));
+ } else {
+ ret = 0;
+ }
+ goto cleanup;
+ } else {
+ virObjectUnlock(fdst);
+ virCondSignal(&fdst->threadCond);
+ virObjectLock(fdst);
+ }
+ }
+
+ if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
+ /* Nope, nope, I'm outta here */
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("unexpected message type"));
+ goto cleanup;
+ }
+
+ if (nbytes > msg->stream.data.len - msg->stream.data.offset)
+ nbytes = msg->stream.data.len - msg->stream.data.offset;
+
+ memcpy(bytes,
+ msg->stream.data.buf + msg->stream.data.offset,
+ nbytes);
+
+ msg->stream.data.offset += nbytes;
+ if (msg->stream.data.offset == msg->stream.data.len) {
+ virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
*QueuePop is not a void... what if saferead fails.
+ virFDStreamMsgFree(msg);
+ }
+
+ ret = nbytes;
+
+ } else {
+ retry:
+ ret = read(fdst->fd, bytes, nbytes);
+ if (ret < 0) {
+ VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ VIR_WARNINGS_RESET
+ ret = -2;
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ ret = -1;
+ virReportSystemError(errno, "%s",
+ _("cannot read from stream"));
+ }
+ goto cleanup;
I think I know the answer to my question from virFDStreamWrite now..
John
}
- } else if (fdst->length) {
- fdst->offset += ret;
}
+ if (fdst->length)
+ fdst->offset += ret;
+
+ cleanup:
virObjectUnlock(fdst);
return ret;
}
@@ -609,11 +897,19 @@ static int virFDStreamOpenInternal(virStreamPtr st,
st->privateData = fdst;
if (threadData) {
+ fdst->threadDoRead = threadData->doRead;
+
/* Create the thread after fdst and st were initialized.
* The thread worker expects them to be that way. */
if (VIR_ALLOC(fdst->thread) < 0)
goto error;
+ if (virCondInit(&fdst->threadCond) < 0) {
+ virReportSystemError(errno, "%s",
+ _("cannot initialize condition variable"));
+ goto error;
+ }
+
if (virThreadCreate(fdst->thread,
true,
virFDStreamThread,
@@ -782,6 +1078,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
goto error;
tmpfd = pipefds[0];
+ threadData->doRead = true;
} else {
threadData->fdin = pipefds[0];
threadData->fdout = fd;
@@ -789,6 +1086,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
VIR_STRDUP(threadData->fdoutname, path) < 0)
goto error;
tmpfd = pipefds[1];
+ threadData->doRead = false;
}
}