On Tue, Jan 13, 2009 at 05:45:43PM +0000, Daniel P. Berrange wrote:
Historically libvirtd was single threaded, serializing all
requests across clients. An recent patch allowed multiple
threads, so multiple clients could run in parallel. A single
client was still serialized.
This patch removes that final restriction, allowing a single
client to have multiple in-flight RPC requests & replies.
Each client now has 3 variables
- rx: zero or one. If ready for, or in process of reading
a message this will be non-null. If we're throttling the
client requests, it'll be NULL. Once completely read, moved
to the 'dx' queue.
- dx: zero or many. Requests read off wire currently waiting
to be picked up for processing by a worker thread. Once a
worker is available the message is removed from the 'dx'
queue for duration of processing. A reply is put on the
'tx' queue once a call is finished
- tx: zero or many. Replies in process of, or ready to be,
sent back to a client. Also includes any asynchronous
event notifications to be sent.
The 'max_client_requests' configuration parameter controls
how many RPC request+reply calls can be processed in parallel.
Once this limit is reached, no more requests will be read off
the wire until a reply has been completed transmitted.
Each request requires upto 256 KB of memory, thus memory usage
for I/O is bounded by 'max_client_requests * max_clients * 256k'
Compatability:
- old client -> old server - everything serialized
- old client -> new server - client never sends a new
request until its first is finished, so effectively
serialized, and no compatability problems
- new client -> old server - client sends many requests
without waiting for replies. The server will only
read and proess one at a time, so effectively
serialized, and no compatability problems
- new client -> new server - fully parallelized
The code has been stress tested by running 500 concurrent
clients and fixing the crashes, deadlocks and memory leaks.
Seems reasonably robust now.
libvirtd.aug | 2
libvirtd.conf | 16 +
qemud.c | 630 ++++++++++++++++++++++++++++++++----------------------
qemud.h | 66 +++--
remote.c | 48 +++-
test_libvirtd.aug | 26 ++
6 files changed, 502 insertions(+), 286 deletions(-)
Updated patch to remove the s/X_OK/R_OK/ change that's now in
CVS
Daniel
diff --git a/qemud/libvirtd.aug b/qemud/libvirtd.aug
--- a/qemud/libvirtd.aug
+++ b/qemud/libvirtd.aug
@@ -53,6 +53,8 @@ module Libvirtd =
let processing_entry = int_entry "min_workers"
| int_entry "max_workers"
| int_entry "max_clients"
+ | int_entry "max_requests"
+ | int_entry "max_client_requests"
let logging_entry = int_entry "log_level"
| str_entry "log_filters"
diff --git a/qemud/libvirtd.conf b/qemud/libvirtd.conf
--- a/qemud/libvirtd.conf
+++ b/qemud/libvirtd.conf
@@ -247,6 +247,22 @@
#min_workers = 5
#max_workers = 20
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+#
+# XXX this isn't actually enforced yet, only the per-client
+# limit is used so far
+#max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+#max_client_requests = 5
+
#################################################################
#
# Logging controls
diff --git a/qemud/qemud.c b/qemud/qemud.c
--- a/qemud/qemud.c
+++ b/qemud/qemud.c
@@ -138,6 +138,11 @@ static int min_workers = 5;
static int max_workers = 20;
static int max_clients = 20;
+/* Total number of 'in-process' RPC calls allowed across all clients */
+static int max_requests = 20;
+/* Total number of 'in-process' RPC calls allowed by a single client*/
+static int max_client_requests = 5;
+
#define DH_BITS 1024
static sig_atomic_t sig_errors = 0;
@@ -162,9 +167,36 @@ static void sig_handler(int sig, siginfo
static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudRegisterClientEvent(struct qemud_server *server,
- struct qemud_client *client,
- int removeFirst);
+
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+ struct qemud_client_message *msg)
+{
+ struct qemud_client_message *tmp = *queue;
+
+ if (tmp) {
+ while (tmp->next)
+ tmp = tmp->next;
+ tmp->next = msg;
+ } else {
+ *queue = msg;
+ }
+}
+
+static struct qemud_client_message *
+qemudClientMessageQueuePop(struct qemud_client_message **queue)
+{
+ struct qemud_client_message *tmp = *queue;
+
+ if (tmp)
+ *queue = tmp->next;
+ else
+ *queue = NULL;
+
+ tmp->next = NULL;
+ return tmp;
+}
static int
remoteCheckCertFile(const char *type, const char *file)
@@ -1042,6 +1074,8 @@ remoteCheckCertificate (gnutls_session_t
static int
remoteCheckAccess (struct qemud_client *client)
{
+ struct qemud_client_message *confirm;
+
/* Verify client certificate. */
if (remoteCheckCertificate (client->tlssession) == -1) {
VIR_ERROR0(_("remoteCheckCertificate: "
@@ -1051,14 +1085,25 @@ remoteCheckAccess (struct qemud_client *
"is set so the bad certificate is ignored"));
}
+ if (client->tx) {
+ VIR_INFO("%s",
+ _("client had unexpected data pending tx after access
check"));
+ return -1;
+ }
+
+ if (VIR_ALLOC(confirm) < 0)
+ return -1;
+
/* Checks have succeeded. Write a '\1' byte back to the client to
* indicate this (otherwise the socket is abruptly closed).
* (NB. The '\1' byte is sent in an encrypted record).
*/
- client->bufferLength = 1;
- client->bufferOffset = 0;
- client->buffer[0] = '\1';
- client->mode = QEMUD_MODE_TX_PACKET;
+ confirm->async = 1;
+ confirm->bufferLength = 1;
+ confirm->bufferOffset = 0;
+ confirm->buffer[0] = '\1';
+
+ client->tx = confirm;
return 0;
}
@@ -1084,6 +1129,7 @@ int qemudGetSocketIdentity(int fd, uid_t
}
#endif
+
static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket *sock) {
int fd;
struct sockaddr_storage addr;
@@ -1099,7 +1145,7 @@ static int qemudDispatchServer(struct qe
}
if (server->nclients >= max_clients) {
- VIR_ERROR0(_("Too many active clients, dropping connection"));
+ VIR_ERROR(_("Too many active clients (%d), dropping connection"),
max_clients);
close(fd);
return -1;
}
@@ -1137,6 +1183,12 @@ static int qemudDispatchServer(struct qe
client->addrlen = addrlen;
client->server = server;
+ /* Prepare one for packet receive */
+ if (VIR_ALLOC(client->rx) < 0)
+ goto cleanup;
+ client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+
#if HAVE_POLKIT
/* Only do policy checks for non-root - allow root user
through with no checks, as a fail-safe - root can easily
@@ -1158,9 +1210,7 @@ static int qemudDispatchServer(struct qe
#endif
if (client->type != QEMUD_SOCK_TYPE_TLS) {
- client->mode = QEMUD_MODE_RX_HEADER;
- client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-
+ /* Plain socket, so prepare to read first message */
if (qemudRegisterClientEvent (server, client, 0) < 0)
goto cleanup;
} else {
@@ -1180,12 +1230,12 @@ static int qemudDispatchServer(struct qe
if (remoteCheckAccess (client) == -1)
goto cleanup;
+ /* Handshake & cert check OK, so prepare to read first message */
if (qemudRegisterClientEvent(server, client, 0) < 0)
goto cleanup;
} else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
- /* Most likely. */
- client->mode = QEMUD_MODE_TLS_HANDSHAKE;
- client->bufferLength = -1;
+ /* Most likely, need to do more handshake data */
+ client->handshake = 1;
if (qemudRegisterClientEvent (server, client, 0) < 0)
goto cleanup;
@@ -1204,7 +1254,8 @@ static int qemudDispatchServer(struct qe
if (client &&
client->tlssession) gnutls_deinit (client->tlssession);
close (fd);
- free (client);
+ VIR_FREE(client->rx);
+ VIR_FREE(client);
return -1;
}
@@ -1216,8 +1267,7 @@ static int qemudDispatchServer(struct qe
* We keep the libvirt connection open until any async
* jobs have finished, then clean it up elsehwere
*/
-static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED,
- struct qemud_client *client) {
+void qemudDispatchClientFailure(struct qemud_client *client) {
virEventRemoveHandleImpl(client->watch);
/* Deregister event delivery callback */
@@ -1242,7 +1292,7 @@ static struct qemud_client *qemudPending
int i;
for (i = 0 ; i < server->nclients ; i++) {
virMutexLock(&server->clients[i]->lock);
- if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) {
+ if (server->clients[i]->dx) {
/* Delibrately don't unlock client - caller wants the lock */
return server->clients[i];
}
@@ -1256,8 +1306,9 @@ static void *qemudWorker(void *data)
struct qemud_server *server = data;
while (1) {
- struct qemud_client *client;
- int len;
+ struct qemud_client *client = NULL;
+ struct qemud_client_message *reply;
+
virMutexLock(&server->lock);
while ((client = qemudPendingJob(server)) == NULL) {
if (virCondWait(&server->job, &server->lock) < 0) {
@@ -1268,55 +1319,64 @@ static void *qemudWorker(void *data)
virMutexUnlock(&server->lock);
/* We own a locked client now... */
- client->mode = QEMUD_MODE_IN_DISPATCH;
client->refs++;
- if ((len = remoteDispatchClientRequest (server, client)) == 0)
- qemudDispatchClientFailure(server, client);
+ /* Remove out message from dispatch queue while we use it */
+ reply = qemudClientMessageQueuePop(&client->dx);
- /* Set up the output buffer. */
- client->mode = QEMUD_MODE_TX_PACKET;
- client->bufferLength = len;
- client->bufferOffset = 0;
+ /* This function drops the lock during dispatch,
+ * and re-acquires it before returning */
+ if (remoteDispatchClientRequest (server, client, reply) < 0) {
+ VIR_FREE(reply);
+ qemudDispatchClientFailure(client);
+ client->refs--;
+ virMutexUnlock(&client->lock);
+ continue;
+ }
+
+ /* Put reply on end of tx queue to send out */
+ qemudClientMessageQueuePush(&client->tx, reply);
if (qemudRegisterClientEvent(server, client, 1) < 0)
- qemudDispatchClientFailure(server, client);
+ qemudDispatchClientFailure(client);
client->refs--;
virMutexUnlock(&client->lock);
- virMutexUnlock(&server->lock);
}
}
-static int qemudClientReadBuf(struct qemud_server *server,
- struct qemud_client *client,
+/*
+ * Read data into buffer using wire decoding (plain or TLS)
+ */
+static int qemudClientReadBuf(struct qemud_client *client,
char *data, unsigned len) {
int ret;
/*qemudDebug ("qemudClientRead: len = %d", len);*/
if (!client->tlssession) {
- if ((ret = read (client->fd, data, len)) <= 0) {
- if (ret == 0 || errno != EAGAIN) {
- if (ret != 0)
- VIR_ERROR(_("read: %s"), strerror (errno));
- qemudDispatchClientFailure(server, client);
- }
+ ret = read (client->fd, data, len);
+ if (ret == -1 && (errno == EAGAIN ||
+ errno == EINTR))
+ return 0;
+ if (ret <= 0) {
+ if (ret != 0)
+ VIR_ERROR(_("read: %s"), strerror (errno));
+ qemudDispatchClientFailure(client);
return -1;
}
} else {
ret = gnutls_record_recv (client->tlssession, data, len);
- if (qemudRegisterClientEvent (server, client, 1) < 0)
- qemudDispatchClientFailure (server, client);
- else if (ret <= 0) {
- if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
- ret != GNUTLS_E_INTERRUPTED)) {
- if (ret != 0)
- VIR_ERROR(_("gnutls_record_recv: %s"),
- gnutls_strerror (ret));
- qemudDispatchClientFailure (server, client);
- }
+
+ if (ret == -1 && (ret == GNUTLS_E_AGAIN &&
+ ret == GNUTLS_E_INTERRUPTED))
+ return 0;
+ if (ret <= 0) {
+ if (ret != 0)
+ VIR_ERROR(_("gnutls_record_recv: %s"),
+ gnutls_strerror (ret));
+ qemudDispatchClientFailure(client);
return -1;
}
}
@@ -1324,21 +1384,26 @@ static int qemudClientReadBuf(struct qem
return ret;
}
-static int qemudClientReadPlain(struct qemud_server *server,
- struct qemud_client *client) {
+/*
+ * Read data into buffer without decoding
+ */
+static int qemudClientReadPlain(struct qemud_client *client) {
int ret;
- ret = qemudClientReadBuf(server, client,
- client->buffer + client->bufferOffset,
- client->bufferLength - client->bufferOffset);
- if (ret < 0)
- return ret;
- client->bufferOffset += ret;
- return 0;
+ ret = qemudClientReadBuf(client,
+ client->rx->buffer + client->rx->bufferOffset,
+ client->rx->bufferLength -
client->rx->bufferOffset);
+ if (ret <= 0)
+ return ret; /* -1 error, 0 eagain */
+
+ client->rx->bufferOffset += ret;
+ return ret;
}
#if HAVE_SASL
-static int qemudClientReadSASL(struct qemud_server *server,
- struct qemud_client *client) {
+/*
+ * Read data into buffer decoding with SASL
+ */
+static int qemudClientReadSASL(struct qemud_client *client) {
int got, want;
/* We're doing a SSF data read, so now its times to ensure
@@ -1350,30 +1415,33 @@ static int qemudClientReadSASL(struct qe
/* Need to read some more data off the wire */
if (client->saslDecoded == NULL) {
+ int ret;
char encoded[8192];
int encodedLen = sizeof(encoded);
- encodedLen = qemudClientReadBuf(server, client, encoded, encodedLen);
+ encodedLen = qemudClientReadBuf(client, encoded, encodedLen);
if (encodedLen < 0)
return -1;
- sasl_decode(client->saslconn, encoded, encodedLen,
- &client->saslDecoded, &client->saslDecodedLength);
+ ret = sasl_decode(client->saslconn, encoded, encodedLen,
+ &client->saslDecoded,
&client->saslDecodedLength);
+ if (ret != SASL_OK)
+ return -1;
client->saslDecodedOffset = 0;
}
/* Some buffered decoded data to return now */
got = client->saslDecodedLength - client->saslDecodedOffset;
- want = client->bufferLength - client->bufferOffset;
+ want = client->rx->bufferLength - client->rx->bufferOffset;
if (want > got)
want = got;
- memcpy(client->buffer + client->bufferOffset,
+ memcpy(client->rx->buffer + client->rx->bufferOffset,
client->saslDecoded + client->saslDecodedOffset, want);
client->saslDecodedOffset += want;
- client->bufferOffset += want;
+ client->rx->bufferOffset += want;
if (client->saslDecodedOffset == client->saslDecodedLength) {
client->saslDecoded = NULL;
@@ -1384,132 +1452,125 @@ static int qemudClientReadSASL(struct qe
}
#endif
-static int qemudClientRead(struct qemud_server *server,
- struct qemud_client *client) {
+/*
+ * Read as much data off wire as possible till we fill our
+ * buffer, or would block on I/O
+ */
+static int qemudClientRead(struct qemud_client *client) {
#if HAVE_SASL
if (client->saslSSF & QEMUD_SASL_SSF_READ)
- return qemudClientReadSASL(server, client);
+ return qemudClientReadSASL(client);
else
#endif
- return qemudClientReadPlain(server, client);
+ return qemudClientReadPlain(client);
}
-static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_client
*client) {
- unsigned int len;
+/*
+ * Read data until we get a complete message to process
+ */
+static void qemudDispatchClientRead(struct qemud_server *server,
+ struct qemud_client *client) {
/*qemudDebug ("qemudDispatchClientRead: mode = %d", client->mode);*/
- switch (client->mode) {
- case QEMUD_MODE_RX_HEADER: {
+readmore:
+ if (qemudClientRead(client) < 0)
+ return; /* Error, or blocking */
+
+ if (client->rx->bufferOffset < client->rx->bufferLength)
+ return; /* Not read enough */
+
+ /* Either done with length word header */
+ if (client->rx->bufferLength == REMOTE_MESSAGE_HEADER_XDR_LEN) {
+ int len;
XDR x;
- if (qemudClientRead(server, client) < 0)
- return; /* Error, or blocking */
+ xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength,
XDR_DECODE);
- if (client->bufferOffset < client->bufferLength)
- return; /* Not read enough */
-
- xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE);
-
- if (!xdr_u_int(&x, &len)) {
+ if (!xdr_int(&x, &len)) {
xdr_destroy (&x);
DEBUG0("Failed to decode packet length");
- qemudDispatchClientFailure(server, client);
+ qemudDispatchClientFailure(client);
return;
}
xdr_destroy (&x);
+ /* Length includes the size of the length word itself */
+ len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
+
if (len > REMOTE_MESSAGE_MAX) {
DEBUG("Packet length %u too large", len);
- qemudDispatchClientFailure(server, client);
+ qemudDispatchClientFailure(client);
return;
}
/* Length include length of the length field itself, so
* check minimum size requirements */
- if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) {
+ if (len <= 0) {
DEBUG("Packet length %u too small", len);
- qemudDispatchClientFailure(server, client);
+ qemudDispatchClientFailure(client);
return;
}
- client->mode = QEMUD_MODE_RX_PAYLOAD;
- client->bufferLength = len - REMOTE_MESSAGE_HEADER_XDR_LEN;
- client->bufferOffset = 0;
+ /* Prepare to read rest of message */
+ client->rx->bufferLength += len;
if (qemudRegisterClientEvent(server, client, 1) < 0) {
- qemudDispatchClientFailure(server, client);
+ qemudDispatchClientFailure(client);
return;
}
- /* Fall through */
- }
+ /* Try and read payload immediately instead of going back
+ into poll() because chances are the data is already
+ waiting for us */
+ goto readmore;
+ } else {
+ /* Move completed message to the end of the dispatch queue */
+ qemudClientMessageQueuePush(&client->dx, client->rx);
+ client->rx = NULL;
+ client->nrequests++;
- case QEMUD_MODE_RX_PAYLOAD: {
- if (qemudClientRead(server, client) < 0)
- return; /* Error, or blocking */
+ /* Possibly need to create another receive buffer */
+ if ((client->nrequests < max_client_requests &&
+ VIR_ALLOC(client->rx) < 0)) {
+ qemudDispatchClientFailure(client);
+ } else {
+ if (client->rx)
+ client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
- if (client->bufferOffset < client->bufferLength)
- return; /* Not read enough */
-
- client->mode = QEMUD_MODE_WAIT_DISPATCH;
- if (qemudRegisterClientEvent(server, client, 1) < 0)
- qemudDispatchClientFailure(server, client);
-
- virCondSignal(&server->job);
-
- break;
- }
-
- case QEMUD_MODE_TLS_HANDSHAKE: {
- int ret;
-
- /* Continue the handshake. */
- ret = gnutls_handshake (client->tlssession);
- if (ret == 0) {
- /* Finished. Next step is to check the certificate. */
- if (remoteCheckAccess (client) == -1)
- qemudDispatchClientFailure (server, client);
- else if (qemudRegisterClientEvent (server, client, 1) < 0)
- qemudDispatchClientFailure (server, client);
- } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
- VIR_ERROR(_("TLS handshake failed: %s"),
- gnutls_strerror (ret));
- qemudDispatchClientFailure (server, client);
- } else {
- if (qemudRegisterClientEvent (server ,client, 1) < 0)
- qemudDispatchClientFailure (server, client);
+ if (qemudRegisterClientEvent(server, client, 1) < 0)
+ qemudDispatchClientFailure(client);
+ else
+ /* Tell one of the workers to get on with it... */
+ virCondSignal(&server->job);
}
-
- break;
- }
-
- default:
- DEBUG("Got unexpected data read while in %d mode", client->mode);
- qemudDispatchClientFailure(server, client);
}
}
-static int qemudClientWriteBuf(struct qemud_server *server,
- struct qemud_client *client,
+/*
+ * Send a chunk of data using wire encoding (plain or TLS)
+ */
+static int qemudClientWriteBuf(struct qemud_client *client,
const char *data, int len) {
int ret;
if (!client->tlssession) {
- if ((ret = safewrite(client->fd, data, len)) == -1) {
+ if ((ret = write(client->fd, data, len)) == -1) {
+ if (errno == EAGAIN || errno == EINTR)
+ return 0;
VIR_ERROR(_("write: %s"), strerror (errno));
- qemudDispatchClientFailure(server, client);
+ qemudDispatchClientFailure(client);
return -1;
}
} else {
ret = gnutls_record_send (client->tlssession, data, len);
- if (qemudRegisterClientEvent (server, client, 1) < 0)
- qemudDispatchClientFailure (server, client);
- else if (ret < 0) {
- if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
- VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
- qemudDispatchClientFailure (server, client);
- }
+ if (ret < 0) {
+ if (ret == GNUTLS_E_INTERRUPTED ||
+ ret == GNUTLS_E_AGAIN)
+ return 0;
+
+ VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
+ qemudDispatchClientFailure(client);
return -1;
}
}
@@ -1517,42 +1578,49 @@ static int qemudClientWriteBuf(struct qe
}
-static int qemudClientWritePlain(struct qemud_server *server,
- struct qemud_client *client) {
- int ret = qemudClientWriteBuf(server, client,
- client->buffer + client->bufferOffset,
- client->bufferLength - client->bufferOffset);
- if (ret < 0)
- return -1;
- client->bufferOffset += ret;
- return 0;
+/*
+ * Send client->tx using no encoding
+ */
+static int qemudClientWritePlain(struct qemud_client *client) {
+ int ret = qemudClientWriteBuf(client,
+ client->tx->buffer +
client->tx->bufferOffset,
+ client->tx->bufferLength -
client->tx->bufferOffset);
+ if (ret <= 0)
+ return ret; /* -1 error, 0 = egain */
+ client->tx->bufferOffset += ret;
+ return ret;
}
#if HAVE_SASL
-static int qemudClientWriteSASL(struct qemud_server *server,
- struct qemud_client *client) {
+/*
+ * Send client->tx using SASL encoding
+ */
+static int qemudClientWriteSASL(struct qemud_client *client) {
int ret;
/* Not got any pending encoded data, so we need to encode raw stuff */
if (client->saslEncoded == NULL) {
int err;
err = sasl_encode(client->saslconn,
- client->buffer + client->bufferOffset,
- client->bufferLength - client->bufferOffset,
+ client->tx->buffer + client->tx->bufferOffset,
+ client->tx->bufferLength -
client->tx->bufferOffset,
&client->saslEncoded,
&client->saslEncodedLength);
+ if (err != SASL_OK)
+ return -1;
+
client->saslEncodedOffset = 0;
}
/* Send some of the encoded stuff out on the wire */
- ret = qemudClientWriteBuf(server, client,
+ ret = qemudClientWriteBuf(client,
client->saslEncoded + client->saslEncodedOffset,
client->saslEncodedLength -
client->saslEncodedOffset);
- if (ret < 0)
- return -1;
+ if (ret <= 0)
+ return ret; /* -1 error, 0 == egain */
/* Note how much we sent */
client->saslEncodedOffset += ret;
@@ -1561,77 +1629,101 @@ static int qemudClientWriteSASL(struct q
if (client->saslEncodedOffset == client->saslEncodedLength) {
client->saslEncoded = NULL;
client->saslEncodedOffset = client->saslEncodedLength = 0;
- client->bufferOffset = client->bufferLength;
+
+ /* Mark as complete, so caller detects completion */
+ client->tx->bufferOffset = client->tx->bufferLength;
}
- return 0;
+ return ret;
}
#endif
-static int qemudClientWrite(struct qemud_server *server,
- struct qemud_client *client) {
+/*
+ * Send as much data in the client->tx as possible
+ */
+static int qemudClientWrite(struct qemud_client *client) {
#if HAVE_SASL
if (client->saslSSF & QEMUD_SASL_SSF_WRITE)
- return qemudClientWriteSASL(server, client);
+ return qemudClientWriteSASL(client);
else
#endif
- return qemudClientWritePlain(server, client);
+ return qemudClientWritePlain(client);
}
-void
+/*
+ * Process all queued client->tx messages until
+ * we would block on I/O
+ */
+static void
qemudDispatchClientWrite(struct qemud_server *server,
struct qemud_client *client) {
- switch (client->mode) {
- case QEMUD_MODE_TX_PACKET: {
- if (qemudClientWrite(server, client) < 0)
- return;
-
- if (client->bufferOffset == client->bufferLength) {
- if (client->closing) {
- qemudDispatchClientFailure (server, client);
- } else {
- /* Done writing, switch back to receive */
- client->mode = QEMUD_MODE_RX_HEADER;
- client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
- client->bufferOffset = 0;
-
- if (qemudRegisterClientEvent (server, client, 1) < 0)
- qemudDispatchClientFailure (server, client);
- }
- }
- /* Still writing */
- break;
- }
-
- case QEMUD_MODE_TLS_HANDSHAKE: {
+ while (client->tx) {
int ret;
- /* Continue the handshake. */
- ret = gnutls_handshake (client->tlssession);
- if (ret == 0) {
- /* Finished. Next step is to check the certificate. */
- if (remoteCheckAccess (client) == -1)
- qemudDispatchClientFailure (server, client);
- else if (qemudRegisterClientEvent (server, client, 1))
- qemudDispatchClientFailure (server, client);
- } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
- VIR_ERROR(_("TLS handshake failed: %s"), gnutls_strerror (ret));
- qemudDispatchClientFailure (server, client);
- } else {
- if (qemudRegisterClientEvent (server, client, 1))
- qemudDispatchClientFailure (server, client);
+ ret = qemudClientWrite(client);
+ if (ret < 0) {
+ qemudDispatchClientFailure(client);
+ return;
}
+ if (ret == 0)
+ return; /* Would block on write EAGAIN */
- break;
- }
+ if (client->tx->bufferOffset == client->tx->bufferLength) {
+ struct qemud_client_message *reply;
- default:
- DEBUG("Got unexpected data write while in %d mode", client->mode);
- qemudDispatchClientFailure(server, client);
+ /* Get finished reply from head of tx queue */
+ reply = qemudClientMessageQueuePop(&client->tx);
+
+ /* If its not an async message, then we have
+ * just completed an RPC request */
+ if (!reply->async)
+ client->nrequests--;
+
+ /* Move record to end of 'rx' ist */
+ if (!client->rx &&
+ client->nrequests < max_client_requests) {
+ /* Reset message record for next RX attempt */
+ client->rx = reply;
+ client->rx->bufferOffset = 0;
+ client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+ } else {
+ VIR_FREE(reply);
+ }
+
+ if (client->closing ||
+ qemudRegisterClientEvent (server, client, 1) < 0)
+ qemudDispatchClientFailure(client);
+ }
}
}
+static void
+qemudDispatchClientHandshake(struct qemud_server *server,
+ struct qemud_client *client) {
+ int ret;
+ /* Continue the handshake. */
+ ret = gnutls_handshake (client->tlssession);
+ if (ret == 0) {
+ /* Finished. Next step is to check the certificate. */
+ if (remoteCheckAccess (client) == -1)
+ qemudDispatchClientFailure(client);
+ else if (qemudRegisterClientEvent (server, client, 1))
+ qemudDispatchClientFailure(client);
+ } else if (ret == GNUTLS_E_AGAIN ||
+ ret == GNUTLS_E_INTERRUPTED) {
+ /* Carry on waiting for more handshake. Update
+ the events just in case handshake data flow
+ direction has changed */
+ if (qemudRegisterClientEvent (server, client, 1))
+ qemudDispatchClientFailure(client);
+ } else {
+ /* Fatal error in handshake */
+ VIR_ERROR(_("TLS handshake failed: %s"),
+ gnutls_strerror (ret));
+ qemudDispatchClientFailure(client);
+ }
+}
static void
qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
@@ -1642,59 +1734,66 @@ qemudDispatchClientEvent(int watch, int
virMutexLock(&server->lock);
for (i = 0 ; i < server->nclients ; i++) {
+ virMutexLock(&server->clients[i]->lock);
if (server->clients[i]->watch == watch) {
client = server->clients[i];
break;
}
+ virMutexUnlock(&server->clients[i]->lock);
}
+ virMutexUnlock(&server->lock);
+
if (!client) {
- virMutexUnlock(&server->lock);
return;
}
- virMutexLock(&client->lock);
- virMutexUnlock(&server->lock);
+ if (client->fd != fd) {
+ virMutexUnlock(&client->lock);
+ return;
+ }
- if (client->fd != fd)
- return;
+ if (events & (VIR_EVENT_HANDLE_WRITABLE |
+ VIR_EVENT_HANDLE_READABLE)) {
+ if (client->handshake) {
+ qemudDispatchClientHandshake(server, client);
+ } else {
+ if (events & VIR_EVENT_HANDLE_WRITABLE)
+ qemudDispatchClientWrite(server, client);
+ if (events == VIR_EVENT_HANDLE_READABLE)
+ qemudDispatchClientRead(server, client);
+ }
+ }
- if (events == VIR_EVENT_HANDLE_WRITABLE)
- qemudDispatchClientWrite(server, client);
- else if (events == VIR_EVENT_HANDLE_READABLE)
- qemudDispatchClientRead(server, client);
- else
- qemudDispatchClientFailure(server, client);
+ /* NB, will get HANGUP + READABLE at same time upon
+ * disconnect */
+ if (events & (VIR_EVENT_HANDLE_ERROR |
+ VIR_EVENT_HANDLE_HANGUP))
+ qemudDispatchClientFailure(client);
+
virMutexUnlock(&client->lock);
}
-static int qemudRegisterClientEvent(struct qemud_server *server,
- struct qemud_client *client,
- int update) {
- int mode;
- switch (client->mode) {
- case QEMUD_MODE_TLS_HANDSHAKE:
+int qemudRegisterClientEvent(struct qemud_server *server,
+ struct qemud_client *client,
+ int update) {
+ int mode = 0;
+
+ if (client->handshake) {
if (gnutls_record_get_direction (client->tlssession) == 0)
- mode = VIR_EVENT_HANDLE_READABLE;
+ mode |= VIR_EVENT_HANDLE_READABLE;
else
- mode = VIR_EVENT_HANDLE_WRITABLE;
- break;
+ mode |= VIR_EVENT_HANDLE_WRITABLE;
+ } else {
+ /* If there is a message on the rx queue then
+ * we're wanting more input */
+ if (client->rx)
+ mode |= VIR_EVENT_HANDLE_READABLE;
- case QEMUD_MODE_RX_HEADER:
- case QEMUD_MODE_RX_PAYLOAD:
- mode = VIR_EVENT_HANDLE_READABLE;
- break;
-
- case QEMUD_MODE_TX_PACKET:
- mode = VIR_EVENT_HANDLE_WRITABLE;
- break;
-
- case QEMUD_MODE_WAIT_DISPATCH:
- mode = 0;
- break;
-
- default:
- return -1;
+ /* If there are one or more messages to send back to client,
+ then monitor for writability on socket */
+ if (client->tx)
+ mode |= VIR_EVENT_HANDLE_WRITABLE;
}
if (update) {
@@ -1760,6 +1859,29 @@ static void qemudInactiveTimer(int timer
}
}
+static void qemudFreeClient(struct qemud_client *client) {
+ while (client->rx) {
+ struct qemud_client_message *msg
+ = qemudClientMessageQueuePop(&client->rx);
+ VIR_FREE(msg);
+ }
+ while (client->dx) {
+ struct qemud_client_message *msg
+ = qemudClientMessageQueuePop(&client->dx);
+ VIR_FREE(msg);
+ }
+ while (client->tx) {
+ struct qemud_client_message *msg
+ = qemudClientMessageQueuePop(&client->tx);
+ VIR_FREE(msg);
+ }
+
+ if (client->conn)
+ virConnectClose(client->conn);
+ virMutexDestroy(&client->lock);
+ VIR_FREE(client);
+}
+
static int qemudRunLoop(struct qemud_server *server) {
int timerid = -1;
int ret = -1, i;
@@ -1796,8 +1918,11 @@ static int qemudRunLoop(struct qemud_ser
}
virMutexUnlock(&server->lock);
- if (qemudOneLoop() < 0)
+ if (qemudOneLoop() < 0) {
+ virMutexLock(&server->lock);
+ DEBUG0("Loop iteration error, exiting\n");
break;
+ }
virMutexLock(&server->lock);
reprocess:
@@ -1808,17 +1933,18 @@ static int qemudRunLoop(struct qemud_ser
&& server->clients[i]->refs == 0;
virMutexUnlock(&server->clients[i]->lock);
if (inactive) {
- if (server->clients[i]->conn)
- virConnectClose(server->clients[i]->conn);
- virMutexDestroy(&server->clients[i]->lock);
- VIR_FREE(server->clients[i]);
+ qemudFreeClient(server->clients[i]);
server->nclients--;
- if (i < server->nclients) {
+ if (i < server->nclients)
memmove(server->clients + i,
server->clients + i + 1,
- server->nclients - i);
- goto reprocess;
+ sizeof (*server->clients) * (server->nclients - i));
+
+ if (VIR_REALLOC_N(server->clients,
+ server->nclients) < 0) {
+ ; /* ignore */
}
+ goto reprocess;
}
}
@@ -1843,6 +1969,7 @@ static int qemudRunLoop(struct qemud_ser
pthread_join(thread, NULL);
virMutexLock(&server->lock);
}
+ VIR_FREE(server->workers);
free(server->workers);
virMutexUnlock(&server->lock);
@@ -2223,6 +2350,9 @@ remoteReadConfigFile (struct qemud_serve
GET_CONF_INT (conf, filename, max_workers);
GET_CONF_INT (conf, filename, max_clients);
+ GET_CONF_INT (conf, filename, max_requests);
+ GET_CONF_INT (conf, filename, max_client_requests);
+
virConfFree (conf);
return 0;
diff --git a/qemud/qemud.h b/qemud/qemud.h
--- a/qemud/qemud.h
+++ b/qemud/qemud.h
@@ -65,15 +65,6 @@
#define qemudDebug DEBUG
-enum qemud_mode {
- QEMUD_MODE_RX_HEADER, /* Receiving the fixed length RPC header data */
- QEMUD_MODE_RX_PAYLOAD, /* Receiving the variable length RPC payload data */
- QEMUD_MODE_WAIT_DISPATCH, /* Message received, waiting for worker to process */
- QEMUD_MODE_IN_DISPATCH, /* RPC call being processed */
- QEMUD_MODE_TX_PACKET, /* Transmitting reply to RPC call */
- QEMUD_MODE_TLS_HANDSHAKE, /* Performing TLS handshake */
-};
-
/* Whether we're passing reads & writes through a sasl SSF */
enum qemud_sasl_ssf {
QEMUD_SASL_SSF_NONE = 0,
@@ -87,6 +78,18 @@ enum qemud_sock_type {
QEMUD_SOCK_TYPE_TLS = 2,
};
+struct qemud_client_message;
+
+struct qemud_client_message {
+ char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN];
+ unsigned int bufferLength;
+ unsigned int bufferOffset;
+
+ int async : 1;
+
+ struct qemud_client_message *next;
+};
+
/* Stores the per-client connection state */
struct qemud_client {
virMutex lock;
@@ -97,7 +100,6 @@ struct qemud_client {
int watch;
int readonly:1;
int closing:1;
- enum qemud_mode mode;
struct sockaddr_storage addr;
socklen_t addrlen;
@@ -105,6 +107,7 @@ struct qemud_client {
int type; /* qemud_sock_type */
gnutls_session_t tlssession;
int auth;
+ int handshake : 1; /* If we're in progress for TLS handshake */
#if HAVE_SASL
sasl_conn_t *saslconn;
int saslSSF;
@@ -117,12 +120,20 @@ struct qemud_client {
char *saslUsername;
#endif
- unsigned int incomingSerial;
- unsigned int outgoingSerial;
-
- char buffer [REMOTE_MESSAGE_MAX];
- unsigned int bufferLength;
- unsigned int bufferOffset;
+ /* Count of meages in 'dx' or 'tx' queue
+ * ie RPC calls in progress. Does not count
+ * async events which are not used for
+ * throttling calculations */
+ int nrequests;
+ /* Zero or one messages being received. Zero if
+ * nrequests >= max_clients and throttling */
+ struct qemud_client_message *rx;
+ /* Zero or many messages waiting for a worker
+ * to process them */
+ struct qemud_client_message *dx;
+ /* Zero or many messages waiting for transmit
+ * back to client, including async events */
+ struct qemud_client_message *tx;
/* This is only valid if a remote open call has been made on this
* connection, otherwise it will be NULL. Also if remote close is
@@ -181,16 +192,20 @@ void qemudLog(int priority, const char *
int qemudSetCloseExec(int fd);
int qemudSetNonBlock(int fd);
-unsigned int
+int
remoteDispatchClientRequest (struct qemud_server *server,
- struct qemud_client *client);
+ struct qemud_client *client,
+ struct qemud_client_message *req);
-void qemudDispatchClientWrite(struct qemud_server *server,
- struct qemud_client *client);
+int qemudRegisterClientEvent(struct qemud_server *server,
+ struct qemud_client *client,
+ int update);
-#if HAVE_POLKIT
-int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
-#endif
+void qemudDispatchClientFailure(struct qemud_client *client);
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+ struct qemud_client_message *msg);
int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
virDomainPtr dom,
@@ -198,4 +213,9 @@ int remoteRelayDomainEvent (virConnectPt
int detail,
void *opaque);
+
+#if HAVE_POLKIT
+int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
#endif
+
+#endif
diff --git a/qemud/remote.c b/qemud/remote.c
--- a/qemud/remote.c
+++ b/qemud/remote.c
@@ -111,6 +111,7 @@ static const dispatch_data const dispatc
/* Prototypes */
static void
remoteDispatchDomainEventSend (struct qemud_client *client,
+ struct qemud_client_message *msg,
virDomainPtr dom,
int event,
int detail);
@@ -219,9 +220,10 @@ remoteDispatchConnError (remote_error *r
* Server object is unlocked
* Client object is locked
*/
-unsigned int
+int
remoteDispatchClientRequest (struct qemud_server *server,
- struct qemud_client *client)
+ struct qemud_client *client,
+ struct qemud_client_message *msg)
{
XDR xdr;
remote_message_header req, rep;
@@ -237,7 +239,10 @@ remoteDispatchClientRequest (struct qemu
memset(&rerr, 0, sizeof rerr);
/* Parse the header. */
- xdrmem_create (&xdr, client->buffer, client->bufferLength, XDR_DECODE);
+ xdrmem_create (&xdr,
+ msg->buffer + REMOTE_MESSAGE_HEADER_XDR_LEN,
+ msg->bufferLength - REMOTE_MESSAGE_HEADER_XDR_LEN,
+ XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &req))
goto fatal_error;
@@ -333,7 +338,7 @@ rpc_error:
rep.status = rv < 0 ? REMOTE_ERROR : REMOTE_OK;
/* Serialise the return header. */
- xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+ xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
len = 0; /* We'll come back and write this later. */
if (!xdr_int (&xdr, &len)) {
@@ -368,13 +373,17 @@ rpc_error:
goto fatal_error;
xdr_destroy (&xdr);
- return len;
+
+ msg->bufferLength = len;
+ msg->bufferOffset = 0;
+
+ return 0;
fatal_error:
/* Seriously bad stuff happened, so we'll kill off this client
and not send back any RPC error */
xdr_destroy (&xdr);
- return 0;
+ return -1;
}
int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
@@ -386,9 +395,20 @@ int remoteRelayDomainEvent (virConnectPt
struct qemud_client *client = opaque;
REMOTE_DEBUG("Relaying domain event %d %d", event, detail);
- if(client) {
- remoteDispatchDomainEventSend (client, dom, event, detail);
- qemudDispatchClientWrite(client->server,client);
+ if (client) {
+ struct qemud_client_message *ev;
+
+ if (VIR_ALLOC(ev) < 0)
+ return -1;
+
+ virMutexLock(&client->lock);
+
+ remoteDispatchDomainEventSend (client, ev, dom, event, detail);
+
+ if (qemudRegisterClientEvent(client->server, client, 1) < 0)
+ qemudDispatchClientFailure(client);
+
+ virMutexUnlock(&client->lock);
}
return 0;
}
@@ -4202,6 +4222,7 @@ remoteDispatchDomainEventsDeregister (st
static void
remoteDispatchDomainEventSend (struct qemud_client *client,
+ struct qemud_client_message *msg,
virDomainPtr dom,
int event,
int detail)
@@ -4222,7 +4243,7 @@ remoteDispatchDomainEventSend (struct qe
rep.status = REMOTE_OK;
/* Serialise the return header and event. */
- xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+ xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
len = 0; /* We'll come back and write this later. */
if (!xdr_int (&xdr, &len)) {
@@ -4263,9 +4284,10 @@ remoteDispatchDomainEventSend (struct qe
xdr_destroy (&xdr);
/* Send it. */
- client->mode = QEMUD_MODE_TX_PACKET;
- client->bufferLength = len;
- client->bufferOffset = 0;
+ msg->async = 1;
+ msg->bufferLength = len;
+ msg->bufferOffset = 0;
+ qemudClientMessageQueuePush(&client->tx, msg);
}
/*----- Helpers. -----*/
diff --git a/qemud/test_libvirtd.aug b/qemud/test_libvirtd.aug
--- a/qemud/test_libvirtd.aug
+++ b/qemud/test_libvirtd.aug
@@ -246,6 +246,19 @@ max_clients = 20
# of clients allowed
min_workers = 5
max_workers = 20
+
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+max_client_requests = 5
"
test Libvirtd.lns get conf =
@@ -499,3 +512,16 @@ max_workers = 20
{ "#comment" = "of clients allowed"}
{ "min_workers" = "5" }
{ "max_workers" = "20" }
+ { "#empty" }
+ { "#comment" = "Total global limit on concurrent RPC calls. Should
be" }
+ { "#comment" = "at least as large as max_workers. Beyond this, RPC
requests" }
+ { "#comment" = "will be read into memory and queued. This directly
impact" }
+ { "#comment" = "memory usage, currently each request requires 256
KB of" }
+ { "#comment" = "memory. So by default upto 5 MB of memory is
used" }
+ { "max_requests" = "20" }
+ { "#empty" }
+ { "#comment" = "Limit on concurrent requests from a single
client" }
+ { "#comment" = "connection. To avoid one client monopolizing the
server" }
+ { "#comment" = "this should be a small fraction of the global
max_requests" }
+ { "#comment" = "and max_workers parameter" }
+ { "max_client_requests" = "5" }
--
|: Red Hat, Engineering, London -o-
http://people.redhat.com/berrange/ :|
|:
http://libvirt.org -o-
http://virt-manager.org -o-
http://ovirt.org :|
|:
http://autobuild.org -o-
http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|