From: "Daniel P. Berrange" <berrange(a)redhat.com>
Refactor the RPC server dispatcher code so that if 'max_workers==0'
the entire server will run single threaded. This is useful for
use cases where there will only ever be 1 client connected
which serializes its requests
Signed-off-by: Daniel P. Berrange <berrange(a)redhat.com>
---
src/rpc/virnetserver.c | 113 +++++++++++++++++++++++++++++-------------------
1 file changed, 68 insertions(+), 45 deletions(-)
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index 358666d..4a02aab 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -127,49 +127,64 @@ static void virNetServerUnlock(virNetServerPtr srv)
}
-static void virNetServerHandleJob(void *jobOpaque, void *opaque)
+static int virNetServerProcessMsg(virNetServerPtr srv,
+ virNetServerClientPtr client,
+ virNetServerProgramPtr prog,
+ virNetMessagePtr msg)
{
- virNetServerPtr srv = opaque;
- virNetServerJobPtr job = jobOpaque;
-
- VIR_DEBUG("server=%p client=%p message=%p prog=%p",
- srv, job->client, job->msg, job->prog);
-
- if (!job->prog) {
+ int ret = -1;
+ if (!prog) {
/* Only send back an error for type == CALL. Other
* message types are not expecting replies, so we
* must just log it & drop them
*/
- if (job->msg->header.type == VIR_NET_CALL ||
- job->msg->header.type == VIR_NET_CALL_WITH_FDS) {
- if (virNetServerProgramUnknownError(job->client,
- job->msg,
- &job->msg->header) < 0)
- goto error;
+ if (msg->header.type == VIR_NET_CALL ||
+ msg->header.type == VIR_NET_CALL_WITH_FDS) {
+ if (virNetServerProgramUnknownError(client,
+ msg,
+ &msg->header) < 0)
+ goto cleanup;
} else {
VIR_INFO("Dropping client mesage, unknown program %d version %d type %d
proc %d",
- job->msg->header.prog, job->msg->header.vers,
- job->msg->header.type, job->msg->header.proc);
+ msg->header.prog, msg->header.vers,
+ msg->header.type, msg->header.proc);
/* Send a dummy reply to free up 'msg' & unblock client rx */
- virNetMessageClear(job->msg);
- job->msg->header.type = VIR_NET_REPLY;
- if (virNetServerClientSendMessage(job->client, job->msg) < 0)
- goto error;
+ virNetMessageClear(msg);
+ msg->header.type = VIR_NET_REPLY;
+ if (virNetServerClientSendMessage(client, msg) < 0)
+ goto cleanup;
}
- goto cleanup;
+ goto done;
}
- if (virNetServerProgramDispatch(job->prog,
+ if (virNetServerProgramDispatch(prog,
srv,
- job->client,
- job->msg) < 0)
+ client,
+ msg) < 0)
+ goto cleanup;
+
+done:
+ ret = 0;
+
+cleanup:
+ return ret;
+}
+
+static void virNetServerHandleJob(void *jobOpaque, void *opaque)
+{
+ virNetServerPtr srv = opaque;
+ virNetServerJobPtr job = jobOpaque;
+
+ VIR_DEBUG("server=%p client=%p message=%p prog=%p",
+ srv, job->client, job->msg, job->prog);
+
+ if (virNetServerProcessMsg(srv, job->client, job->prog, job->msg) < 0)
goto error;
virNetServerLock(srv);
virNetServerProgramFree(job->prog);
virNetServerUnlock(srv);
-cleanup:
virNetServerClientFree(job->client);
VIR_FREE(job);
return;
@@ -187,7 +202,6 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr
client,
void *opaque)
{
virNetServerPtr srv = opaque;
- virNetServerJobPtr job;
virNetServerProgramPtr prog = NULL;
unsigned int priority = 0;
size_t i;
@@ -196,34 +210,42 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr
client,
VIR_DEBUG("server=%p client=%p message=%p",
srv, client, msg);
- if (VIR_ALLOC(job) < 0) {
- virReportOOMError();
- return -1;
- }
-
- job->client = client;
- job->msg = msg;
-
virNetServerLock(srv);
for (i = 0 ; i < srv->nprograms ; i++) {
- if (virNetServerProgramMatches(srv->programs[i], job->msg)) {
+ if (virNetServerProgramMatches(srv->programs[i], msg)) {
prog = srv->programs[i];
break;
}
}
- if (prog) {
- virNetServerProgramRef(prog);
- job->prog = prog;
- priority = virNetServerProgramGetPriority(prog, msg->header.proc);
- }
+ if (srv->workers) {
+ virNetServerJobPtr job;
+
+ if (VIR_ALLOC(job) < 0) {
+ virReportOOMError();
+ goto cleanup;
+ }
- ret = virThreadPoolSendJob(srv->workers, priority, job);
+ job->client = client;
+ job->msg = msg;
+
+ if (prog) {
+ virNetServerProgramRef(prog);
+ job->prog = prog;
+ priority = virNetServerProgramGetPriority(prog, msg->header.proc);
+ }
- if (ret < 0) {
- VIR_FREE(job);
- virNetServerProgramFree(prog);
+ ret = virThreadPoolSendJob(srv->workers, priority, job);
+
+ if (ret < 0) {
+ VIR_FREE(job);
+ virNetServerProgramFree(prog);
+ }
+ } else {
+ ret = virNetServerProcessMsg(srv, client, prog, msg);
}
+
+cleanup:
virNetServerUnlock(srv);
return ret;
@@ -324,7 +346,8 @@ virNetServerPtr virNetServerNew(size_t min_workers,
srv->refs = 1;
- if (!(srv->workers = virThreadPoolNew(min_workers, max_workers,
+ if (max_workers &&
+ !(srv->workers = virThreadPoolNew(min_workers, max_workers,
priority_workers,
virNetServerHandleJob,
srv)))
--
1.7.10.4