Add function virJSONValueFromStream, which reads data from
a stream and passes it to json parser. When end of the object
is reached, it returns this object.
To avoid reading from the stream by single bytes it reads to
a buffer (in a structure virJSONStreamParserState), which should
be passed to a consequent call of this function. So if the end
of one object and the beginning of the next object have been
read by a single system call - virJSONValueFromStream handle
it correctly.
example of usage:
virJSONValuePtr v;
virJSONStreamParserState state;
memset(&state, 0, sizeof(state));
while (1) {
v = virJSONValueFromStream(mon->fd, &state);
if (v == (void *)-1)
/* error */
break;
if (v == NULL)
/* file descriptor has been closed */
break;
/* handle object 'v' */
}
I need such function for the parallels driver. It caches info
about domains and needs some mechanism to update this cache.
There is a "prlsrvctl monitor" command which waits for events
forever and prints info about events to stdout in json format.
So parallels driver could start separate thread which will
read from prlsrvctl's stdout and update cache accordingly.
There is the same task in qemu_monitor_json, but each json object
is printed in a separate line there. It's not possible in my case,
because some fields could have line endings.
---
src/libvirt_private.syms | 1 +
src/util/virjson.c | 110 ++++++++++++++++++++++++++++++++++++++++++++++
src/util/virjson.h | 8 +++
3 files changed, 119 insertions(+), 0 deletions(-)
* This patch left unchanged since v2
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index b449293..0db2a0d 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -1391,6 +1391,7 @@ virJSONValueArrayGet;
virJSONValueArraySize;
virJSONValueFree;
virJSONValueFromString;
+virJSONValueFromStream;
virJSONValueGetBoolean;
virJSONValueGetNumberDouble;
virJSONValueGetNumberInt;
diff --git a/src/util/virjson.c b/src/util/virjson.c
index 3a6f520..9559a26 100644
--- a/src/util/virjson.c
+++ b/src/util/virjson.c
@@ -1022,6 +1022,116 @@ cleanup:
return ret;
}
+/*
+ * Read single JSON object from the stream. Store data, which
+ * have already been read from the stream, but belongs to the
+ * next object to the virJSONStreamParserState structure. So that
+ * consequent call to this function will parse it and return that
+ * object.
+ *
+ * @fd: file descriptor, opened for reading
+ * @state: pointer to the structure with buffer for data, read from fd.
+ *
+ * Return (void *)-1 in case of error, NULL when eof reached,
+ * pointer to the virJSONValuePtr in case of success.
+ */
+
+virJSONValuePtr virJSONValueFromStream(int fd, virJSONStreamParserStatePtr state)
+{
+ yajl_handle hand;
+ virJSONParser parser = { NULL, NULL, 0 };
+ virJSONValuePtr value = (void *)-1;
+# ifndef WITH_YAJL2
+ yajl_parser_config cfg = { 1, 1 };
+# endif
+ ssize_t len;
+ int ret = 0;
+ bool done = false;
+
+# ifdef WITH_YAJL2
+ hand = yajl_alloc(&parserCallbacks, NULL, &parser);
+ if (hand) {
+ yajl_config(hand, yajl_allow_comments, 1);
+ yajl_config(hand, yajl_dont_validate_strings, 0);
+ yajl_config(hand, yajl_allow_trailing_garbage, 1);
+ }
+# else
+ hand = yajl_alloc(&parserCallbacks, &cfg, NULL, &parser);
+# endif
+ if (!hand) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Unable to create JSON parser"));
+ goto cleanup;
+ }
+
+ do {
+ if (strlen(&state->buf[state->pos]) == 0) {
+ state->pos = 0;
+ memset(state->buf, 0, sizeof(state->buf));
+ len = read(fd, state->buf, sizeof(state->buf) - 1);
+
+ if (len < 0) {
+ virReportSystemError(errno, _("cannot read from fd
'%d'"), fd);
+ virJSONValueFree(parser.head);
+ goto cleanup;
+ }
+
+ if (len == 0) {
+ value = parser.head;
+ goto cleanup;
+ }
+ }
+
+ for (;state->pos < strlen(state->buf); state->pos++) {
+ unsigned char *buf = (unsigned char *)&state->buf[state->pos];
+ /*
+ * New yaml library has useful function yajl_get_bytes_consumed
+ * which allows parsing by larger chunks. But rhel-6 has 1.0.7
+ * version, which doesn't have it.
+ */
+ ret = yajl_parse(hand, buf, 1);
+# ifdef WITH_YAJL2
+ if (ret == 0 && yajl_get_bytes_consumed(hand) == 0) {
+ done = true;
+ /* state->pos points to the first symbol after current
+ * object */
+ break;
+ } else if (ret != 0) {
+# else
+ if (ret == 0) {
+ done = true;
+ /* state->pos points to the last symbol of the
+ * current object */
+ state->pos++;
+ break;
+ } else if (ret != yajl_status_insufficient_data) {
+# endif
+ unsigned char *errstr = yajl_get_error(hand, 1, buf, 1);
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("cannot parse json: %s"),
+ (const char*) errstr);
+ VIR_FREE(errstr);
+ virJSONValueFree(parser.head);
+ goto cleanup;
+ }
+ }
+ } while (!done);
+
+ value = parser.head;
+
+cleanup:
+ yajl_free(hand);
+
+ if (parser.nstate) {
+ int i;
+ for (i = 0 ; i < parser.nstate ; i++) {
+ VIR_FREE(parser.state[i].key);
+ }
+ }
+
+ VIR_DEBUG("result=%p", parser.head);
+ return value;
+}
static int virJSONValueToStringOne(virJSONValuePtr object,
yajl_gen g)
diff --git a/src/util/virjson.h b/src/util/virjson.h
index db11396..1f5ce38 100644
--- a/src/util/virjson.h
+++ b/src/util/virjson.h
@@ -48,6 +48,8 @@ typedef virJSONObjectPair *virJSONObjectPairPtr;
typedef struct _virJSONArray virJSONArray;
typedef virJSONArray *virJSONArrayPtr;
+typedef struct _virJSONStreamParserState virJSONStreamParserState;
+typedef virJSONStreamParserState *virJSONStreamParserStatePtr;
struct _virJSONObjectPair {
char *key;
@@ -77,6 +79,11 @@ struct _virJSONValue {
} data;
};
+struct _virJSONStreamParserState {
+ char buf[1024];
+ size_t pos;
+};
+
void virJSONValueFree(virJSONValuePtr value);
virJSONValuePtr virJSONValueNewString(const char *data);
@@ -138,5 +145,6 @@ int virJSONValueObjectRemoveKey(virJSONValuePtr object, const char
*key,
virJSONValuePtr virJSONValueFromString(const char *jsonstring);
char *virJSONValueToString(virJSONValuePtr object,
bool pretty);
+virJSONValuePtr virJSONValueFromStream(int fd, virJSONStreamParserStatePtr state);
#endif /* __VIR_JSON_H_ */
--
1.7.1