[libvirt] [PATCH 00/21] Support NBD for tunnelled migration

The provided patchset implements NBD disk migration over a tunnelled connection provided by libvirt. The migration source instructs QEMU to NBD mirror drives into the provided UNIX socket. These connections and all the data are then tunnelled to the destination using newly introduced RPC call. The migration destination implements a driver method that connects the tunnelled stream to the QEMU's NBD destination. The detailed scheme is the following: PREPARE 1. Migration destination starts QEMU's NBD server listening on a UNIX socket using the `nbd-server-add` monitor command and tells NBD to accept listed disks via code added to qemuMigrationStartNBDServer that calls introduced qemuMonitorNBDServerStartUnix monitor function. PERFORM 2. Migration source creates a UNIX socket that is later used as NBDs destination in `drive-mirror` monitor command. This is implemented as a call to virNetSocketNewListenUnix from doTunnelMigrate. 3. Source starts IOThread that polls on the UNIX socket, accepting every incoming QEMU connection. This is done by adding a new pollfd in the poll(2) call in qemuMigrationIOFunc that calls introduced qemuNBDTunnelAcceptAndPipe function. 4. The qemuNBDTunnelAcceptAndPipe function accepts the connection and creates two virStream's. One is `local` that is later associated with just accepted connection using virFDStreamOpen. Second is `remote` that is later tunnelled to the remote destination stream. The `local` stream is converted to a virFDStreamDrv stream using the virFDStreamOpen call on the fd returned by accept(2). The `remote` stream is associated with a stream on the destination in the way similar to used by PrepareTunnel3* function. That is, the virDomainMigrateOpenTunnel function called on the destination connection object. The virDomainMigrateOpenTunnel calls remote driver's handler remoteDomainMigrateOpenTunnel that makes DOMAIN_MIGRATE_OPEN_TUNNEL call to the destination host. The code in remoteDomainMigrateOpenTunnel ties passed virStream object to a virStream on the destination host via remoteStreamDrv driver. The remote driver handles stream's IO by tunnelling data through the RPC connection. The qemuNBDTunnelAcceptAndPipe at last assigns both streams the same event callback qemuMigrationPipeEvent. Its job is to track statuses of the streams doing IO whenever it is necessary. 5. Source starts the drive mirroring using the qemuMigrationDriveMirror func. The function instructs QEMU to mirror drives to the UNIX socket that thread listens on. Since it is necessary for the mirror driving to get into the 'synchronized' state, where writes go to both destinations simultaneously, before continuing VM migration, the thread serving the connections must be started earlier. 6. When the connection to a UNIX socket on the migration source is made the DOMAIN_MIGRATE_OPEN_TUNNEL proc is called on the migration destination. The handler of this code calls virDomainMigrateOpenTunnel which calls qemuMigrationOpenNBDTunnel by the means of qemuDomainMigrateOpenTunnel. The qemuMigrationOpenNBDTunnel connects the stream linked to a source's stream to the NBD's UNIX socket on the migration destination side. 7. The rest of the disk migration occurs semimagically: virStream* APIs tunnel data in both directions. This is done by qemuMigrationPipeEvent event callback set for both streams. The order of the patches is roughly the following: * First, the RPC machinery and remote driver's virDrvDomainMigrateOpenTunnel implementation are added. * Then, the source-side of the protocol is implemented: code listening on a UNIX socket is added, DriveMirror is enhanced to instruct QEMU to `drive-mirror` here and starting IOThread driving the tunneling sooner. * After that, the destination-side of the protocol is implemented: the qemuMonitorNBDServerStartUnix added and qemuMigrationStartNBDServer enhanced to call it. The qemuDomainMigrateOpenTunnel is implemented along with qemuMigrationOpenNBDTunnel that does the real job. * Finally, the code blocking NBD migration for tunnelled migration is removed. Pavel Boldin (21): rpc: add DOMAIN_MIGRATE_OPEN_TUNNEL proc driver: add virDrvDomainMigrateOpenTunnel remote_driver: introduce virRemoteClientNew remote_driver: add remoteDomainMigrateOpenTunnel domain: add virDomainMigrateOpenTunnel domain: add virDomainMigrateTunnelFlags remote: impl remoteDispatchDomainMigrateOpenTunnel qemu: migration: src: add nbd tunnel socket data qemu: migration: src: nbdtunnel unix socket qemu: migration: src: qemu `drive-mirror` to UNIX qemu: migration: src: qemuSock for running thread qemu: migration: src: add NBD unixSock to iothread qemu: migration: src: qemuNBDTunnelAcceptAndPipe qemu: migration: src: stream piping qemu: monitor: add qemuMonitorNBDServerStartUnix qemu: migration: dest: nbd-server to UNIX sock qemu: migration: dest: qemuMigrationOpenTunnel qemu: driver: add qemuDomainMigrateOpenTunnel qemu: migration: dest: qemuMigrationOpenNBDTunnel qemu: migration: allow NBD tunneling migration apparmor: fix tunnelmigrate permissions daemon/remote.c | 50 ++++ docs/apibuild.py | 1 + docs/hvsupport.pl | 1 + include/libvirt/libvirt-domain.h | 3 + src/driver-hypervisor.h | 8 + src/libvirt-domain.c | 43 ++++ src/libvirt_internal.h | 6 + src/libvirt_private.syms | 1 + src/qemu/qemu_driver.c | 24 ++ src/qemu/qemu_migration.c | 495 +++++++++++++++++++++++++++++++++------ src/qemu/qemu_migration.h | 6 + src/qemu/qemu_monitor.c | 12 + src/qemu/qemu_monitor.h | 2 + src/qemu/qemu_monitor_json.c | 35 +++ src/qemu/qemu_monitor_json.h | 2 + src/remote/remote_driver.c | 91 +++++-- src/remote/remote_protocol.x | 19 +- src/remote_protocol-structs | 8 + src/security/virt-aa-helper.c | 4 +- 19 files changed, 719 insertions(+), 92 deletions(-) -- 1.9.1

Add REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL remote call, args and rets. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- daemon/remote.c | 12 ++++++++++++ src/remote/remote_protocol.x | 19 ++++++++++++++++++- src/remote_protocol-structs | 8 ++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/daemon/remote.c b/daemon/remote.c index 3a3eb09..237124d 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -6661,6 +6661,18 @@ remoteDispatchDomainInterfaceAddresses(virNetServerPtr server ATTRIBUTE_UNUSED, } +static int +remoteDispatchDomainMigrateOpenTunnel(virNetServerPtr server ATTRIBUTE_UNUSED, + virNetServerClientPtr client ATTRIBUTE_UNUSED, + virNetMessagePtr msg ATTRIBUTE_UNUSED, + virNetMessageErrorPtr rerr ATTRIBUTE_UNUSED, + remote_domain_migrate_open_tunnel_args *args ATTRIBUTE_UNUSED, + remote_domain_migrate_open_tunnel_ret *ret ATTRIBUTE_UNUSED) +{ + return -1; +} + + /*----- Helpers. -----*/ /* get_nonnull_domain and get_nonnull_network turn an on-wire diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 80f4a8b..3d8702f 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -3240,6 +3240,15 @@ struct remote_domain_rename_ret { int retcode; }; +struct remote_domain_migrate_open_tunnel_args { + remote_uuid uuid; + unsigned int flags; +}; + +struct remote_domain_migrate_open_tunnel_ret { + int retcode; +}; + /*----- Protocol. -----*/ /* Define the program number, protocol version and procedure numbers here. */ @@ -5712,5 +5721,13 @@ enum remote_procedure { * @acl: domain:write * @acl: domain:save */ - REMOTE_PROC_DOMAIN_RENAME = 358 + REMOTE_PROC_DOMAIN_RENAME = 358, + + /** + * @generate: none + * @acl: domain:migrate + * @acl: domain:start + * @acl: domain:write + */ + REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL = 359 }; diff --git a/src/remote_protocol-structs b/src/remote_protocol-structs index ff99c00..b065576 100644 --- a/src/remote_protocol-structs +++ b/src/remote_protocol-structs @@ -2692,6 +2692,13 @@ struct remote_domain_rename_args { struct remote_domain_rename_ret { int retcode; }; +struct remote_domain_migrate_open_tunnel_args { + remote_uuid uuid; + u_int flags; +}; +struct remote_domain_migrate_open_tunnel_ret { + int retcode; +}; enum remote_procedure { REMOTE_PROC_CONNECT_OPEN = 1, REMOTE_PROC_CONNECT_CLOSE = 2, @@ -3051,4 +3058,5 @@ enum remote_procedure { REMOTE_PROC_DOMAIN_DEL_IOTHREAD = 356, REMOTE_PROC_DOMAIN_SET_USER_PASSWORD = 357, REMOTE_PROC_DOMAIN_RENAME = 358, + REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL = 359, }; -- 1.9.1

Add virDrvDomainMigrateOpenTunnel call that is to be implemented by the drivers in order to provide a way to open tunnel during migration. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/driver-hypervisor.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/driver-hypervisor.h b/src/driver-hypervisor.h index ae2ec4d..30a7446 100644 --- a/src/driver-hypervisor.h +++ b/src/driver-hypervisor.h @@ -1212,6 +1212,13 @@ typedef int const char *password, unsigned int flags); +typedef int +(*virDrvDomainMigrateOpenTunnel)(virConnectPtr dconn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN], + unsigned int flags); + + typedef struct _virHypervisorDriver virHypervisorDriver; typedef virHypervisorDriver *virHypervisorDriverPtr; @@ -1443,6 +1450,7 @@ struct _virHypervisorDriver { virDrvDomainGetFSInfo domainGetFSInfo; virDrvDomainInterfaceAddresses domainInterfaceAddresses; virDrvDomainSetUserPassword domainSetUserPassword; + virDrvDomainMigrateOpenTunnel domainMigrateOpenTunnel; }; -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Add virDrvDomainMigrateOpenTunnel call that is to be implemented by the drivers in order to provide a way to open tunnel during migration.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/driver-hypervisor.h | 8 ++++++++ 1 file changed, 8 insertions(+)
diff --git a/src/driver-hypervisor.h b/src/driver-hypervisor.h index ae2ec4d..30a7446 100644 --- a/src/driver-hypervisor.h +++ b/src/driver-hypervisor.h @@ -1212,6 +1212,13 @@ typedef int const char *password, unsigned int flags);
+typedef int +(*virDrvDomainMigrateOpenTunnel)(virConnectPtr dconn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN],
Should be 'const unsigned char *uuid," seems all use VIR_UUID_BUFLEN anyway as one digs deeper.
+ unsigned int flags); + + typedef struct _virHypervisorDriver virHypervisorDriver; typedef virHypervisorDriver *virHypervisorDriverPtr;
@@ -1443,6 +1450,7 @@ struct _virHypervisorDriver { virDrvDomainGetFSInfo domainGetFSInfo; virDrvDomainInterfaceAddresses domainInterfaceAddresses; virDrvDomainSetUserPassword domainSetUserPassword; + virDrvDomainMigrateOpenTunnel domainMigrateOpenTunnel; };

Move common code to a function. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/remote/remote_driver.c | 48 +++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index a1dd640..b72cb86 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -6119,6 +6119,28 @@ remoteDomainMigratePrepare3(virConnectPtr dconn, goto done; } +static virNetClientStreamPtr +virRemoteClientOpen(virStreamPtr st, + struct private_data *priv, + enum remote_procedure proc) +{ + virNetClientStreamPtr netst; + + if (!(netst = virNetClientStreamNew(priv->remoteProgram, + proc, + priv->counter))) + return NULL; + + if (virNetClientAddStream(priv->client, netst) < 0) { + virObjectUnref(netst); + return NULL; + } + + st->driver = &remoteStreamDrv; + st->privateData = netst; + + return netst; +} static int remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, @@ -6143,18 +6165,11 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(priv->remoteProgram, - REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, - priv->counter))) - goto done; + netst = virRemoteClientOpen(st, priv, + REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3); - if (virNetClientAddStream(priv->client, netst) < 0) { - virObjectUnref(netst); + if (netst == NULL) goto done; - } - - st->driver = &remoteStreamDrv; - st->privateData = netst; args.cookie_in.cookie_in_val = (char *)cookiein; args.cookie_in.cookie_in_len = cookieinlen; @@ -7193,18 +7208,11 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(priv->remoteProgram, - REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, - priv->counter))) - goto cleanup; + netst = virRemoteClientOpen(st, priv, + REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS); - if (virNetClientAddStream(priv->client, netst) < 0) { - virObjectUnref(netst); + if (netst == NULL) goto cleanup; - } - - st->driver = &remoteStreamDrv; - st->privateData = netst; if (call(dconn, priv, 0, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, (xdrproc_t) xdr_remote_domain_migrate_prepare_tunnel3_params_args, -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Move common code to a function.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/remote/remote_driver.c | 48 +++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 20 deletions(-)
ACK - this perhaps could be applied now since it's perhaps useful anyway. John

Add remoteDomainMigrateOpenTunnel that ties passed stream to the network stream and then makes the appropriate remote call. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/remote/remote_driver.c | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index b72cb86..f6571a9 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -8087,6 +8087,48 @@ remoteDomainRename(virDomainPtr dom, const char *new_name, unsigned int flags) } +static int +remoteDomainMigrateOpenTunnel(virConnectPtr dconn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN], + unsigned int flags) +{ + struct private_data *priv = dconn->privateData; + int rv = -1; + remote_domain_migrate_open_tunnel_args args; + remote_domain_migrate_open_tunnel_ret ret; + virNetClientStreamPtr netst; + + remoteDriverLock(priv); + + memset(&args, 0, sizeof(args)); + memset(&ret, 0, sizeof(ret)); + + netst = virRemoteClientOpen(st, priv, + REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL); + + if (netst == NULL) + goto done; + + memcpy(args.uuid, uuid, VIR_UUID_BUFLEN); + args.flags = flags; + + if (call(dconn, priv, 0, REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL, + (xdrproc_t) xdr_remote_domain_migrate_open_tunnel_args, (char *) &args, + (xdrproc_t) xdr_remote_domain_migrate_open_tunnel_ret, (char *) &ret) == -1) { + virNetClientRemoveStream(priv->client, netst); + virObjectUnref(netst); + goto done; + } + + rv = ret.retcode; + + done: + remoteDriverUnlock(priv); + return rv; +} + + /* get_nonnull_domain and get_nonnull_network turn an on-wire * (name, uuid) pair into virDomainPtr or virNetworkPtr object. * These can return NULL if underlying memory allocations fail, @@ -8437,6 +8479,7 @@ static virHypervisorDriver hypervisor_driver = { .domainInterfaceAddresses = remoteDomainInterfaceAddresses, /* 1.2.14 */ .domainSetUserPassword = remoteDomainSetUserPassword, /* 1.2.16 */ .domainRename = remoteDomainRename, /* 1.2.19 */ + .domainMigrateOpenTunnel = remoteDomainMigrateOpenTunnel, /* 1.2.XX */ }; static virNetworkDriver network_driver = { -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Add remoteDomainMigrateOpenTunnel that ties passed stream to the network stream and then makes the appropriate remote call.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/remote/remote_driver.c | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+)
diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index b72cb86..f6571a9 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -8087,6 +8087,48 @@ remoteDomainRename(virDomainPtr dom, const char *new_name, unsigned int flags) }
+static int +remoteDomainMigrateOpenTunnel(virConnectPtr dconn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN],
Should be "const unsigned char *uuid,"
+ unsigned int flags) +{ + struct private_data *priv = dconn->privateData; + int rv = -1; + remote_domain_migrate_open_tunnel_args args; + remote_domain_migrate_open_tunnel_ret ret; + virNetClientStreamPtr netst; + + remoteDriverLock(priv); + + memset(&args, 0, sizeof(args)); + memset(&ret, 0, sizeof(ret)); + + netst = virRemoteClientOpen(st, priv, + REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL); + + if (netst == NULL) + goto done; + + memcpy(args.uuid, uuid, VIR_UUID_BUFLEN); + args.flags = flags; + + if (call(dconn, priv, 0, REMOTE_PROC_DOMAIN_MIGRATE_OPEN_TUNNEL, + (xdrproc_t) xdr_remote_domain_migrate_open_tunnel_args, (char *) &args, + (xdrproc_t) xdr_remote_domain_migrate_open_tunnel_ret, (char *) &ret) == -1) { + virNetClientRemoveStream(priv->client, netst); + virObjectUnref(netst); + goto done; + } + + rv = ret.retcode; + + done: + remoteDriverUnlock(priv); + return rv; +} + + /* get_nonnull_domain and get_nonnull_network turn an on-wire * (name, uuid) pair into virDomainPtr or virNetworkPtr object. * These can return NULL if underlying memory allocations fail, @@ -8437,6 +8479,7 @@ static virHypervisorDriver hypervisor_driver = { .domainInterfaceAddresses = remoteDomainInterfaceAddresses, /* 1.2.14 */ .domainSetUserPassword = remoteDomainSetUserPassword, /* 1.2.16 */ .domainRename = remoteDomainRename, /* 1.2.19 */ + .domainMigrateOpenTunnel = remoteDomainMigrateOpenTunnel, /* 1.2.XX */
will be at least 1.3.1 John
};
static virNetworkDriver network_driver = {

Add auxiliary private function that calls the apropriate driver's domainMigrateOpenTunnel function. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- docs/apibuild.py | 1 + docs/hvsupport.pl | 1 + src/libvirt-domain.c | 43 +++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 6 ++++++ src/libvirt_private.syms | 1 + 5 files changed, 52 insertions(+) diff --git a/docs/apibuild.py b/docs/apibuild.py index f934fb2..6e60093 100755 --- a/docs/apibuild.py +++ b/docs/apibuild.py @@ -102,6 +102,7 @@ ignored_functions = { "virDomainMigratePrepare3Params": "private function for migration", "virDomainMigrateConfirm3Params": "private function for migration", "virDomainMigratePrepareTunnel3Params": "private function for tunnelled migration", + "virDomainMigrateOpenTunnel": "private function for tunnelled migration", "virErrorCopyNew": "private", } diff --git a/docs/hvsupport.pl b/docs/hvsupport.pl index 44a30ce..3b6ee65 100755 --- a/docs/hvsupport.pl +++ b/docs/hvsupport.pl @@ -200,6 +200,7 @@ $apis{virDomainMigratePerform3Params}->{vers} = "1.1.0"; $apis{virDomainMigrateFinish3Params}->{vers} = "1.1.0"; $apis{virDomainMigrateConfirm3Params}->{vers} = "1.1.0"; +$apis{virDomainMigrateOpenTunnel}->{vers} = "1.2.XX"; # Now we want to get the mapping between public APIs diff --git a/src/libvirt-domain.c b/src/libvirt-domain.c index de7eb04..3037c01 100644 --- a/src/libvirt-domain.c +++ b/src/libvirt-domain.c @@ -11595,3 +11595,46 @@ virDomainInterfaceFree(virDomainInterfacePtr iface) VIR_FREE(iface); } + + +/* + * Not for public use. This function is part of the internal + * implementation of migration in the remote case. + */ +int +virDomainMigrateOpenTunnel(virConnectPtr conn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN], + unsigned int flags) +{ + char uuidstr[VIR_UUID_STRING_BUFLEN]; + + virUUIDFormat(uuid, uuidstr); + VIR_DEBUG("conn=%p, stream=%p, uuid=%s, flags=%x", + conn, st, uuidstr, flags); + + virResetLastError(); + + virCheckConnectReturn(conn, -1); + virCheckReadOnlyGoto(conn->flags, error); + + if (conn != st->conn) { + virReportInvalidArg(conn, "%s", + _("conn must match stream connection")); + goto error; + } + + if (conn->driver->domainMigrateOpenTunnel) { + int rv; + rv = conn->driver->domainMigrateOpenTunnel(conn, st, uuid, flags); + if (rv < 0) + goto error; + return rv; + } + + virReportUnsupportedError(); + + error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 1313b58..bbfba0b 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -289,4 +289,10 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams); +int +virDomainMigrateOpenTunnel(virConnectPtr conn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN], + unsigned int flags); + #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index a835f18..cf5725c 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -952,6 +952,7 @@ virDomainMigrateFinish; virDomainMigrateFinish2; virDomainMigrateFinish3; virDomainMigrateFinish3Params; +virDomainMigrateOpenTunnel; virDomainMigratePerform; virDomainMigratePerform3; virDomainMigratePerform3Params; -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Add auxiliary private function that calls the apropriate driver's domainMigrateOpenTunnel function.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- docs/apibuild.py | 1 + docs/hvsupport.pl | 1 + src/libvirt-domain.c | 43 +++++++++++++++++++++++++++++++++++++++++++ src/libvirt_internal.h | 6 ++++++ src/libvirt_private.syms | 1 + 5 files changed, 52 insertions(+)
diff --git a/docs/apibuild.py b/docs/apibuild.py index f934fb2..6e60093 100755 --- a/docs/apibuild.py +++ b/docs/apibuild.py @@ -102,6 +102,7 @@ ignored_functions = { "virDomainMigratePrepare3Params": "private function for migration", "virDomainMigrateConfirm3Params": "private function for migration", "virDomainMigratePrepareTunnel3Params": "private function for tunnelled migration", + "virDomainMigrateOpenTunnel": "private function for tunnelled migration", "virErrorCopyNew": "private", }
diff --git a/docs/hvsupport.pl b/docs/hvsupport.pl index 44a30ce..3b6ee65 100755 --- a/docs/hvsupport.pl +++ b/docs/hvsupport.pl @@ -200,6 +200,7 @@ $apis{virDomainMigratePerform3Params}->{vers} = "1.1.0"; $apis{virDomainMigrateFinish3Params}->{vers} = "1.1.0"; $apis{virDomainMigrateConfirm3Params}->{vers} = "1.1.0";
+$apis{virDomainMigrateOpenTunnel}->{vers} = "1.2.XX";
At least 1.3.1 Although - this may actually need to happen in the libvirt-python code rather than here. Cannot remember if these are necessary here still.
# Now we want to get the mapping between public APIs diff --git a/src/libvirt-domain.c b/src/libvirt-domain.c index de7eb04..3037c01 100644 --- a/src/libvirt-domain.c +++ b/src/libvirt-domain.c @@ -11595,3 +11595,46 @@ virDomainInterfaceFree(virDomainInterfacePtr iface)
VIR_FREE(iface); } + + +/* + * Not for public use. This function is part of the internal + * implementation of migration in the remote case. + */ +int +virDomainMigrateOpenTunnel(virConnectPtr conn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN],
Use const unsigned char *uuid
+ unsigned int flags) +{ + char uuidstr[VIR_UUID_STRING_BUFLEN]; + + virUUIDFormat(uuid, uuidstr); + VIR_DEBUG("conn=%p, stream=%p, uuid=%s, flags=%x", + conn, st, uuidstr, flags); + + virResetLastError(); + + virCheckConnectReturn(conn, -1); + virCheckReadOnlyGoto(conn->flags, error); + + if (conn != st->conn) { + virReportInvalidArg(conn, "%s", + _("conn must match stream connection")); + goto error; + } + + if (conn->driver->domainMigrateOpenTunnel) { + int rv; + rv = conn->driver->domainMigrateOpenTunnel(conn, st, uuid, flags); + if (rv < 0) + goto error; + return rv; + } + + virReportUnsupportedError(); + + error: + virDispatchError(conn); + return -1; +} diff --git a/src/libvirt_internal.h b/src/libvirt_internal.h index 1313b58..bbfba0b 100644 --- a/src/libvirt_internal.h +++ b/src/libvirt_internal.h @@ -289,4 +289,10 @@ virTypedParameterValidateSet(virConnectPtr conn, virTypedParameterPtr params, int nparams);
+int +virDomainMigrateOpenTunnel(virConnectPtr conn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN],
again...
+ unsigned int flags); + #endif diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index a835f18..cf5725c 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -952,6 +952,7 @@ virDomainMigrateFinish; virDomainMigrateFinish2; virDomainMigrateFinish3; virDomainMigrateFinish3Params; +virDomainMigrateOpenTunnel; virDomainMigratePerform; virDomainMigratePerform3; virDomainMigratePerform3Params;

Add virDomainMigrateTunnelFlags enum. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- include/libvirt/libvirt-domain.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/libvirt/libvirt-domain.h b/include/libvirt/libvirt-domain.h index a1ea6a5..444deee 100644 --- a/include/libvirt/libvirt-domain.h +++ b/include/libvirt/libvirt-domain.h @@ -661,6 +661,9 @@ typedef enum { VIR_MIGRATE_RDMA_PIN_ALL = (1 << 14), /* RDMA memory pinning */ } virDomainMigrateFlags; +typedef enum { + VIR_MIGRATE_TUNNEL_NBD = (1 << 0), /* open tunnel for NBD */ +} virDomainMigrateTunnelFlags; /** * VIR_MIGRATE_PARAM_URI: -- 1.9.1

Implement remoteDispatchDomainMigrateOpenTunnel. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- daemon/remote.c | 50 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/daemon/remote.c b/daemon/remote.c index 237124d..3c6803e 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -6663,13 +6663,51 @@ remoteDispatchDomainInterfaceAddresses(virNetServerPtr server ATTRIBUTE_UNUSED, static int remoteDispatchDomainMigrateOpenTunnel(virNetServerPtr server ATTRIBUTE_UNUSED, - virNetServerClientPtr client ATTRIBUTE_UNUSED, - virNetMessagePtr msg ATTRIBUTE_UNUSED, - virNetMessageErrorPtr rerr ATTRIBUTE_UNUSED, - remote_domain_migrate_open_tunnel_args *args ATTRIBUTE_UNUSED, - remote_domain_migrate_open_tunnel_ret *ret ATTRIBUTE_UNUSED) + virNetServerClientPtr client, + virNetMessagePtr msg, + virNetMessageErrorPtr rerr, + remote_domain_migrate_open_tunnel_args *args, + remote_domain_migrate_open_tunnel_ret *ret) { - return -1; + int rv = -1; + struct daemonClientPrivate *priv = + virNetServerClientGetPrivateData(client); + virStreamPtr st = NULL; + daemonClientStreamPtr stream = NULL; + + if (!priv->conn) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("connection not open")); + goto cleanup; + } + + if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)) || + !(stream = daemonCreateClientStream(client, st, remoteProgram, + &msg->header))) + goto cleanup; + + ret->retcode = virDomainMigrateOpenTunnel(priv->conn, st, + (unsigned char *)args->uuid, + args->flags); + + if (ret->retcode < 0) + goto cleanup; + + if (daemonAddClientStream(client, stream, true) < 0) + goto cleanup; + + rv = 0; + + cleanup: + if (rv < 0) { + virNetMessageSaveError(rerr); + if (stream) { + virStreamAbort(st); + daemonFreeClientStream(client, stream); + } else { + virObjectUnref(st); + } + } + return rv; } -- 1.9.1

Add local NBD tunnel socket info to the qemuMigrationSpec structure. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 3eee3a5..fb2a216 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3966,6 +3966,11 @@ struct _qemuMigrationSpec { union { virStreamPtr stream; } fwd; + + struct { + char *file; + int sock; + } nbd_tunnel_unix_socket; }; #define TUNNEL_SEND_BUF_SIZE 65536 -- 1.9.1

Create a UNIX socket that will be a target for outgoing NBD connection from the QEMU side. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index fb2a216..d587c56 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -4597,7 +4597,7 @@ static int doTunnelMigrate(virQEMUDriverPtr driver, size_t nmigrate_disks, const char **migrate_disks) { - virNetSocketPtr sock = NULL; + virNetSocketPtr sock = NULL, nbdSock = NULL; int ret = -1; qemuMigrationSpec spec; virQEMUDriverConfigPtr cfg = virQEMUDriverGetConfig(driver); @@ -4613,6 +4613,23 @@ static int doTunnelMigrate(virQEMUDriverPtr driver, spec.fwdType = MIGRATION_FWD_STREAM; spec.fwd.stream = st; + if (nmigrate_disks) { + spec.nbd_tunnel_unix_socket.sock = -1; + spec.nbd_tunnel_unix_socket.file = NULL; + + if (virAsprintf(&spec.nbd_tunnel_unix_socket.file, + "%s/domain-%s/qemu.nbdtunnelmigrate.src", + cfg->libDir, vm->def->name) < 0) + goto cleanup; + + if (virNetSocketNewListenUNIX(spec.nbd_tunnel_unix_socket.file, 0700, + cfg->user, cfg->group, + &nbdSock) < 0 || + virNetSocketListen(nbdSock, 1) < 0) + goto cleanup; + + spec.nbd_tunnel_unix_socket.sock = virNetSocketGetFD(nbdSock); + } spec.destType = MIGRATION_DEST_FD; spec.dest.fd.qemu = -1; @@ -4643,6 +4660,11 @@ static int doTunnelMigrate(virQEMUDriverPtr driver, VIR_FREE(spec.dest.unix_socket.file); } + if (nmigrate_disks) { + virObjectUnref(nbdSock); + VIR_FREE(spec.nbd_tunnel_unix_socket.file); + } + virObjectUnref(cfg); return ret; } -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Create a UNIX socket that will be a target for outgoing NBD connection from the QEMU side.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index fb2a216..d587c56 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -4597,7 +4597,7 @@ static int doTunnelMigrate(virQEMUDriverPtr driver, size_t nmigrate_disks, const char **migrate_disks) { - virNetSocketPtr sock = NULL; + virNetSocketPtr sock = NULL, nbdSock = NULL; int ret = -1; qemuMigrationSpec spec; virQEMUDriverConfigPtr cfg = virQEMUDriverGetConfig(driver); @@ -4613,6 +4613,23 @@ static int doTunnelMigrate(virQEMUDriverPtr driver, spec.fwdType = MIGRATION_FWD_STREAM; spec.fwd.stream = st;
+ if (nmigrate_disks) { + spec.nbd_tunnel_unix_socket.sock = -1; + spec.nbd_tunnel_unix_socket.file = NULL; + + if (virAsprintf(&spec.nbd_tunnel_unix_socket.file, + "%s/domain-%s/qemu.nbdtunnelmigrate.src", + cfg->libDir, vm->def->name) < 0) + goto cleanup; + + if (virNetSocketNewListenUNIX(spec.nbd_tunnel_unix_socket.file, 0700, + cfg->user, cfg->group, + &nbdSock) < 0 || + virNetSocketListen(nbdSock, 1) < 0) + goto cleanup; + + spec.nbd_tunnel_unix_socket.sock = virNetSocketGetFD(nbdSock); + }
We can now get to cleanup without "spec.destType" being set. The first thing it does is check it - so it's uninitialized according to Coverity. Probably need to move the following 2 lines up
spec.destType = MIGRATION_DEST_FD; spec.dest.fd.qemu = -1; @@ -4643,6 +4660,11 @@ static int doTunnelMigrate(virQEMUDriverPtr driver, VIR_FREE(spec.dest.unix_socket.file); }
+ if (nmigrate_disks) { + virObjectUnref(nbdSock);
?? VIR_FORCE_CLOSE(spec.nbd_tunnel_unix_socket.sock); John
+ VIR_FREE(spec.nbd_tunnel_unix_socket.file); + } + virObjectUnref(cfg); return ret; }

Make qemuMigrationDriveMirror able to instruct QEMU to connect to a local UNIX socket used for tunnelling. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d587c56..d95cd66 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2048,7 +2048,8 @@ static int qemuMigrationDriveMirror(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationCookiePtr mig, - const char *host, + bool dest_host, + const char *dest, unsigned long speed, unsigned int *migrate_flags, size_t nmigrate_disks, @@ -2057,7 +2058,7 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, { qemuDomainObjPrivatePtr priv = vm->privateData; int ret = -1; - int port; + int port = -1; size_t i; char *diskAlias = NULL; char *nbd_dest = NULL; @@ -2068,16 +2069,20 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, VIR_DEBUG("Starting drive mirrors for domain %s", vm->def->name); - /* steal NBD port and thus prevent its propagation back to destination */ - port = mig->nbd->port; - mig->nbd->port = 0; + virCheckNonNullArgGoto(dest, cleanup); + + if (dest_host) { + /* steal NBD port and thus prevent its propagation back to destination */ + port = mig->nbd->port; + mig->nbd->port = 0; - /* escape literal IPv6 address */ - if (strchr(host, ':')) { - if (virAsprintf(&hoststr, "[%s]", host) < 0) + /* escape literal IPv6 address */ + if (strchr(dest, ':')) { + if (virAsprintf(&hoststr, "[%s]", dest) < 0) + goto cleanup; + } else if (VIR_STRDUP(hoststr, dest) < 0) { goto cleanup; - } else if (VIR_STRDUP(hoststr, host) < 0) { - goto cleanup; + } } if (*migrate_flags & QEMU_MONITOR_MIGRATE_NON_SHARED_INC) @@ -2092,11 +2097,18 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, if (!qemuMigrateDisk(disk, nmigrate_disks, migrate_disks)) continue; - if ((virAsprintf(&diskAlias, "%s%s", - QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) || - (virAsprintf(&nbd_dest, "nbd:%s:%d:exportname=%s", - hoststr, port, diskAlias) < 0)) + if (virAsprintf(&diskAlias, "%s%s", + QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) goto cleanup; + if (dest_host) { + if (virAsprintf(&nbd_dest, "nbd:%s:%d:exportname=%s", + hoststr, port, diskAlias) < 0) + goto cleanup; + } else { + if (virAsprintf(&nbd_dest, "nbd:unix:%s:exportname=%s", + dest, diskAlias) < 0) + goto cleanup; + } if (qemuDomainObjEnterMonitorAsync(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) @@ -4281,10 +4293,13 @@ qemuMigrationRun(virQEMUDriverPtr driver, if (migrate_flags & (QEMU_MONITOR_MIGRATE_NON_SHARED_DISK | QEMU_MONITOR_MIGRATE_NON_SHARED_INC)) { + bool dest_host = spec->destType == MIGRATION_DEST_HOST; + const char *dest = dest_host ? spec->dest.host.name : + spec->nbd_tunnel_unix_socket.file; if (mig->nbd) { /* This will update migrate_flags on success */ if (qemuMigrationDriveMirror(driver, vm, mig, - spec->dest.host.name, + dest_host, dest, migrate_speed, &migrate_flags, nmigrate_disks, -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Make qemuMigrationDriveMirror able to instruct QEMU to connect to a local UNIX socket used for tunnelling.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d587c56..d95cd66 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -2048,7 +2048,8 @@ static int qemuMigrationDriveMirror(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationCookiePtr mig, - const char *host, + bool dest_host, + const char *dest,
You'll need to update comments for function including the parameters In particular if 'dest_host' is not set, then the expectation is that the incoming 'dest' contains the unix socket file/tunnel. FWIW: When I first read it, I thought the change would be to have the new flag be the new option, not the old way. Perhaps it'd be clearer to have the calling code check for MIGRATION_DEST_FD in order to determine that we "could" have this ndb socket. However, something that's not quite clear (yet) - as I read it, pec.nbd_tunnel_unix_socket.file is only generated when doTunnelMigrate determines 'nmigrate_disks' is true. So if it's not true, but yet some existing 'destType == MIGRATION_DEST_FD', then because this code has a check for a NULL 'dest' value, it would seem a failure would occur when perhaps it didn't previously or wasn't expected if there were no disks to migrate. Again, I have limited exposure/knowledge of the overall environment/ setup for migration, but something just doesn't seem right. It would seem that the code now assumes that nmigrate_disks would be true when MIGRATION_DEST_FD is set.
unsigned long speed, unsigned int *migrate_flags, size_t nmigrate_disks, @@ -2057,7 +2058,7 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, { qemuDomainObjPrivatePtr priv = vm->privateData; int ret = -1; - int port; + int port = -1; size_t i; char *diskAlias = NULL; char *nbd_dest = NULL; @@ -2068,16 +2069,20 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver,
VIR_DEBUG("Starting drive mirrors for domain %s", vm->def->name);
- /* steal NBD port and thus prevent its propagation back to destination */ - port = mig->nbd->port; - mig->nbd->port = 0; + virCheckNonNullArgGoto(dest, cleanup);
Hmm.. wouldn't ever expect this to trigger! If 'host' had been NULL, the strchr() below would have failed miserably.
+ + if (dest_host) { + /* steal NBD port and thus prevent its propagation back to destination */ + port = mig->nbd->port; + mig->nbd->port = 0;
- /* escape literal IPv6 address */ - if (strchr(host, ':')) { - if (virAsprintf(&hoststr, "[%s]", host) < 0) + /* escape literal IPv6 address */ + if (strchr(dest, ':')) { + if (virAsprintf(&hoststr, "[%s]", dest) < 0) + goto cleanup; + } else if (VIR_STRDUP(hoststr, dest) < 0) { goto cleanup; - } else if (VIR_STRDUP(hoststr, host) < 0) { - goto cleanup; + } }
if (*migrate_flags & QEMU_MONITOR_MIGRATE_NON_SHARED_INC) @@ -2092,11 +2097,18 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver, if (!qemuMigrateDisk(disk, nmigrate_disks, migrate_disks)) continue;
- if ((virAsprintf(&diskAlias, "%s%s", - QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) || - (virAsprintf(&nbd_dest, "nbd:%s:%d:exportname=%s", - hoststr, port, diskAlias) < 0)) + if (virAsprintf(&diskAlias, "%s%s", + QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) goto cleanup; + if (dest_host) { + if (virAsprintf(&nbd_dest, "nbd:%s:%d:exportname=%s", + hoststr, port, diskAlias) < 0) + goto cleanup; + } else { + if (virAsprintf(&nbd_dest, "nbd:unix:%s:exportname=%s", + dest, diskAlias) < 0) + goto cleanup; + }
if (qemuDomainObjEnterMonitorAsync(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) @@ -4281,10 +4293,13 @@ qemuMigrationRun(virQEMUDriverPtr driver,
if (migrate_flags & (QEMU_MONITOR_MIGRATE_NON_SHARED_DISK | QEMU_MONITOR_MIGRATE_NON_SHARED_INC)) { + bool dest_host = spec->destType == MIGRATION_DEST_HOST; + const char *dest = dest_host ? spec->dest.host.name : + spec->nbd_tunnel_unix_socket.file;
Seems to me that perhaps the bool should be "dest_fd_migrate_disks" and would be set if (spec->nbd_tunnel_unix_socket.file). It would only be checked/set if "spec->destType == MIGRATION_DEST_FD"; otherwise, we use the existing code and no boolean. IOW: The assumption I see here is that "any" destType != MIGRATION_DEST_HOST means it's MIGRATION_DEST_FD, which perhaps isn't true since I also see a MIGRATION_DEST_CONNECT_HOST and it doesn't preclude something in the future being added. John
if (mig->nbd) { /* This will update migrate_flags on success */ if (qemuMigrationDriveMirror(driver, vm, mig, - spec->dest.host.name, + dest_host, dest, migrate_speed, &migrate_flags, nmigrate_disks,

Tunnelled drive mirroring requires an active thread to accept incoming connections from the QEMU and pumping them to the remote host through the tunnel. For this, we need to split thread's QEMU socket initialization from the start of the thread and introduce qemuMigrationSetQEMUSocket to specify it later. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 93 ++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d95cd66..61e78c5 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3991,14 +3991,15 @@ typedef struct _qemuMigrationIOThread qemuMigrationIOThread; typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; struct _qemuMigrationIOThread { virThread thread; - virStreamPtr st; - int sock; + virStreamPtr qemuStream; + int qemuSock; virError err; int wakeupRecvFD; int wakeupSendFD; }; -static void qemuMigrationIOFunc(void *arg) +static void +qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; char *buffer = NULL; @@ -4006,21 +4007,18 @@ static void qemuMigrationIOFunc(void *arg) int timeout = -1; virErrorPtr err = NULL; - VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d", - data->st, data->sock); + VIR_DEBUG("Running migration tunnel; qemuStream=%p", data->qemuStream); if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) goto abrt; - fds[0].fd = data->sock; - fds[1].fd = data->wakeupRecvFD; + fds[0].fd = data->wakeupRecvFD; + fds[1].fd = -1; + fds[0].events = fds[1].events = POLLIN; for (;;) { int ret; - fds[0].events = fds[1].events = POLLIN; - fds[0].revents = fds[1].revents = 0; - ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); if (ret < 0) { @@ -4040,30 +4038,36 @@ static void qemuMigrationIOFunc(void *arg) break; } - if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { - char stop = 0; + if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + char action = 0; - if (saferead(data->wakeupRecvFD, &stop, 1) != 1) { + if (saferead(data->wakeupRecvFD, &action, 1) != 1) { virReportSystemError(errno, "%s", _("failed to read from wakeup fd")); goto abrt; } - VIR_DEBUG("Migration tunnel was asked to %s", - stop ? "abort" : "finish"); - if (stop) { - goto abrt; - } else { - timeout = 0; + VIR_DEBUG("Migration tunnel was asked to %c", action); + switch (action) { + case 's': + goto abrt; + break; + case 'f': + timeout = 0; + break; + case 'u': + fds[1].fd = data->qemuSock; + VIR_DEBUG("qemuSock set %d", data->qemuSock); + break; } } - if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { int nbytes; - nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); + nbytes = saferead(data->qemuSock, buffer, TUNNEL_SEND_BUF_SIZE); if (nbytes > 0) { - if (virStreamSend(data->st, buffer, nbytes) < 0) + if (virStreamSend(data->qemuStream, buffer, nbytes) < 0) goto error; } else if (nbytes < 0) { virReportSystemError(errno, "%s", @@ -4076,10 +4080,9 @@ static void qemuMigrationIOFunc(void *arg) } } - if (virStreamFinish(data->st) < 0) - goto error; + virStreamFinish(data->qemuStream); - VIR_FORCE_CLOSE(data->sock); + VIR_FORCE_CLOSE(data->qemuSock); VIR_FREE(buffer); return; @@ -4090,7 +4093,7 @@ static void qemuMigrationIOFunc(void *arg) virFreeError(err); err = NULL; } - virStreamAbort(data->st); + virStreamAbort(data->qemuStream); if (err) { virSetError(err); virFreeError(err); @@ -4099,7 +4102,7 @@ static void qemuMigrationIOFunc(void *arg) error: /* Let the source qemu know that the transfer cant continue anymore. * Don't copy the error for EPIPE as destination has the actual error. */ - VIR_FORCE_CLOSE(data->sock); + VIR_FORCE_CLOSE(data->qemuSock); if (!virLastErrorIsSystemErrno(EPIPE)) virCopyLastError(&data->err); virResetLastError(); @@ -4108,8 +4111,7 @@ static void qemuMigrationIOFunc(void *arg) static qemuMigrationIOThreadPtr -qemuMigrationStartTunnel(virStreamPtr st, - int sock) +qemuMigrationStartTunnel(virStreamPtr qemuStream) { qemuMigrationIOThreadPtr io = NULL; int wakeupFD[2] = { -1, -1 }; @@ -4123,8 +4125,8 @@ qemuMigrationStartTunnel(virStreamPtr st, if (VIR_ALLOC(io) < 0) goto error; - io->st = st; - io->sock = sock; + io->qemuStream = qemuStream; + io->qemuSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1]; @@ -4149,10 +4151,10 @@ static int qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) { int rv = -1; - char stop = error ? 1 : 0; + char action = error ? 's' : 'f'; /* make sure the thread finishes its job and is joinable */ - if (safewrite(io->wakeupSendFD, &stop, 1) != 1) { + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { virReportSystemError(errno, "%s", _("failed to wakeup migration tunnel")); goto cleanup; @@ -4180,6 +4182,26 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) } static int +qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock) +{ + int rv = -1; + char action = 'u'; + + io->qemuSock = sock; + + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to update migration tunnel")); + goto error; + } + + rv = 0; + + error: + return rv; +} + +static int qemuMigrationConnect(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationSpecPtr spec) @@ -4422,7 +4444,10 @@ qemuMigrationRun(virQEMUDriverPtr driver, } if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, fd))) + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + goto cancel; + + if (qemuMigrationSetQEMUSocket(iothread, fd) < 0) goto cancel; /* If we've created a tunnel, then the 'fd' will be closed in the * qemuMigrationIOFunc as data->sock. -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Tunnelled drive mirroring requires an active thread to accept incoming connections from the QEMU and pumping them to the remote host through the tunnel.
For this, we need to split thread's QEMU socket initialization from the start of the thread and introduce qemuMigrationSetQEMUSocket to specify it later.
This is a whole lot more going that isn't explained.... e.g. 's', 'f', 'u'... The polling loop would now seem to "wait" for a data socket to be created/added. Also, even though it adds patches, perhaps would have been easier to understand by renaming the fields first, then flip-flopping the order, then splitting the setting of qemuSock until the 'u' is seen.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 93 ++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 34 deletions(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index d95cd66..61e78c5 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3991,14 +3991,15 @@ typedef struct _qemuMigrationIOThread qemuMigrationIOThread; typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; struct _qemuMigrationIOThread { virThread thread; - virStreamPtr st; - int sock; + virStreamPtr qemuStream; + int qemuSock; virError err; int wakeupRecvFD; int wakeupSendFD; };
-static void qemuMigrationIOFunc(void *arg) +static void +qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; char *buffer = NULL; @@ -4006,21 +4007,18 @@ static void qemuMigrationIOFunc(void *arg) int timeout = -1; virErrorPtr err = NULL;
- VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d", - data->st, data->sock); + VIR_DEBUG("Running migration tunnel; qemuStream=%p", data->qemuStream);
Since sock is "sent" to fds[0], thus isn't no longer used. So why is it passed?
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) goto abrt;
- fds[0].fd = data->sock; - fds[1].fd = data->wakeupRecvFD; + fds[0].fd = data->wakeupRecvFD; + fds[1].fd = -1; + fds[0].events = fds[1].events = POLLIN;
for (;;) { int ret;
- fds[0].events = fds[1].events = POLLIN; - fds[0].revents = fds[1].revents = 0; -
Don't think we want to lose the revents = 0. According to how I read the man page, because the fds[1] = -1, it'd be set to 0 anyway on return. But once fds[1] is set - since we're polling two fd's here - how do you guarantee in the following code that you wouldn't be "rereading and resending" on fds[1] if fds[0] is what tripped the poll?
ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
if (ret < 0) { @@ -4040,30 +4038,36 @@ static void qemuMigrationIOFunc(void *arg) break; }
- if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { - char stop = 0; + if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + char action = 0;
- if (saferead(data->wakeupRecvFD, &stop, 1) != 1) { + if (saferead(data->wakeupRecvFD, &action, 1) != 1) { virReportSystemError(errno, "%s", _("failed to read from wakeup fd")); goto abrt; }
- VIR_DEBUG("Migration tunnel was asked to %s", - stop ? "abort" : "finish"); - if (stop) { - goto abrt; - } else { - timeout = 0; + VIR_DEBUG("Migration tunnel was asked to %c", action); + switch (action) { + case 's': + goto abrt; + break; + case 'f': + timeout = 0; + break; + case 'u': + fds[1].fd = data->qemuSock; + VIR_DEBUG("qemuSock set %d", data->qemuSock); + break; } }
- if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) { + if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) { int nbytes;
- nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); + nbytes = saferead(data->qemuSock, buffer, TUNNEL_SEND_BUF_SIZE); if (nbytes > 0) { - if (virStreamSend(data->st, buffer, nbytes) < 0) + if (virStreamSend(data->qemuStream, buffer, nbytes) < 0) goto error; } else if (nbytes < 0) { virReportSystemError(errno, "%s", @@ -4076,10 +4080,9 @@ static void qemuMigrationIOFunc(void *arg) } }
- if (virStreamFinish(data->st) < 0) - goto error; + virStreamFinish(data->qemuStream);
It would seem to me we shouldn't be losing this goto error on failure. John
- VIR_FORCE_CLOSE(data->sock); + VIR_FORCE_CLOSE(data->qemuSock); VIR_FREE(buffer);
return; @@ -4090,7 +4093,7 @@ static void qemuMigrationIOFunc(void *arg) virFreeError(err); err = NULL; } - virStreamAbort(data->st); + virStreamAbort(data->qemuStream); if (err) { virSetError(err); virFreeError(err); @@ -4099,7 +4102,7 @@ static void qemuMigrationIOFunc(void *arg) error: /* Let the source qemu know that the transfer cant continue anymore. * Don't copy the error for EPIPE as destination has the actual error. */ - VIR_FORCE_CLOSE(data->sock); + VIR_FORCE_CLOSE(data->qemuSock); if (!virLastErrorIsSystemErrno(EPIPE)) virCopyLastError(&data->err); virResetLastError(); @@ -4108,8 +4111,7 @@ static void qemuMigrationIOFunc(void *arg)
static qemuMigrationIOThreadPtr -qemuMigrationStartTunnel(virStreamPtr st, - int sock) +qemuMigrationStartTunnel(virStreamPtr qemuStream) { qemuMigrationIOThreadPtr io = NULL; int wakeupFD[2] = { -1, -1 }; @@ -4123,8 +4125,8 @@ qemuMigrationStartTunnel(virStreamPtr st, if (VIR_ALLOC(io) < 0) goto error;
- io->st = st; - io->sock = sock; + io->qemuStream = qemuStream; + io->qemuSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1];
@@ -4149,10 +4151,10 @@ static int qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) { int rv = -1; - char stop = error ? 1 : 0; + char action = error ? 's' : 'f';
/* make sure the thread finishes its job and is joinable */ - if (safewrite(io->wakeupSendFD, &stop, 1) != 1) { + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { virReportSystemError(errno, "%s", _("failed to wakeup migration tunnel")); goto cleanup; @@ -4180,6 +4182,26 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error) }
static int +qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock) +{ + int rv = -1; + char action = 'u'; + + io->qemuSock = sock; + + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to update migration tunnel")); + goto error; + } + + rv = 0; + + error: + return rv; +} + +static int qemuMigrationConnect(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationSpecPtr spec) @@ -4422,7 +4444,10 @@ qemuMigrationRun(virQEMUDriverPtr driver, }
if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, fd))) + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + goto cancel; + + if (qemuMigrationSetQEMUSocket(iothread, fd) < 0) goto cancel; /* If we've created a tunnel, then the 'fd' will be closed in the * qemuMigrationIOFunc as data->sock.

Pass UNIX socket used as a local NBD server destination to the migration iothread. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 46 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 61e78c5..0682fd8 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3993,6 +3993,7 @@ struct _qemuMigrationIOThread { virThread thread; virStreamPtr qemuStream; int qemuSock; + int unixSock; virError err; int wakeupRecvFD; int wakeupSendFD; @@ -4003,7 +4004,7 @@ qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; char *buffer = NULL; - struct pollfd fds[2]; + struct pollfd fds[3]; int timeout = -1; virErrorPtr err = NULL; @@ -4013,8 +4014,8 @@ qemuMigrationIOFunc(void *arg) goto abrt; fds[0].fd = data->wakeupRecvFD; - fds[1].fd = -1; - fds[0].events = fds[1].events = POLLIN; + fds[1].fd = fds[2].fd = -1; + fds[0].events = fds[1].events = fds[2].events = POLLIN; for (;;) { int ret; @@ -4057,7 +4058,9 @@ qemuMigrationIOFunc(void *arg) break; case 'u': fds[1].fd = data->qemuSock; - VIR_DEBUG("qemuSock set %d", data->qemuSock); + fds[2].fd = data->unixSock; + VIR_DEBUG("qemuSock set %d, unixSock set %d", + data->qemuSock, data->unixSock); break; } } @@ -4126,7 +4129,7 @@ qemuMigrationStartTunnel(virStreamPtr qemuStream) goto error; io->qemuStream = qemuStream; - io->qemuSock = -1; + io->qemuSock = io->unixSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1]; @@ -4202,6 +4205,26 @@ qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock) } static int +qemuMigrationSetUnixSocket(qemuMigrationIOThreadPtr io, int sock) +{ + int rv = -1; + char action = 'u'; + + io->unixSock = sock; + + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to update migration tunnel")); + goto error; + } + + rv = 0; + + error: + return rv; +} + +static int qemuMigrationConnect(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationSpecPtr spec) @@ -4313,6 +4336,16 @@ qemuMigrationRun(virQEMUDriverPtr driver, if (qemuDomainMigrateGraphicsRelocate(driver, vm, mig, graphicsuri) < 0) VIR_WARN("unable to provide data for graphics client relocation"); + if (spec->fwdType != MIGRATION_FWD_DIRECT) { + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + goto cancel; + + if (nmigrate_disks && + qemuMigrationSetUnixSocket(iothread, + spec->nbd_tunnel_unix_socket.sock) < 0) + goto cancel; + } + if (migrate_flags & (QEMU_MONITOR_MIGRATE_NON_SHARED_DISK | QEMU_MONITOR_MIGRATE_NON_SHARED_INC)) { bool dest_host = spec->destType == MIGRATION_DEST_HOST; @@ -4444,9 +4477,6 @@ qemuMigrationRun(virQEMUDriverPtr driver, } if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) - goto cancel; - if (qemuMigrationSetQEMUSocket(iothread, fd) < 0) goto cancel; /* If we've created a tunnel, then the 'fd' will be closed in the -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Pass UNIX socket used as a local NBD server destination to the migration iothread.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 46 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 61e78c5..0682fd8 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3993,6 +3993,7 @@ struct _qemuMigrationIOThread { virThread thread; virStreamPtr qemuStream; int qemuSock; + int unixSock; virError err; int wakeupRecvFD; int wakeupSendFD; @@ -4003,7 +4004,7 @@ qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; char *buffer = NULL; - struct pollfd fds[2]; + struct pollfd fds[3]; int timeout = -1; virErrorPtr err = NULL;
@@ -4013,8 +4014,8 @@ qemuMigrationIOFunc(void *arg) goto abrt;
fds[0].fd = data->wakeupRecvFD; - fds[1].fd = -1; - fds[0].events = fds[1].events = POLLIN; + fds[1].fd = fds[2].fd = -1; + fds[0].events = fds[1].events = fds[2].events = POLLIN;
for (;;) {
You'll need the fds[2].revents = 0 here too. Or are we not polling on this one... It seems we're not since there's no fd2[2].revents code below (yet?).
int ret; @@ -4057,7 +4058,9 @@ qemuMigrationIOFunc(void *arg) break; case 'u': fds[1].fd = data->qemuSock; - VIR_DEBUG("qemuSock set %d", data->qemuSock); + fds[2].fd = data->unixSock; + VIR_DEBUG("qemuSock set %d, unixSock set %d", + data->qemuSock, data->unixSock);
Two for one specials... I do have some "concern" over the synchronization between the setting/closing of these sockets... Perhaps more towards the "real" ones are managed via spec.* and these are in io* and concern over making sure the io* is removed first before the spec* is closed. Don't think it's an issue, but the more options added the more things to consider.
break; } } @@ -4126,7 +4129,7 @@ qemuMigrationStartTunnel(virStreamPtr qemuStream) goto error;
io->qemuStream = qemuStream; - io->qemuSock = -1; + io->qemuSock = io->unixSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1];
@@ -4202,6 +4205,26 @@ qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock) }
static int +qemuMigrationSetUnixSocket(qemuMigrationIOThreadPtr io, int sock) +{ + int rv = -1; + char action = 'u'; + + io->unixSock = sock; + + if (safewrite(io->wakeupSendFD, &action, 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to update migration tunnel")); + goto error; + } + + rv = 0; + + error: + return rv; +} + +static int qemuMigrationConnect(virQEMUDriverPtr driver, virDomainObjPtr vm, qemuMigrationSpecPtr spec) @@ -4313,6 +4336,16 @@ qemuMigrationRun(virQEMUDriverPtr driver, if (qemuDomainMigrateGraphicsRelocate(driver, vm, mig, graphicsuri) < 0) VIR_WARN("unable to provide data for graphics client relocation");
+ if (spec->fwdType != MIGRATION_FWD_DIRECT) { + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + goto cancel; + + if (nmigrate_disks && + qemuMigrationSetUnixSocket(iothread, + spec->nbd_tunnel_unix_socket.sock) < 0) + goto cancel; + } +
So again here we seem to be doing two things at one time... In particular moving the creation/setup of the Tunnel to much sooner in the processing... before even entering the monitor... I would think that going to 'cancel' is not the right failure step too. Why is there 'nmigrate_disks' and 'mig->nbd'? If the destination doesn't support NBD, then one would think the nbd_tunnel_unix_socket wouldn't have been created, but I don't recall that check being made in earlier patches. Perhaps the above lines need to be separated and the setting of the socket for the ndb_tunnel* only done if mig->nbd is true below. That way you're not setting that without also setting the migrate_flags for the *DISK|*INC bits... John
if (migrate_flags & (QEMU_MONITOR_MIGRATE_NON_SHARED_DISK | QEMU_MONITOR_MIGRATE_NON_SHARED_INC)) { bool dest_host = spec->destType == MIGRATION_DEST_HOST; @@ -4444,9 +4477,6 @@ qemuMigrationRun(virQEMUDriverPtr driver, }
if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) - goto cancel; - if (qemuMigrationSetQEMUSocket(iothread, fd) < 0) goto cancel; /* If we've created a tunnel, then the 'fd' will be closed in the

Add qemuNBDTunnelAcceptAndPipe function that is called to handle POLLIN on the UNIX socket connection from the QEMU's NBD server. The function creates a pipe of a remote stream connected to the QEMU NBD Unix socket on destination and a local stream connected to the incoming connection from the source QEMU's NBD. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 134 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 2 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0682fd8..0f35c13 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3987,6 +3987,9 @@ struct _qemuMigrationSpec { #define TUNNEL_SEND_BUF_SIZE 65536 +typedef struct _qemuMigrationPipe qemuMigrationPipe; +typedef qemuMigrationPipe *qemuMigrationPipePtr; + typedef struct _qemuMigrationIOThread qemuMigrationIOThread; typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; struct _qemuMigrationIOThread { @@ -3997,9 +4000,124 @@ struct _qemuMigrationIOThread { virError err; int wakeupRecvFD; int wakeupSendFD; + qemuMigrationPipePtr pipes; + virConnectPtr dconn; + unsigned char uuid[VIR_UUID_BUFLEN]; +}; + +struct _qemuMigrationPipe { + qemuMigrationPipePtr next; + qemuMigrationIOThreadPtr data; + virStreamPtr local; + virStreamPtr remote; }; static void +qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) +{ + virStreamEventUpdateCallback(pipe->local, 0); + virStreamEventUpdateCallback(pipe->remote, 0); + + if (abort) { + virStreamAbort(pipe->local); + virStreamAbort(pipe->remote); + } else { + virStreamFinish(pipe->local); + virStreamFinish(pipe->remote); + } + + virObjectUnref(pipe->local); + virObjectUnref(pipe->remote); +} + +static qemuMigrationPipePtr +qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) +{ + qemuMigrationPipePtr pipe = NULL; + + if (VIR_ALLOC(pipe) < 0) + goto error; + + pipe->local = local; + pipe->remote = remote; + + return pipe; + + error: + virStreamEventRemoveCallback(local); + virStreamEventRemoveCallback(remote); + VIR_FREE(pipe); + return NULL; +} + + +static int +qemuNBDTunnelAcceptAndPipe(qemuMigrationIOThreadPtr data) +{ + int fd, ret; + virStreamPtr local = NULL, remote = NULL; + qemuMigrationPipePtr pipe = NULL; + + while ((fd = accept(data->unixSock, NULL, NULL)) < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + virReportSystemError( + errno, "%s", _("failed to accept connection from qemu")); + goto abrt; + } + + if (!(local = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK))) + goto abrt; + + if (!(remote = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK))) + goto abrt; + + ret = virDomainMigrateOpenTunnel(data->dconn, + remote, + data->uuid, + VIR_MIGRATE_TUNNEL_NBD); + + if (ret < 0) + goto abrt; + + if (virFDStreamOpen(local, fd) < 0) + goto abrt; + + if (!(pipe = qemuMigrationPipeCreate(local, remote))) + goto abrt; + + pipe->data = data; + pipe->next = data->pipes; + data->pipes = pipe; + + return 0; + + abrt: + VIR_FORCE_CLOSE(fd); + virStreamAbort(local); + virStreamAbort(remote); + + virObjectUnref(local); + virObjectUnref(remote); + return -1; +} + +static void +qemuMigrationPipesStop(qemuMigrationPipePtr pipe, bool abort) +{ + qemuMigrationPipePtr tmp; + + while (pipe) { + tmp = pipe->next; + + qemuMigrationPipeClose(pipe, abort); + VIR_FREE(pipe); + + pipe = tmp; + } +} + +static void qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; @@ -4081,9 +4199,14 @@ qemuMigrationIOFunc(void *arg) break; } } + + if (fds[2].revents & (POLLIN | POLLERR | POLLHUP) && + qemuNBDTunnelAcceptAndPipe(data) < 0) + goto abrt; } virStreamFinish(data->qemuStream); + qemuMigrationPipesStop(data->pipes, false); VIR_FORCE_CLOSE(data->qemuSock); VIR_FREE(buffer); @@ -4097,6 +4220,7 @@ qemuMigrationIOFunc(void *arg) err = NULL; } virStreamAbort(data->qemuStream); + qemuMigrationPipesStop(data->pipes, true); if (err) { virSetError(err); virFreeError(err); @@ -4114,7 +4238,9 @@ qemuMigrationIOFunc(void *arg) static qemuMigrationIOThreadPtr -qemuMigrationStartTunnel(virStreamPtr qemuStream) +qemuMigrationStartTunnel(virStreamPtr qemuStream, + virConnectPtr dconn, + unsigned char uuid[VIR_UUID_BUFLEN]) { qemuMigrationIOThreadPtr io = NULL; int wakeupFD[2] = { -1, -1 }; @@ -4132,6 +4258,8 @@ qemuMigrationStartTunnel(virStreamPtr qemuStream) io->qemuSock = io->unixSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1]; + io->dconn = dconn; + memcpy(io->uuid, uuid, VIR_UUID_BUFLEN); if (virThreadCreate(&io->thread, true, qemuMigrationIOFunc, @@ -4337,7 +4465,9 @@ qemuMigrationRun(virQEMUDriverPtr driver, VIR_WARN("unable to provide data for graphics client relocation"); if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, + dconn, + mig->uuid))) goto cancel; if (nmigrate_disks && -- 1.9.1

On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Add qemuNBDTunnelAcceptAndPipe function that is called to handle POLLIN on the UNIX socket connection from the QEMU's NBD server.
The function creates a pipe of a remote stream connected to the QEMU NBD Unix socket on destination and a local stream connected to the incoming connection from the source QEMU's NBD.
Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 134 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 2 deletions(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0682fd8..0f35c13 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3987,6 +3987,9 @@ struct _qemuMigrationSpec {
#define TUNNEL_SEND_BUF_SIZE 65536
+typedef struct _qemuMigrationPipe qemuMigrationPipe; +typedef qemuMigrationPipe *qemuMigrationPipePtr; + typedef struct _qemuMigrationIOThread qemuMigrationIOThread; typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr; struct _qemuMigrationIOThread { @@ -3997,9 +4000,124 @@ struct _qemuMigrationIOThread { virError err; int wakeupRecvFD; int wakeupSendFD; + qemuMigrationPipePtr pipes; + virConnectPtr dconn; + unsigned char uuid[VIR_UUID_BUFLEN]; +}; + +struct _qemuMigrationPipe { + qemuMigrationPipePtr next; + qemuMigrationIOThreadPtr data; + virStreamPtr local; + virStreamPtr remote; };
static void +qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) +{ + virStreamEventUpdateCallback(pipe->local, 0); + virStreamEventUpdateCallback(pipe->remote, 0); + + if (abort) { + virStreamAbort(pipe->local); + virStreamAbort(pipe->remote); + } else { + virStreamFinish(pipe->local); + virStreamFinish(pipe->remote); + } + + virObjectUnref(pipe->local); + virObjectUnref(pipe->remote); +} +
Norm seems to be 2 blank lines between functions. There's only one here.
+static qemuMigrationPipePtr +qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) +{ + qemuMigrationPipePtr pipe = NULL; + + if (VIR_ALLOC(pipe) < 0) + goto error; + + pipe->local = local; + pipe->remote = remote; + + return pipe; + + error: + virStreamEventRemoveCallback(local); + virStreamEventRemoveCallback(remote); + VIR_FREE(pipe); + return NULL; +} +
but two here...
+ +static int +qemuNBDTunnelAcceptAndPipe(qemuMigrationIOThreadPtr data) +{ + int fd, ret; + virStreamPtr local = NULL, remote = NULL; + qemuMigrationPipePtr pipe = NULL; + + while ((fd = accept(data->unixSock, NULL, NULL)) < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + virReportSystemError( + errno, "%s", _("failed to accept connection from qemu")); + goto abrt; + } + + if (!(local = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK))) + goto abrt; + + if (!(remote = virStreamNew(data->dconn, VIR_STREAM_NONBLOCK))) + goto abrt; + + ret = virDomainMigrateOpenTunnel(data->dconn, + remote, + data->uuid, + VIR_MIGRATE_TUNNEL_NBD); + + if (ret < 0) + goto abrt; + + if (virFDStreamOpen(local, fd) < 0) + goto abrt; + + if (!(pipe = qemuMigrationPipeCreate(local, remote))) + goto abrt; + + pipe->data = data; + pipe->next = data->pipes; + data->pipes = pipe;
Didn't think too long about this insertion code, but how many times can this happen? It's the removal code which removes them all at one time that's just has me wondering.
+ + return 0; + + abrt: + VIR_FORCE_CLOSE(fd); + virStreamAbort(local); + virStreamAbort(remote); + + virObjectUnref(local); + virObjectUnref(remote); + return -1; +} +
back to just one blank line.
+static void +qemuMigrationPipesStop(qemuMigrationPipePtr pipe, bool abort) +{ + qemuMigrationPipePtr tmp; + + while (pipe) { + tmp = pipe->next; + + qemuMigrationPipeClose(pipe, abort); + VIR_FREE(pipe); + + pipe = tmp; + } +} +
And again only 1 line...
+static void qemuMigrationIOFunc(void *arg) { qemuMigrationIOThreadPtr data = arg; @@ -4081,9 +4199,14 @@ qemuMigrationIOFunc(void *arg) break; } } + + if (fds[2].revents & (POLLIN | POLLERR | POLLHUP) && + qemuNBDTunnelAcceptAndPipe(data) < 0) + goto abrt;
This would only seem to be necessary if we have a dconn && uuid set...
}
virStreamFinish(data->qemuStream); + qemuMigrationPipesStop(data->pipes, false);
VIR_FORCE_CLOSE(data->qemuSock); VIR_FREE(buffer); @@ -4097,6 +4220,7 @@ qemuMigrationIOFunc(void *arg) err = NULL; } virStreamAbort(data->qemuStream); + qemuMigrationPipesStop(data->pipes, true); if (err) { virSetError(err); virFreeError(err);
Looks like we can get to error: from within the polling loop, but we don't use qemuMigrationPipesStop there
@@ -4114,7 +4238,9 @@ qemuMigrationIOFunc(void *arg)
static qemuMigrationIOThreadPtr -qemuMigrationStartTunnel(virStreamPtr qemuStream) +qemuMigrationStartTunnel(virStreamPtr qemuStream, + virConnectPtr dconn, + unsigned char uuid[VIR_UUID_BUFLEN])
Again the "const unsigned char *uuid" So what happens when we don't have nbd disks to migrate? Do we really need to have dconn and uuid? Seems that is the assumption...
{ qemuMigrationIOThreadPtr io = NULL; int wakeupFD[2] = { -1, -1 }; @@ -4132,6 +4258,8 @@ qemuMigrationStartTunnel(virStreamPtr qemuStream) io->qemuSock = io->unixSock = -1; io->wakeupRecvFD = wakeupFD[0]; io->wakeupSendFD = wakeupFD[1]; + io->dconn = dconn; + memcpy(io->uuid, uuid, VIR_UUID_BUFLEN);
if (virThreadCreate(&io->thread, true, qemuMigrationIOFunc, @@ -4337,7 +4465,9 @@ qemuMigrationRun(virQEMUDriverPtr driver, VIR_WARN("unable to provide data for graphics client relocation");
if (spec->fwdType != MIGRATION_FWD_DIRECT) { - if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream))) + if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, + dconn, + mig->uuid)))
So this will only matter if we have mig->nbd, migrate_flags & (*DISK|*INC), true? An assumption that's not always true I would think. And thus the 'need' to start things up with that enabled is unnecessary. Seems you'd want to keep qemuMigrationStartTunnel as is, then if/when we known we're going to have this functionality have a "set" function for dconn && uuid... prior to of course setting the nbd_tunnel* in iothread. John FWIW: I'm going to stop here. I've got a backlog of other things to look at right now and it seems the next one makes things even a bit more complicated... Plus as I said in my .0 response - starting at patch 16 I wasn't able to apply the changes.
goto cancel;
if (nmigrate_disks &&

Add and use qemuMigrationPipeEvent piped streams' event handler. It sets the appropriate event flags for each of the stream and pumps the pipe using qemuMigrationPipeIO whenever there is a data at any end. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 0f35c13..43f71e9 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -4010,8 +4010,28 @@ struct _qemuMigrationPipe { qemuMigrationIOThreadPtr data; virStreamPtr local; virStreamPtr remote; + + int local_flags : 4; + int remote_flags : 4; + char buffer[TUNNEL_SEND_BUF_SIZE]; }; +static int +qemuMigrationPipeIO(virStreamPtr from, virStreamPtr to, char *buffer) +{ + int done, got, offset = 0; + got = virStreamRecv(from, buffer, TUNNEL_SEND_BUF_SIZE); + + while (offset < got) { + done = virStreamSend(to, buffer + offset, got - offset); + if (done < 0) + break; + offset += done; + } + + return got; +} + static void qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) { @@ -4030,6 +4050,55 @@ qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort) virObjectUnref(pipe->remote); } +static void +qemuMigrationPipeEvent(virStreamPtr stream, int events, void *opaque) +{ + qemuMigrationPipePtr pipe = opaque; + + if (stream == pipe->remote) + pipe->remote_flags |= events; + if (stream == pipe->local) + pipe->local_flags |= events; + + VIR_DEBUG("remote = %p, remote_flags = %x, local = %p, local_flags = %x", + pipe->remote, pipe->remote_flags, + pipe->local, pipe->local_flags); + + if (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP)) { + char dummy; + virStreamRecv(stream, &dummy, 1); + abrt: + virCopyLastError(&pipe->data->err); + qemuMigrationPipeClose(pipe, true); + if (safewrite(pipe->data->wakeupSendFD, "c", 1) != 1) { + virReportSystemError(errno, "%s", + _("failed to stop migration tunnel")); + } + return; + } + + if ((pipe->remote_flags & VIR_STREAM_EVENT_READABLE) && + (pipe->local_flags & VIR_STREAM_EVENT_WRITABLE)) { + + if (qemuMigrationPipeIO(pipe->remote, pipe->local, pipe->buffer) == -1) + goto abrt; + + pipe->remote_flags &= ~VIR_STREAM_EVENT_READABLE; + pipe->local_flags &= ~VIR_STREAM_EVENT_WRITABLE; + } + + if ((pipe->local_flags & VIR_STREAM_EVENT_READABLE) && + (pipe->remote_flags & VIR_STREAM_EVENT_WRITABLE)) { + + if (qemuMigrationPipeIO(pipe->local, pipe->remote, pipe->buffer) == -1) + goto abrt; + + pipe->local_flags &= ~VIR_STREAM_EVENT_READABLE; + pipe->remote_flags &= ~VIR_STREAM_EVENT_WRITABLE; + } +} + + static qemuMigrationPipePtr qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) { @@ -4041,6 +4110,20 @@ qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote) pipe->local = local; pipe->remote = remote; + if (virStreamEventAddCallback(local, + VIR_STREAM_EVENT_READABLE | + VIR_STREAM_EVENT_WRITABLE, + qemuMigrationPipeEvent, + pipe, NULL) < 0) + goto error; + + if (virStreamEventAddCallback(remote, + VIR_STREAM_EVENT_READABLE | + VIR_STREAM_EVENT_WRITABLE, + qemuMigrationPipeEvent, + pipe, NULL) < 0) + goto error; + return pipe; error: @@ -4230,7 +4313,7 @@ qemuMigrationIOFunc(void *arg) /* Let the source qemu know that the transfer cant continue anymore. * Don't copy the error for EPIPE as destination has the actual error. */ VIR_FORCE_CLOSE(data->qemuSock); - if (!virLastErrorIsSystemErrno(EPIPE)) + if (data->err.code == VIR_ERR_OK && !virLastErrorIsSystemErrno(EPIPE)) virCopyLastError(&data->err); virResetLastError(); VIR_FREE(buffer); -- 1.9.1

Add qemuMonitorNBDServerStartUnix used to instruct QEMU to connect to a UNIX socket as a NBD drive mirror destination. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_monitor.c | 12 ++++++++++++ src/qemu/qemu_monitor.h | 2 ++ src/qemu/qemu_monitor_json.c | 35 +++++++++++++++++++++++++++++++++++ src/qemu/qemu_monitor_json.h | 2 ++ 4 files changed, 51 insertions(+) diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c index 49d4aa2..9c8e0fe 100644 --- a/src/qemu/qemu_monitor.c +++ b/src/qemu/qemu_monitor.c @@ -3594,6 +3594,18 @@ qemuMonitorNBDServerStart(qemuMonitorPtr mon, int +qemuMonitorNBDServerStartUnix(qemuMonitorPtr mon, + const char *file) +{ + VIR_DEBUG("file=%s", file); + + QEMU_CHECK_MONITOR_JSON(mon); + + return qemuMonitorJSONNBDServerStartUnix(mon, file); +} + + +int qemuMonitorNBDServerAdd(qemuMonitorPtr mon, const char *deviceID, bool writable) diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h index 2ce3958..e94ac93 100644 --- a/src/qemu/qemu_monitor.h +++ b/src/qemu/qemu_monitor.h @@ -873,6 +873,8 @@ char *qemuMonitorGetTargetArch(qemuMonitorPtr mon); int qemuMonitorNBDServerStart(qemuMonitorPtr mon, const char *host, unsigned int port); +int qemuMonitorNBDServerStartUnix(qemuMonitorPtr mon, + const char *path); int qemuMonitorNBDServerAdd(qemuMonitorPtr mon, const char *deviceID, bool writable); diff --git a/src/qemu/qemu_monitor_json.c b/src/qemu/qemu_monitor_json.c index b39b29b..ef162a4 100644 --- a/src/qemu/qemu_monitor_json.c +++ b/src/qemu/qemu_monitor_json.c @@ -5749,6 +5749,41 @@ qemuMonitorJSONNBDServerStart(qemuMonitorPtr mon, } int +qemuMonitorJSONNBDServerStartUnix(qemuMonitorPtr mon, + const char *file) +{ + int ret = -1; + virJSONValuePtr cmd = NULL; + virJSONValuePtr reply = NULL; + virJSONValuePtr addr = NULL; + + if (!(addr = qemuMonitorJSONBuildUnixSocketAddress(file))) + return ret; + + if (!(cmd = qemuMonitorJSONMakeCommand("nbd-server-start", + "a:addr", addr, + NULL))) + goto cleanup; + + /* From now on, @addr is part of @cmd */ + addr = NULL; + + if (qemuMonitorJSONCommand(mon, cmd, &reply) < 0) + goto cleanup; + + if (qemuMonitorJSONCheckError(cmd, reply) < 0) + goto cleanup; + + ret = 0; + + cleanup: + virJSONValueFree(reply); + virJSONValueFree(cmd); + virJSONValueFree(addr); + return ret; +} + +int qemuMonitorJSONNBDServerAdd(qemuMonitorPtr mon, const char *deviceID, bool writable) diff --git a/src/qemu/qemu_monitor_json.h b/src/qemu/qemu_monitor_json.h index 120bd93..28e17c0 100644 --- a/src/qemu/qemu_monitor_json.h +++ b/src/qemu/qemu_monitor_json.h @@ -441,6 +441,8 @@ char *qemuMonitorJSONGetTargetArch(qemuMonitorPtr mon); int qemuMonitorJSONNBDServerStart(qemuMonitorPtr mon, const char *host, unsigned int port); +int qemuMonitorJSONNBDServerStartUnix(qemuMonitorPtr mon, + const char *path); int qemuMonitorJSONNBDServerAdd(qemuMonitorPtr mon, const char *deviceID, bool writable); -- 1.9.1

Modify qemuMigrationStartNBDServer so it can instruct QEMU to start NBD server binded to a local UNIX socket. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 43f71e9..303cd47 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -1694,6 +1694,7 @@ qemuMigrationPrecreateStorage(virConnectPtr conn, static int qemuMigrationStartNBDServer(virQEMUDriverPtr driver, virDomainObjPtr vm, + bool tunnel, const char *listenAddr, size_t nmigrate_disks, const char **migrate_disks) @@ -1701,8 +1702,9 @@ qemuMigrationStartNBDServer(virQEMUDriverPtr driver, int ret = -1; qemuDomainObjPrivatePtr priv = vm->privateData; unsigned short port = 0; - char *diskAlias = NULL; + char *diskAlias = NULL, *tunnelName = NULL; size_t i; + virQEMUDriverConfigPtr cfg = virQEMUDriverGetConfig(driver); for (i = 0; i < vm->def->ndisks; i++) { virDomainDiskDefPtr disk = vm->def->disks[i]; @@ -1720,12 +1722,20 @@ qemuMigrationStartNBDServer(virQEMUDriverPtr driver, QEMU_ASYNC_JOB_MIGRATION_IN) < 0) goto cleanup; - if (!port && + if (!tunnel && !port && ((virPortAllocatorAcquire(driver->migrationPorts, &port) < 0) || (qemuMonitorNBDServerStart(priv->mon, listenAddr, port) < 0))) { goto exit_monitor; } + if (tunnel && !tunnelName && + ((virAsprintf(&tunnelName, + "%s/domain-%s/qemu.nbdtunnelmigrate.src", + cfg->libDir, vm->def->name) < 0) || + (qemuMonitorNBDServerStartUnix(priv->mon, tunnelName) < 0))) { + goto exit_monitor; + } + if (qemuMonitorNBDServerAdd(priv->mon, diskAlias, true) < 0) goto exit_monitor; if (qemuDomainObjExitMonitor(driver, vm) < 0) @@ -1736,7 +1746,9 @@ qemuMigrationStartNBDServer(virQEMUDriverPtr driver, ret = 0; cleanup: + virObjectUnref(cfg); VIR_FREE(diskAlias); + VIR_FREE(tunnelName); if (ret < 0) virPortAllocatorRelease(driver->migrationPorts, port); return ret; @@ -3488,7 +3500,7 @@ qemuMigrationPrepareAny(virQEMUDriverPtr driver, if (mig->nbd && flags & (VIR_MIGRATE_NON_SHARED_DISK | VIR_MIGRATE_NON_SHARED_INC) && virQEMUCapsGet(priv->qemuCaps, QEMU_CAPS_NBD_SERVER)) { - if (qemuMigrationStartNBDServer(driver, vm, listenAddress, + if (qemuMigrationStartNBDServer(driver, vm, tunnel, listenAddress, nmigrate_disks, migrate_disks) < 0) { /* error already reported */ goto endjob; -- 1.9.1

Introduce an auxiliary handler domainMigrateOpenTunnel for QEMU. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 19 +++++++++++++++++++ src/qemu/qemu_migration.h | 6 ++++++ 2 files changed, 25 insertions(+) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 303cd47..4708387 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3605,6 +3605,25 @@ qemuMigrationPrepareTunnel(virQEMUDriverPtr driver, } +int +qemuMigrationOpenTunnel(virQEMUDriverPtr driver, + virConnectPtr dconn, + virStreamPtr st, + virDomainDefPtr def, + unsigned long flags) +{ + VIR_DEBUG("driver=%p, dconn=%p, st=%p, def=%p, flags=%lx", + driver, dconn, st, def, flags); + + if (st == NULL) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("opening a tunnel requested but NULL stream passed")); + return -1; + } + + return 0; +} + static virURIPtr qemuMigrationParseURI(const char *uri, bool *wellFormed) { diff --git a/src/qemu/qemu_migration.h b/src/qemu/qemu_migration.h index 8175f4b..f91791e 100644 --- a/src/qemu/qemu_migration.h +++ b/src/qemu/qemu_migration.h @@ -121,6 +121,12 @@ int qemuMigrationPrepareTunnel(virQEMUDriverPtr driver, const char *origname, unsigned long flags); +int qemuMigrationOpenTunnel(virQEMUDriverPtr driver, + virConnectPtr dconn, + virStreamPtr st, + virDomainDefPtr def, + unsigned long flags); + int qemuMigrationPrepareDirect(virQEMUDriverPtr driver, virConnectPtr dconn, const char *cookiein, -- 1.9.1

Add domainMigrateOpenTunnel handler for QEMU driver. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_driver.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c index 92a9961..ad9a6a0 100644 --- a/src/qemu/qemu_driver.c +++ b/src/qemu/qemu_driver.c @@ -20064,6 +20064,29 @@ static int qemuDomainRename(virDomainPtr dom, goto endjob; } +static int qemuDomainMigrateOpenTunnel(virConnectPtr dconn, + virStreamPtr st, + unsigned char uuid[VIR_UUID_BUFLEN], + unsigned int flags) +{ + virQEMUDriverPtr driver = dconn->privateData; + int ret = -1; + virDomainObjPtr vm; + + virCheckFlags(VIR_MIGRATE_TUNNEL_NBD, -1); + + vm = virDomainObjListFindByUUIDRef(driver->domains, uuid); + + if (virDomainMigrateOpenTunnelEnsureACL(dconn, vm->def) < 0) + goto cleanup; + + ret = qemuMigrationOpenTunnel(driver, dconn, st, vm->def, flags); + + cleanup: + virDomainObjEndAPI(&vm); + return ret; +} + static virHypervisorDriver qemuHypervisorDriver = { .name = QEMU_DRIVER_NAME, .connectOpen = qemuConnectOpen, /* 0.2.0 */ @@ -20272,6 +20295,7 @@ static virHypervisorDriver qemuHypervisorDriver = { .domainInterfaceAddresses = qemuDomainInterfaceAddresses, /* 1.2.14 */ .domainSetUserPassword = qemuDomainSetUserPassword, /* 1.2.16 */ .domainRename = qemuDomainRename, /* 1.2.19 */ + .domainMigrateOpenTunnel = qemuDomainMigrateOpenTunnel, /* 1.2.XX */ }; -- 1.9.1

Add qemuMigrationOpenNBDTunnel that connects a remote stream to the local NBD UNIX socket. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 4708387..27c1acb 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3605,6 +3605,32 @@ qemuMigrationPrepareTunnel(virQEMUDriverPtr driver, } +static int +qemuMigrationOpenNBDTunnel(virQEMUDriverPtr driver, + virStreamPtr st, + const char *name) +{ + char *tunnelName = NULL; + int ret = -1; + virQEMUDriverConfigPtr cfg = virQEMUDriverGetConfig(driver); + + if (virAsprintf(&tunnelName, + "%s/domain-%s/qemu.nbdtunnelmigrate.src", + cfg->libDir, name) < 0) + goto cleanup; + + if (virFDStreamConnectUNIX(st, tunnelName, false) < 0) + goto cleanup; + + ret = 0; + + cleanup: + VIR_FREE(tunnelName); + virObjectUnref(cfg); + return ret; +} + + int qemuMigrationOpenTunnel(virQEMUDriverPtr driver, virConnectPtr dconn, @@ -3621,6 +3647,9 @@ qemuMigrationOpenTunnel(virQEMUDriverPtr driver, return -1; } + if (flags & VIR_MIGRATE_TUNNEL_NBD) + return qemuMigrationOpenNBDTunnel(driver, st, def->name); + return 0; } -- 1.9.1

Now that all the pieces are in their places finally allow NBD in tunnelled migration. Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/qemu/qemu_migration.c | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c index 27c1acb..9520e34 100644 --- a/src/qemu/qemu_migration.c +++ b/src/qemu/qemu_migration.c @@ -3041,13 +3041,6 @@ qemuMigrationBeginPhase(virQEMUDriverPtr driver, goto cleanup; } } - - if (flags & VIR_MIGRATE_TUNNELLED) { - virReportError(VIR_ERR_OPERATION_UNSUPPORTED, "%s", - _("Selecting disks to migrate is not " - "implemented for tunnelled migration")); - goto cleanup; - } } else { virReportError(VIR_ERR_OPERATION_UNSUPPORTED, "%s", _("qemu does not support drive-mirror command")); @@ -3056,13 +3049,8 @@ qemuMigrationBeginPhase(virQEMUDriverPtr driver, } if (has_drive_mirror) { - /* TODO support NBD for TUNNELLED migration */ - if (flags & VIR_MIGRATE_TUNNELLED) { - VIR_WARN("NBD in tunnelled migration is currently not supported"); - } else { - cookieFlags |= QEMU_MIGRATION_COOKIE_NBD; - priv->nbdPort = 0; - } + cookieFlags |= QEMU_MIGRATION_COOKIE_NBD; + priv->nbdPort = 0; } } -- 1.9.1

Signed-off-by: Pavel Boldin <pboldin@mirantis.com> --- src/security/virt-aa-helper.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/security/virt-aa-helper.c b/src/security/virt-aa-helper.c index 5de56e5..87af98f 100644 --- a/src/security/virt-aa-helper.c +++ b/src/security/virt-aa-helper.c @@ -1367,9 +1367,9 @@ main(int argc, char **argv) LOCALSTATEDIR, ctl->def->name); virBufferAsprintf(&buf, " \"/run/libvirt/**/%s.pid\" rwk,\n", ctl->def->name); - virBufferAsprintf(&buf, " \"%s/run/libvirt/**/*.tunnelmigrate.dest.%s\" rw,\n", + virBufferAsprintf(&buf, " \"%s/lib/libvirt/qemu/domain-%s/*tunnelmigrate.src\" rw,\n", LOCALSTATEDIR, ctl->def->name); - virBufferAsprintf(&buf, " \"/run/libvirt/**/*.tunnelmigrate.dest.%s\" rw,\n", + virBufferAsprintf(&buf, " \"/lib/libvirt/qemu/domain-%s/*tunnelmigrate.src\" rw,\n", ctl->def->name); } if (ctl->files) -- 1.9.1

Ping. May I have your attention guys? Pavel On Wed, Nov 18, 2015 at 8:12 PM, Pavel Boldin <pboldin@mirantis.com> wrote:
The provided patchset implements NBD disk migration over a tunnelled connection provided by libvirt.
The migration source instructs QEMU to NBD mirror drives into the provided UNIX socket. These connections and all the data are then tunnelled to the destination using newly introduced RPC call. The migration destination implements a driver method that connects the tunnelled stream to the QEMU's NBD destination.
The detailed scheme is the following:
PREPARE 1. Migration destination starts QEMU's NBD server listening on a UNIX socket using the `nbd-server-add` monitor command and tells NBD to accept listed disks via code added to qemuMigrationStartNBDServer that calls introduced qemuMonitorNBDServerStartUnix monitor function.
PERFORM 2. Migration source creates a UNIX socket that is later used as NBDs destination in `drive-mirror` monitor command.
This is implemented as a call to virNetSocketNewListenUnix from doTunnelMigrate.
3. Source starts IOThread that polls on the UNIX socket, accepting every incoming QEMU connection.
This is done by adding a new pollfd in the poll(2) call in qemuMigrationIOFunc that calls introduced qemuNBDTunnelAcceptAndPipe function.
4. The qemuNBDTunnelAcceptAndPipe function accepts the connection and creates two virStream's. One is `local` that is later associated with just accepted connection using virFDStreamOpen. Second is `remote` that is later tunnelled to the remote destination stream.
The `local` stream is converted to a virFDStreamDrv stream using the virFDStreamOpen call on the fd returned by accept(2).
The `remote` stream is associated with a stream on the destination in the way similar to used by PrepareTunnel3* function. That is, the virDomainMigrateOpenTunnel function called on the destination connection object. The virDomainMigrateOpenTunnel calls remote driver's handler remoteDomainMigrateOpenTunnel that makes DOMAIN_MIGRATE_OPEN_TUNNEL call to the destination host. The code in remoteDomainMigrateOpenTunnel ties passed virStream object to a virStream on the destination host via remoteStreamDrv driver. The remote driver handles stream's IO by tunnelling data through the RPC connection.
The qemuNBDTunnelAcceptAndPipe at last assigns both streams the same event callback qemuMigrationPipeEvent. Its job is to track statuses of the streams doing IO whenever it is necessary.
5. Source starts the drive mirroring using the qemuMigrationDriveMirror func. The function instructs QEMU to mirror drives to the UNIX socket that thread listens on.
Since it is necessary for the mirror driving to get into the 'synchronized' state, where writes go to both destinations simultaneously, before continuing VM migration, the thread serving the connections must be started earlier.
6. When the connection to a UNIX socket on the migration source is made the DOMAIN_MIGRATE_OPEN_TUNNEL proc is called on the migration destination.
The handler of this code calls virDomainMigrateOpenTunnel which calls qemuMigrationOpenNBDTunnel by the means of qemuDomainMigrateOpenTunnel.
The qemuMigrationOpenNBDTunnel connects the stream linked to a source's stream to the NBD's UNIX socket on the migration destination side.
7. The rest of the disk migration occurs semimagically: virStream* APIs tunnel data in both directions. This is done by qemuMigrationPipeEvent event callback set for both streams.
The order of the patches is roughly the following:
* First, the RPC machinery and remote driver's virDrvDomainMigrateOpenTunnel implementation are added.
* Then, the source-side of the protocol is implemented: code listening on a UNIX socket is added, DriveMirror is enhanced to instruct QEMU to `drive-mirror` here and starting IOThread driving the tunneling sooner.
* After that, the destination-side of the protocol is implemented: the qemuMonitorNBDServerStartUnix added and qemuMigrationStartNBDServer enhanced to call it. The qemuDomainMigrateOpenTunnel is implemented along with qemuMigrationOpenNBDTunnel that does the real job.
* Finally, the code blocking NBD migration for tunnelled migration is removed.
Pavel Boldin (21): rpc: add DOMAIN_MIGRATE_OPEN_TUNNEL proc driver: add virDrvDomainMigrateOpenTunnel remote_driver: introduce virRemoteClientNew remote_driver: add remoteDomainMigrateOpenTunnel domain: add virDomainMigrateOpenTunnel domain: add virDomainMigrateTunnelFlags remote: impl remoteDispatchDomainMigrateOpenTunnel qemu: migration: src: add nbd tunnel socket data qemu: migration: src: nbdtunnel unix socket qemu: migration: src: qemu `drive-mirror` to UNIX qemu: migration: src: qemuSock for running thread qemu: migration: src: add NBD unixSock to iothread qemu: migration: src: qemuNBDTunnelAcceptAndPipe qemu: migration: src: stream piping qemu: monitor: add qemuMonitorNBDServerStartUnix qemu: migration: dest: nbd-server to UNIX sock qemu: migration: dest: qemuMigrationOpenTunnel qemu: driver: add qemuDomainMigrateOpenTunnel qemu: migration: dest: qemuMigrationOpenNBDTunnel qemu: migration: allow NBD tunneling migration apparmor: fix tunnelmigrate permissions
daemon/remote.c | 50 ++++ docs/apibuild.py | 1 + docs/hvsupport.pl | 1 + include/libvirt/libvirt-domain.h | 3 + src/driver-hypervisor.h | 8 + src/libvirt-domain.c | 43 ++++ src/libvirt_internal.h | 6 + src/libvirt_private.syms | 1 + src/qemu/qemu_driver.c | 24 ++ src/qemu/qemu_migration.c | 495 +++++++++++++++++++++++++++++++++------ src/qemu/qemu_migration.h | 6 + src/qemu/qemu_monitor.c | 12 + src/qemu/qemu_monitor.h | 2 + src/qemu/qemu_monitor_json.c | 35 +++ src/qemu/qemu_monitor_json.h | 2 + src/remote/remote_driver.c | 91 +++++-- src/remote/remote_protocol.x | 19 +- src/remote_protocol-structs | 8 + src/security/virt-aa-helper.c | 4 +- 19 files changed, 719 insertions(+), 92 deletions(-)
-- 1.9.1

On 11/18/2015 01:12 PM, Pavel Boldin wrote:
The provided patchset implements NBD disk migration over a tunnelled connection provided by libvirt.
[...]
daemon/remote.c | 50 ++++ docs/apibuild.py | 1 + docs/hvsupport.pl | 1 + include/libvirt/libvirt-domain.h | 3 + src/driver-hypervisor.h | 8 + src/libvirt-domain.c | 43 ++++ src/libvirt_internal.h | 6 + src/libvirt_private.syms | 1 + src/qemu/qemu_driver.c | 24 ++ src/qemu/qemu_migration.c | 495 +++++++++++++++++++++++++++++++++------ src/qemu/qemu_migration.h | 6 + src/qemu/qemu_monitor.c | 12 + src/qemu/qemu_monitor.h | 2 + src/qemu/qemu_monitor_json.c | 35 +++ src/qemu/qemu_monitor_json.h | 2 + src/remote/remote_driver.c | 91 +++++-- src/remote/remote_protocol.x | 19 +- src/remote_protocol-structs | 8 + src/security/virt-aa-helper.c | 4 +- 19 files changed, 719 insertions(+), 92 deletions(-)
Although not my area of expertise - figured I could give this series at least a glance as I'm working my way through the list I had of unread patches. Also I was only able to git am -3 the first 15 patches... after that some other change gets in the way. You may need to fix up a few things and repost. Maybe go with shorter series to at least make progress... Some high level thoughts... First off - obviously it's a larger series. You see 21 patches and know you have to set aside the time for proper review... Of course many are short which is exactly what is "requested", still I assume the quantity causes the "put this on my todo list" reaction - hence the delay in anyone looking. Not an excuse for having something upstream for a bit without review, but I hope it's a logical explanation... Second - I note liberal usage of "unsigned char uuid[VIR_UUID_BUFLEN] in function headers. I believe those should be replaced by "const unsigned char *uuid" instead. IIRC this can lead to buffer overflow type issues (think stack space for args). Third - could this tunnel be possibly used more generically? That is this use is for NBD, but just the barebones indicate it's an extra communication stream/channel. Would it be beneficial to pass more than a remote_uuid. Perhaps remote resource name and/or uri? A number of migrate API's seem to use a cookie - is that something that would be useful? That's a finer technical detail that I hope can be worked out. Hopefully someone with that finer and detailed technical knowledge of the migration protocol will also jump in ;-) John

On Wed, Nov 18, 2015 at 20:12:58 +0200, Pavel Boldin wrote:
The provided patchset implements NBD disk migration over a tunnelled connection provided by libvirt.
The migration source instructs QEMU to NBD mirror drives into the provided UNIX socket. These connections and all the data are then tunnelled to the destination using newly introduced RPC call. The migration destination implements a driver method that connects the tunnelled stream to the QEMU's NBD destination.
To be honest, I'm still in doubts this is all worth the effort. This code will likely be unused once both QEMU and libvirt gain proper TLS support for migration. The current tunneled migration is known to be slow due to the big overhead and even with the presence of a better alternative we'd still have to maintain this code. Not to mention that it increases the (already great) complexity of our migration code and adds another dimension to a testing matrix. Jirka

On 08.01.2016 02:30, Jiri Denemark wrote:
On Wed, Nov 18, 2015 at 20:12:58 +0200, Pavel Boldin wrote:
The provided patchset implements NBD disk migration over a tunnelled connection provided by libvirt.
The migration source instructs QEMU to NBD mirror drives into the provided UNIX socket. These connections and all the data are then tunnelled to the destination using newly introduced RPC call. The migration destination implements a driver method that connects the tunnelled stream to the QEMU's NBD destination.
To be honest, I'm still in doubts this is all worth the effort. This code will likely be unused once both QEMU and libvirt gain proper TLS support for migration. The current tunneled migration is known to be slow due to the big overhead and even with the presence of a better alternative we'd still have to maintain this code. Not to mention that it increases the (already great) complexity of our migration code and adds another dimension to a testing matrix.
Jirka
Hi. Here in virtuozzo we need to support encrypted migration in near future and want to be able to pass all migration traffic thru single socket too. So we upvote this patch if it matters. If technical implications are so high could we elaborate another approach to organize single socket encrypted migration? There is an RFC already - https://www.redhat.com/archives/libvir-list/2015-November/msg00294.html. However because it is probably too short and Jan's comments gives some hints too I'll rephrase it here. It is perfectly possible to organize external migration tunnel with current options. We can set migration uri to "127.0.0.1:port" and use VIR_MIGRATE_PARAM_LISTEN_ADDRESS equal to "127.0.0.1". This set of options would cause source qemu to migrate to "127.0.0.1:port" and destination qemu to handle migration on "127.0.0.1:port". Thus we can tunnel by forwarding "port" from source to destination. However if VM has non shared disks we can't do it. The problem is that port for disks migration will be choosen by destination side automatiacally and we have no possibility to organize an appropriate port forwarding beforehand. So the proposition is to add one more option, say VIR_MIGRATE_PARAM_NBD_PORT so that nbd port could be specified in migration command. This should not add much complexity.
-- libvir-list mailing list libvir-list@redhat.com https://www.redhat.com/mailman/listinfo/libvir-list
participants (4)
-
Jiri Denemark
-
John Ferlan
-
Nikolay Shirokovskiy
-
Pavel Boldin