Allows to read async from a stream with GVirInputStream.
This is modelled after GSocket.
---
libvirt-gobject/Makefile.am | 2 +
libvirt-gobject/libvirt-gobject-connection.c | 2 +-
libvirt-gobject/libvirt-gobject-input-stream.c | 239 ++++++++++++++++++++++++
libvirt-gobject/libvirt-gobject-input-stream.h | 68 +++++++
libvirt-gobject/libvirt-gobject-stream.c | 130 +++++++++++++-
libvirt-gobject/libvirt-gobject-stream.h | 10 +-
6 files changed, 443 insertions(+), 8 deletions(-)
create mode 100644 libvirt-gobject/libvirt-gobject-input-stream.c
create mode 100644 libvirt-gobject/libvirt-gobject-input-stream.h
diff --git a/libvirt-gobject/Makefile.am b/libvirt-gobject/Makefile.am
index 8147db2..7013675 100644
--- a/libvirt-gobject/Makefile.am
+++ b/libvirt-gobject/Makefile.am
@@ -40,6 +40,8 @@ libvirt_gobject_1_0_la_HEADERS = \
libvirt_gobject_1_0_la_SOURCES = \
$(libvirt_gobject_1_0_la_HEADERS) \
libvirt-gobject-enums.c \
+ libvirt-gobject-input-stream.c \
+ libvirt-gobject-input-stream.h \
$(GOBJECT_SOURCE_FILES)
libvirt_gobject_1_0_la_CFLAGS = \
-DDATADIR="\"$(datadir)\"" \
diff --git a/libvirt-gobject/libvirt-gobject-connection.c
b/libvirt-gobject/libvirt-gobject-connection.c
index 5fc0a9e..95cd878 100644
--- a/libvirt-gobject/libvirt-gobject-connection.c
+++ b/libvirt-gobject/libvirt-gobject-connection.c
@@ -1151,7 +1151,7 @@ GVirStream *gvir_connection_get_stream(GVirConnection *self,
klass = GVIR_CONNECTION_GET_CLASS(self);
g_return_val_if_fail(klass->stream_new, NULL);
- virStreamPtr st = virStreamNew(self->priv->conn, flags);
+ virStreamPtr st = virStreamNew(self->priv->conn, flags | VIR_STREAM_NONBLOCK);
return klass->stream_new(self, st);
}
diff --git a/libvirt-gobject/libvirt-gobject-input-stream.c
b/libvirt-gobject/libvirt-gobject-input-stream.c
new file mode 100644
index 0000000..a76d670
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-input-stream.c
@@ -0,0 +1,239 @@
+/*
+ * libvirt-gobject-input-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Authors: Daniel P. Berrange <berrange(a)redhat.com>
+ * Marc-André Lureau <marcandre.lureau(a)redhat.com>
+ */
+
+#include <config.h>
+
+#include <libvirt/virterror.h>
+#include <string.h>
+
+#include "libvirt-glib/libvirt-glib.h"
+#include "libvirt-gobject/libvirt-gobject.h"
+#include "libvirt-gobject-input-stream.h"
+
+extern gboolean debugFlag;
+
+#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); }
while (0)
+
+#define gvir_input_stream_get_type _gvir_input_stream_get_type
+G_DEFINE_TYPE(GVirInputStream, gvir_input_stream, G_TYPE_INPUT_STREAM);
+
+enum
+{
+ PROP_0,
+ PROP_STREAM
+};
+
+struct _GVirInputStreamPrivate
+{
+ GVirStream *stream;
+
+ /* pending operation metadata */
+ GSimpleAsyncResult *result;
+ GCancellable *cancellable;
+ gpointer buffer;
+ gsize count;
+};
+
+static void gvir_input_stream_get_property(GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ GVirInputStream *stream = GVIR_INPUT_STREAM(object);
+
+ switch (prop_id) {
+ case PROP_STREAM:
+ g_value_set_object(value, stream->priv->stream);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ }
+}
+
+static void gvir_input_stream_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ GVirInputStream *stream = GVIR_INPUT_STREAM(object);
+
+ switch (prop_id) {
+ case PROP_STREAM:
+ stream->priv->stream = g_value_dup_object(value);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ }
+}
+
+static void gvir_input_stream_finalize(GObject *object)
+{
+ GVirInputStream *stream = GVIR_INPUT_STREAM(object);
+
+ if (stream->priv->stream)
+ g_object_unref(stream->priv->stream);
+
+ if (G_OBJECT_CLASS(gvir_input_stream_parent_class)->finalize)
+ (*G_OBJECT_CLASS(gvir_input_stream_parent_class)->finalize)(object);
+}
+
+static void
+gvir_input_stream_read_ready (G_GNUC_UNUSED virStreamPtr st,
+ int events, void *opaque)
+{
+ GVirInputStream *stream = GVIR_INPUT_STREAM(opaque);
+ GVirInputStreamPrivate *priv = stream->priv;
+ GSimpleAsyncResult *simple;
+ GError *error = NULL;
+ gssize result;
+
+ g_return_if_fail(events & VIR_STREAM_EVENT_READABLE);
+
+ result = gvir_stream_receive(priv->stream, priv->buffer, priv->count,
+ priv->cancellable, &error);
+
+ if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_warn_if_reached();
+ return;
+ }
+
+ simple = stream->priv->result;
+ stream->priv->result = NULL;
+
+ if (result >= 0)
+ g_simple_async_result_set_op_res_gssize(simple, result);
+
+ if (error)
+ g_simple_async_result_take_error(simple, error);
+
+ if (priv->cancellable) {
+ g_object_unref(stream->priv->cancellable);
+ priv->cancellable = NULL;
+ }
+
+ g_simple_async_result_complete(simple);
+ g_object_unref(simple);
+
+ return;
+}
+
+static void gvir_input_stream_read_async(GInputStream *stream,
+ void *buffer,
+ gsize count,
+ G_GNUC_UNUSED int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GVirInputStream *input_stream = GVIR_INPUT_STREAM(stream);
+ virStreamPtr handle;
+
+ g_return_if_fail(GVIR_IS_INPUT_STREAM(stream));
+ g_return_if_fail(input_stream->priv->result == NULL);
+
+ g_object_get(input_stream->priv->stream, "handle", &handle,
NULL);
+
+ if (virStreamEventAddCallback(handle, VIR_STREAM_EVENT_READABLE,
+ gvir_input_stream_read_ready, stream, NULL) < 0) {
+ g_simple_async_report_error_in_idle(G_OBJECT(stream),
+ callback,
+ user_data,
+ G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+ "Couldn't add event callback
%s",
+ G_STRFUNC);
+ goto end;
+ }
+
+ input_stream->priv->result =
+ g_simple_async_result_new(G_OBJECT(stream), callback, user_data,
+ gvir_input_stream_read_async);
+ if (cancellable)
+ g_object_ref(cancellable);
+ input_stream->priv->cancellable = cancellable;
+ input_stream->priv->buffer = buffer;
+ input_stream->priv->count = count;
+
+end:
+ virStreamFree(handle);
+}
+
+
+static gssize gvir_input_stream_read_finish(GInputStream *stream,
+ GAsyncResult *result,
+ G_GNUC_UNUSED GError **error)
+{
+ GVirInputStream *input_stream = GVIR_INPUT_STREAM(stream);
+ GSimpleAsyncResult *simple;
+ virStreamPtr handle;
+ gssize count;
+
+ g_return_val_if_fail(GVIR_IS_INPUT_STREAM(stream), -1);
+ g_object_get(input_stream->priv->stream, "handle", &handle,
NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT(result);
+
+ g_warn_if_fail(g_simple_async_result_get_source_tag(simple) ==
gvir_input_stream_read_async);
+
+ count = g_simple_async_result_get_op_res_gssize(simple);
+
+ virStreamEventRemoveCallback(handle);
+ virStreamFree(handle);
+
+ return count;
+}
+
+
+static void gvir_input_stream_class_init(GVirInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+ GInputStreamClass *ginputstream_class = G_INPUT_STREAM_CLASS(klass);
+
+ g_type_class_add_private(klass, sizeof(GVirInputStreamPrivate));
+
+ gobject_class->finalize = gvir_input_stream_finalize;
+ gobject_class->get_property = gvir_input_stream_get_property;
+ gobject_class->set_property = gvir_input_stream_set_property;
+
+ ginputstream_class->read_fn = NULL;
+ ginputstream_class->read_async = gvir_input_stream_read_async;
+ ginputstream_class->read_finish = gvir_input_stream_read_finish;
+
+ g_object_class_install_property(gobject_class, PROP_STREAM,
+ g_param_spec_object("stream",
+ "stream",
+ "GVirStream",
+ GVIR_TYPE_STREAM,
G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
+}
+
+static void gvir_input_stream_init(GVirInputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE(stream, GVIR_TYPE_INPUT_STREAM,
GVirInputStreamPrivate);
+}
+
+GVirInputStream* _gvir_input_stream_new(GVirStream *stream)
+{
+ return GVIR_INPUT_STREAM(g_object_new(GVIR_TYPE_INPUT_STREAM, "stream",
stream, NULL));
+}
diff --git a/libvirt-gobject/libvirt-gobject-input-stream.h
b/libvirt-gobject/libvirt-gobject-input-stream.h
new file mode 100644
index 0000000..e8002b9
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-input-stream.h
@@ -0,0 +1,68 @@
+/*
+ * libvirt-gobject-input-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Authors: Daniel P. Berrange <berrange(a)redhat.com>
+ * Marc-André Lureau <marcandre.lureau(a)redhat.com>
+ */
+
+#if !defined(__LIBVIRT_GOBJECT_H__) && !defined(LIBVIRT_GOBJECT_BUILD)
+#error "Only <libvirt-gobject/libvirt-gobject.h> can be included
directly."
+#endif
+
+#ifndef __LIBVIRT_GOBJECT_INPUT_STREAM_H__
+#define __LIBVIRT_GOBJECT_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+#include "libvirt-gobject-stream.h"
+
+G_BEGIN_DECLS
+
+#define GVIR_TYPE_INPUT_STREAM (_gvir_input_stream_get_type ())
+#define GVIR_INPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_CAST
((inst), \
+ GVIR_TYPE_INPUT_STREAM,
GVirInputStream))
+#define GVIR_INPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_CAST
((class), \
+ GVIR_TYPE_INPUT_STREAM,
GVirInputStreamClass))
+#define GVIR_IS_INPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_TYPE
((inst), \
+ GVIR_TYPE_INPUT_STREAM))
+#define GVIR_IS_INPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_TYPE
((class), \
+ GVIR_TYPE_INPUT_STREAM))
+#define GVIR_INPUT_STREAM_GET_CLASS(inst) (G_TYPE_INSTANCE_GET_CLASS
((inst), \
+ GVIR_TYPE_INPUT_STREAM,
GVirInputStreamClass))
+
+typedef struct _GVirInputStreamPrivate GVirInputStreamPrivate;
+typedef struct _GVirInputStreamClass GVirInputStreamClass;
+typedef struct _GVirInputStream GVirInputStream;
+
+struct _GVirInputStreamClass
+{
+ GInputStreamClass parent_class;
+};
+
+struct _GVirInputStream
+{
+ GInputStream parent_instance;
+ GVirInputStreamPrivate *priv;
+};
+
+GType _gvir_input_stream_get_type (void) G_GNUC_CONST;
+GVirInputStream * _gvir_input_stream_new (GVirStream *stream);
+
+G_END_DECLS
+
+#endif /* __LIBVIRT_GOBJECT_INPUT_STREAM_H__ */
diff --git a/libvirt-gobject/libvirt-gobject-stream.c
b/libvirt-gobject/libvirt-gobject-stream.c
index 519d733..88e3a40 100644
--- a/libvirt-gobject/libvirt-gobject-stream.c
+++ b/libvirt-gobject/libvirt-gobject-stream.c
@@ -30,6 +30,8 @@
#include "libvirt-glib/libvirt-glib.h"
#include "libvirt-gobject/libvirt-gobject.h"
+#include "libvirt-gobject/libvirt-gobject-input-stream.h"
+
extern gboolean debugFlag;
#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); }
while (0)
@@ -39,10 +41,12 @@ extern gboolean debugFlag;
struct _GVirStreamPrivate
{
- virStreamPtr handle;
+ virStreamPtr handle;
+ GInputStream *input_stream;
+ gboolean in_dispose;
};
-G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_OBJECT);
+G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
enum {
@@ -60,6 +64,71 @@ gvir_stream_error_quark(void)
return g_quark_from_static_string("vir-g-stream");
}
+
+static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream)
+{
+ GVirStream *self = GVIR_STREAM(io_stream);
+
+ if (self->priv->input_stream == NULL)
+ self->priv->input_stream = (GInputStream *)_gvir_input_stream_new(self);
+
+ return self->priv->input_stream;
+}
+
+
+static gboolean gvir_stream_close(GIOStream *io_stream,
+ GCancellable *cancellable, G_GNUC_UNUSED GError
**error)
+{
+ GVirStream *self = GVIR_STREAM(io_stream);
+
+ if (self->priv->input_stream)
+ g_input_stream_close(self->priv->input_stream, cancellable, NULL);
+
+ if (self->priv->in_dispose)
+ return TRUE;
+
+ return TRUE; /* FIXME: really close the stream? */
+}
+
+
+static void gvir_stream_close_async(GIOStream *stream, G_GNUC_UNUSED int io_priority,
+ GCancellable *cancellable, GAsyncReadyCallback
callback,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *res;
+ GIOStreamClass *class;
+ GError *error;
+
+ class = G_IO_STREAM_GET_CLASS(stream);
+
+ /* close is not blocked, just do it! */
+ error = NULL;
+ if (class->close_fn &&
+ !class->close_fn(stream, cancellable, &error)) {
+ g_simple_async_report_take_gerror_in_idle(G_OBJECT (stream),
+ callback, user_data,
+ error);
+ return;
+ }
+
+ res = g_simple_async_result_new(G_OBJECT (stream),
+ callback,
+ user_data,
+ gvir_stream_close_async);
+ g_simple_async_result_complete_in_idle(res);
+ g_object_unref (res);
+}
+
+
+static gboolean
+gvir_stream_close_finish(G_GNUC_UNUSED GIOStream *stream,
+ G_GNUC_UNUSED GAsyncResult *result,
+ G_GNUC_UNUSED GError **error)
+{
+ return TRUE;
+}
+
+
static void gvir_stream_get_property(GObject *object,
guint prop_id,
GValue *value,
@@ -107,6 +176,9 @@ static void gvir_stream_finalize(GObject *object)
DEBUG("Finalize GVirStream=%p", self);
+ if (self->priv->input_stream)
+ g_object_unref(self->priv->input_stream);
+
if (priv->handle) {
if (virStreamFinish(priv->handle) < 0)
g_critical("cannot finish stream");
@@ -120,12 +192,18 @@ static void gvir_stream_finalize(GObject *object)
static void gvir_stream_class_init(GVirStreamClass *klass)
{
- GObjectClass *object_class = G_OBJECT_CLASS (klass);
+ GObjectClass *object_class = G_OBJECT_CLASS(klass);
+ GIOStreamClass *stream_class = G_IO_STREAM_CLASS(klass);
object_class->finalize = gvir_stream_finalize;
object_class->get_property = gvir_stream_get_property;
object_class->set_property = gvir_stream_set_property;
+ stream_class->get_input_stream = gvir_stream_get_input_stream;
+ stream_class->close_fn = gvir_stream_close;
+ stream_class->close_async = gvir_stream_close_async;
+ stream_class->close_finish = gvir_stream_close_finish;
+
g_object_class_install_property(object_class,
PROP_HANDLE,
g_param_spec_boxed("handle",
@@ -170,6 +248,50 @@ GType gvir_stream_handle_get_type(void)
return handle_type;
}
+/**
+ * gvir_stream_receive:
+ * @stream: the stream
+ * @buffer: a buffer to read data into (which should be at least @size
+ * bytes long).
+ * @size: the number of bytes you want to read from the stream
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Receive data (up to @size bytes) from a stream.
+ * On error -1 is returned and @error is set accordingly.
+ *
+ * gvir_stream_receive() can return any number of bytes, up to
+ * @size. If more than @size bytes have been received, the additional
+ * data will be returned in future calls to gvir_stream_receive().
+ *
+ * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be
+ * returned.
+ *
+ * Returns: Number of bytes read, or 0 if the end of stream reached,
+ * or -1 on error.
+ */
+gssize gvir_stream_receive(GVirStream *self, gchar *buffer, gsize size,
+ GCancellable *cancellable, GError **error)
+{
+ int got;
+
+ g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
+ g_return_val_if_fail(buffer != NULL, -1);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ got = virStreamRecv(self->priv->handle, buffer, size);
+
+ if (got == -2) { /* blocking */
+ g_set_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, NULL);
+ } else if (got < 0) {
+ g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+ "Got virStreamRecv error in %s", G_STRFUNC);
+ }
+
+ return got;
+}
struct stream_sink_helper {
GVirStream *self;
@@ -197,7 +319,7 @@ stream_sink(virStreamPtr st G_GNUC_UNUSED,
* requested data sink. This is simply a convenient alternative
* to virStreamRecv, for apps that do blocking-I/o.
*/
-gint
+gssize
gvir_stream_receive_all(GVirStream *self, GVirStreamSinkFunc func, gpointer user_data,
GError **err)
{
struct stream_sink_helper helper = {
diff --git a/libvirt-gobject/libvirt-gobject-stream.h
b/libvirt-gobject/libvirt-gobject-stream.h
index 5181e24..35526db 100644
--- a/libvirt-gobject/libvirt-gobject-stream.h
+++ b/libvirt-gobject/libvirt-gobject-stream.h
@@ -28,6 +28,9 @@
#ifndef __LIBVIRT_GOBJECT_STREAM_H__
#define __LIBVIRT_GOBJECT_STREAM_H__
+#include <glib-object.h>
+#include <gio/gio.h>
+
G_BEGIN_DECLS
#define GVIR_TYPE_STREAM (gvir_stream_get_type ())
@@ -45,7 +48,7 @@ typedef struct _GVirStreamClass GVirStreamClass;
struct _GVirStream
{
- GObject parent;
+ GIOStream parent_instance;
GVirStreamPrivate *priv;
@@ -54,7 +57,7 @@ struct _GVirStream
struct _GVirStreamClass
{
- GObjectClass parent_class;
+ GIOStreamClass parent_class;
gpointer padding[20];
};
@@ -76,7 +79,8 @@ typedef gint (* GVirStreamSinkFunc) (GVirStream *stream,
GType gvir_stream_get_type(void);
GType gvir_stream_handle_get_type(void);
-gint gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer
user_data, GError **err);
+gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer
user_data, GError **err);
+gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable
*cancellable, GError **error);
G_END_DECLS
--
1.7.6.2