[libvirt] [PATCH v2 0/7] Introduce virCommandDoAsyncIO

Currently, if we want to use IO with asynchronous command we have to copy code from virFileWrapperFd to misuse our event loop for reading and writing to the command. However, we can extend our virCommand implementation to automatically set things up. All patches but the first has been ACKed already. diff to v1: -drop usleep(100) while waiting for the event loop to process our string IOs and do it ourselves instead. Michal Privoznik (7): virCommand: Introduce virCommandDoAsyncIO Introduce event loop to commandtest tests: Create test for virCommandDoAsyncIO virFileWrapperFd: Switch to new virCommandDoAsyncIO qemu: Catch stderr of image decompression binary qemu: Catch stderr of image compression binary qemu: Destroy domain on decompression binary error src/libvirt_private.syms | 1 + src/qemu/qemu_driver.c | 9 +- src/qemu/qemu_migration.c | 9 +- src/util/vircommand.c | 279 ++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + src/util/virfile.c | 82 +------------ tests/commanddata/test3.log | 2 +- tests/commandtest.c | 136 +++++++++++++++++++++ 8 files changed, 430 insertions(+), 89 deletions(-) -- 1.8.0.2

Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this: virCommandPtr cmd = virCommandNew*(...); char *buf = NULL; ... virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd); if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup; ... if (virCommandWait(cmd, NULL) < 0) goto cleanup; /* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf)); ... cleanup: VIR_FREE(buf); virCommandFree(cmd); Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 279 +++++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + 3 files changed, 273 insertions(+), 8 deletions(-) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index c589236..99e20c0 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -143,6 +143,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..117fc07 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@ /* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), }; struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr; + size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1; cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1; virCommandAddArgSet(cmd, args); @@ -2122,6 +2129,173 @@ virCommandHook(void *data) } +static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char ***bufptr = NULL; + char buf[1024]; + ssize_t nread; + size_t len = 0; + int *watchPtr = NULL; + bool eof = false; + int tmpfd, *fdptr = NULL, **fdptrptr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + errno = 0; + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + fdptr = &cmd->infd; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf); + + while (true) { + nread = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset); + if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + cmd->inbufOffset += nread; + if (cmd->inbufOffset == len) { + tmpfd = cmd->infd; + if (VIR_CLOSE(cmd->infd) < 0) + VIR_DEBUG("ignoring failed close on fd %d", tmpfd); + eof = true; + break; + } + } + + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = &cmd->outbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } else { + watchPtr = &cmd->errWatch; + bufptr = &cmd->errbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } + + if (bufptr && *bufptr && events & VIR_EVENT_HANDLE_READABLE) { + if (**bufptr) + len = strlen(**bufptr); + + while (true) { + nread = read(fd, buf, sizeof(buf)); + if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(**bufptr + len, buf, nread); + (**bufptr)[len + nread] = '\0'; + } + + } + } + + if (eof || (events & VIR_EVENT_HANDLE_HANGUP) || + (events & VIR_EVENT_HANDLE_ERROR)) { + *watchPtr = -1; + /* Reset any capturing, in case caller runs + * this identical command again */ + tmpfd = *fdptr; + if (VIR_CLOSE(*fdptr) < 0) + VIR_DEBUG("ignoring failed close on fd %d", tmpfd); + if (bufptr) + *bufptr = NULL; + if (fdptrptr) + *fdptrptr = NULL; + virEventRemoveHandle(watch); + } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf && + (cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd && + (cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + goto cleanup; + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd && + (cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + goto cleanup; + } + + ret = 0; + +cleanup: + return ret; +} + + /** * virCommandRunAsync: * @cmd: command to start @@ -2149,6 +2323,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) char *str; int i; bool synchronous = false; + int infd[2] = {-1, -1}; if (!cmd || cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2163,10 +2338,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC; - /* Buffer management can only be requested via virCommandRun. */ - if ((cmd->inbuf && cmd->infd == -1) || - (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || - (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { + /* Buffer management can only be requested via virCommandRun, unless help + * from the event loop has been requested via virCommandDoAsyncIO. */ + if (cmd->flags & VIR_EXEC_ASYNC_IO) { + /* If we have an input buffer, we need + * a pipe to feed the data to the child */ + if (cmd->inbuf && cmd->infd == -1) { + if (pipe2(infd, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("unable to open pipe")); + cmd->has_error = -1; + return -1; + } + cmd->infd = infd[0]; + } + } else if ((cmd->inbuf && cmd->infd == -1) || + (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || + (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot mix string I/O with asynchronous command")); return -1; @@ -2228,6 +2416,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) else cmd->reap = true; + if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { + cmd->flags &= ~VIR_EXEC_ASYNC_IO; + if (cmd->inbuf && cmd->infd != -1) { + /* close the read end of infd and replace it with the write end */ + VIR_FORCE_CLOSE(cmd->infd); + cmd->infd = infd[1]; + } + ret = virCommandRegisterEventLoop(cmd); + } + return ret; } @@ -2248,6 +2446,7 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) { int ret; int status = 0; + const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP; if (!cmd ||cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2272,6 +2471,24 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) * guarantee that virProcessWait only fails due to failure to wait, * and repeat the exitstatus check code ourselves. */ ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd); + cmd->outWatch = -1; + } + + if (cmd->errWatch != -1) { + virEventRemoveHandle(cmd->errWatch); + virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd); + cmd->errWatch = -1; + } + if (ret == 0) { cmd->pid = -1; cmd->reap = false; @@ -2521,3 +2738,49 @@ virCommandFree(virCommandPtr cmd) VIR_FREE(cmd); } + +/** + * virCommandDoAsyncIO: + * @cmd: command to do async IO on + * + * This requests asynchronous string IO on @cmd. It is useful in + * combination with virCommandRunAsync(): + * + * virCommandPtr cmd = virCommandNew*(...); + * char *buf = NULL; + * + * ... + * + * virCommandSetOutputBuffer(cmd, &buf); + * virCommandDoAsyncIO(cmd); + * + * if (virCommandRunAsync(cmd, NULL) < 0) + * goto cleanup; + * + * ... + * + * if (virCommandWait(cmd, NULL) < 0) + * goto cleanup; + * + * // @buf now contains @cmd's stdout + * VIR_DEBUG("STDOUT: %s", NULLSTR(buf)); + * + * ... + * + * cleanup: + * VIR_FREE(buf); + * virCommandFree(cmd); + * + * The libvirt's event loop is used for handling stdios of @cmd. + * Since current implementation uses strlen to determine length + * of data to be written to @cmd's stdin, don't pass any binary + * data. + */ +void +virCommandDoAsyncIO(virCommandPtr cmd) +{ + if (!cmd || cmd->has_error) + return; + + cmd->flags |= VIR_EXEC_ASYNC_IO | VIR_EXEC_NONBLOCK; +} diff --git a/src/util/vircommand.h b/src/util/vircommand.h index 9b7117d..c1a2e24 100644 --- a/src/util/vircommand.h +++ b/src/util/vircommand.h @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd); void virCommandFree(virCommandPtr cmd); +void virCommandDoAsyncIO(virCommandPtr cmd); #endif /* __VIR_COMMAND_H__ */ -- 1.8.0.2

On 01/28/13 17:39, Michal Privoznik wrote:
Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this:
virCommandPtr cmd = virCommandNew*(...); char *buf = NULL;
...
virCommandSetOutputBuffer(cmd, &buf);
The docs for this func state: * Capture the child's stdout to a string buffer. *outbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed * virCommandRun; caller is responsible for freeing *outbuf. * This requires the use of virCommandRun. The last sentence isn't true after this patch. Same applies for the error buffer setting func.
virCommandDoAsyncIO(cmd);
if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup;
...
if (virCommandWait(cmd, NULL) < 0) goto cleanup;
/* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf));
...
cleanup: VIR_FREE(buf); virCommandFree(cmd);
Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 279 +++++++++++++++++++++++++++++++++++++++++++++-- src/util/vircommand.h | 1 + 3 files changed, 273 insertions(+), 8 deletions(-)
[...]
diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..117fc07 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c
[...]
@@ -2122,6 +2129,173 @@ virCommandHook(void *data) }
+static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char ***bufptr = NULL; + char buf[1024]; + ssize_t nread; + size_t len = 0; + int *watchPtr = NULL; + bool eof = false; + int tmpfd, *fdptr = NULL, **fdptrptr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + errno = 0; + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + fdptr = &cmd->infd; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf); + + while (true) { + nread = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset);
nread is a little awkward when used with write()
+ if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + cmd->inbufOffset += nread; + if (cmd->inbufOffset == len) { + tmpfd = cmd->infd; + if (VIR_CLOSE(cmd->infd) < 0) + VIR_DEBUG("ignoring failed close on fd %d", tmpfd); + eof = true; + break; + } + } + + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = &cmd->outbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } else { + watchPtr = &cmd->errWatch; + bufptr = &cmd->errbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } + + if (bufptr && *bufptr && events & VIR_EVENT_HANDLE_READABLE) {
bufptr isn't NULL at this place. even *bufptr shouldn't be NULL as the watch handle should be unregistered on error but I'm not 100% sure about this.
+ if (**bufptr) + len = strlen(**bufptr); + + while (true) { + nread = read(fd, buf, sizeof(buf)); + if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(**bufptr + len, buf, nread); + (**bufptr)[len + nread] = '\0';
It's a bit awkward here as you can't guarantee you didn't read binary data, so memcpy is better here compared to strcpy.
+ } + + } + } + + if (eof || (events & VIR_EVENT_HANDLE_HANGUP) || + (events & VIR_EVENT_HANDLE_ERROR)) {
[1] ...
+ *watchPtr = -1; + /* Reset any capturing, in case caller runs + * this identical command again */
I don't think you are able to re-run the same command twice as [2] ...
+ tmpfd = *fdptr; + if (VIR_CLOSE(*fdptr) < 0) + VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
This is already logged by VIR_CLOSE.
+ if (bufptr) + *bufptr = NULL;
[2] ... here you clear the buffer address from the cmd structure. In case someone re-runs it the pointers to the user's buffer won't be available and the error reporting won't work.
+ if (fdptrptr) + *fdptrptr = NULL; + virEventRemoveHandle(watch);
This line should be moved before [1] to match other parts of this patch.
+ } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf && + (cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd && + (cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + goto cleanup; + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd && + (cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + goto cleanup; + } + + ret = 0; + +cleanup: + return ret; +}
I think you should unregister the handles registered if one of the others failed. Otherwise they might be left registered. An option would be to add the unregistration into the freeing function of cmd.
+ + /** * virCommandRunAsync: * @cmd: command to start
[...]
@@ -2272,6 +2471,24 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) * guarantee that virProcessWait only fails due to failure to wait, * and repeat the exitstatus check code ourselves. */ ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd); + cmd->outWatch = -1; + } + + if (cmd->errWatch != -1) { + virEventRemoveHandle(cmd->errWatch); + virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd); + cmd->errWatch = -1; + }
Okay, this approach is better than the previous version.
+ if (ret == 0) { cmd->pid = -1; cmd->reap = false; @@ -2521,3 +2738,49 @@ virCommandFree(virCommandPtr cmd)
VIR_FREE(cmd); }
Peter

This is just preparing environment for the next patch, which is going to need an event loop. --- tests/commanddata/test3.log | 2 +- tests/commandtest.c | 87 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/tests/commanddata/test3.log b/tests/commanddata/test3.log index 6bd7974..bd06371 100644 --- a/tests/commanddata/test3.log +++ b/tests/commanddata/test3.log @@ -8,7 +8,7 @@ ENV:USER=test FD:0 FD:1 FD:2 -FD:3 FD:5 +FD:8 DAEMON:no CWD:/tmp diff --git a/tests/commandtest.c b/tests/commandtest.c index b5c5882..00d2eac 100644 --- a/tests/commandtest.c +++ b/tests/commandtest.c @@ -37,9 +37,19 @@ #include "virfile.h" #include "virpidfile.h" #include "virerror.h" +#include "virthread.h" #define VIR_FROM_THIS VIR_FROM_NONE +typedef struct _virCommandTestData virCommandTestData; +typedef virCommandTestData *virCommandTestDataPtr; +struct _virCommandTestData { + virMutex lock; + virThread thread; + bool quit; + bool running; +}; + #ifdef WIN32 int @@ -841,11 +851,46 @@ static const char *const newenv[] = { NULL }; +static void virCommandThreadWorker(void *opaque) +{ + virCommandTestDataPtr test = opaque; + + virMutexLock(&test->lock); + + while (!test->quit) { + virMutexUnlock(&test->lock); + + if (virEventRunDefaultImpl() < 0) { + test->quit = true; + break; + } + + virMutexLock(&test->lock); + } + + test->running = false; + + virMutexUnlock(&test->lock); + return; +} + +static void +virCommandTestFreeTimer(int timer ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + /* nothing to be done here */ +} + static int mymain(void) { int ret = 0; int fd; + virCommandTestDataPtr test = NULL; + int timer = -1; + + if (virThreadInitialize() < 0) + return EXIT_FAILURE; if (chdir("/tmp") < 0) return EXIT_FAILURE; @@ -886,6 +931,30 @@ mymain(void) fd = 5; VIR_FORCE_CLOSE(fd); + virEventRegisterDefaultImpl(); + if (VIR_ALLOC(test) < 0) { + virReportOOMError(); + goto cleanup; + } + + if (virMutexInit(&test->lock) < 0) { + printf("Unable to init mutex: %d\n", errno); + goto cleanup; + } + + virMutexLock(&test->lock); + + if (virThreadCreate(&test->thread, + true, + virCommandThreadWorker, + test) < 0) { + virMutexUnlock(&test->lock); + goto cleanup; + } + + test->running = true; + virMutexUnlock(&test->lock); + environ = (char **)newenv; # define DO_TEST(NAME) \ @@ -915,6 +984,24 @@ mymain(void) DO_TEST(test19); DO_TEST(test20); + virMutexLock(&test->lock); + if (test->running) { + test->quit = true; + /* HACK: Add a dummy timeout to break event loop */ + timer = virEventAddTimeout(0, virCommandTestFreeTimer, NULL, NULL); + } + virMutexUnlock(&test->lock); + +cleanup: + if (test->running) + virThreadJoin(&test->thread); + + if (timer != -1) + virEventRemoveTimeout(timer); + + virMutexDestroy(&test->lock); + VIR_FREE(test); + return ret==0 ? EXIT_SUCCESS : EXIT_FAILURE; } -- 1.8.0.2

This is just a basic test, so we don't break virCommand in the future. A "Hello world\n" string is written to commanhelper, which copies input to stdout and stderr where we read it from. Then the read strings are compared with expected values. --- tests/commandtest.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/commandtest.c b/tests/commandtest.c index 00d2eac..f4e335f 100644 --- a/tests/commandtest.c +++ b/tests/commandtest.c @@ -851,6 +851,54 @@ static const char *const newenv[] = { NULL }; +static int test21(const void *unused ATTRIBUTE_UNUSED) +{ + virCommandPtr cmd = virCommandNew(abs_builddir "/commandhelper"); + int ret = -1; + const char *wrbuf = "Hello world\n"; + char *outbuf = NULL, *errbuf = NULL; + const char *outbufExpected="BEGIN STDOUT\n" + "Hello world\n" + "END STDOUT\n"; + const char *errbufExpected="BEGIN STDERR\n" + "Hello world\n" + "END STDERR\n"; + + virCommandSetInputBuffer(cmd, wrbuf); + virCommandSetOutputBuffer(cmd, &outbuf); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); + + if (virCommandRunAsync(cmd, NULL) < 0) { + virErrorPtr err = virGetLastError(); + printf("Cannot run child %s\n", err->message); + goto cleanup; + } + + if (virCommandWait(cmd, NULL) < 0) + goto cleanup; + + if (virTestGetVerbose()) + printf("STDOUT:%s\nSTDERR:%s\n", NULLSTR(outbuf), NULLSTR(errbuf)); + + if (STRNEQ(outbuf, outbufExpected)) { + virtTestDifference(stderr, outbufExpected, outbuf); + goto cleanup; + } + + if (STRNEQ(errbuf, errbufExpected)) { + virtTestDifference(stderr, errbufExpected, errbuf); + goto cleanup; + } + + ret = 0; +cleanup: + VIR_FREE(outbuf); + VIR_FREE(errbuf); + virCommandFree(cmd); + return ret; +} + static void virCommandThreadWorker(void *opaque) { virCommandTestDataPtr test = opaque; @@ -983,6 +1031,7 @@ mymain(void) DO_TEST(test18); DO_TEST(test19); DO_TEST(test20); + DO_TEST(test21); virMutexLock(&test->lock); if (test->running) { -- 1.8.0.2

Commit 34e8f63a32f83 introduced support for catching errors from libvirt iohelper. However, at those times there wasn't such fancy API as virCommandDoAsyncIO(), so everything has to be implemented on our own. But since we do have the API now, we can use it and drop our implementation then. --- src/util/virfile.c | 82 +++--------------------------------------------------- 1 file changed, 4 insertions(+), 78 deletions(-) diff --git a/src/util/virfile.c b/src/util/virfile.c index 5cca54d..b4765fb 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -135,58 +135,11 @@ virFileDirectFdFlag(void) * read-write is not supported, just a single direction. */ struct _virFileWrapperFd { virCommandPtr cmd; /* Child iohelper process to do the I/O. */ - int err_fd; /* FD to read stderr of @cmd */ char *err_msg; /* stderr of @cmd */ - size_t err_msg_len; /* strlen of err_msg so we don't - have to compute it every time */ - int err_watch; /* ID of watch in the event loop */ }; #ifndef WIN32 /** - * virFileWrapperFdReadStdErr: - * @watch: watch ID - * @fd: the read end of pipe to iohelper's stderr - * @events: an OR-ed set of events which occurred on @fd - * @opaque: virFileWrapperFdPtr - * - * This is a callback to our eventloop which will read iohelper's - * stderr, reallocate @opaque->err_msg and copy data. - */ -static void -virFileWrapperFdReadStdErr(int watch ATTRIBUTE_UNUSED, - int fd, int events, void *opaque) -{ - virFileWrapperFdPtr wfd = (virFileWrapperFdPtr) opaque; - char ebuf[1024]; - ssize_t nread; - - if (events & VIR_EVENT_HANDLE_READABLE) { - while ((nread = saferead(fd, ebuf, sizeof(ebuf)))) { - if (nread < 0) { - if (errno != EAGAIN) - virReportSystemError(errno, "%s", - _("unable to read iohelper's stderr")); - break; - } - - if (VIR_REALLOC_N(wfd->err_msg, wfd->err_msg_len + nread + 1) < 0) { - virReportOOMError(); - return; - } - memcpy(wfd->err_msg + wfd->err_msg_len, ebuf, nread); - wfd->err_msg_len += nread; - wfd->err_msg[wfd->err_msg_len] = '\0'; - } - } - - if (events & VIR_EVENT_HANDLE_HANGUP) { - virEventRemoveHandle(watch); - wfd->err_watch = -1; - } -} - -/** * virFileWrapperFdNew: * @fd: pointer to fd to wrap * @name: name of fd, for diagnostics @@ -245,8 +198,6 @@ virFileWrapperFdNew(int *fd, const char *name, unsigned int flags) return NULL; } - ret->err_watch = -1; - mode = fcntl(*fd, F_GETFL); if (mode < 0) { @@ -279,38 +230,16 @@ virFileWrapperFdNew(int *fd, const char *name, unsigned int flags) virCommandAddArg(ret->cmd, "0"); } - /* In order to catch iohelper stderr, we must: - * - pass a FD to virCommand (-1 to auto-allocate one) - * - change iohelper's env so virLog functions print to stderr + /* In order to catch iohelper stderr, we must change + * iohelper's env so virLog functions print to stderr */ - ret->err_fd = -1; - virCommandSetErrorFD(ret->cmd, &ret->err_fd); virCommandAddEnvPair(ret->cmd, "LIBVIRT_LOG_OUTPUTS", "1:stderr"); + virCommandSetErrorBuffer(ret->cmd, &ret->err_msg); + virCommandDoAsyncIO(ret->cmd); if (virCommandRunAsync(ret->cmd, NULL) < 0) goto error; - /* deliberately don't use virCommandNonblockingFDs here as it is all or - * nothing. And we want iohelper's stdin and stdout to block (default). - * However, stderr is read within event loop and therefore it must be - * nonblocking.*/ - if (virSetNonBlock(ret->err_fd) < 0) { - virReportSystemError(errno, "%s", - _("Failed to set non-blocking " - "file descriptor flag")); - goto error; - } - - if ((ret->err_watch = virEventAddHandle(ret->err_fd, - VIR_EVENT_HANDLE_READABLE, - virFileWrapperFdReadStdErr, - ret, NULL)) < 0) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register iohelper's " - "stderr FD in the eventloop")); - goto error; - } - if (VIR_CLOSE(pipefd[!output]) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("unable to close pipe")); goto error; @@ -389,9 +318,6 @@ virFileWrapperFdFree(virFileWrapperFdPtr wfd) if (!wfd) return; - VIR_FORCE_CLOSE(wfd->err_fd); - if (wfd->err_watch != -1) - virEventRemoveHandle(wfd->err_watch); VIR_FREE(wfd->err_msg); virCommandFree(wfd->cmd); -- 1.8.0.2

If a decompression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_driver.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 13bf854..69ad948 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -4879,6 +4879,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, virDomainEventPtr event; int intermediatefd = -1; virCommandPtr cmd = NULL; + char *errbuf = NULL; if (header->version == 2) { const char *prog = qemuSaveCompressionTypeToString(header->compressed); @@ -4896,6 +4897,8 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, virCommandSetInputFD(cmd, intermediatefd); virCommandSetOutputFD(cmd, fd); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); if (virCommandRunAsync(cmd, NULL) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, @@ -4924,6 +4927,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, if (virCommandWait(cmd, NULL) < 0) ret = -1; + VIR_DEBUG("Decompression binary stderr: %s", NULLSTR(errbuf)); } VIR_FORCE_CLOSE(intermediatefd); @@ -4973,6 +4977,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, out: virCommandFree(cmd); + VIR_FREE(errbuf); if (virSecurityManagerRestoreSavedStateLabel(driver->securityManager, vm->def, path) < 0) VIR_WARN("failed to restore save state label on %s", path); -- 1.8.0.2

If a compression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_migration.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d03e361..2c13d4c 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3598,6 +3598,7 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, virCommandPtr cmd = NULL; int pipeFD[2] = { -1, -1 }; unsigned long saveMigBandwidth = priv->migMaxBandwidth; + char *errbuf = NULL; /* Increase migration bandwidth to unlimited since target is a file. * Failure to change migration speed is not fatal. */ @@ -3680,6 +3681,8 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, cmd = virCommandNewArgs(args); virCommandSetInputFD(cmd, pipeFD[0]); virCommandSetOutputFD(cmd, &fd); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); if (virSetCloseExec(pipeFD[1]) < 0) { virReportSystemError(errno, "%s", _("Unable to set cloexec flag")); @@ -3727,7 +3730,11 @@ cleanup: VIR_FORCE_CLOSE(pipeFD[0]); VIR_FORCE_CLOSE(pipeFD[1]); - virCommandFree(cmd); + if (cmd) { + VIR_DEBUG("Compression binary stderr: %s", NULLSTR(errbuf)); + VIR_FREE(errbuf); + virCommandFree(cmd); + } if (restoreLabel && (!bypassSecurityDriver) && virSecurityManagerRestoreSavedStateLabel(driver->securityManager, vm->def, path) < 0) -- 1.8.0.2

https://bugzilla.redhat.com/show_bug.cgi?id=894723 Currently, if qemuProcessStart() succeeds, but it's decompression binary that returns nonzero status, we don't kill the qemu process, but remove it from internal domain list, leaving the qemu process hanging around totally uncontrolled. --- src/qemu/qemu_driver.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 69ad948..b53d412 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -4925,8 +4925,10 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, VIR_FORCE_CLOSE(*fd); } - if (virCommandWait(cmd, NULL) < 0) + if (virCommandWait(cmd, NULL) < 0) { + qemuProcessStop(driver, vm, VIR_DOMAIN_SHUTOFF_FAILED, 0); ret = -1; + } VIR_DEBUG("Decompression binary stderr: %s", NULLSTR(errbuf)); } VIR_FORCE_CLOSE(intermediatefd); -- 1.8.0.2

On 01/28/13 17:39, Michal Privoznik wrote:
https://bugzilla.redhat.com/show_bug.cgi?id=894723
Currently, if qemuProcessStart() succeeds, but it's decompression binary that returns nonzero status, we don't kill the qemu process, but remove it from internal domain list, leaving the qemu process hanging around totally uncontrolled. --- src/qemu/qemu_driver.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-)
My ACK on this one stands. It's a bug fix and this should go in before the release. The rest of the series is quite complex and it will be better if you wait with it until the release is out. I will review your changes later today. Peter

On 29.01.2013 09:59, Peter Krempa wrote:
On 01/28/13 17:39, Michal Privoznik wrote:
https://bugzilla.redhat.com/show_bug.cgi?id=894723
Currently, if qemuProcessStart() succeeds, but it's decompression binary that returns nonzero status, we don't kill the qemu process, but remove it from internal domain list, leaving the qemu process hanging around totally uncontrolled. --- src/qemu/qemu_driver.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-)
My ACK on this one stands. It's a bug fix and this should go in before the release.
The rest of the series is quite complex and it will be better if you wait with it until the release is out. I will review your changes later today.
Peter
Okay, I agree and I've pushed it (although it needed some rebase because of context). Michal
participants (2)
-
Michal Privoznik
-
Peter Krempa