[libvirt] [PATCH v3 0/6] Introduce virCommandDoAsyncIO

Currently, if we want to use IO with asynchronous command we have to copy code from virFileWrapperFd to misuse our event loop for reading and writing to the command. However, we can extend our virCommand implementation to automatically set things up. All patches but the first has been ACKed already. diff to v2: -even more of Peter's suggestion worked in diff to v1: -drop usleep(100) while waiting for the event loop to process our string IOs and do it ourselves instead. Michal Privoznik (6): virCommand: Introduce virCommandDoAsyncIO Introduce event loop to commandtest tests: Create test for virCommandDoAsyncIO virFileWrapperFd: Switch to new virCommandDoAsyncIO qemu: Catch stderr of image decompression binary qemu: Catch stderr of image compression binary src/libvirt_private.syms | 1 + src/qemu/qemu_driver.c | 5 + src/qemu/qemu_migration.c | 9 +- src/util/vircommand.c | 308 +++++++++++++++++++++++++++++++++++++++++--- src/util/vircommand.h | 1 + src/util/virfile.c | 82 +----------- tests/commanddata/test3.log | 2 +- tests/commandtest.c | 136 +++++++++++++++++++ 8 files changed, 447 insertions(+), 97 deletions(-) -- 1.8.0.2

Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this: virCommandPtr cmd = virCommandNew*(...); char *buf = NULL; ... virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd); if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup; ... if (virCommandWait(cmd, NULL) < 0) goto cleanup; /* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf)); ... cleanup: VIR_FREE(buf); virCommandFree(cmd); Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 308 ++++++++++++++++++++++++++++++++++++++++++++--- src/util/vircommand.h | 1 + 3 files changed, 293 insertions(+), 17 deletions(-) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index c589236..99e20c0 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -143,6 +143,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..270e8a2 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@ /* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), }; struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr; + size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1; cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1; virCommandAddArgSet(cmd, args); @@ -1394,8 +1401,8 @@ virCommandSetWorkingDirectory(virCommandPtr cmd, const char *pwd) * @cmd: the command to modify * @inbuf: string to feed to stdin * - * Feed the child's stdin from a string buffer. This requires the use - * of virCommandRun(). + * Feed the child's stdin from a string buffer. + * The buffer is forgotten after each @cmd run. */ void virCommandSetInputBuffer(virCommandPtr cmd, const char *inbuf) @@ -1423,8 +1430,8 @@ virCommandSetInputBuffer(virCommandPtr cmd, const char *inbuf) * Capture the child's stdout to a string buffer. *outbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed - * virCommandRun; caller is responsible for freeing *outbuf. - * This requires the use of virCommandRun. + * virCommandRun or virCommandRunAsync; caller is responsible for + * freeing *outbuf. The buffer is forgotten after each @cmd run. */ void virCommandSetOutputBuffer(virCommandPtr cmd, char **outbuf) @@ -1452,11 +1459,11 @@ virCommandSetOutputBuffer(virCommandPtr cmd, char **outbuf) * Capture the child's stderr to a string buffer. *errbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed - * virCommandRun; caller is responsible for freeing *errbuf. - * This requires the use of virCommandRun. It is possible to - * pass the same pointer as for virCommandSetOutputBuffer(), in - * which case the child process will interleave all output into - * a single string. + * virCommandRun or virCommandRunAsync; caller is responsible for + * freeing *errbuf. It is possible to pass the same pointer as + * for virCommandSetOutputBuffer(), in which case the child + * process will interleave all output into a single string. The + * buffer is forgotten after each @cmd run. */ void virCommandSetErrorBuffer(virCommandPtr cmd, char **errbuf) @@ -2122,6 +2129,183 @@ virCommandHook(void *data) } +static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char ***bufptr = NULL; + char buf[1024]; + ssize_t nread, nwritten; + size_t len = 0; + int *watchPtr = NULL; + bool eof = false; + int tmpfd, *fdptr = NULL, **fdptrptr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + errno = 0; + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + fdptr = &cmd->infd; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf); + + while (true) { + nwritten = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset); + if (nwritten < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + eof = true; + } + break; + } + + if (nwritten == 0) { + eof = true; + break; + } + + cmd->inbufOffset += nwritten; + if (cmd->inbufOffset == len) { + tmpfd = cmd->infd; + if (VIR_CLOSE(cmd->infd) < 0) + VIR_DEBUG("ignoring failed close on fd %d", tmpfd); + eof = true; + break; + } + } + + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = &cmd->outbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } else { + watchPtr = &cmd->errWatch; + bufptr = &cmd->errbuf; + fdptr = &cmd->errfd; + fdptrptr = &cmd->errfdptr; + } + + if (events & VIR_EVENT_HANDLE_READABLE) { + if (**bufptr) + len = strlen(**bufptr); + + while (true) { + nread = read(fd, buf, sizeof(buf)); + if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(**bufptr + len, buf, nread); + (**bufptr)[len + nread] = '\0'; + } + + } + } + + if (eof || (events & VIR_EVENT_HANDLE_HANGUP) || + (events & VIR_EVENT_HANDLE_ERROR)) { + virEventRemoveHandle(watch); + + *watchPtr = -1; + VIR_FORCE_CLOSE(*fdptr); + if (bufptr) + *bufptr = NULL; + if (fdptrptr) + *fdptrptr = NULL; + } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf && + (cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd && + (cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + goto cleanup; + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd && + (cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + cmd->outWatch = -1; + } + goto cleanup; + } + + ret = 0; + +cleanup: + return ret; +} + + /** * virCommandRunAsync: * @cmd: command to start @@ -2149,6 +2333,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) char *str; int i; bool synchronous = false; + int infd[2] = {-1, -1}; if (!cmd || cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2163,10 +2348,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC; - /* Buffer management can only be requested via virCommandRun. */ - if ((cmd->inbuf && cmd->infd == -1) || - (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || - (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { + /* Buffer management can only be requested via virCommandRun, unless help + * from the event loop has been requested via virCommandDoAsyncIO. */ + if (cmd->flags & VIR_EXEC_ASYNC_IO) { + /* If we have an input buffer, we need + * a pipe to feed the data to the child */ + if (cmd->inbuf && cmd->infd == -1) { + if (pipe2(infd, O_CLOEXEC) < 0) { + virReportSystemError(errno, "%s", + _("unable to open pipe")); + cmd->has_error = -1; + return -1; + } + cmd->infd = infd[0]; + } + } else if ((cmd->inbuf && cmd->infd == -1) || + (cmd->outbuf && cmd->outfdptr != &cmd->outfd) || + (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot mix string I/O with asynchronous command")); return -1; @@ -2228,6 +2426,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) else cmd->reap = true; + if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { + cmd->flags &= ~VIR_EXEC_ASYNC_IO; + if (cmd->inbuf && cmd->infd != -1) { + /* close the read end of infd and replace it with the write end */ + VIR_FORCE_CLOSE(cmd->infd); + cmd->infd = infd[1]; + } + ret = virCommandRegisterEventLoop(cmd); + } + return ret; } @@ -2248,6 +2456,7 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) { int ret; int status = 0; + const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP; if (!cmd ||cmd->has_error == ENOMEM) { virReportOOMError(); @@ -2272,6 +2481,24 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) * guarantee that virProcessWait only fails due to failure to wait, * and repeat the exitstatus check code ourselves. */ ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd); + cmd->outWatch = -1; + } + + if (cmd->errWatch != -1) { + virEventRemoveHandle(cmd->errWatch); + virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd); + cmd->errWatch = -1; + } + if (ret == 0) { cmd->pid = -1; cmd->reap = false; @@ -2521,3 +2748,50 @@ virCommandFree(virCommandPtr cmd) VIR_FREE(cmd); } + +/** + * virCommandDoAsyncIO: + * @cmd: command to do async IO on + * + * This requests asynchronous string IO on @cmd. It is useful in + * combination with virCommandRunAsync(): + * + * virCommandPtr cmd = virCommandNew*(...); + * char *buf = NULL; + * + * ... + * + * virCommandSetOutputBuffer(cmd, &buf); + * virCommandDoAsyncIO(cmd); + * + * if (virCommandRunAsync(cmd, NULL) < 0) + * goto cleanup; + * + * ... + * + * if (virCommandWait(cmd, NULL) < 0) + * goto cleanup; + * + * // @buf now contains @cmd's stdout + * VIR_DEBUG("STDOUT: %s", NULLSTR(buf)); + * + * ... + * + * cleanup: + * VIR_FREE(buf); + * virCommandFree(cmd); + * + * The libvirt's event loop is used for handling stdios of @cmd. + * Since current implementation uses strlen to determine length + * of data to be written to @cmd's stdin, don't pass any binary + * data. If you want to re-run command, you need to call this and + * buffer setting functions (virCommandSet.*Buffer) prior each run. + */ +void +virCommandDoAsyncIO(virCommandPtr cmd) +{ + if (!cmd || cmd->has_error) + return; + + cmd->flags |= VIR_EXEC_ASYNC_IO | VIR_EXEC_NONBLOCK; +} diff --git a/src/util/vircommand.h b/src/util/vircommand.h index 9b7117d..c1a2e24 100644 --- a/src/util/vircommand.h +++ b/src/util/vircommand.h @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd); void virCommandFree(virCommandPtr cmd); +void virCommandDoAsyncIO(virCommandPtr cmd); #endif /* __VIR_COMMAND_H__ */ -- 1.8.0.2

On 02/04/13 15:56, Michal Privoznik wrote:
Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this:
virCommandPtr cmd = virCommandNew*(...); char *buf = NULL;
...
virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd);
if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup;
...
if (virCommandWait(cmd, NULL) < 0) goto cleanup;
/* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf));
...
cleanup: VIR_FREE(buf); virCommandFree(cmd);
Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 308 ++++++++++++++++++++++++++++++++++++++++++++--- src/util/vircommand.h | 1 + 3 files changed, 293 insertions(+), 17 deletions(-)
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index c589236..99e20c0 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -143,6 +143,7 @@ virCommandAddEnvString; virCommandAllowCap; virCommandClearCaps; virCommandDaemonize; +virCommandDoAsyncIO; virCommandExec; virCommandFree; virCommandHandshakeNotify; diff --git a/src/util/vircommand.c b/src/util/vircommand.c index 8566d1a..270e8a2 100644 --- a/src/util/vircommand.c +++ b/src/util/vircommand.c @@ -47,11 +47,12 @@
/* Flags for virExecWithHook */ enum { - VIR_EXEC_NONE = 0, - VIR_EXEC_NONBLOCK = (1 << 0), - VIR_EXEC_DAEMON = (1 << 1), + VIR_EXEC_NONE = 0, + VIR_EXEC_NONBLOCK = (1 << 0), + VIR_EXEC_DAEMON = (1 << 1), VIR_EXEC_CLEAR_CAPS = (1 << 2), - VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_RUN_SYNC = (1 << 3), + VIR_EXEC_ASYNC_IO = (1 << 4), };
struct _virCommand { @@ -84,6 +85,11 @@ struct _virCommand { int *outfdptr; int *errfdptr;
+ size_t inbufOffset; + int inWatch; + int outWatch; + int errWatch; + bool handshake; int handshakeWait[2]; int handshakeNotify[2]; @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args) cmd->handshakeNotify[1] = -1;
cmd->infd = cmd->outfd = cmd->errfd = -1; + cmd->inWatch = cmd->outWatch = cmd->errWatch = -1; cmd->pid = -1;
virCommandAddArgSet(cmd, args); @@ -1394,8 +1401,8 @@ virCommandSetWorkingDirectory(virCommandPtr cmd, const char *pwd) * @cmd: the command to modify * @inbuf: string to feed to stdin * - * Feed the child's stdin from a string buffer. This requires the use - * of virCommandRun().
Instead of getting rid of this sentence, you should add the second condition when this will work: when asyncIO is requested.
+ * Feed the child's stdin from a string buffer. + * The buffer is forgotten after each @cmd run. */ void virCommandSetInputBuffer(virCommandPtr cmd, const char *inbuf) @@ -1423,8 +1430,8 @@ virCommandSetInputBuffer(virCommandPtr cmd, const char *inbuf) * Capture the child's stdout to a string buffer. *outbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed - * virCommandRun; caller is responsible for freeing *outbuf. - * This requires the use of virCommandRun. + * virCommandRun or virCommandRunAsync; caller is responsible for + * freeing *outbuf. The buffer is forgotten after each @cmd run. */ void virCommandSetOutputBuffer(virCommandPtr cmd, char **outbuf) @@ -1452,11 +1459,11 @@ virCommandSetOutputBuffer(virCommandPtr cmd, char **outbuf) * Capture the child's stderr to a string buffer. *errbuf is * guaranteed to be allocated after successful virCommandRun or * virCommandWait, and is best-effort allocated after failed - * virCommandRun; caller is responsible for freeing *errbuf. - * This requires the use of virCommandRun. It is possible to - * pass the same pointer as for virCommandSetOutputBuffer(), in - * which case the child process will interleave all output into - * a single string. + * virCommandRun or virCommandRunAsync; caller is responsible for + * freeing *errbuf. It is possible to pass the same pointer as + * for virCommandSetOutputBuffer(), in which case the child + * process will interleave all output into a single string. The + * buffer is forgotten after each @cmd run. */ void virCommandSetErrorBuffer(virCommandPtr cmd, char **errbuf) @@ -2122,6 +2129,183 @@ virCommandHook(void *data) }
+static void +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) +{ + virCommandPtr cmd = (virCommandPtr) opaque; + char ***bufptr = NULL; + char buf[1024]; + ssize_t nread, nwritten; + size_t len = 0; + int *watchPtr = NULL; + bool eof = false; + int tmpfd, *fdptr = NULL, **fdptrptr = NULL; + + VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events); + errno = 0; + + if (watch == cmd->inWatch) { + watchPtr = &cmd->inWatch; + fdptr = &cmd->infd; + + if (events & VIR_EVENT_HANDLE_WRITABLE) { + len = strlen(cmd->inbuf); + + while (true) { + nwritten = write(fd, cmd->inbuf + cmd->inbufOffset, + len - cmd->inbufOffset); + if (nwritten < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("Unable to write command's " + "input to FD %d"), + fd); + eof = true; + } + break; + } + + if (nwritten == 0) { + eof = true; + break; + } + + cmd->inbufOffset += nwritten; + if (cmd->inbufOffset == len) { + tmpfd = cmd->infd; + if (VIR_CLOSE(cmd->infd) < 0) + VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
Hm, use rather VIR_FORCE_CLOSE if you don't care that it failed. It should report the debug info for you. [1]
+ eof = true; + break; + } + } + + } + } else { + if (watch == cmd->outWatch) { + watchPtr = &cmd->outWatch; + bufptr = &cmd->outbuf; + fdptr = &cmd->outfd; + fdptrptr = &cmd->outfdptr; + } else { + watchPtr = &cmd->errWatch; + bufptr = &cmd->errbuf; + fdptr = &cmd->errfd; + fdptrptr = &cmd->errfdptr; + } + + if (events & VIR_EVENT_HANDLE_READABLE) { + if (**bufptr) + len = strlen(**bufptr); + + while (true) { + nread = read(fd, buf, sizeof(buf)); + if (nread < 0) { + if (errno != EAGAIN && errno != EINTR) { + virReportSystemError(errno, + _("unable to read command's " + "output from FD %d"), + fd); + eof = true; + } + break; + } + + if (nread == 0) { + eof = true; + break; + } + + if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) { + virReportOOMError(); + break; + } + + memcpy(**bufptr + len, buf, nread); + (**bufptr)[len + nread] = '\0'; + } + + } + } + + if (eof || (events & VIR_EVENT_HANDLE_HANGUP) || + (events & VIR_EVENT_HANDLE_ERROR)) { + virEventRemoveHandle(watch); + + *watchPtr = -1; + VIR_FORCE_CLOSE(*fdptr);
[1] ah, you already did it here where I complained the last time.
+ if (bufptr) + *bufptr = NULL; + if (fdptrptr) + *fdptrptr = NULL; + } +} + + +static int +virCommandRegisterEventLoop(virCommandPtr cmd) +{ + int ret = -1; + + if (cmd->inbuf && + (cmd->inWatch = virEventAddHandle(cmd->infd, + VIR_EVENT_HANDLE_WRITABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register infd %d in the event loop"), + cmd->infd); + goto cleanup; + } + + if (cmd->outbuf && cmd->outfdptr == &cmd->outfd && + (cmd->outWatch = virEventAddHandle(cmd->outfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register outfd %d in the event loop"), + cmd->outfd); + + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + goto cleanup; + } + + if (cmd->errbuf && cmd->errfdptr == &cmd->errfd && + (cmd->errWatch = virEventAddHandle(cmd->errfd, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_HANGUP | + VIR_EVENT_HANDLE_ERROR, + virCommandHandleReadWrite, + cmd, NULL)) < 0) { + virReportError(VIR_ERR_INTERNAL_ERROR, + _("Unable to register errfd %d in the event loop"), + cmd->errfd); + if (cmd->inWatch != -1) { + virEventRemoveHandle(cmd->inWatch); + cmd->inWatch = -1; + } + if (cmd->outWatch != -1) { + virEventRemoveHandle(cmd->outWatch); + cmd->outWatch = -1; + }
Hm, okay.
+ goto cleanup; + } + + ret = 0; + +cleanup: + return ret; +} + + /** * virCommandRunAsync: * @cmd: command to start @@ -2149,6 +2333,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) char *str; int i; bool synchronous = false; + int infd[2] = {-1, -1};
if (!cmd || cmd->has_error == ENOMEM) { virReportOOMError();
ACK with the two nits fixed. ACKs on the unchanged patches from the last time still stand. Peter

On 04.02.2013 16:47, Peter Krempa wrote:
On 02/04/13 15:56, Michal Privoznik wrote:
Currently, if we want to feed stdin, or catch stdout or stderr of a virCommand we have to use virCommandRun(). When using virCommandRunAsync() we have to register FD handles by hand. This may lead to code duplication. Hence, introduce an internal API, which does this automatically within virCommandRunAsync(). The intended usage looks like this:
virCommandPtr cmd = virCommandNew*(...); char *buf = NULL;
...
virCommandSetOutputBuffer(cmd, &buf); virCommandDoAsyncIO(cmd);
if (virCommandRunAsync(cmd, NULL) < 0) goto cleanup;
...
if (virCommandWait(cmd, NULL) < 0) goto cleanup;
/* @buf now contains @cmd's stdout */ VIR_DEBUG("STDOUT: %s", NULLSTR(buf));
...
cleanup: VIR_FREE(buf); virCommandFree(cmd);
Note, that both stdout and stderr buffers may change until virCommandWait() returns. --- src/libvirt_private.syms | 1 + src/util/vircommand.c | 308 ++++++++++++++++++++++++++++++++++++++++++++--- src/util/vircommand.h | 1 + 3 files changed, 293 insertions(+), 17 deletions(-)
ACK with the two nits fixed. ACKs on the unchanged patches from the last time still stand.
Peter
Thanks, fixed and pushed. Michal

This is just preparing environment for the next patch, which is going to need an event loop. --- tests/commanddata/test3.log | 2 +- tests/commandtest.c | 87 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/tests/commanddata/test3.log b/tests/commanddata/test3.log index 6bd7974..bd06371 100644 --- a/tests/commanddata/test3.log +++ b/tests/commanddata/test3.log @@ -8,7 +8,7 @@ ENV:USER=test FD:0 FD:1 FD:2 -FD:3 FD:5 +FD:8 DAEMON:no CWD:/tmp diff --git a/tests/commandtest.c b/tests/commandtest.c index b5c5882..00d2eac 100644 --- a/tests/commandtest.c +++ b/tests/commandtest.c @@ -37,9 +37,19 @@ #include "virfile.h" #include "virpidfile.h" #include "virerror.h" +#include "virthread.h" #define VIR_FROM_THIS VIR_FROM_NONE +typedef struct _virCommandTestData virCommandTestData; +typedef virCommandTestData *virCommandTestDataPtr; +struct _virCommandTestData { + virMutex lock; + virThread thread; + bool quit; + bool running; +}; + #ifdef WIN32 int @@ -841,11 +851,46 @@ static const char *const newenv[] = { NULL }; +static void virCommandThreadWorker(void *opaque) +{ + virCommandTestDataPtr test = opaque; + + virMutexLock(&test->lock); + + while (!test->quit) { + virMutexUnlock(&test->lock); + + if (virEventRunDefaultImpl() < 0) { + test->quit = true; + break; + } + + virMutexLock(&test->lock); + } + + test->running = false; + + virMutexUnlock(&test->lock); + return; +} + +static void +virCommandTestFreeTimer(int timer ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + /* nothing to be done here */ +} + static int mymain(void) { int ret = 0; int fd; + virCommandTestDataPtr test = NULL; + int timer = -1; + + if (virThreadInitialize() < 0) + return EXIT_FAILURE; if (chdir("/tmp") < 0) return EXIT_FAILURE; @@ -886,6 +931,30 @@ mymain(void) fd = 5; VIR_FORCE_CLOSE(fd); + virEventRegisterDefaultImpl(); + if (VIR_ALLOC(test) < 0) { + virReportOOMError(); + goto cleanup; + } + + if (virMutexInit(&test->lock) < 0) { + printf("Unable to init mutex: %d\n", errno); + goto cleanup; + } + + virMutexLock(&test->lock); + + if (virThreadCreate(&test->thread, + true, + virCommandThreadWorker, + test) < 0) { + virMutexUnlock(&test->lock); + goto cleanup; + } + + test->running = true; + virMutexUnlock(&test->lock); + environ = (char **)newenv; # define DO_TEST(NAME) \ @@ -915,6 +984,24 @@ mymain(void) DO_TEST(test19); DO_TEST(test20); + virMutexLock(&test->lock); + if (test->running) { + test->quit = true; + /* HACK: Add a dummy timeout to break event loop */ + timer = virEventAddTimeout(0, virCommandTestFreeTimer, NULL, NULL); + } + virMutexUnlock(&test->lock); + +cleanup: + if (test->running) + virThreadJoin(&test->thread); + + if (timer != -1) + virEventRemoveTimeout(timer); + + virMutexDestroy(&test->lock); + VIR_FREE(test); + return ret==0 ? EXIT_SUCCESS : EXIT_FAILURE; } -- 1.8.0.2

This is just a basic test, so we don't break virCommand in the future. A "Hello world\n" string is written to commanhelper, which copies input to stdout and stderr where we read it from. Then the read strings are compared with expected values. --- tests/commandtest.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/commandtest.c b/tests/commandtest.c index 00d2eac..f4e335f 100644 --- a/tests/commandtest.c +++ b/tests/commandtest.c @@ -851,6 +851,54 @@ static const char *const newenv[] = { NULL }; +static int test21(const void *unused ATTRIBUTE_UNUSED) +{ + virCommandPtr cmd = virCommandNew(abs_builddir "/commandhelper"); + int ret = -1; + const char *wrbuf = "Hello world\n"; + char *outbuf = NULL, *errbuf = NULL; + const char *outbufExpected="BEGIN STDOUT\n" + "Hello world\n" + "END STDOUT\n"; + const char *errbufExpected="BEGIN STDERR\n" + "Hello world\n" + "END STDERR\n"; + + virCommandSetInputBuffer(cmd, wrbuf); + virCommandSetOutputBuffer(cmd, &outbuf); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); + + if (virCommandRunAsync(cmd, NULL) < 0) { + virErrorPtr err = virGetLastError(); + printf("Cannot run child %s\n", err->message); + goto cleanup; + } + + if (virCommandWait(cmd, NULL) < 0) + goto cleanup; + + if (virTestGetVerbose()) + printf("STDOUT:%s\nSTDERR:%s\n", NULLSTR(outbuf), NULLSTR(errbuf)); + + if (STRNEQ(outbuf, outbufExpected)) { + virtTestDifference(stderr, outbufExpected, outbuf); + goto cleanup; + } + + if (STRNEQ(errbuf, errbufExpected)) { + virtTestDifference(stderr, errbufExpected, errbuf); + goto cleanup; + } + + ret = 0; +cleanup: + VIR_FREE(outbuf); + VIR_FREE(errbuf); + virCommandFree(cmd); + return ret; +} + static void virCommandThreadWorker(void *opaque) { virCommandTestDataPtr test = opaque; @@ -983,6 +1031,7 @@ mymain(void) DO_TEST(test18); DO_TEST(test19); DO_TEST(test20); + DO_TEST(test21); virMutexLock(&test->lock); if (test->running) { -- 1.8.0.2

Commit 34e8f63a32f83 introduced support for catching errors from libvirt iohelper. However, at those times there wasn't such fancy API as virCommandDoAsyncIO(), so everything has to be implemented on our own. But since we do have the API now, we can use it and drop our implementation then. --- src/util/virfile.c | 82 +++--------------------------------------------------- 1 file changed, 4 insertions(+), 78 deletions(-) diff --git a/src/util/virfile.c b/src/util/virfile.c index 5cca54d..b4765fb 100644 --- a/src/util/virfile.c +++ b/src/util/virfile.c @@ -135,58 +135,11 @@ virFileDirectFdFlag(void) * read-write is not supported, just a single direction. */ struct _virFileWrapperFd { virCommandPtr cmd; /* Child iohelper process to do the I/O. */ - int err_fd; /* FD to read stderr of @cmd */ char *err_msg; /* stderr of @cmd */ - size_t err_msg_len; /* strlen of err_msg so we don't - have to compute it every time */ - int err_watch; /* ID of watch in the event loop */ }; #ifndef WIN32 /** - * virFileWrapperFdReadStdErr: - * @watch: watch ID - * @fd: the read end of pipe to iohelper's stderr - * @events: an OR-ed set of events which occurred on @fd - * @opaque: virFileWrapperFdPtr - * - * This is a callback to our eventloop which will read iohelper's - * stderr, reallocate @opaque->err_msg and copy data. - */ -static void -virFileWrapperFdReadStdErr(int watch ATTRIBUTE_UNUSED, - int fd, int events, void *opaque) -{ - virFileWrapperFdPtr wfd = (virFileWrapperFdPtr) opaque; - char ebuf[1024]; - ssize_t nread; - - if (events & VIR_EVENT_HANDLE_READABLE) { - while ((nread = saferead(fd, ebuf, sizeof(ebuf)))) { - if (nread < 0) { - if (errno != EAGAIN) - virReportSystemError(errno, "%s", - _("unable to read iohelper's stderr")); - break; - } - - if (VIR_REALLOC_N(wfd->err_msg, wfd->err_msg_len + nread + 1) < 0) { - virReportOOMError(); - return; - } - memcpy(wfd->err_msg + wfd->err_msg_len, ebuf, nread); - wfd->err_msg_len += nread; - wfd->err_msg[wfd->err_msg_len] = '\0'; - } - } - - if (events & VIR_EVENT_HANDLE_HANGUP) { - virEventRemoveHandle(watch); - wfd->err_watch = -1; - } -} - -/** * virFileWrapperFdNew: * @fd: pointer to fd to wrap * @name: name of fd, for diagnostics @@ -245,8 +198,6 @@ virFileWrapperFdNew(int *fd, const char *name, unsigned int flags) return NULL; } - ret->err_watch = -1; - mode = fcntl(*fd, F_GETFL); if (mode < 0) { @@ -279,38 +230,16 @@ virFileWrapperFdNew(int *fd, const char *name, unsigned int flags) virCommandAddArg(ret->cmd, "0"); } - /* In order to catch iohelper stderr, we must: - * - pass a FD to virCommand (-1 to auto-allocate one) - * - change iohelper's env so virLog functions print to stderr + /* In order to catch iohelper stderr, we must change + * iohelper's env so virLog functions print to stderr */ - ret->err_fd = -1; - virCommandSetErrorFD(ret->cmd, &ret->err_fd); virCommandAddEnvPair(ret->cmd, "LIBVIRT_LOG_OUTPUTS", "1:stderr"); + virCommandSetErrorBuffer(ret->cmd, &ret->err_msg); + virCommandDoAsyncIO(ret->cmd); if (virCommandRunAsync(ret->cmd, NULL) < 0) goto error; - /* deliberately don't use virCommandNonblockingFDs here as it is all or - * nothing. And we want iohelper's stdin and stdout to block (default). - * However, stderr is read within event loop and therefore it must be - * nonblocking.*/ - if (virSetNonBlock(ret->err_fd) < 0) { - virReportSystemError(errno, "%s", - _("Failed to set non-blocking " - "file descriptor flag")); - goto error; - } - - if ((ret->err_watch = virEventAddHandle(ret->err_fd, - VIR_EVENT_HANDLE_READABLE, - virFileWrapperFdReadStdErr, - ret, NULL)) < 0) { - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", - _("unable to register iohelper's " - "stderr FD in the eventloop")); - goto error; - } - if (VIR_CLOSE(pipefd[!output]) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("unable to close pipe")); goto error; @@ -389,9 +318,6 @@ virFileWrapperFdFree(virFileWrapperFdPtr wfd) if (!wfd) return; - VIR_FORCE_CLOSE(wfd->err_fd); - if (wfd->err_watch != -1) - virEventRemoveHandle(wfd->err_watch); VIR_FREE(wfd->err_msg); virCommandFree(wfd->cmd); -- 1.8.0.2

If a decompression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_driver.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 812bf95..b53d412 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -4879,6 +4879,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, virDomainEventPtr event; int intermediatefd = -1; virCommandPtr cmd = NULL; + char *errbuf = NULL; if (header->version == 2) { const char *prog = qemuSaveCompressionTypeToString(header->compressed); @@ -4896,6 +4897,8 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, virCommandSetInputFD(cmd, intermediatefd); virCommandSetOutputFD(cmd, fd); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); if (virCommandRunAsync(cmd, NULL) < 0) { virReportError(VIR_ERR_INTERNAL_ERROR, @@ -4926,6 +4929,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, qemuProcessStop(driver, vm, VIR_DOMAIN_SHUTOFF_FAILED, 0); ret = -1; } + VIR_DEBUG("Decompression binary stderr: %s", NULLSTR(errbuf)); } VIR_FORCE_CLOSE(intermediatefd); @@ -4975,6 +4979,7 @@ qemuDomainSaveImageStartVM(virConnectPtr conn, out: virCommandFree(cmd); + VIR_FREE(errbuf); if (virSecurityManagerRestoreSavedStateLabel(driver->securityManager, vm->def, path) < 0) VIR_WARN("failed to restore save state label on %s", path); -- 1.8.0.2

On 02/04/13 15:56, Michal Privoznik wrote:
If a decompression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_driver.c | 5 +++++ 1 file changed, 5 insertions(+)
ACK. Peter

If a compression binary prints something to stderr, currently it is discarded. However, it can contain useful data from debugging POV, so we should catch it. --- src/qemu/qemu_migration.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d03e361..2c13d4c 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3598,6 +3598,7 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, virCommandPtr cmd = NULL; int pipeFD[2] = { -1, -1 }; unsigned long saveMigBandwidth = priv->migMaxBandwidth; + char *errbuf = NULL; /* Increase migration bandwidth to unlimited since target is a file. * Failure to change migration speed is not fatal. */ @@ -3680,6 +3681,8 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm, cmd = virCommandNewArgs(args); virCommandSetInputFD(cmd, pipeFD[0]); virCommandSetOutputFD(cmd, &fd); + virCommandSetErrorBuffer(cmd, &errbuf); + virCommandDoAsyncIO(cmd); if (virSetCloseExec(pipeFD[1]) < 0) { virReportSystemError(errno, "%s", _("Unable to set cloexec flag")); @@ -3727,7 +3730,11 @@ cleanup: VIR_FORCE_CLOSE(pipeFD[0]); VIR_FORCE_CLOSE(pipeFD[1]); - virCommandFree(cmd); + if (cmd) { + VIR_DEBUG("Compression binary stderr: %s", NULLSTR(errbuf)); + VIR_FREE(errbuf); + virCommandFree(cmd); + } if (restoreLabel && (!bypassSecurityDriver) && virSecurityManagerRestoreSavedStateLabel(driver->securityManager, vm->def, path) < 0) -- 1.8.0.2

On Mon, Feb 04, 2013 at 03:56:37PM +0100, Michal Privoznik wrote:
Currently, if we want to use IO with asynchronous command we have to copy code from virFileWrapperFd to misuse our event loop for reading and writing to the command. However, we can extend our virCommand implementation to automatically set things up.
There is something racy in this series of changes. About 50% of the time when running a domain save, I see warnings about closing invalid file descriptors & invalid watches from libvirtd 2013-02-06 11:41:43.766+0000: 19078: warning : virEventPollRemoveHandle:183 : Ignoring invalid remove watch -1 2013-02-06 11:41:43.766+0000: 19078: warning : virFileClose:65 : Tried to close invalid fd 25 The first log message has the following trace #3 0x00007ffff7499b91 in virEventPollRemoveHandle (watch=-1) at util/vireventpoll.c:183 #4 0x00007ffff748b3c0 in virCommandHandleReadWrite (watch=-1, fd=-1, events=events@entry=9, opaque=opaque@entry=0x7fffcc01e380) at util/vircommand.c:2236 #5 0x00007ffff748ef8b in virCommandWait (cmd=0x7fffcc01e380, exitstatus=exitstatus@entry=0x0) at util/vircommand.c:2501 #6 0x00007ffff749b367 in virFileWrapperFdClose (wfd=wfd@entry=0x7fffcc01e360) at util/virfile.c:288 #7 0x00007fffdf180eb7 in qemuDomainSaveMemory (driver=driver@entry=0x7fffd806dfb0, vm=vm@entry=0x7fffbc001510, path=path@entry=0x7fffcc01e4b0 "/home/berrange/src/virt/libvirt-tck/tck.img", domXML=<optimized out>, compressed=compressed@entry=0, was_running=was_running@entry=true, flags=flags@entry=0, asyncJob=asyncJob@entry=QEMU_ASYNC_JOB_SAVE) at qemu/qemu_driver.c:2792 #8 0x00007fffdf1842b3 in qemuDomainSaveInternal (driver=driver@entry=0x7fffd806dfb0, vm=0x7fffbc001510, path=path@entry=0x7fffcc01e4b0 "/home/berrange/src/virt/libvirt-tck/tck.img", compressed=0, xmlin=xmlin@entry=0x0, flags=flags@entry=0, dom=0x7fffcc00a1a0) at qemu/qemu_driver.c:2901 #9 0x00007fffdf1851f2 in qemuDomainSaveFlags (dom=0x7fffcc00a1a0, path=0x7fffcc01e4b0 "/home/berrange/src/virt/libvirt-tck/tck.img", dxml=0x0, flags=0) at qemu/qemu_driver.c:3013 #10 0x00007ffff75426e9 in virDomainSave (domain=domain@entry=0x7fffcc00a1a0, to=0x7fffcc01e2a0 "/home/berrange/src/virt/libvirt-tck/tck.img") at libvirt.c:2617 #11 0x0000000000428bc2 in remoteDispatchDomainSave (server=<optimized out>, msg=<optimized out>, args=0x7fffcc01e2e0, rerr=0x7fffe87a4c50, client=<optimized out>) at remote_dispatch.h:4758 #12 remoteDispatchDomainSaveHelper (server=<optimized out>, client=<optimized out>, msg=<optimized out>, rerr=0x7fffe87a4c50, args=0x7fffcc01e2e0, ret=<optimized out>) at remote_dispatch.h:4736 #13 0x00007ffff75af942 in virNetServerProgramDispatchCall (msg=0x693e10, client=0x694f60, server=0x66b490, prog=0x690bd0) at rpc/virnetserverprogram.c:432 #14 virNetServerProgramDispatch (prog=0x690bd0, server=server@entry=0x66b490, client=0x694f60, msg=0x693e10) at rpc/virnetserverprogram.c:305 #15 0x00007ffff75a9b78 in virNetServerProcessMsg (msg=<optimized out>, prog=<optimized out>, client=<optimized out>, srv=0x66b490) at rpc/virnetserver.c:162 #16 virNetServerHandleJob (jobOpaque=<optimized out>, opaque=0x66b490) at rpc/virnetserver.c:183 #17 0x00007ffff74c304e in virThreadPoolWorker (opaque=opaque@entry=0x65ac90) at util/virthreadpool.c:144 #18 0x00007ffff74c2706 in virThreadHelper (data=<optimized out>) at util/virthreadpthread.c:161 #19 0x00007ffff3e9dd15 in start_thread (arg=0x7fffe87a5700) at pthread_create.c:308 #20 0x00007ffff37c946d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:114 The second log messages has this trace #3 0x00007ffff749b178 in virFileClose (fdptr=fdptr@entry=0x7fffcc01e3f8, flags=flags@entry=VIR_FILE_CLOSE_PRESERVE_ERRNO) at util/virfile.c:65 #4 0x00007ffff748b3da in virCommandHandleReadWrite (watch=-1, fd=-1, events=events@entry=9, opaque=opaque@entry=0x7fffcc01e380) at util/vircommand.c:2239 #5 0x00007ffff748ef8b in virCommandWait (cmd=0x7fffcc01e380, exitstatus=exitstatus@entry=0x0) at util/vircommand.c:2501 #6 0x00007ffff749b367 in virFileWrapperFdClose (wfd=wfd@entry=0x7fffcc01e360) at util/virfile.c:288 #7 0x00007fffdf180eb7 in qemuDomainSaveMemory (driver=driver@entry=0x7fffd806dfb0, vm=vm@entry=0x7fffbc001510, path=path@entry=0x7fffcc01e4b0 "/home/berrange/src/virt/libvirt-tck/tck.img", domXML=<optimized out>, compressed=compressed@entry=0, was_running=was_running@entry=true, flags=flags@entry=0, asyncJob=asyncJob@entry=QEMU_ASYNC_JOB_SAVE) at qemu/qemu_driver.c:2792 #8 0x00007fffdf1842b3 in qemuDomainSaveInternal (driver=driver@entry=0x7fffd806dfb0, vm=0x7fffbc001510, path=path@entry=0x7fffcc01e4b0 "/home/berrange/src/virt/libvirt-tck/tck.img", compressed=0, xmlin=xmlin@entry=0x0, flags=flags@entry=0, dom=0x7fffcc00a1a0) at qemu/qemu_driver.c:2901 #9 0x00007fffdf1851f2 in qemuDomainSaveFlags (dom=0x7fffcc00a1a0, path=0x7fffcc01e4b0 "/home/berrange/src/virt/libvirt-tck/tck.img", dxml=0x0, flags=0) at qemu/qemu_driver.c:3013 #10 0x00007ffff75426e9 in virDomainSave (domain=domain@entry=0x7fffcc00a1a0, to=0x7fffcc01e2a0 "/home/berrange/src/virt/libvirt-tck/tck.img") at libvirt.c:2617 #11 0x0000000000428bc2 in remoteDispatchDomainSave (server=<optimized out>, msg=<optimized out>, args=0x7fffcc01e2e0, rerr=0x7fffe87a4c50, client=<optimized out>) at remote_dispatch.h:4758 #12 remoteDispatchDomainSaveHelper (server=<optimized out>, client=<optimized out>, msg=<optimized out>, rerr=0x7fffe87a4c50, args=0x7fffcc01e2e0, ret=<optimized out>) at remote_dispatch.h:4736 #13 0x00007ffff75af942 in virNetServerProgramDispatchCall (msg=0x693e10, client=0x694f60, server=0x66b490, prog=0x690bd0) at rpc/virnetserverprogram.c:432 #14 virNetServerProgramDispatch (prog=0x690bd0, server=server@entry=0x66b490, client=0x694f60, msg=0x693e10) at rpc/virnetserverprogram.c:305 #15 0x00007ffff75a9b78 in virNetServerProcessMsg (msg=<optimized out>, prog=<optimized out>, client=<optimized out>, srv=0x66b490) at rpc/virnetserver.c:162 #16 virNetServerHandleJob (jobOpaque=<optimized out>, opaque=0x66b490) at rpc/virnetserver.c:183 #17 0x00007ffff74c304e in virThreadPoolWorker (opaque=opaque@entry=0x65ac90) at util/virthreadpool.c:144 #18 0x00007ffff74c2706 in virThreadHelper (data=<optimized out>) at util/virthreadpthread.c:161 #19 0x00007ffff3e9dd15 in start_thread (arg=0x7fffe87a5700) at pthread_create.c:308 #20 0x00007ffff37c946d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:114 Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On 06.02.2013 12:51, Daniel P. Berrange wrote:
On Mon, Feb 04, 2013 at 03:56:37PM +0100, Michal Privoznik wrote:
Currently, if we want to use IO with asynchronous command we have to copy code from virFileWrapperFd to misuse our event loop for reading and writing to the command. However, we can extend our virCommand implementation to automatically set things up.
There is something racy in this series of changes. About 50% of the time when running a domain save, I see warnings about closing invalid file descriptors & invalid watches from libvirtd
2013-02-06 11:41:43.766+0000: 19078: warning : virEventPollRemoveHandle:183 : Ignoring invalid remove watch -1 2013-02-06 11:41:43.766+0000: 19078: warning : virFileClose:65 : Tried to close invalid fd 25
Yeah, I've notice some errors as well, however with lower ratio. I believe this is the fix: https://www.redhat.com/archives/libvir-list/2013-February/msg00352.html Michal
participants (3)
-
Daniel P. Berrange
-
Michal Privoznik
-
Peter Krempa