On 11/18/2015 01:13 PM, Pavel Boldin wrote:
Tunnelled drive mirroring requires an active thread to accept
incoming
connections from the QEMU and pumping them to the remote host through
the tunnel.
For this, we need to split thread's QEMU socket initialization from
the start of the thread and introduce qemuMigrationSetQEMUSocket
to specify it later.
This is a whole lot more going that isn't explained.... e.g. 's',
'f',
'u'... The polling loop would now seem to "wait" for a data socket to
be created/added.
Also, even though it adds patches, perhaps would have been easier to
understand by renaming the fields first, then flip-flopping the order,
then splitting the setting of qemuSock until the 'u' is seen.
Signed-off-by: Pavel Boldin <pboldin(a)mirantis.com>
---
src/qemu/qemu_migration.c | 93 ++++++++++++++++++++++++++++++-----------------
1 file changed, 59 insertions(+), 34 deletions(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index d95cd66..61e78c5 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -3991,14 +3991,15 @@ typedef struct _qemuMigrationIOThread qemuMigrationIOThread;
typedef qemuMigrationIOThread *qemuMigrationIOThreadPtr;
struct _qemuMigrationIOThread {
virThread thread;
- virStreamPtr st;
- int sock;
+ virStreamPtr qemuStream;
+ int qemuSock;
virError err;
int wakeupRecvFD;
int wakeupSendFD;
};
-static void qemuMigrationIOFunc(void *arg)
+static void
+qemuMigrationIOFunc(void *arg)
{
qemuMigrationIOThreadPtr data = arg;
char *buffer = NULL;
@@ -4006,21 +4007,18 @@ static void qemuMigrationIOFunc(void *arg)
int timeout = -1;
virErrorPtr err = NULL;
- VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d",
- data->st, data->sock);
+ VIR_DEBUG("Running migration tunnel; qemuStream=%p", data->qemuStream);
Since sock is "sent" to fds[0], thus isn't no longer used. So why is it
passed?
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0)
goto abrt;
- fds[0].fd = data->sock;
- fds[1].fd = data->wakeupRecvFD;
+ fds[0].fd = data->wakeupRecvFD;
+ fds[1].fd = -1;
+ fds[0].events = fds[1].events = POLLIN;
for (;;) {
int ret;
- fds[0].events = fds[1].events = POLLIN;
- fds[0].revents = fds[1].revents = 0;
-
Don't think we want to lose the revents = 0.
According to how I read the man page, because the fds[1] = -1, it'd be
set to 0 anyway on return. But once fds[1] is set - since we're polling
two fd's here - how do you guarantee in the following code that you
wouldn't be "rereading and resending" on fds[1] if fds[0] is what
tripped the poll?
ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
if (ret < 0) {
@@ -4040,30 +4038,36 @@ static void qemuMigrationIOFunc(void *arg)
break;
}
- if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
- char stop = 0;
+ if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
+ char action = 0;
- if (saferead(data->wakeupRecvFD, &stop, 1) != 1) {
+ if (saferead(data->wakeupRecvFD, &action, 1) != 1) {
virReportSystemError(errno, "%s",
_("failed to read from wakeup fd"));
goto abrt;
}
- VIR_DEBUG("Migration tunnel was asked to %s",
- stop ? "abort" : "finish");
- if (stop) {
- goto abrt;
- } else {
- timeout = 0;
+ VIR_DEBUG("Migration tunnel was asked to %c", action);
+ switch (action) {
+ case 's':
+ goto abrt;
+ break;
+ case 'f':
+ timeout = 0;
+ break;
+ case 'u':
+ fds[1].fd = data->qemuSock;
+ VIR_DEBUG("qemuSock set %d", data->qemuSock);
+ break;
}
}
- if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
+ if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
int nbytes;
- nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
+ nbytes = saferead(data->qemuSock, buffer, TUNNEL_SEND_BUF_SIZE);
if (nbytes > 0) {
- if (virStreamSend(data->st, buffer, nbytes) < 0)
+ if (virStreamSend(data->qemuStream, buffer, nbytes) < 0)
goto error;
} else if (nbytes < 0) {
virReportSystemError(errno, "%s",
@@ -4076,10 +4080,9 @@ static void qemuMigrationIOFunc(void *arg)
}
}
- if (virStreamFinish(data->st) < 0)
- goto error;
+ virStreamFinish(data->qemuStream);
It would seem to me we shouldn't be losing this goto error on failure.
John
- VIR_FORCE_CLOSE(data->sock);
+ VIR_FORCE_CLOSE(data->qemuSock);
VIR_FREE(buffer);
return;
@@ -4090,7 +4093,7 @@ static void qemuMigrationIOFunc(void *arg)
virFreeError(err);
err = NULL;
}
- virStreamAbort(data->st);
+ virStreamAbort(data->qemuStream);
if (err) {
virSetError(err);
virFreeError(err);
@@ -4099,7 +4102,7 @@ static void qemuMigrationIOFunc(void *arg)
error:
/* Let the source qemu know that the transfer cant continue anymore.
* Don't copy the error for EPIPE as destination has the actual error. */
- VIR_FORCE_CLOSE(data->sock);
+ VIR_FORCE_CLOSE(data->qemuSock);
if (!virLastErrorIsSystemErrno(EPIPE))
virCopyLastError(&data->err);
virResetLastError();
@@ -4108,8 +4111,7 @@ static void qemuMigrationIOFunc(void *arg)
static qemuMigrationIOThreadPtr
-qemuMigrationStartTunnel(virStreamPtr st,
- int sock)
+qemuMigrationStartTunnel(virStreamPtr qemuStream)
{
qemuMigrationIOThreadPtr io = NULL;
int wakeupFD[2] = { -1, -1 };
@@ -4123,8 +4125,8 @@ qemuMigrationStartTunnel(virStreamPtr st,
if (VIR_ALLOC(io) < 0)
goto error;
- io->st = st;
- io->sock = sock;
+ io->qemuStream = qemuStream;
+ io->qemuSock = -1;
io->wakeupRecvFD = wakeupFD[0];
io->wakeupSendFD = wakeupFD[1];
@@ -4149,10 +4151,10 @@ static int
qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
{
int rv = -1;
- char stop = error ? 1 : 0;
+ char action = error ? 's' : 'f';
/* make sure the thread finishes its job and is joinable */
- if (safewrite(io->wakeupSendFD, &stop, 1) != 1) {
+ if (safewrite(io->wakeupSendFD, &action, 1) != 1) {
virReportSystemError(errno, "%s",
_("failed to wakeup migration tunnel"));
goto cleanup;
@@ -4180,6 +4182,26 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
}
static int
+qemuMigrationSetQEMUSocket(qemuMigrationIOThreadPtr io, int sock)
+{
+ int rv = -1;
+ char action = 'u';
+
+ io->qemuSock = sock;
+
+ if (safewrite(io->wakeupSendFD, &action, 1) != 1) {
+ virReportSystemError(errno, "%s",
+ _("failed to update migration tunnel"));
+ goto error;
+ }
+
+ rv = 0;
+
+ error:
+ return rv;
+}
+
+static int
qemuMigrationConnect(virQEMUDriverPtr driver,
virDomainObjPtr vm,
qemuMigrationSpecPtr spec)
@@ -4422,7 +4444,10 @@ qemuMigrationRun(virQEMUDriverPtr driver,
}
if (spec->fwdType != MIGRATION_FWD_DIRECT) {
- if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream, fd)))
+ if (!(iothread = qemuMigrationStartTunnel(spec->fwd.stream)))
+ goto cancel;
+
+ if (qemuMigrationSetQEMUSocket(iothread, fd) < 0)
goto cancel;
/* If we've created a tunnel, then the 'fd' will be closed in the
* qemuMigrationIOFunc as data->sock.