All incoming messages currently get routed to the generic method
remoteDispatchClientRequest() for processing. To allow incoming
data stream messages to bypass this and be routed to a specific
location, a concept of dispatch filters is introduced.
* qemud/qemud.h: Add a qemud_client_filter struct and a callback
qemud_client_filter_func. Maintain a list of filters on every
struct qemud_client
* qemud/qemud.c: Move remoteDecodeClientMessageHeader() out of
qemudWorker() into qemudDispatchClientRead(). Check registered
message filters in qemudDispatchClientRead() to decide where
to send incoming messages for dispatch.
Signed-off-by: Daniel P. Berrange <berrange(a)redhat.com>
---
qemud/qemud.c | 28 ++++++++++++++++++++++++----
qemud/qemud.h | 16 ++++++++++++++++
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/qemud/qemud.c b/qemud/qemud.c
index 42bc00e..e393db4 100644
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -1457,8 +1457,7 @@ static void *qemudWorker(void *data)
/* This function drops the lock during dispatch,
* and re-acquires it before returning */
- if (remoteDecodeClientMessageHeader(msg) < 0 ||
- remoteDispatchClientRequest (server, client, msg) < 0) {
+ if (remoteDispatchClientRequest (server, client, msg) < 0) {
VIR_FREE(msg);
qemudDispatchClientFailure(client);
client->refs--;
@@ -1705,9 +1704,30 @@ readmore:
waiting for us */
goto readmore;
} else {
+ /* Grab the completed message */
+ struct qemud_client_message *msg =
qemudClientMessageQueueServe(&client->rx);
+ struct qemud_client_filter *filter;
+
+ /* Decode the header so we can use it for routing decisions */
+ if (remoteDecodeClientMessageHeader(msg) < 0) {
+ VIR_FREE(msg);
+ qemudDispatchClientFailure(client);
+ }
+
+ /* Check if any filters match this message */
+ filter = client->filters;
+ while (filter) {
+ if ((filter->query)(msg, filter->opaque)) {
+ qemudClientMessageQueuePush(&filter->dx, msg);
+ msg = NULL;
+ break;
+ }
+ filter = filter->next;
+ }
+
/* Move completed message to the end of the dispatch queue */
- qemudClientMessageQueuePush(&client->dx, client->rx);
- client->rx = NULL;
+ if (msg)
+ qemudClientMessageQueuePush(&client->dx, msg);
client->nrequests++;
/* Possibly need to create another receive buffer */
diff --git a/qemud/qemud.h b/qemud/qemud.h
index 86b893d..abacbbb 100644
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -90,6 +90,19 @@ struct qemud_client_message {
struct qemud_client_message *next;
};
+/* Allow for filtering of incoming messages to a custom
+ * dispatch processing queue, instead of client->dx.
+ */
+typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque);
+struct qemud_client_filter {
+ qemud_client_filter_func query;
+ void *opaque;
+
+ struct qemud_client_message *dx;
+
+ struct qemud_client_filter *next;
+};
+
/* Stores the per-client connection state */
struct qemud_client {
virMutex lock;
@@ -134,6 +147,9 @@ struct qemud_client {
/* Zero or many messages waiting for transmit
* back to client, including async events */
struct qemud_client_message *tx;
+ /* Filters to capture messages that would otherwise
+ * end up on the 'dx' queue */
+ struct qemud_client_filter *filters;
/* This is only valid if a remote open call has been made on this
* connection, otherwise it will be NULL. Also if remote close is
--
1.6.2.5
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|