[libvirt] [PATCH 0/3] Fix stream upload in new RPC code

These patches do three small fixes to the RPC code in libvirtd to make sure stream upload works correctly again

The virNetServerClient object had a hardcoded limit of 10 requests per client. Extend constructor to allow it to be passed in as a configurable variable. Wire this up to the 'max_client_requests' config parameter in libvirtd * daemon/libvirtd.c: Pass max_client_requests into services * src/rpc/virnetserverservice.c, src/rpc/virnetserverservice.h: Pass nrequests_client_max to clients * src/rpc/virnetserverclient.c, src/rpc/virnetserverclient.h: Allow configurable request limit --- daemon/libvirtd.c | 4 ++++ src/rpc/virnetserverclient.c | 3 ++- src/rpc/virnetserverclient.h | 1 + src/rpc/virnetserverservice.c | 6 ++++++ src/rpc/virnetserverservice.h | 2 ++ 5 files changed, 15 insertions(+), 1 deletions(-) diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index d1f80e4..118aa92 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -486,6 +486,7 @@ static int daemonSetupNetworking(virNetServerPtr srv, unix_sock_gid, config->auth_unix_rw, false, + config->max_client_requests, NULL))) goto error; if (sock_path_ro && @@ -494,6 +495,7 @@ static int daemonSetupNetworking(virNetServerPtr srv, unix_sock_gid, config->auth_unix_ro, true, + config->max_client_requests, NULL))) goto error; @@ -509,6 +511,7 @@ static int daemonSetupNetworking(virNetServerPtr srv, config->tcp_port, config->auth_tcp, false, + config->max_client_requests, NULL))) goto error; @@ -543,6 +546,7 @@ static int daemonSetupNetworking(virNetServerPtr srv, config->tls_port, config->auth_tls, false, + config->max_client_requests, ctxt))) { virNetTLSContextFree(ctxt); goto error; diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 57a3446..aac4c3c 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -282,6 +282,7 @@ virNetServerClientCheckAccess(virNetServerClientPtr client) virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, int auth, bool readonly, + size_t nrequests_max, virNetTLSContextPtr tls) { virNetServerClientPtr client; @@ -301,7 +302,7 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, client->auth = auth; client->readonly = readonly; client->tlsCtxt = tls; - client->nrequests_max = 10; /* XXX */ + client->nrequests_max = nrequests_max; if (tls) virNetTLSContextRef(tls); diff --git a/src/rpc/virnetserverclient.h b/src/rpc/virnetserverclient.h index 0994890..66510c3 100644 --- a/src/rpc/virnetserverclient.h +++ b/src/rpc/virnetserverclient.h @@ -41,6 +41,7 @@ typedef int (*virNetServerClientFilterFunc)(virNetServerClientPtr client, virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock, int auth, bool readonly, + size_t nrequests_max, virNetTLSContextPtr tls); int virNetServerClientAddFilter(virNetServerClientPtr client, diff --git a/src/rpc/virnetserverservice.c b/src/rpc/virnetserverservice.c index e5a47b0..e84f72c 100644 --- a/src/rpc/virnetserverservice.c +++ b/src/rpc/virnetserverservice.c @@ -39,6 +39,7 @@ struct _virNetServerService { int auth; bool readonly; + size_t nrequests_client_max; virNetTLSContextPtr tls; @@ -65,6 +66,7 @@ static void virNetServerServiceAccept(virNetSocketPtr sock, if (!(client = virNetServerClientNew(clientsock, svc->auth, svc->readonly, + svc->nrequests_client_max, svc->tls))) goto error; @@ -88,6 +90,7 @@ virNetServerServicePtr virNetServerServiceNewTCP(const char *nodename, const char *service, int auth, bool readonly, + size_t nrequests_client_max, virNetTLSContextPtr tls) { virNetServerServicePtr svc; @@ -99,6 +102,7 @@ virNetServerServicePtr virNetServerServiceNewTCP(const char *nodename, svc->refs = 1; svc->auth = auth; svc->readonly = readonly; + svc->nrequests_client_max = nrequests_client_max; svc->tls = tls; if (tls) virNetTLSContextRef(tls); @@ -138,6 +142,7 @@ virNetServerServicePtr virNetServerServiceNewUNIX(const char *path, gid_t grp, int auth, bool readonly, + size_t nrequests_client_max, virNetTLSContextPtr tls) { virNetServerServicePtr svc; @@ -149,6 +154,7 @@ virNetServerServicePtr virNetServerServiceNewUNIX(const char *path, svc->refs = 1; svc->auth = auth; svc->readonly = readonly; + svc->nrequests_client_max = nrequests_client_max; svc->tls = tls; if (tls) virNetTLSContextRef(tls); diff --git a/src/rpc/virnetserverservice.h b/src/rpc/virnetserverservice.h index 378fa0b..9357598 100644 --- a/src/rpc/virnetserverservice.h +++ b/src/rpc/virnetserverservice.h @@ -40,12 +40,14 @@ virNetServerServicePtr virNetServerServiceNewTCP(const char *nodename, const char *service, int auth, bool readonly, + size_t nrequests_client_max, virNetTLSContextPtr tls); virNetServerServicePtr virNetServerServiceNewUNIX(const char *path, mode_t mask, gid_t grp, int auth, bool readonly, + size_t nrequests_client_max, virNetTLSContextPtr tls); int virNetServerServiceGetPort(virNetServerServicePtr svc); -- 1.7.4.4

On 06/30/2011 05:29 AM, Daniel P. Berrange wrote:
The virNetServerClient object had a hardcoded limit of 10 requests per client. Extend constructor to allow it to be passed in as a configurable variable. Wire this up to the 'max_client_requests' config parameter in libvirtd
* daemon/libvirtd.c: Pass max_client_requests into services * src/rpc/virnetserverservice.c, src/rpc/virnetserverservice.h: Pass nrequests_client_max to clients * src/rpc/virnetserverclient.c, src/rpc/virnetserverclient.h: Allow configurable request limit --- daemon/libvirtd.c | 4 ++++ src/rpc/virnetserverclient.c | 3 ++- src/rpc/virnetserverclient.h | 1 + src/rpc/virnetserverservice.c | 6 ++++++ src/rpc/virnetserverservice.h | 2 ++ 5 files changed, 15 insertions(+), 1 deletions(-)
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

To save on memory reallocation, virNetMessage instances that have been transmitted, may be reused for a subsequent incoming message. We forgot to clear out the old data of the message fully, which caused later confusion upon read. * src/rpc/virnetserverclient.c: memset entire message before reusing it --- src/rpc/virnetserverclient.c | 1 + 1 files changed, 1 insertions(+), 0 deletions(-) diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index aac4c3c..5c23cf2 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -837,6 +837,7 @@ virNetServerClientDispatchWrite(virNetServerClientPtr client) client->nrequests < client->nrequests_max) { /* Ready to recv more messages */ client->rx = msg; + memset(client->rx, 0, sizeof(*client->rx)); client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX; msg = NULL; client->nrequests++; -- 1.7.4.4

On 06/30/2011 05:29 AM, Daniel P. Berrange wrote:
To save on memory reallocation, virNetMessage instances that have been transmitted, may be reused for a subsequent incoming message. We forgot to clear out the old data of the message fully, which caused later confusion upon read.
* src/rpc/virnetserverclient.c: memset entire message before reusing it --- src/rpc/virnetserverclient.c | 1 + 1 files changed, 1 insertions(+), 0 deletions(-)
diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index aac4c3c..5c23cf2 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -837,6 +837,7 @@ virNetServerClientDispatchWrite(virNetServerClientPtr client) client->nrequests < client->nrequests_max) { /* Ready to recv more messages */ client->rx = msg; + memset(client->rx, 0, sizeof(*client->rx)); client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

The stream code was reusing a stream message object before it was removed from the linked list of filtered messages. This caused any later queued messages to be completely lost. * daemon/stream.c: Delay reuse of stream message until after it is removed from the queue --- daemon/stream.c | 20 ++++++++++++++++---- 1 files changed, 16 insertions(+), 4 deletions(-) diff --git a/daemon/stream.c b/daemon/stream.c index e4fcf90..685cee2 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -456,10 +456,6 @@ daemonStreamHandleWriteData(virNetServerClientPtr client, /* Partial write, so indicate we have more todo later */ if (msg->bufferOffset < msg->bufferLength) return 1; - - /* A dummy 'send' just to free up 'msg' object */ - memset(msg, 0, sizeof(*msg)); - return virNetServerClientSendMessage(client, msg); } else if (ret == -2) { /* Blocking, so indicate we have more todo later */ return 1; @@ -603,6 +599,22 @@ daemonStreamHandleWrite(virNetServerClientPtr client, virNetServerClientMarkClose(client); return -1; } + + /* 'CONTINUE' messages don't send a reply (unless error + * occurred), so to release the 'msg' object we need to + * send a fake zero-length reply. Nothing actually gets + * onto the wire, but this causes the client to reset + * its active request count / throttling + */ + if (msg->header.status == VIR_NET_CONTINUE) { + memset(msg, 0, sizeof(*msg)); + msg->header.type = VIR_NET_REPLY; + if (virNetServerClientSendMessage(client, msg) < 0) { + virNetMessageFree(msg); + virNetServerClientMarkClose(client); + return -1; + } + } } return 0; -- 1.7.4.4

On 06/30/2011 05:29 AM, Daniel P. Berrange wrote:
The stream code was reusing a stream message object before it was removed from the linked list of filtered messages. This caused any later queued messages to be completely lost.
* daemon/stream.c: Delay reuse of stream message until after it is removed from the queue --- daemon/stream.c | 20 ++++++++++++++++---- 1 files changed, 16 insertions(+), 4 deletions(-)
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

On Thu, Jun 30, 2011 at 12:29:48PM +0100, Daniel P. Berrange wrote:
These patches do three small fixes to the RPC code in libvirtd to make sure stream upload works correctly again
FYI, this was discovered & validated using 6 test scripts I just added to the TCK to cover the different ways you can use the stream APIs: - Incremental writes of data to upload in blocking mode - Callback to pull data to upload in blocking mode - Event driven non-blocking writes of data to upload - Incremental reads of data to download in blocking mode - Callback to consume data to download in blocking mode - Event driven non-blocking reads of data to upload These streams tests are conveniently done using the recentish virStorageVolUpload/Download APIs. http://libvirt.org/git/?p=libvirt-tck.git;a=commitdiff;h=97c6a506cc6dda7bbed... Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
participants (2)
-
Daniel P. Berrange
-
Eric Blake