By running the doTunnelSendAll code in a separate thread, the
main thread can do qemuMigrationWaitForCompletion as with
normal migration. This in turn ensures that job signals work
correctly and that progress monitoring can be done
* src/qemu/qemu_migration.c: Runn tunnelled migration in
separate thread
---
src/qemu/qemu_migration.c | 95 ++++++++++++++++++++++++++++++++++++++-------
1 files changed, 81 insertions(+), 14 deletions(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index d12ffda..bc8a305 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -1287,44 +1287,101 @@ cleanup:
#define TUNNEL_SEND_BUF_SIZE 65536
-static int doTunnelSendAll(virStreamPtr st,
- int sock)
+typedef struct _qemuMigrationIOThread qemuMigrationIOThread;
+typedef qemuMigrationIOThread * qemuMigrationIOThreadPtr;
+struct _qemuMigrationIOThread {
+ virThread thread;
+ virStreamPtr st;
+ int sock;
+ virError err;
+};
+
+static void qemuMigrationIOFunc(void *arg)
{
+ qemuMigrationIOThreadPtr data = arg;
char *buffer;
int nbytes = TUNNEL_SEND_BUF_SIZE;
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) {
virReportOOMError();
- virStreamAbort(st);
- return -1;
+ virStreamAbort(data->st);
+ goto error;
}
for (;;) {
- nbytes = saferead(sock, buffer, TUNNEL_SEND_BUF_SIZE);
+ nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
if (nbytes < 0) {
virReportSystemError(errno, "%s",
_("tunnelled migration failed to read from
qemu"));
- virStreamAbort(st);
+ virStreamAbort(data->st);
VIR_FREE(buffer);
- return -1;
+ goto error;
}
else if (nbytes == 0)
/* EOF; get out of here */
break;
- if (virStreamSend(st, buffer, nbytes) < 0) {
+ if (virStreamSend(data->st, buffer, nbytes) < 0) {
VIR_FREE(buffer);
- return -1;
+ goto error;
}
}
VIR_FREE(buffer);
- if (virStreamFinish(st) < 0)
- /* virStreamFinish set the error for us */
- return -1;
+ if (virStreamFinish(data->st) < 0)
+ goto error;
- return 0;
+ return;
+
+error:
+ virCopyLastError(&data->err);
+ virResetLastError();
+}
+
+
+static qemuMigrationIOThreadPtr
+qemuMigrationStartTunnel(virStreamPtr st,
+ int sock)
+{
+ qemuMigrationIOThreadPtr io;
+
+ if (VIR_ALLOC(io) < 0) {
+ virReportOOMError();
+ return NULL;
+ }
+
+ io->st = st;
+ io->sock = sock;
+
+ if (virThreadCreate(&io->thread, true,
+ qemuMigrationIOFunc,
+ io) < 0) {
+ VIR_FREE(io);
+ return NULL;
+ }
+
+ return io;
+}
+
+static int
+qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
+{
+ int rv = -1;
+ virThreadJoin(&io->thread);
+
+ /* Forward error from the IO thread, to this thread */
+ if (io->err.code != VIR_ERR_OK) {
+ virSetError(&io->err);
+ virResetError(&io->err);
+ goto cleanup;
+ }
+
+ rv = 0;
+
+cleanup:
+ VIR_FREE(io);
+ return rv;
}
@@ -1349,6 +1406,7 @@ static int doTunnelMigrate(struct qemud_driver *driver,
unsigned int background_flags = QEMU_MONITOR_MIGRATE_BACKGROUND;
int ret = -1;
qemuMigrationCookiePtr mig = NULL;
+ qemuMigrationIOThreadPtr iothread = NULL;
if (!qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_UNIX) &&
!qemuCapsGet(priv->qemuCaps, QEMU_CAPS_MIGRATE_QEMU_EXEC)) {
@@ -1484,7 +1542,16 @@ static int doTunnelMigrate(struct qemud_driver *driver,
goto cancel;
}
- ret = doTunnelSendAll(st, client_sock);
+ if (!(iothread = qemuMigrationStartTunnel(st, client_sock)))
+ goto cancel;
+
+ ret = qemuMigrationWaitForCompletion(driver, vm);
+
+ /* Close now to ensure the IO thread quits & is joinable in next method */
+ VIR_FORCE_CLOSE(client_sock);
+
+ if (qemuMigrationStopTunnel(iothread) < 0)
+ ret = -1;
if (ret == 0 &&
qemuMigrationBakeCookie(mig, driver, vm, cookieout, cookieoutlen, 0) < 0)
--
1.7.4.4