From: "Daniel P. Berrange" <berrange(a)redhat.com>
The GIO GInputStream/GOutputStream async model for I/O does not
work for working with non-blocking bi-directional streams. To
allow that to be done more effectively, add an API to allow
main loop watches to be registered against streams.
Since the libvirt level virStreamEventAddCallback API only allows
a single callback to be registered to a stream at any time, the
GVirStream object needs to be multiplexing of multiple watches into
a single libvirt level callback.
Watches can be removed in the normal way with g_source_remove
* libvirt-gobject/libvirt-gobject-stream.c,
libvirt-gobject/libvirt-gobject-stream.h,
libvirt-gobject/libvirt-gobject.sym: Add gvir_stream_add_watch
---
libvirt-gobject/libvirt-gobject-stream.c | 180 ++++++++++++++++++++++++++++++
libvirt-gobject/libvirt-gobject-stream.h | 17 +++
libvirt-gobject/libvirt-gobject.sym | 1 +
3 files changed, 198 insertions(+), 0 deletions(-)
diff --git a/libvirt-gobject/libvirt-gobject-stream.c
b/libvirt-gobject/libvirt-gobject-stream.c
index 0d1c2d1..03b2c84 100644
--- a/libvirt-gobject/libvirt-gobject-stream.c
+++ b/libvirt-gobject/libvirt-gobject-stream.c
@@ -46,8 +46,20 @@ struct _GVirStreamPrivate
virStreamPtr handle;
GInputStream *input_stream;
GOutputStream *output_stream;
+
+ gboolean eventRegistered;
+ int eventLast;
+ GList *sources;
};
+typedef struct {
+ GSource source;
+ GVirStreamIOCondition cond;
+ GVirStreamIOCondition newCond;
+ GVirStream *stream;
+} GVirStreamSource;
+
+
G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
@@ -186,6 +198,7 @@ static void gvir_stream_finalize(GObject *object)
{
GVirStream *self = GVIR_STREAM(object);
GVirStreamPrivate *priv = self->priv;
+ GList *tmp;
DEBUG("Finalize GVirStream=%p", self);
@@ -199,6 +212,14 @@ static void gvir_stream_finalize(GObject *object)
virStreamFree(priv->handle);
}
+ tmp = priv->sources;
+ while (tmp) {
+ GVirStreamSource *source = tmp->data;
+ g_source_remove(g_source_get_id((GSource*)source));
+ tmp = tmp->next;
+ }
+ g_list_free(priv->sources);
+
G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object);
}
@@ -448,3 +469,162 @@ gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func,
gpointer user_
return r;
}
+
+
+static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED,
+ int events,
+ void *opaque)
+{
+ GVirStream *stream = GVIR_STREAM(opaque);
+ GVirStreamPrivate *priv = stream->priv;
+ GList *tmp = priv->sources;
+
+ while (tmp) {
+ GVirStreamSource *source = tmp->data;
+ source->newCond = 0;
+ if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) {
+ if (events & VIR_STREAM_EVENT_READABLE)
+ source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE;
+ if (events & VIR_STREAM_EVENT_HANGUP)
+ source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
+ if (events & VIR_STREAM_EVENT_ERROR)
+ source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
+ }
+ if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) {
+ if (events & VIR_STREAM_EVENT_WRITABLE)
+ source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE;
+ if (events & VIR_STREAM_EVENT_HANGUP)
+ source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
+ if (events & VIR_STREAM_EVENT_ERROR)
+ source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
+ }
+ tmp = tmp->next;
+ }
+
+}
+
+
+static void gvir_stream_update_events(GVirStream *stream)
+{
+ GVirStreamPrivate *priv = stream->priv;
+ int mask = 0;
+ GList *tmp = priv->sources;
+
+ while (tmp) {
+ GVirStreamSource *source = tmp->data;
+ if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE)
+ mask |= VIR_STREAM_EVENT_READABLE;
+ if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE)
+ mask |= VIR_STREAM_EVENT_WRITABLE;
+ tmp = tmp->next;
+ }
+
+ if (mask) {
+ if (priv->eventRegistered) {
+ virStreamEventUpdateCallback(priv->handle, mask);
+ } else {
+ virStreamEventAddCallback(priv->handle, mask,
+ gvir_stream_handle_events,
+ g_object_ref(stream),
+ g_object_unref);
+ priv->eventRegistered = TRUE;
+ }
+ } else {
+ if (priv->eventRegistered) {
+ virStreamEventRemoveCallback(priv->handle);
+ priv->eventRegistered = FALSE;
+ }
+ }
+}
+
+static gboolean gvir_stream_source_prepare(GSource *source,
+ gint *timeout)
+{
+ GVirStreamSource *gsource = (GVirStreamSource*)source;
+ if (gsource->newCond) {
+ *timeout = 0;
+ return TRUE;
+ }
+ *timeout = -1;
+ return FALSE;
+}
+
+static gboolean gvir_stream_source_check(GSource *source)
+{
+ GVirStreamSource *gsource = (GVirStreamSource*)source;
+ if (gsource->newCond)
+ return TRUE;
+ return FALSE;
+}
+
+static gboolean gvir_stream_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ GVirStreamSource *gsource = (GVirStreamSource*)source;
+ GVirStreamIOFunc func = (GVirStreamIOFunc)callback;
+ gboolean ret;
+ ret = func(gsource->stream, gsource->newCond, user_data);
+ gsource->newCond = 0;
+ return ret;
+}
+
+static void gvir_stream_source_finalize(GSource *source)
+{
+ GVirStreamSource *gsource = (GVirStreamSource*)source;
+ GVirStreamPrivate *priv = gsource->stream->priv;
+ GList *tmp, *prev = NULL;
+
+ tmp = priv->sources;
+ while (tmp) {
+ if (tmp->data == source) {
+ if (prev) {
+ prev->next = tmp->next;
+ } else {
+ priv->sources = tmp->next;
+ }
+ tmp->next = NULL;
+ g_list_free(tmp);
+ break;
+ }
+
+ prev = tmp;
+ tmp = tmp->next;
+ }
+
+ gvir_stream_update_events(gsource->stream);
+}
+
+GSourceFuncs gvir_stream_source_funcs = {
+ .prepare = gvir_stream_source_prepare,
+ .check = gvir_stream_source_check,
+ .dispatch = gvir_stream_source_dispatch,
+ .finalize = gvir_stream_source_finalize,
+};
+
+gint gvir_stream_add_watch(GVirStream *stream,
+ GVirStreamIOCondition cond,
+ GVirStreamIOFunc func,
+ gpointer opaque,
+ GDestroyNotify notify)
+{
+ GVirStreamPrivate *priv = stream->priv;
+ gint id;
+ GVirStreamSource *source =
(GVirStreamSource*)g_source_new(&gvir_stream_source_funcs,
+
sizeof(GVirStreamSource));
+
+ source->stream = stream;
+ source->cond = cond;
+
+ priv->sources = g_list_append(priv->sources, source);
+
+ gvir_stream_update_events(source->stream);
+
+ g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque, notify);
+ g_source_attach((GSource*)source, g_main_context_default());
+
+ id = g_source_get_id((GSource*)source);
+ g_source_unref((GSource*)source);
+
+ return id;
+}
diff --git a/libvirt-gobject/libvirt-gobject-stream.h
b/libvirt-gobject/libvirt-gobject-stream.h
index 5a1ee68..e0004b2 100644
--- a/libvirt-gobject/libvirt-gobject-stream.h
+++ b/libvirt-gobject/libvirt-gobject-stream.h
@@ -93,6 +93,23 @@ typedef gint (* GVirStreamSourceFunc)(GVirStream *stream,
GType gvir_stream_get_type(void);
GType gvir_stream_handle_get_type(void);
+typedef enum {
+ GVIR_STREAM_IO_CONDITION_READABLE = (1 << 0),
+ GVIR_STREAM_IO_CONDITION_WRITABLE = (1 << 1),
+ GVIR_STREAM_IO_CONDITION_HANGUP = (1 << 2),
+ GVIR_STREAM_IO_CONDITION_ERROR = (1 << 3),
+} GVirStreamIOCondition;
+
+typedef gboolean (*GVirStreamIOFunc)(GVirStream *stream,
+ GVirStreamIOCondition cond,
+ gpointer opaque);
+
+gint gvir_stream_add_watch(GVirStream *stream,
+ GVirStreamIOCondition cond,
+ GVirStreamIOFunc func,
+ gpointer opaque,
+ GDestroyNotify notify);
+
gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer
user_data, GError **error);
gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable
*cancellable, GError **error);
diff --git a/libvirt-gobject/libvirt-gobject.sym b/libvirt-gobject/libvirt-gobject.sym
index 78b3935..6261865 100644
--- a/libvirt-gobject/libvirt-gobject.sym
+++ b/libvirt-gobject/libvirt-gobject.sym
@@ -126,6 +126,7 @@ LIBVIRT_GOBJECT_0.0.1 {
gvir_stream_get_type;
gvir_stream_receive_all;
gvir_stream_handle_get_type;
+ gvir_stream_add_watch;
local:
*;
--
1.7.6.4