[libvirt] [PATCH 0/7] Introduce virCommandDoAsyncIO

Currently, if we want to use IO with asynchronous command we have to copy code from virFileWrapperFd to misuse our event loop for reading and writing to the command. However, we can extend our virCommand implementation to automatically set things up. Michal Privoznik (7): virCommand: Introduce virCommandDoAsyncIO Introduce event loop to commandtest tests: Create test for virCommandDoAsyncIO virFileWrapperFd: Switch to new virCommandDoAsyncIO qemu: Catch stderr of image decompression binary qemu: Catch stderr of image compression binary qemu: Destroy domain on decompression binary error src/libvirt_private.syms | 1 + src/qemu/qemu_driver.c | 9 +- src/qemu/qemu_migration.c | 9 +- src/util/vircommand.c | 214 ++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + src/util/virfile.c | 82 +---------------- tests/commanddata/test3.log | 2 +- tests/commandtest.c | 136 ++++++++++++++++++++++++++++ 8 files changed, 365 insertions(+), 89 deletions(-) -- 1.8.0.2

Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this: virCommandPtr cmd = virCommandNew*(...); char *buf = NULL; ... virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd); if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup; ... if (virCommandWait(cmd, NULL) < 0) goto cleanup; /* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf)); ... cleanup: VIR_FREE(buf); virCommandFree(cmd); Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 214 +++++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + 3 files changed, 208 insertions(+), 8 deletions(-) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index fc23adc..f89d1aa 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -142,6 +142,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..5d67bd2 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@ /* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), }; struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr; + size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1; cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1; virCommandAddArgSet(cmd, args); @@ -2122,6 +2129,152 @@ virCommandHook(void *data) } +static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char **bufptr = NULL; + char buf[1024]; + ssize_t nread; + size_t len = 0; + int *watchPtr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf); + + while ((nread = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset))) { + if (nread < 0) { + if (errno != EAGAIN) + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + break; + } + + cmd->inbufOffset += nread; + if (cmd->inbufOffset == len) + VIR_FORCE_CLOSE(cmd->infd); + } + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = cmd->outbuf; + } + + if (watch == cmd->errWatch) { + watchPtr = &cmd->errWatch; + bufptr = cmd->errbuf; + } + + if (bufptr && events & VIR_EVENT_HANDLE_READABLE) { + if (*bufptr) + len = strlen(*bufptr); + + while ((nread = read(fd, buf, sizeof(buf)))) { + if (nread < 0) { + if (errno != EAGAIN) + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + break; + } + + if (VIR_REALLOC_N(*bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(*bufptr + len, buf, nread); + (*bufptr)[len + nread] = '\0'; + } + } + } + + if (events & VIR_EVENT_HANDLE_HANGUP) { + *watchPtr = -1; + virEventRemoveHandle(watch); + } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf) { + if (virSetNonBlock(cmd->infd) < 0) { + virReportSystemError(errno, + _("Failed to set non-blocking flag on FD %d"), + cmd->infd); + goto cleanup; + } + + if ((cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd) { + if (virSetNonBlock(cmd->outfd) < 0) { + virReportSystemError(errno, + _("Failed to set non-blocking flag on FD %d"), + cmd->outfd); + goto cleanup; + } + + if ((cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + goto cleanup; + } + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd) { + if (virSetNonBlock(cmd->errfd) < 0) { + virReportSystemError(errno, + _("Failed to set non-blocking flag on FD %d"), + cmd->errfd); + goto cleanup; + } + + if ((cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + goto cleanup; + } + } + + ret = 0; + +cleanup: + return ret; +} + + /** * virCommandRunAsync: * @cmd: command to start @@ -2149,6 +2302,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) char *str; int i; bool synchronous = false; + int infd[2] = {-1, -1}; if (!cmd || cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2163,10 +2317,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC; - /* Buffer management can only be requested via virCommandRun. */ - if ((cmd->inbuf && cmd->infd == -1) || - (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || - (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { + /* Buffer management can only be requested via virCommandRun, unless help + * from the event loop has been requested via virCommandDoAsyncIO. */ + if (cmd->flags & VIR_EXEC_ASYNC_IO) { + /* If we have an input buffer, we need + * a pipe to feed the data to the child */ + if (cmd->inbuf && cmd->infd == -1) { + if (pipe2(infd, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("unable to open pipe")); + cmd->has_error = -1; + return -1; + } + cmd->infd = infd[0]; + } + } else if ((cmd->inbuf && cmd->infd == -1) || + (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || + (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot mix string I/O with asynchronous command")); return -1; @@ -2228,6 +2395,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) else cmd->reap = true; + if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { + cmd->flags &= ~VIR_EXEC_ASYNC_IO; + if (cmd->inbuf && cmd->infd != -1) { + /* close the read end of infd and replace it with the write end */ + VIR_FORCE_CLOSE(cmd->infd); + cmd->infd = infd[1]; + } + ret = virCommandRegisterEventLoop(cmd); + } + return ret; } @@ -2265,6 +2442,11 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) return -1; } + while (cmd->inWatch != -1 && + cmd->outWatch != -1 && + cmd->errWatch != -1) + usleep(100); + /* If virProcessWait reaps pid but then returns failure because * exitstatus was NULL, then a second virCommandWait would risk * calling waitpid on an unrelated process. Besides, that error @@ -2516,8 +2698,24 @@ virCommandFree(virCommandPtr cmd) if (cmd->reap) virCommandAbort(cmd); + if (cmd->inWatch != -1) + virEventRemoveHandle(cmd->inWatch); + if (cmd->outWatch != -1) + virEventRemoveHandle(cmd->outWatch); + if (cmd->errWatch != -1) + virEventRemoveHandle(cmd->errWatch); + VIR_FREE(cmd->transfer); VIR_FREE(cmd->preserve); VIR_FREE(cmd); } + +void +virCommandDoAsyncIO(virCommandPtr cmd) +{ + if (!cmd || cmd->has_error) + return; + + cmd->flags |= VIR_EXEC_ASYNC_IO; +} diff --git a/src/util/vircommand.h b/src/util/vircommand.h index 9b7117d..c1a2e24 100644 --- a/src/util/vircommand.h +++ b/src/util/vircommand.h @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd); void virCommandFree(virCommandPtr cmd); +void virCommandDoAsyncIO(virCommandPtr cmd); #endif /* __VIR_COMMAND_H__ */ -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this:
virCommandPtr cmd = virCommandNew*(...); char *buf = NULL;
...
virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd);
if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup;
...
if (virCommandWait(cmd, NULL) < 0) goto cleanup;
/* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf));
...
cleanup: VIR_FREE(buf); virCommandFree(cmd);
Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 214 +++++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + 3 files changed, 208 insertions(+), 8 deletions(-)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index fc23adc..f89d1aa 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -142,6 +142,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..5d67bd2 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@
/* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), };
struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr;
+ size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1;
cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1;
virCommandAddArgSet(cmd, args); @@ -2122,6 +2129,152 @@ virCommandHook(void *data) }
+static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char **bufptr = NULL; + char buf[1024]; + ssize_t nread; + size_t len = 0; + int *watchPtr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf);
I suppose this isn't intended to work on non-string data. It is worth documenting this so that it doesn't surprise someone.
+ + while ((nread = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset))) {
Hm, what's the difference between len and inbuffOffset here?
+ if (nread < 0) { + if (errno != EAGAIN) + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + break; + } + + cmd->inbufOffset += nread; + if (cmd->inbufOffset == len) + VIR_FORCE_CLOSE(cmd->infd); + } + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = cmd->outbuf; + } + + if (watch == cmd->errWatch) { + watchPtr = &cmd->errWatch; + bufptr = cmd->errbuf; + } + + if (bufptr && events & VIR_EVENT_HANDLE_READABLE) { + if (*bufptr) + len = strlen(*bufptr); + + while ((nread = read(fd, buf, sizeof(buf)))) { + if (nread < 0) { + if (errno != EAGAIN) + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + break; + } + + if (VIR_REALLOC_N(*bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(*bufptr + len, buf, nread); + (*bufptr)[len + nread] = '\0'; + } + } + } + + if (events & VIR_EVENT_HANDLE_HANGUP) { + *watchPtr = -1; + virEventRemoveHandle(watch); + } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf) { + if (virSetNonBlock(cmd->infd) < 0) { + virReportSystemError(errno, + _("Failed to set non-blocking flag on FD %d"), + cmd->infd); + goto cleanup; + } + + if ((cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd) { + if (virSetNonBlock(cmd->outfd) < 0) { + virReportSystemError(errno, + _("Failed to set non-blocking flag on FD %d"), + cmd->outfd); + goto cleanup; + } + + if ((cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + goto cleanup; + } + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd) { + if (virSetNonBlock(cmd->errfd) < 0) { + virReportSystemError(errno, + _("Failed to set non-blocking flag on FD %d"), + cmd->errfd); + goto cleanup; + } + + if ((cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + goto cleanup; + } + } + + ret = 0; + +cleanup: + return ret; +} + + /** * virCommandRunAsync: * @cmd: command to start @@ -2149,6 +2302,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) char *str; int i; bool synchronous = false; + int infd[2] = {-1, -1};
if (!cmd || cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2163,10 +2317,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC;
- /* Buffer management can only be requested via virCommandRun. */ - if ((cmd->inbuf && cmd->infd == -1) || - (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || - (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { + /* Buffer management can only be requested via virCommandRun, unless help + * from the event loop has been requested via virCommandDoAsyncIO. */ + if (cmd->flags & VIR_EXEC_ASYNC_IO) { + /* If we have an input buffer, we need + * a pipe to feed the data to the child */ + if (cmd->inbuf && cmd->infd == -1) { + if (pipe2(infd, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("unable to open pipe")); + cmd->has_error = -1; + return -1; + } + cmd->infd = infd[0]; + } + } else if ((cmd->inbuf && cmd->infd == -1) || + (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || + (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot mix string I/O with asynchronous command")); return -1; @@ -2228,6 +2395,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) else cmd->reap = true;
+ if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { + cmd->flags &= ~VIR_EXEC_ASYNC_IO; + if (cmd->inbuf && cmd->infd != -1) { + /* close the read end of infd and replace it with the write end */ + VIR_FORCE_CLOSE(cmd->infd); + cmd->infd = infd[1]; + } + ret = virCommandRegisterEventLoop(cmd); + } + return ret; }
@@ -2265,6 +2442,11 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) return -1; }
+ while (cmd->inWatch != -1 && + cmd->outWatch != -1 && + cmd->errWatch != -1) + usleep(100);
Hm, is there a possibility to avoid this active wait loop?
+ /* If virProcessWait reaps pid but then returns failure because * exitstatus was NULL, then a second virCommandWait would risk * calling waitpid on an unrelated process. Besides, that error @@ -2516,8 +2698,24 @@ virCommandFree(virCommandPtr cmd) if (cmd->reap) virCommandAbort(cmd);
+ if (cmd->inWatch != -1) + virEventRemoveHandle(cmd->inWatch); + if (cmd->outWatch != -1) + virEventRemoveHandle(cmd->outWatch); + if (cmd->errWatch != -1) + virEventRemoveHandle(cmd->errWatch); + VIR_FREE(cmd->transfer); VIR_FREE(cmd->preserve);
VIR_FREE(cmd); } + +void +virCommandDoAsyncIO(virCommandPtr cmd) +{ + if (!cmd || cmd->has_error) + return; + + cmd->flags |= VIR_EXEC_ASYNC_IO; +} diff --git a/src/util/vircommand.h b/src/util/vircommand.h index 9b7117d..c1a2e24 100644 --- a/src/util/vircommand.h +++ b/src/util/vircommand.h @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd);
void virCommandFree(virCommandPtr cmd);
+void virCommandDoAsyncIO(virCommandPtr cmd); #endif /* __VIR_COMMAND_H__ */
Peter

On 25.01.2013 19:26, Peter Krempa wrote:
On 01/23/13 10:41, Michal Privoznik wrote:
Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this:
virCommandPtr cmd = virCommandNew*(...); char *buf = NULL;
...
virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd);
if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup;
...
if (virCommandWait(cmd, NULL) < 0) goto cleanup;
/* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf));
...
cleanup: VIR_FREE(buf); virCommandFree(cmd);
Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 214 +++++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + 3 files changed, 208 insertions(+), 8 deletions(-)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index fc23adc..f89d1aa 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -142,6 +142,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..5d67bd2 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@
/* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), };
struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr;
+ size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1;
cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1;
virCommandAddArgSet(cmd, args); @@ -2122,6 +2129,152 @@ virCommandHook(void *data) }
+static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char **bufptr = NULL; + char buf[1024]; + ssize_t nread; + size_t len = 0; + int *watchPtr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf);
I suppose this isn't intended to work on non-string data. It is worth documenting this so that it doesn't surprise someone.
Yep, I agree. Especially after my (safe-)read issue :) On the other hand, same strlen() is used in virCommandProcessIO(), that is virCommandRun(). So we and virCommandRunAsync() are able to cope with the very same set of string as synchronous variant. But then again, mentioning this in function description is better way of threating developers than letting them to have to find out themselves.
+ + while ((nread = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset))) {
Hm, what's the difference between len and inbuffOffset here?
'len' holds the length of cmd->inbuf, where 'cmd->inbufOffset' says how much of 'cmd->inbuf' has been written ...
+ if (nread < 0) { + if (errno != EAGAIN) + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + break; + } + + cmd->inbufOffset += nread;
... which is why it is updated here.
+ if (cmd->inbufOffset == len) + VIR_FORCE_CLOSE(cmd->infd); + } + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = cmd->outbuf; + } + + if (watch == cmd->errWatch) { + watchPtr = &cmd->errWatch; + bufptr = cmd->errbuf; + } + + if (bufptr && events & VIR_EVENT_HANDLE_READABLE) { + if (*bufptr) + len = strlen(*bufptr); + + while ((nread = read(fd, buf, sizeof(buf)))) { + if (nread < 0) { + if (errno != EAGAIN) + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + break; + } + + if (VIR_REALLOC_N(*bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(*bufptr + len, buf, nread); + (*bufptr)[len + nread] = '\0'; + } + } + } + + if (events & VIR_EVENT_HANDLE_HANGUP) { + *watchPtr = -1; + virEventRemoveHandle(watch); + } +} + +
@@ -2265,6 +2442,11 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) return -1; }
+ while (cmd->inWatch != -1 && + cmd->outWatch != -1 && + cmd->errWatch != -1) + usleep(100);
Hm, is there a possibility to avoid this active wait loop?
I think I can add a condition variable to wait on. Let me post a v2 and we will see.
+ /* If virProcessWait reaps pid but then returns failure because * exitstatus was NULL, then a second virCommandWait would risk * calling waitpid on an unrelated process. Besides, that error @@ -2516,8 +2698,24 @@ virCommandFree(virCommandPtr cmd) if (cmd->reap) virCommandAbort(cmd);
+ if (cmd->inWatch != -1) + virEventRemoveHandle(cmd->inWatch); + if (cmd->outWatch != -1) + virEventRemoveHandle(cmd->outWatch); + if (cmd->errWatch != -1) + virEventRemoveHandle(cmd->errWatch); + VIR_FREE(cmd->transfer); VIR_FREE(cmd->preserve);
VIR_FREE(cmd); } + +void +virCommandDoAsyncIO(virCommandPtr cmd) +{ + if (!cmd || cmd->has_error) + return; + + cmd->flags |= VIR_EXEC_ASYNC_IO; +} diff --git a/src/util/vircommand.h b/src/util/vircommand.h index 9b7117d..c1a2e24 100644 --- a/src/util/vircommand.h +++ b/src/util/vircommand.h @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd);
void virCommandFree(virCommandPtr cmd);
+void virCommandDoAsyncIO(virCommandPtr cmd); #endif /* __VIR_COMMAND_H__ */
Peter

This is just preparing environment for the next patch, which is going to need an event loop. --- tests/commanddata/test3.log | 2 +- tests/commandtest.c | 87 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/tests/commanddata/test3.log b/tests/commanddata/test3.log index 6bd7974..bd06371 100644 --- a/tests/commanddata/test3.log +++ b/tests/commanddata/test3.log @@ -8,7 +8,7 @@ ENV:USER=test FD:0 FD:1 FD:2 -FD:3 FD:5 +FD:8 DAEMON:no CWD:/tmp diff --git a/tests/commandtest.c b/tests/commandtest.c index b5c5882..00d2eac 100644 --- a/tests/commandtest.c +++ b/tests/commandtest.c @@ -37,9 +37,19 @@ #include "virfile.h" #include "virpidfile.h" #include "virerror.h" +#include "virthread.h" #define VIR_FROM_THIS VIR_FROM_NONE +typedef struct _virCommandTestData virCommandTestData; +typedef virCommandTestData *virCommandTestDataPtr; +struct _virCommandTestData { + virMutex lock; + virThread thread; + bool quit; + bool running; +}; + #ifdef WIN32 int @@ -841,11 +851,46 @@ static const char *const newenv[] = { NULL }; +static void virCommandThreadWorker(void *opaque) +{ + virCommandTestDataPtr test = opaque; + + virMutexLock(&test->lock); + + while (!test->quit) { + virMutexUnlock(&test->lock); + + if (virEventRunDefaultImpl() < 0) { + test->quit = true; + break; + } + + virMutexLock(&test->lock); + } + + test->running = false; + + virMutexUnlock(&test->lock); + return; +} + +static void +virCommandTestFreeTimer(int timer ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + /* nothing to be done here */ +} + static int mymain(void) { int ret = 0; int fd; + virCommandTestDataPtr test = NULL; + int timer = -1; + + if (virThreadInitialize() < 0) + return EXIT_FAILURE; if (chdir("/tmp") < 0) return EXIT_FAILURE; @@ -886,6 +931,30 @@ mymain(void) fd = 5; VIR_FORCE_CLOSE(fd); + virEventRegisterDefaultImpl(); + if (VIR_ALLOC(test) < 0) { + virReportOOMError(); + goto cleanup; + } + + if (virMutexInit(&test->lock) < 0) { + printf("Unable to init mutex: %d\n", errno); + goto cleanup; + } + + virMutexLock(&test->lock); + + if (virThreadCreate(&test->thread, + true, + virCommandThreadWorker, + test) < 0) { + virMutexUnlock(&test->lock); + goto cleanup; + } + + test->running = true; + virMutexUnlock(&test->lock); + environ = (char **)newenv; # define DO_TEST(NAME) \ @@ -915,6 +984,24 @@ mymain(void) DO_TEST(test19); DO_TEST(test20); + virMutexLock(&test->lock); + if (test->running) { + test->quit = true; + /* HACK: Add a dummy timeout to break event loop */ + timer = virEventAddTimeout(0, virCommandTestFreeTimer, NULL, NULL); + } + virMutexUnlock(&test->lock); + +cleanup: + if (test->running) + virThreadJoin(&test->thread); + + if (timer != -1) + virEventRemoveTimeout(timer); + + virMutexDestroy(&test->lock); + VIR_FREE(test); + return ret==0 ? EXIT_SUCCESS : EXIT_FAILURE; } -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
This is just preparing environment for the next patch, which is going to need an event loop. --- tests/commanddata/test3.log | 2 +- tests/commandtest.c | 87 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-)
ACK. Peter

This is just a basic test, so we don't break virCommand in the future. A "Hello world\n" string is written to commanhelper, which copies input to stdout and stderr where we read it from. Then the read strings are compared with expected values. --- tests/commandtest.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/commandtest.c b/tests/commandtest.c index 00d2eac..f4e335f 100644 --- a/tests/commandtest.c +++ b/tests/commandtest.c @@ -851,6 +851,54 @@ static const char *const newenv[] = { NULL }; +static int test21(const void *unused ATTRIBUTE_UNUSED) +{ + virCommandPtr cmd = virCommandNew(abs_builddir "/commandhelper"); + int ret = -1; + const char *wrbuf = "Hello world\n"; + char *outbuf = NULL, *errbuf = NULL; + const char *outbufExpected="BEGIN STDOUT\n" + "Hello world\n" + "END STDOUT\n"; + const char *errbufExpected="BEGIN STDERR\n" + "Hello world\n" + "END STDERR\n"; + + virCommandSetInputBuffer(cmd, wrbuf); + virCommandSetOutputBuffer(cmd, &outbuf); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); + + if (virCommandRunAsync(cmd, NULL) < 0) { + virErrorPtr err = virGetLastError(); + printf("Cannot run child %s\n", err->message); + goto cleanup; + } + + if (virCommandWait(cmd, NULL) < 0) + goto cleanup; + + if (virTestGetVerbose()) + printf("STDOUT:%s\nSTDERR:%s\n", NULLSTR(outbuf), NULLSTR(errbuf)); + + if (STRNEQ(outbuf, outbufExpected)) { + virtTestDifference(stderr, outbufExpected, outbuf); + goto cleanup; + } + + if (STRNEQ(errbuf, errbufExpected)) { + virtTestDifference(stderr, errbufExpected, errbuf); + goto cleanup; + } + + ret = 0; +cleanup: + VIR_FREE(outbuf); + VIR_FREE(errbuf); + virCommandFree(cmd); + return ret; +} + static void virCommandThreadWorker(void *opaque) { virCommandTestDataPtr test = opaque; @@ -983,6 +1031,7 @@ mymain(void) DO_TEST(test18); DO_TEST(test19); DO_TEST(test20); + DO_TEST(test21); virMutexLock(&test->lock); if (test->running) { -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
This is just a basic test, so we don't break virCommand in the future. A "Hello world\n" string is written to commanhelper, which copies input to stdout and stderr where we read it from. Then the read strings are compared with expected values. --- tests/commandtest.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+)
ACK. Peter

Commit 34e8f63a32f83 introduced support for catching errors from libvirt iohelper. However, at those times there wasn't such fancy API as virCommandDoAsyncIO(), so everything has to be implemented on our own. But since we do have the API now, we can use it and drop our implementation then. --- src/util/virfile.c | 82 +++--------------------------------------------------- 1 file changed, 4 insertions(+), 78 deletions(-) diff --git a/src/util/virfile.c b/src/util/virfile.c index 5cca54d..b4765fb 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -135,58 +135,11 @@ virFileDirectFdFlag(void) * read-write is not supported, just a single direction. */ struct _virFileWrapperFd { virCommandPtr cmd; /* Child iohelper process to do the I/O. */ - int err_fd; /* FD to read stderr of @cmd */ char *err_msg; /* stderr of @cmd */ - size_t err_msg_len; /* strlen of err_msg so we don't - have to compute it every time */ - int err_watch; /* ID of watch in the event loop */ }; #ifndef WIN32 /** - * virFileWrapperFdReadStdErr: - * @watch: watch ID - * @fd: the read end of pipe to iohelper's stderr - * @events: an OR-ed set of events which occurred on @fd - * @opaque: virFileWrapperFdPtr - * - * This is a callback to our eventloop which will read iohelper's - * stderr, reallocate @opaque->err_msg and copy data. - */ -static void -virFileWrapperFdReadStdErr(int watch ATTRIBUTE_UNUSED, - int fd, int events, void *opaque) -{ - virFileWrapperFdPtr wfd = (virFileWrapperFdPtr) opaque; - char ebuf[1024]; - ssize_t nread; - - if (events & VIR_EVENT_HANDLE_READABLE) { - while ((nread = saferead(fd, ebuf, sizeof(ebuf)))) { - if (nread < 0) { - if (errno != EAGAIN) - virReportSystemError(errno, "%s", - _("unable to read iohelper's stderr")); - break; - } - - if (VIR_REALLOC_N(wfd->err_msg, wfd->err_msg_len + nread + 1) < 0) { - virReportOOMError(); - return; - } - memcpy(wfd->err_msg + wfd->err_msg_len, ebuf, nread); - wfd->err_msg_len += nread; - wfd->err_msg[wfd->err_msg_len] = '\0'; - } - } - - if (events & VIR_EVENT_HANDLE_HANGUP) { - virEventRemoveHandle(watch); - wfd->err_watch = -1; - } -} - -/** * virFileWrapperFdNew: * @fd: pointer to fd to wrap * @name: name of fd, for diagnostics @@ -245,8 +198,6 @@ virFileWrapperFdNew(int *fd, const char *name, unsigned int flags) return NULL; } - ret->err_watch = -1; - mode = fcntl(*fd, F_GETFL); if (mode < 0) { @@ -279,38 +230,16 @@ virFileWrapperFdNew(int *fd, const char *name, unsigned int flags) virCommandAddArg(ret->cmd, "0"); } - /* In order to catch iohelper stderr, we must: - * - pass a FD to virCommand (-1 to auto-allocate one) - * - change iohelper's env so virLog functions print to stderr + /* In order to catch iohelper stderr, we must change + * iohelper's env so virLog functions print to stderr */ - ret->err_fd = -1; - virCommandSetErrorFD(ret->cmd, &ret->err_fd); virCommandAddEnvPair(ret->cmd, "LIBVIRT_LOG_OUTPUTS", "1:stderr"); + virCommandSetErrorBuffer(ret->cmd, &ret->err_msg); + virCommandDoAsyncIO(ret->cmd); if (virCommandRunAsync(ret->cmd, NULL) < 0) goto error; - /* deliberately don't use virCommandNonblockingFDs here as it is all or - * nothing. And we want iohelper's stdin and stdout to block (default). - * However, stderr is read within event loop and therefore it must be - * nonblocking.*/ - if (virSetNonBlock(ret->err_fd) < 0) { - virReportSystemError(errno, "%s", - _("Failed to set non-blocking " - "file descriptor flag")); - goto error; - } - - if ((ret->err_watch = virEventAddHandle(ret->err_fd, - VIR_EVENT_HANDLE_READABLE, - virFileWrapperFdReadStdErr, - ret, NULL)) < 0) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register iohelper's " - "stderr FD in the eventloop")); - goto error; - } - if (VIR_CLOSE(pipefd[!output]) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("unable to close pipe")); goto error; @@ -389,9 +318,6 @@ virFileWrapperFdFree(virFileWrapperFdPtr wfd) if (!wfd) return; - VIR_FORCE_CLOSE(wfd->err_fd); - if (wfd->err_watch != -1) - virEventRemoveHandle(wfd->err_watch); VIR_FREE(wfd->err_msg); virCommandFree(wfd->cmd); -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
Commit 34e8f63a32f83 introduced support for catching errors from libvirt iohelper. However, at those times there wasn't such fancy API as virCommandDoAsyncIO(), so everything has to be implemented on our own. But since we do have the API now, we can use it and drop our implementation then. --- src/util/virfile.c | 82 +++--------------------------------------------------- 1 file changed, 4 insertions(+), 78 deletions(-)
Seems reasonable to me after the addition in 1/7. ACK. Peter

If a decompression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_driver.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 72907d2..adfa067 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -4883,6 +4883,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, virDomainEventPtr event; int intermediatefd = -1; virCommandPtr cmd = NULL; + char *errbuf = NULL; if (header->version == 2) { const char *prog = qemuSaveCompressionTypeToString(header->compressed); @@ -4900,6 +4901,8 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, virCommandSetInputFD(cmd, intermediatefd); virCommandSetOutputFD(cmd, fd); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); if (virCommandRunAsync(cmd, NULL) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, @@ -4928,6 +4931,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, if (virCommandWait(cmd, NULL) < 0) ret = -1; + VIR_DEBUG("Decompression binary stderr: %s", NULLSTR(errbuf)); } VIR_FORCE_CLOSE(intermediatefd); @@ -4977,6 +4981,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, out: virCommandFree(cmd); + VIR_FREE(errbuf); if (virSecurityManagerRestoreSavedStateLabel(driver->securityManager, vm->def, path) < 0) VIR_WARN("failed to restore save state label on %s", path); -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
If a decompression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_driver.c | 5 +++++ 1 file changed, 5 insertions(+)
Nice addition and the new api is pretty simple to use. ACK. Peter

If a compression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_migration.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d03e361..2c13d4c 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3598,6 +3598,7 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, virCommandPtr cmd = NULL; int pipeFD[2] = { -1, -1 }; unsigned long saveMigBandwidth = priv->migMaxBandwidth; + char *errbuf = NULL; /* Increase migration bandwidth to unlimited since target is a file. * Failure to change migration speed is not fatal. */ @@ -3680,6 +3681,8 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, cmd = virCommandNewArgs(args); virCommandSetInputFD(cmd, pipeFD[0]); virCommandSetOutputFD(cmd, &fd); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); if (virSetCloseExec(pipeFD[1]) < 0) { virReportSystemError(errno, "%s", _("Unable to set cloexec flag")); @@ -3727,7 +3730,11 @@ cleanup: VIR_FORCE_CLOSE(pipeFD[0]); VIR_FORCE_CLOSE(pipeFD[1]); - virCommandFree(cmd); + if (cmd) { + VIR_DEBUG("Compression binary stderr: %s", NULLSTR(errbuf)); + VIR_FREE(errbuf); + virCommandFree(cmd); + } if (restoreLabel && (!bypassSecurityDriver) && virSecurityManagerRestoreSavedStateLabel(driver->securityManager, vm->def, path) < 0) -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
If a compression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_migration.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-)
ACK. Peter

https://bugzilla.redhat.com/show_bug.cgi?id=894723 Currently, if qemuProcessStart() succeeds, but it's decompression binary that returns nonzero status, we don't kill the qemu process, but remove it from internal domain list, leaving the qemu process hanging around totally uncontrolled. --- src/qemu/qemu_driver.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index adfa067..227fc4a 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -4929,8 +4929,10 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, VIR_FORCE_CLOSE(*fd); } - if (virCommandWait(cmd, NULL) < 0) + if (virCommandWait(cmd, NULL) < 0) { + qemuProcessStop(driver, vm, VIR_DOMAIN_SHUTOFF_FAILED, 0); ret = -1; + } VIR_DEBUG("Decompression binary stderr: %s", NULLSTR(errbuf)); } VIR_FORCE_CLOSE(intermediatefd); -- 1.8.0.2

On 01/23/13 10:41, Michal Privoznik wrote:
https://bugzilla.redhat.com/show_bug.cgi?id=894723
Currently, if qemuProcessStart() succeeds, but it's decompression binary that returns nonzero status, we don't kill the qemu process, but remove it from internal domain list, leaving the qemu process hanging around totally uncontrolled. ---
ACK. This patch is semanticaly separate from rest of the series. Peter
participants (2)
-
Michal Privoznik
-
Peter Krempa