Add and use qemuMigrationPipeEvent piped streams' event handler. It
sets the appropriate event flags for each of the stream and pumps the
pipe using qemuMigrationPipeIO whenever there is a data at any end.
Signed-off-by: Pavel Boldin <pboldin(a)mirantis.com>
---
src/qemu/qemu_migration.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 84 insertions(+), 1 deletion(-)
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index 0f35c13..43f71e9 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -4010,8 +4010,28 @@ struct _qemuMigrationPipe {
qemuMigrationIOThreadPtr data;
virStreamPtr local;
virStreamPtr remote;
+
+ int local_flags : 4;
+ int remote_flags : 4;
+ char buffer[TUNNEL_SEND_BUF_SIZE];
};
+static int
+qemuMigrationPipeIO(virStreamPtr from, virStreamPtr to, char *buffer)
+{
+ int done, got, offset = 0;
+ got = virStreamRecv(from, buffer, TUNNEL_SEND_BUF_SIZE);
+
+ while (offset < got) {
+ done = virStreamSend(to, buffer + offset, got - offset);
+ if (done < 0)
+ break;
+ offset += done;
+ }
+
+ return got;
+}
+
static void
qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort)
{
@@ -4030,6 +4050,55 @@ qemuMigrationPipeClose(qemuMigrationPipePtr pipe, bool abort)
virObjectUnref(pipe->remote);
}
+static void
+qemuMigrationPipeEvent(virStreamPtr stream, int events, void *opaque)
+{
+ qemuMigrationPipePtr pipe = opaque;
+
+ if (stream == pipe->remote)
+ pipe->remote_flags |= events;
+ if (stream == pipe->local)
+ pipe->local_flags |= events;
+
+ VIR_DEBUG("remote = %p, remote_flags = %x, local = %p, local_flags = %x",
+ pipe->remote, pipe->remote_flags,
+ pipe->local, pipe->local_flags);
+
+ if (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP)) {
+ char dummy;
+ virStreamRecv(stream, &dummy, 1);
+ abrt:
+ virCopyLastError(&pipe->data->err);
+ qemuMigrationPipeClose(pipe, true);
+ if (safewrite(pipe->data->wakeupSendFD, "c", 1) != 1) {
+ virReportSystemError(errno, "%s",
+ _("failed to stop migration tunnel"));
+ }
+ return;
+ }
+
+ if ((pipe->remote_flags & VIR_STREAM_EVENT_READABLE) &&
+ (pipe->local_flags & VIR_STREAM_EVENT_WRITABLE)) {
+
+ if (qemuMigrationPipeIO(pipe->remote, pipe->local, pipe->buffer) == -1)
+ goto abrt;
+
+ pipe->remote_flags &= ~VIR_STREAM_EVENT_READABLE;
+ pipe->local_flags &= ~VIR_STREAM_EVENT_WRITABLE;
+ }
+
+ if ((pipe->local_flags & VIR_STREAM_EVENT_READABLE) &&
+ (pipe->remote_flags & VIR_STREAM_EVENT_WRITABLE)) {
+
+ if (qemuMigrationPipeIO(pipe->local, pipe->remote, pipe->buffer) == -1)
+ goto abrt;
+
+ pipe->local_flags &= ~VIR_STREAM_EVENT_READABLE;
+ pipe->remote_flags &= ~VIR_STREAM_EVENT_WRITABLE;
+ }
+}
+
+
static qemuMigrationPipePtr
qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote)
{
@@ -4041,6 +4110,20 @@ qemuMigrationPipeCreate(virStreamPtr local, virStreamPtr remote)
pipe->local = local;
pipe->remote = remote;
+ if (virStreamEventAddCallback(local,
+ VIR_STREAM_EVENT_READABLE |
+ VIR_STREAM_EVENT_WRITABLE,
+ qemuMigrationPipeEvent,
+ pipe, NULL) < 0)
+ goto error;
+
+ if (virStreamEventAddCallback(remote,
+ VIR_STREAM_EVENT_READABLE |
+ VIR_STREAM_EVENT_WRITABLE,
+ qemuMigrationPipeEvent,
+ pipe, NULL) < 0)
+ goto error;
+
return pipe;
error:
@@ -4230,7 +4313,7 @@ qemuMigrationIOFunc(void *arg)
/* Let the source qemu know that the transfer cant continue anymore.
* Don't copy the error for EPIPE as destination has the actual error. */
VIR_FORCE_CLOSE(data->qemuSock);
- if (!virLastErrorIsSystemErrno(EPIPE))
+ if (data->err.code == VIR_ERR_OK && !virLastErrorIsSystemErrno(EPIPE))
virCopyLastError(&data->err);
virResetLastError();
VIR_FREE(buffer);
--
1.9.1