[libvirt] [PATCH v2 1/2] json: add stream parser

Add function virJSONValueFromStream, which reads data from a stream and passes it to json parser. When end of the object is reached, it returns this object. To avoid reading from the stream by single bytes it reads to a buffer (in a structure virJSONStreamParserState), which should be passed to a consequent call of this function. So if the end of one object and the beginning of the next object have been read by a single system call - virJSONValueFromStream handle it correctly. example of usage: virJSONValuePtr v; virJSONStreamParserState state; memset(&state, 0, sizeof(state)); while (1) { v = virJSONValueFromStream(mon->fd, &state); if (v == (void *)-1) /* error */ break; if (v == NULL) /* file descriptor has been closed */ break; /* handle object 'v' */ } I need such function for the parallels driver. It caches info about domains and needs some mechanism to update this cache. There is a "prlsrvctl monitor" command which waits for events forever and prints info about events to stdout in json format. So parallels driver could start separate thread which will read from prlsrvctl's stdout and update cache accordingly. There is the same task in qemu_monitor_json, but each json object is printed in a separate line there. It's not possible in my case, because some fields could have line endings. Signed-off-by: Dmitry Guryanov <dguryanov@parallels.com> --- Changes: * add function to virJSONValueFromStream * fix bug with returning object which ends just before end of file * fix bug with handling objects without any characters between then, when each read returns data for one entire object. src/libvirt_private.syms | 1 + src/util/virjson.c | 110 ++++++++++++++++++++++++++++++++++++++++++++++ src/util/virjson.h | 8 +++ 3 files changed, 119 insertions(+), 0 deletions(-) diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 5cad990..567055c 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1320,6 +1320,7 @@ virJSONValueArrayGet; virJSONValueArraySize; virJSONValueFree; virJSONValueFromString; +virJSONValueFromStream; virJSONValueGetBoolean; virJSONValueGetNumberDouble; virJSONValueGetNumberInt; diff --git a/src/util/virjson.c b/src/util/virjson.c index e6a3b1b..731293f 100644 --- a/src/util/virjson.c +++ b/src/util/virjson.c @@ -990,6 +990,116 @@ cleanup: return ret; } +/* + * Read single JSON object from the stream. Store data, which + * have already been read from the stream, but belongs to the + * next object to the virJSONStreamParserState structure. So that + * consequent call to this function will parse it and return that + * object. + * + * @fd: file descriptor, opened for reading + * @state: pointer to the structure with buffer for data, read from fd. + * + * Return (void *)-1 in case of error, NULL when eof reached, + * pointer to the virJSONValuePtr in case of success. + */ + +virJSONValuePtr virJSONValueFromStream(int fd, virJSONStreamParserStatePtr state) +{ + yajl_handle hand; + virJSONParser parser = { NULL, NULL, 0 }; + virJSONValuePtr value = (void *)-1; +# ifndef WITH_YAJL2 + yajl_parser_config cfg = { 1, 1 }; +# endif + ssize_t len; + int ret = 0; + bool done = false; + +# ifdef WITH_YAJL2 + hand = yajl_alloc(&parserCallbacks, NULL, &parser); + if (hand) { + yajl_config(hand, yajl_allow_comments, 1); + yajl_config(hand, yajl_dont_validate_strings, 0); + yajl_config(hand, yajl_allow_trailing_garbage, 1); + } +# else + hand = yajl_alloc(&parserCallbacks, &cfg, NULL, &parser); +# endif + if (!hand) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Unable to create JSON parser")); + goto cleanup; + } + + do { + if (strlen(&state->buf[state->pos]) == 0) { + state->pos = 0; + memset(state->buf, 0, sizeof(state->buf)); + len = read(fd, state->buf, sizeof(state->buf) - 1); + + if (len < 0) { + virReportSystemError(errno, _("cannot read from fd '%d'"), fd); + virJSONValueFree(parser.head); + goto cleanup; + } + + if (len == 0) { + value = parser.head; + goto cleanup; + } + } + + for (;state->pos < strlen(state->buf); state->pos++) { + unsigned char *buf = (unsigned char *)&state->buf[state->pos]; + /* + * New yaml library has useful function yajl_get_bytes_consumed + * which allows parsing by larger chunks. But rhel-6 has 1.0.7 + * version, which doesn't have it. + */ + ret = yajl_parse(hand, buf, 1); +# ifdef WITH_YAJL2 + if (ret == 0 && yajl_get_bytes_consumed(hand) == 0) { + done = true; + /* state->pos points to the first symbol after current + * object */ + break; + } else if (ret != 0) { +# else + if (ret == 0) { + done = true; + /* state->pos points to the last symbol of the + * current object */ + state->pos++; + break; + } else if (ret != yajl_status_insufficient_data) { +# endif + unsigned char *errstr = yajl_get_error(hand, 1, buf, 1); + virReportError(VIR_ERR_INTERNAL_ERROR, + _("cannot parse json: %s"), + (const char*) errstr); + VIR_FREE(errstr); + virJSONValueFree(parser.head); + goto cleanup; + } + } + } while (!done); + + value = parser.head; + +cleanup: + yajl_free(hand); + + if (parser.nstate) { + int i; + for (i = 0 ; i < parser.nstate ; i++) { + VIR_FREE(parser.state[i].key); + } + } + + VIR_DEBUG("result=%p", parser.head); + return value; +} static int virJSONValueToStringOne(virJSONValuePtr object, yajl_gen g) diff --git a/src/util/virjson.h b/src/util/virjson.h index 67f4398..aeb25ef 100644 --- a/src/util/virjson.h +++ b/src/util/virjson.h @@ -48,6 +48,8 @@ typedef virJSONObjectPair *virJSONObjectPairPtr; typedef struct _virJSONArray virJSONArray; typedef virJSONArray *virJSONArrayPtr; +typedef struct _virJSONStreamParserState virJSONStreamParserState; +typedef virJSONStreamParserState *virJSONStreamParserStatePtr; struct _virJSONObjectPair { char *key; @@ -77,6 +79,11 @@ struct _virJSONValue { } data; }; +struct _virJSONStreamParserState { + char buf[1024]; + size_t pos; +}; + void virJSONValueFree(virJSONValuePtr value); virJSONValuePtr virJSONValueNewString(const char *data); @@ -134,5 +141,6 @@ int virJSONValueObjectAppendNull(virJSONValuePtr object, const char *key); virJSONValuePtr virJSONValueFromString(const char *jsonstring); char *virJSONValueToString(virJSONValuePtr object, bool pretty); +virJSONValuePtr virJSONValueFromStream(int fd, virJSONStreamParserStatePtr state); #endif /* __VIR_JSON_H_ */ -- 1.7.1

Signed-off-by: Dmitry Guryanov <dguryanov@parallels.com> --- tests/jsontest.c | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 201 insertions(+), 4 deletions(-) diff --git a/tests/jsontest.c b/tests/jsontest.c index 98a6069..107d772 100644 --- a/tests/jsontest.c +++ b/tests/jsontest.c @@ -4,14 +4,23 @@ #include <stdlib.h> #include <string.h> #include <time.h> +#include <unistd.h> +#include <poll.h> +#include <signal.h> +#include <sched.h> #include "internal.h" #include "virjson.h" #include "testutils.h" +#include "vircommand.h" +#include "virprocess.h" +#include "virtime.h" +#include "virfile.h" struct testInfo { const char *doc; bool pass; + size_t chunk; }; @@ -53,21 +62,185 @@ cleanup: return ret; } +ATTRIBUTE_NORETURN static int +testJSONReadProcess(int fd, int finishFd) +{ + int n = 0; + int exitcode = EXIT_FAILURE; + virJSONValuePtr v; + virJSONStreamParserState state; + int x; + + if (safewrite(finishFd, " ", 1) != 1) { + if (virTestGetVerbose()) + perror("write"); + _exit(exitcode); + } + /* There must be exactly two objects, each must have "valid" + * field with integer value */ + + memset(&state, 0, sizeof(state)); + while (1) { + v = virJSONValueFromStream(fd, &state); + + if (v == (void *)-1) { + if (virTestGetVerbose()) + fprintf(stderr, "virJSONValueFromStream returned error\n"); + goto cleanup; + } + + if (v == 0) + break; + + n++; + + if (virJSONValueObjectGetNumberInt(v, "valid", &x) < 0) { + if (virTestGetVerbose()) + fprintf(stderr, "Parsed value in object %d doesn't have " + "'valid' integer field\n", n); + goto cleanup; + } + } + + if (n != 2) { + if (virTestGetVerbose()) + fprintf(stderr, "Invalid number of objects: %d, must be 2\n", n); + } else { + exitcode = EXIT_SUCCESS; + } + +cleanup: + if (safewrite(finishFd, " ", 1) != 1) { + if (virTestGetVerbose()) + perror("write"); + _exit(exitcode); + } + + VIR_FORCE_CLOSE(fd); + VIR_FORCE_CLOSE(finishFd); + _exit(exitcode); +} + +/* + * This test creates a separate process, which reads JSON data + * from a pipe with help of virJSONValueFromStream function. It expects + * 2 objects, each must have 'valid' integer key. Parent process writes + * data to the pipe and handles child exit code. + */ +static int +testJSONFromStream(const void *data) +{ + struct testInfo *info = (struct testInfo *)data; + int ret = -1; + int pret; + int pipefd[2]; + int wpipefd[2]; + ssize_t w; + pid_t pid; + struct pollfd pollfd; + int status; + size_t docLen, i; + char c; + + if (pipe(pipefd) < 0) { + if (virTestGetVerbose()) + perror("pipe"); + return -1; + } + + if (pipe(wpipefd) < 0) { + if (virTestGetVerbose()) + perror("pipe"); + goto cleanup; + } + + if (virFork(&pid) < 0) { + if (virTestGetVerbose()) + perror("fork"); + goto cleanup2; + } + + if (pid == 0) { + VIR_FORCE_CLOSE(pipefd[1]); + VIR_FORCE_CLOSE(wpipefd[0]); + testJSONReadProcess(pipefd[0], wpipefd[1]); + /* couldn't be reached */ + } + + /* write test data */ + docLen = strlen(info->doc); + + if (read(wpipefd[0], &c, 1) < 0) { + if (virTestGetVerbose()) + perror("read"); + goto cleanup2; + } + + for (i = 0; i < docLen; i += info->chunk) { + size_t len = i + info->chunk <= docLen ? info->chunk : docLen % info->chunk; + + w = safewrite(pipefd[1], info->doc + i * info->chunk, len); + if (w < 0) { + if (virTestGetVerbose()) + perror("write"); + goto cleanup2; + } + + if (w < len) { + if (virTestGetVerbose()) + fprintf(stderr, "Couldn't write entire json string to the pipe\n"); + goto cleanup2; + } + + sched_yield(); + } + + VIR_FORCE_CLOSE(pipefd[1]); + + /* wait for read process */ + pollfd.fd = wpipefd[0]; + pollfd.events = POLLIN; + + pret = poll(&pollfd, 1, 1000); + if (pret < 0) { + if (virTestGetVerbose()) + perror("poll"); + goto cleanup2; + } + + if (pret == 0) { + if (virTestGetVerbose()) + fprintf(stderr, "timeout reached\n"); + virProcessKill(pid, SIGTERM); + } + + if (virProcessWait(pid, &status) == 0 && !WIFSIGNALED(status) + && WEXITSTATUS(status) == 0) + ret = 0; + +cleanup2: + VIR_FORCE_CLOSE(wpipefd[0]); + VIR_FORCE_CLOSE(wpipefd[1]); +cleanup: + VIR_FORCE_CLOSE(pipefd[0]); + VIR_FORCE_CLOSE(pipefd[1]); + return ret; +} static int mymain(void) { int ret = 0; -#define DO_TEST_FULL(name, cmd, doc, pass) \ +#define DO_TEST_FULL(name, cmd, doc, pass, chunk) \ do { \ - struct testInfo info = { doc, pass }; \ + struct testInfo info = { doc, pass, chunk }; \ if (virtTestRun(name, 1, testJSON ## cmd, &info) < 0) \ ret = -1; \ } while (0) #define DO_TEST_PARSE(name, doc) \ - DO_TEST_FULL(name, FromString, doc, true) + DO_TEST_FULL(name, FromString, doc, true, 0) DO_TEST_PARSE("Simple", "{\"return\": {}, \"id\": \"libvirt-1\"}"); DO_TEST_PARSE("NotSoSimple", "{\"QMP\": {\"version\": {\"qemu\":" @@ -105,6 +278,30 @@ mymain(void) "\"query-uuid\"}, {\"name\": \"query-migrate\"}, {\"name\": " "\"query-balloon\"}], \"id\": \"libvirt-2\"}"); +#define DO_TEST_PARSE_STREAM(name, doc, chunk) \ + DO_TEST_FULL(name, FromStream, doc, true, chunk) + + DO_TEST_PARSE_STREAM("StreamSimple", "{\"valid\": 10}{\"valid\": 10}", 1); + + char *largeText; + size_t largeTextSize = 8192; + size_t pos = 0; + + if (VIR_ALLOC_N(largeText, largeTextSize) < 0) + return EXIT_FAILURE; + + memset(largeText, 0, largeTextSize); + pos += snprintf(largeText + pos, 64, "{"); + while (pos < largeTextSize / 2 - 100) + pos += snprintf(largeText + pos, 64, "\"x%ld\": %ld, ", pos, pos); + pos += snprintf(largeText + pos, 64, "\"valid\": 1}"); + pos += snprintf(largeText + pos, strlen(largeText) + 1, "%s", largeText); + + DO_TEST_PARSE_STREAM("StreamLargeChunks", largeText, largeTextSize); + DO_TEST_PARSE_STREAM("StreamSmallChunks", largeText, 1); + + VIR_FREE(largeText); + return (ret == 0) ? EXIT_SUCCESS : EXIT_FAILURE; } -- 1.7.1
participants (1)
-
Dmitry Guryanov