[libvirt] [PATCH 0/9] Misc bug fixes to new RPC code

This is a series of bug fixes to the new RPC code I have done since I posted the original series, which is now merged. Most of these fixes relate to handling of I/O streams. Patches 1 and 9 also impacted the original code prior to the RPC rewrite, and should be backported by people maintaining old branches

If a callback being invoked from a stream issues a virStreamAbort operation, the stream data will be free'd but the callback will then stil try to use this. Delay free'ing of the stream data when a callback is dispatching * src/fdstream.c: Delay stream free when callback is active --- src/fdstream.c | 33 +++++++++++++++++++++++---------- 1 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/fdstream.c b/src/fdstream.c index c1ad787..182b6fa 100644 --- a/src/fdstream.c +++ b/src/fdstream.c @@ -56,8 +56,9 @@ struct virFDStreamData { unsigned long long length; int watch; - unsigned int cbRemoved; - unsigned int dispatching; + bool cbRemoved; + bool dispatching; + bool closed; virStreamEventCallback cb; void *opaque; virFreeCallback ff; @@ -85,7 +86,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream) virEventRemoveHandle(fdst->watch); if (fdst->dispatching) - fdst->cbRemoved = 1; + fdst->cbRemoved = true; else if (fdst->ff) (fdst->ff)(fdst->opaque); @@ -138,6 +139,7 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, virStreamEventCallback cb; void *cbopaque; virFreeCallback ff; + bool closed; if (!fdst) return; @@ -151,16 +153,22 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED, cb = fdst->cb; cbopaque = fdst->opaque; ff = fdst->ff; - fdst->dispatching = 1; + fdst->dispatching = true; virMutexUnlock(&fdst->lock); cb(stream, events, cbopaque); virMutexLock(&fdst->lock); - fdst->dispatching = 0; + fdst->dispatching = false; if (fdst->cbRemoved && ff) (ff)(cbopaque); + closed = fdst->closed; virMutexUnlock(&fdst->lock); + + if (closed) { + virMutexDestroy(&fdst->lock); + VIR_FREE(fdst); + } } static int @@ -196,7 +204,7 @@ virFDStreamAddCallback(virStreamPtr st, goto cleanup; } - fdst->cbRemoved = 0; + fdst->cbRemoved = false; fdst->cb = cb; fdst->opaque = opaque; fdst->ff = ff; @@ -252,13 +260,18 @@ virFDStreamClose(virStreamPtr st) ret = -1; } virCommandFree(fdst->cmd); + fdst->cmd = NULL; } - st->privateData = NULL; - virMutexUnlock(&fdst->lock); - virMutexDestroy(&fdst->lock); - VIR_FREE(fdst); + if (fdst->dispatching) { + fdst->closed = true; + virMutexUnlock(&fdst->lock); + } else { + virMutexUnlock(&fdst->lock); + virMutexDestroy(&fdst->lock); + VIR_FREE(fdst); + } return ret; } -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
If a callback being invoked from a stream issues a virStreamAbort operation, the stream data will be free'd but the callback will then stil try to use this. Delay free'ing of the stream data when
s/stil/still/
a callback is dispatching
* src/fdstream.c: Delay stream free when callback is active --- src/fdstream.c | 33 +++++++++++++++++++++++---------- 1 files changed, 23 insertions(+), 10 deletions(-)
cb(stream, events, cbopaque);
virMutexLock(&fdst->lock); - fdst->dispatching = 0; + fdst->dispatching = false; if (fdst->cbRemoved && ff) (ff)(cbopaque);
cb is called without locks, while ff is still under the lock. No change from previous status quo, but I wonder if ff should ever be moved outside the lock. ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

On Tue, Jun 28, 2011 at 11:39:41AM -0600, Eric Blake wrote:
On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
If a callback being invoked from a stream issues a virStreamAbort operation, the stream data will be free'd but the callback will then stil try to use this. Delay free'ing of the stream data when
s/stil/still/
a callback is dispatching
* src/fdstream.c: Delay stream free when callback is active --- src/fdstream.c | 33 +++++++++++++++++++++++---------- 1 files changed, 23 insertions(+), 10 deletions(-)
cb(stream, events, cbopaque);
virMutexLock(&fdst->lock); - fdst->dispatching = 0; + fdst->dispatching = false; if (fdst->cbRemoved && ff) (ff)(cbopaque);
cb is called without locks, while ff is still under the lock. No change from previous status quo, but I wonder if ff should ever be moved outside the lock.
For the way the free callback is used, I dont think it matters Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

If a streams error is raised, virNetClientIOEventLoop returns 0, but an error is set. Check for this and propagate it if present * src/rpc/virnetclient.c: Propagate streams error --- src/rpc/virnetclient.c | 5 +++++ 1 files changed, 5 insertions(+), 0 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index ded1e12..dc0ce51 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1090,10 +1090,15 @@ static int virNetClientIO(virNetClientPtr client, */ virNetSocketUpdateIOCallback(client->sock, 0); + virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + if (rv == 0 && + virGetLastError()) + rv = -1; + cleanup: VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); return rv; -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
If a streams error is raised, virNetClientIOEventLoop returns 0, but an error is set. Check for this and propagate it if present
* src/rpc/virnetclient.c: Propagate streams error --- src/rpc/virnetclient.c | 5 +++++ 1 files changed, 5 insertions(+), 0 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index ded1e12..dc0ce51 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1090,10 +1090,15 @@ static int virNetClientIO(virNetClientPtr client, */ virNetSocketUpdateIOCallback(client->sock, 0);
+ virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall);
virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE);
+ if (rv == 0 && + virGetLastError()) + rv = -1; + cleanup: VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); return rv;
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

The RPC client treats failure to register a socket watch as non-fatal, since we do not mandate that a libvirt client application provide an event loop implementation. It is thus inappropriate to a log a message at VIR_LOG_WARN * src/rpc/virnetsocket.c: Lower logging level --- src/rpc/virnetsocket.c | 2 +- 1 files changed, 1 insertions(+), 1 deletions(-) diff --git a/src/rpc/virnetsocket.c b/src/rpc/virnetsocket.c index 68d7de7..96d2dfd 100644 --- a/src/rpc/virnetsocket.c +++ b/src/rpc/virnetsocket.c @@ -1066,7 +1066,7 @@ int virNetSocketAddIOCallback(virNetSocketPtr sock, virNetSocketEventHandle, sock, NULL)) < 0) { - VIR_WARN("Failed to register watch on socket %p", sock); + VIR_DEBUG("Failed to register watch on socket %p", sock); return -1; } sock->func = func; -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
The RPC client treats failure to register a socket watch as non-fatal, since we do not mandate that a libvirt client application provide an event loop implementation. It is thus inappropriate to a log a message at VIR_LOG_WARN
* src/rpc/virnetsocket.c: Lower logging level --- src/rpc/virnetsocket.c | 2 +- 1 files changed, 1 insertions(+), 1 deletions(-)
diff --git a/src/rpc/virnetsocket.c b/src/rpc/virnetsocket.c index 68d7de7..96d2dfd 100644 --- a/src/rpc/virnetsocket.c +++ b/src/rpc/virnetsocket.c @@ -1066,7 +1066,7 @@ int virNetSocketAddIOCallback(virNetSocketPtr sock, virNetSocketEventHandle, sock, NULL)) < 0) { - VIR_WARN("Failed to register watch on socket %p", sock); + VIR_DEBUG("Failed to register watch on socket %p", sock);
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

On stream completion it is neccessary to send back a message with an empty payload. The message header was not being filled out correctly, since we were not writing any payload. Add a method for encoding an empty payload which updates the message headers correctly. * src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add a virNetMessageEncodePayloadEmpty method * src/rpc/virnetserverprogram.c: Write empty payload on stream completion --- src/rpc/virnetmessage.c | 25 +++++++++++++++++++++++++ src/rpc/virnetmessage.h | 2 ++ src/rpc/virnetserverprogram.c | 5 ++++- 3 files changed, 31 insertions(+), 1 deletions(-) diff --git a/src/rpc/virnetmessage.c b/src/rpc/virnetmessage.c index 62cdbc3..a7a25b1 100644 --- a/src/rpc/virnetmessage.c +++ b/src/rpc/virnetmessage.c @@ -334,6 +334,31 @@ error: } +int virNetMessageEncodePayloadEmpty(virNetMessagePtr msg) +{ + XDR xdr; + unsigned int msglen; + + /* Re-encode the length word. */ + VIR_DEBUG("Encode length as %zu", msg->bufferOffset); + xdrmem_create(&xdr, msg->buffer, VIR_NET_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE); + msglen = msg->bufferOffset; + if (!xdr_u_int(&xdr, &msglen)) { + virNetError(VIR_ERR_RPC, "%s", _("Unable to encode message length")); + goto error; + } + xdr_destroy(&xdr); + + msg->bufferLength = msg->bufferOffset; + msg->bufferOffset = 0; + return 0; + +error: + xdr_destroy(&xdr); + return -1; +} + + void virNetMessageSaveError(virNetMessageErrorPtr rerr) { /* This func may be called several times & the first diff --git a/src/rpc/virnetmessage.h b/src/rpc/virnetmessage.h index 9215112..2aae3f6 100644 --- a/src/rpc/virnetmessage.h +++ b/src/rpc/virnetmessage.h @@ -78,6 +78,8 @@ int virNetMessageEncodePayloadRaw(virNetMessagePtr msg, const char *buf, size_t len) ATTRIBUTE_NONNULL(1) ATTRIBUTE_RETURN_CHECK; +int virNetMessageEncodePayloadEmpty(virNetMessagePtr msg) + ATTRIBUTE_NONNULL(1) ATTRIBUTE_RETURN_CHECK; void virNetMessageSaveError(virNetMessageErrorPtr rerr) ATTRIBUTE_NONNULL(1); diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c index 0d1577a..4afed64 100644 --- a/src/rpc/virnetserverprogram.c +++ b/src/rpc/virnetserverprogram.c @@ -433,8 +433,11 @@ int virNetServerProgramSendStreamData(virNetServerProgramPtr prog, if (virNetMessageEncodePayloadRaw(msg, data, len) < 0) return -1; - VIR_DEBUG("Total %zu", msg->bufferOffset); + } else { + if (virNetMessageEncodePayloadEmpty(msg) < 0) + return -1; } + VIR_DEBUG("Total %zu", msg->bufferOffset); return virNetServerClientSendMessage(client, msg); } -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
On stream completion it is neccessary to send back a
s/neccessary/necessary/ (must be one of your finger-memory typos ;)
message with an empty payload. The message header was not being filled out correctly, since we were not writing any payload. Add a method for encoding an empty payload which updates the message headers correctly.
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add a virNetMessageEncodePayloadEmpty method * src/rpc/virnetserverprogram.c: Write empty payload on stream completion --- src/rpc/virnetmessage.c | 25 +++++++++++++++++++++++++ src/rpc/virnetmessage.h | 2 ++ src/rpc/virnetserverprogram.c | 5 ++++- 3 files changed, 31 insertions(+), 1 deletions(-)
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

Improve log messages issued when encountering a bogus message length to include the actual length and the limit violated * src/rpc/virnetmessage.c: Improve log messages --- src/rpc/virnetmessage.c | 10 ++++++---- 1 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/rpc/virnetmessage.c b/src/rpc/virnetmessage.c index a7a25b1..0725491 100644 --- a/src/rpc/virnetmessage.c +++ b/src/rpc/virnetmessage.c @@ -101,8 +101,9 @@ int virNetMessageDecodeLength(virNetMessagePtr msg) msg->bufferOffset = xdr_getpos(&xdr); if (len < VIR_NET_MESSAGE_LEN_MAX) { - virNetError(VIR_ERR_RPC, "%s", - _("packet received from server too small")); + virNetError(VIR_ERR_RPC, + _("packet %d bytes received from server too small, want %d"), + len, VIR_NET_MESSAGE_LEN_MAX); goto cleanup; } @@ -110,8 +111,9 @@ int virNetMessageDecodeLength(virNetMessagePtr msg) len -= VIR_NET_MESSAGE_LEN_MAX; if (len > VIR_NET_MESSAGE_MAX) { - virNetError(VIR_ERR_RPC, "%s", - _("packet received from server too large")); + virNetError(VIR_ERR_RPC, + _("packet %d bytes received from server too large, want %d"), + len, VIR_NET_MESSAGE_MAX); goto cleanup; } -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
Improve log messages issued when encountering a bogus message length to include the actual length and the limit violated
* src/rpc/virnetmessage.c: Improve log messages --- src/rpc/virnetmessage.c | 10 ++++++---- 1 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/src/rpc/virnetmessage.c b/src/rpc/virnetmessage.c index a7a25b1..0725491 100644 --- a/src/rpc/virnetmessage.c +++ b/src/rpc/virnetmessage.c @@ -101,8 +101,9 @@ int virNetMessageDecodeLength(virNetMessagePtr msg) msg->bufferOffset = xdr_getpos(&xdr);
if (len < VIR_NET_MESSAGE_LEN_MAX) { - virNetError(VIR_ERR_RPC, "%s", - _("packet received from server too small")); + virNetError(VIR_ERR_RPC, + _("packet %d bytes received from server too small, want %d"), + len, VIR_NET_MESSAGE_LEN_MAX);
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

When a filter steals an RPC message, that message must not be freed, except by the filter code itself * src/rpc/virnetserverclient.c: Don't free stolen RPC messages --- src/rpc/virnetserverclient.c | 6 +++++- 1 files changed, 5 insertions(+), 1 deletions(-) diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c index 327b121..57a3446 100644 --- a/src/rpc/virnetserverclient.c +++ b/src/rpc/virnetserverclient.c @@ -723,13 +723,17 @@ readmore: filter = client->filters; while (filter) { int ret = filter->func(client, msg, filter->opaque); - if (ret < 0 || ret > 0) { + if (ret < 0) { virNetMessageFree(msg); msg = NULL; if (ret < 0) client->wantClose = true; break; } + if (ret > 0) { + msg = NULL; + break; + } filter = filter->next; } -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
When a filter steals an RPC message, that message must not be freed, except by the filter code itself
* src/rpc/virnetserverclient.c: Don't free stolen RPC messages --- src/rpc/virnetserverclient.c | 6 +++++- 1 files changed, 5 insertions(+), 1 deletions(-)
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

* src/rpc/virnetclientstream.c: Avoid referencing NULL --- src/rpc/virnetclientstream.c | 21 ++++++++++++++++----- 1 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 44c9acf..99c7b41 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -217,13 +217,24 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, } else { st->err.code = err.code; } - st->err.message = *err.message; - *err.message = NULL; + if (err.message) { + st->err.message = *err.message; + *err.message = NULL; + } st->err.domain = err.domain; st->err.level = err.level; - st->err.str1 = *err.str1; - st->err.str2 = *err.str2; - st->err.str3 = *err.str3; + if (err.str1) { + st->err.str1 = *err.str1; + *err.str1 = NULL; + } + if (err.str2) { + st->err.str2 = *err.str2; + *err.str2 = NULL; + } + if (err.str3) { + st->err.str3 = *err.str3; + *err.str3 = NULL; + } st->err.int1 = err.int1; st->err.int2 = err.int2; -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
* src/rpc/virnetclientstream.c: Avoid referencing NULL --- src/rpc/virnetclientstream.c | 21 ++++++++++++++++----- 1 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 44c9acf..99c7b41 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -217,13 +217,24 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, } else { st->err.code = err.code; } - st->err.message = *err.message; - *err.message = NULL; + if (err.message) { + st->err.message = *err.message; + *err.message = NULL; + }
ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

The client stream object can be used independantly of the virNetClientPtr object, so must have full locking of its own and not rely on any caller. * src/remote/remote_driver.c: Remove locking around stream callback * src/rpc/virnetclientstream.c: Add locking to all APIs and callbacks --- src/remote/remote_driver.c | 3 - src/rpc/virnetclientstream.c | 112 +++++++++++++++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 26 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 2ac87c8..bb686c8 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -3254,11 +3254,8 @@ static void remoteStreamEventCallback(virNetClientStreamPtr stream ATTRIBUTE_UNU void *opaque) { struct remoteStreamCallbackData *cbdata = opaque; - struct private_data *priv = cbdata->st->conn->privateData; - remoteDriverUnlock(priv); (cbdata->cb)(cbdata->st, events, cbdata->opaque); - remoteDriverLock(priv); } diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 99c7b41..9da5aee 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -28,6 +28,7 @@ #include "virterror_internal.h" #include "logging.h" #include "event.h" +#include "threads.h" #define VIR_FROM_THIS VIR_FROM_RPC #define virNetError(code, ...) \ @@ -35,6 +36,8 @@ __FUNCTION__, __LINE__, __VA_ARGS__) struct _virNetClientStream { + virMutex lock; + virNetClientProgramPtr prog; int proc; unsigned serial; @@ -53,7 +56,6 @@ struct _virNetClientStream { size_t incomingOffset; size_t incomingLength; - virNetClientStreamEventCallback cb; void *cbOpaque; virFreeCallback cbFree; @@ -89,7 +91,8 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virNetClientStreamPtr st = opaque; int events = 0; - /* XXX we need a mutex on 'st' to protect this callback */ + + virMutexLock(&st->lock); if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && @@ -106,12 +109,15 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) virFreeCallback cbFree = st->cbFree; st->cbDispatch = 1; + virMutexUnlock(&st->lock); (cb)(st, events, cbOpaque); + virMutexLock(&st->lock); st->cbDispatch = 0; if (!st->cb && cbFree) (cbFree)(cbOpaque); } + virMutexUnlock(&st->lock); } @@ -134,30 +140,45 @@ virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, return NULL; } - virNetClientProgramRef(prog); - st->refs = 1; st->prog = prog; st->proc = proc; st->serial = serial; + if (virMutexInit(&st->lock) < 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize mutex")); + VIR_FREE(st); + return NULL; + } + + virNetClientProgramRef(prog); + return st; } void virNetClientStreamRef(virNetClientStreamPtr st) { + virMutexLock(&st->lock); st->refs++; + virMutexUnlock(&st->lock); } void virNetClientStreamFree(virNetClientStreamPtr st) { + virMutexLock(&st->lock); st->refs--; - if (st->refs > 0) + if (st->refs > 0) { + virMutexUnlock(&st->lock); return; + } + + virMutexUnlock(&st->lock); virResetError(&st->err); VIR_FREE(st->incoming); + virMutexDestroy(&st->lock); virNetClientProgramFree(st->prog); VIR_FREE(st); } @@ -165,18 +186,24 @@ void virNetClientStreamFree(virNetClientStreamPtr st) bool virNetClientStreamMatches(virNetClientStreamPtr st, virNetMessagePtr msg) { + bool match = false; + virMutexLock(&st->lock); if (virNetClientProgramMatches(st->prog, msg) && st->proc == msg->header.proc && st->serial == msg->header.serial) - return 1; - return 0; + match = true; + virMutexUnlock(&st->lock); + return match; } bool virNetClientStreamRaiseError(virNetClientStreamPtr st) { - if (st->err.code == VIR_ERR_OK) + virMutexLock(&st->lock); + if (st->err.code == VIR_ERR_OK) { + virMutexUnlock(&st->lock); return false; + } virRaiseErrorFull(__FILE__, __FUNCTION__, __LINE__, st->err.domain, @@ -188,7 +215,7 @@ bool virNetClientStreamRaiseError(virNetClientStreamPtr st) st->err.int1, st->err.int2, "%s", st->err.message ? st->err.message : _("Unknown error")); - + virMutexUnlock(&st->lock); return true; } @@ -199,6 +226,8 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessageError err; int ret = -1; + virMutexLock(&st->lock); + if (st->err.code != VIR_ERR_OK) VIR_DEBUG("Overwriting existing stream error %s", NULLSTR(st->err.message)); @@ -242,6 +271,7 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, cleanup: xdr_free((xdrproc_t)xdr_virNetMessageError, (void*)&err); + virMutexUnlock(&st->lock); return ret; } @@ -249,15 +279,18 @@ cleanup: int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - size_t avail = st->incomingLength - st->incomingOffset; - size_t need = msg->bufferLength - msg->bufferOffset; + int ret = -1; + size_t need; + virMutexLock(&st->lock); + need = msg->bufferLength - msg->bufferOffset; + size_t avail = st->incomingLength - st->incomingOffset; if (need > avail) { size_t extra = need - avail; if (VIR_REALLOC_N(st->incoming, st->incomingLength + extra) < 0) { VIR_DEBUG("Out of memory handling stream data"); - return -1; + goto cleanup; } st->incomingLength += extra; } @@ -269,7 +302,12 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, VIR_DEBUG("Stream incoming data offset %zu length %zu", st->incomingOffset, st->incomingLength); - return 0; + + ret = 0; + +cleanup: + virMutexUnlock(&st->lock); + return ret; } @@ -286,6 +324,8 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, if (!(msg = virNetMessageNew())) return -1; + virMutexLock(&st->lock); + msg->header.prog = virNetClientProgramGetProgram(st->prog); msg->header.vers = virNetClientProgramGetVersion(st->prog); msg->header.status = status; @@ -293,6 +333,8 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, msg->header.serial = st->serial; msg->header.proc = st->proc; + virMutexUnlock(&st->lock); + if (virNetMessageEncodeHeader(msg) < 0) goto error; @@ -329,6 +371,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, int rv = -1; VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); + virMutexLock(&st->lock); if (!st->incomingOffset) { virNetMessagePtr msg; int ret; @@ -351,8 +394,9 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, msg->header.proc = st->proc; VIR_DEBUG("Dummy packet to wait for stream data"); + virMutexUnlock(&st->lock); ret = virNetClientSend(client, msg, true); - + virMutexLock(&st->lock); virNetMessageFree(msg); if (ret < 0) @@ -380,6 +424,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virNetClientStreamEventTimerUpdate(st); cleanup: + virMutexUnlock(&st->lock); return rv; } @@ -390,20 +435,23 @@ int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, void *opaque, virFreeCallback ff) { + int ret = -1; + + virMutexLock(&st->lock); if (st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("multiple stream callbacks not supported")); - return 1; + goto cleanup; } - virNetClientStreamRef(st); + st->refs++; if ((st->cbTimer = virEventAddTimeout(-1, virNetClientStreamEventTimer, st, virNetClientStreamEventTimerFree)) < 0) { - virNetClientStreamFree(st); - return -1; + st->refs--; + goto cleanup; } st->cb = cb; @@ -413,31 +461,45 @@ int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, virNetClientStreamEventTimerUpdate(st); - return 0; + ret = 0; + +cleanup: + virMutexUnlock(&st->lock); + return ret; } int virNetClientStreamEventUpdateCallback(virNetClientStreamPtr st, int events) { + int ret = -1; + + virMutexLock(&st->lock); if (!st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("no stream callback registered")); - return -1; + goto cleanup; } st->cbEvents = events; virNetClientStreamEventTimerUpdate(st); - return 0; + ret = 0; + +cleanup: + virMutexUnlock(&st->lock); + return ret; } int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st) { + int ret = -1; + + virMutexUnlock(&st->lock); if (!st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("no stream callback registered")); - return -1; + goto cleanup; } if (!st->cbDispatch && @@ -449,5 +511,9 @@ int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st) st->cbEvents = 0; virEventRemoveTimeout(st->cbTimer); - return 0; + ret = 0; + +cleanup: + virMutexUnlock(&st->lock); + return ret; } -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
The client stream object can be used independantly of the
s/independantly/independently/
virNetClientPtr object, so must have full locking of its own and not rely on any caller.
* src/remote/remote_driver.c: Remove locking around stream callback * src/rpc/virnetclientstream.c: Add locking to all APIs and callbacks --- src/remote/remote_driver.c | 3 - src/rpc/virnetclientstream.c | 112 +++++++++++++++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 26 deletions(-)
@@ -390,20 +435,23 @@ int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, void *opaque, virFreeCallback ff) { + int ret = -1; + + virMutexLock(&st->lock); if (st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("multiple stream callbacks not supported")); - return 1; + goto cleanup; }
- virNetClientStreamRef(st); + st->refs++; if ((st->cbTimer = virEventAddTimeout(-1, virNetClientStreamEventTimer, st, virNetClientStreamEventTimerFree)) < 0) { - virNetClientStreamFree(st); - return -1; + st->refs--; + goto cleanup; }
This increments st->refs on success,
int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st) { + int ret = -1; + + virMutexUnlock(&st->lock); if (!st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("no stream callback registered")); - return -1; + goto cleanup; }
if (!st->cbDispatch &&
but nothing here ever decrements it. That looks like the only flaw; so ACK if you fix things to guarantee no leaked st on a paired callback add/remove. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org

On Tue, Jun 28, 2011 at 12:12:53PM -0600, Eric Blake wrote:
On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
The client stream object can be used independantly of the
s/independantly/independently/
virNetClientPtr object, so must have full locking of its own and not rely on any caller.
* src/remote/remote_driver.c: Remove locking around stream callback * src/rpc/virnetclientstream.c: Add locking to all APIs and callbacks --- src/remote/remote_driver.c | 3 - src/rpc/virnetclientstream.c | 112 +++++++++++++++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 26 deletions(-)
@@ -390,20 +435,23 @@ int virNetClientStreamEventAddCallback(virNetClientStreamPtr st, void *opaque, virFreeCallback ff) { + int ret = -1; + + virMutexLock(&st->lock); if (st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("multiple stream callbacks not supported")); - return 1; + goto cleanup; }
- virNetClientStreamRef(st); + st->refs++; if ((st->cbTimer = virEventAddTimeout(-1, virNetClientStreamEventTimer, st, virNetClientStreamEventTimerFree)) < 0) { - virNetClientStreamFree(st); - return -1; + st->refs--; + goto cleanup; }
This increments st->refs on success,
int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st) { + int ret = -1; + + virMutexUnlock(&st->lock); if (!st->cb) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("no stream callback registered")); - return -1; + goto cleanup; }
if (!st->cbDispatch &&
but nothing here ever decrements it.
The decrement is done by the free callback, virNetClientStreamEventTimerFree asynchronously. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

When the remote client receives end of file on the stream it never invokes the stream callback. Applications relying on async event driven I/O will thus never see the EOF condition on the stream * src/rpc/virnetclient.c, src/rpc/virnetclientstream.c: Ensure EOF is dispatched --- src/rpc/virnetclient.c | 3 -- src/rpc/virnetclientstream.c | 43 ++++++++++++++++++++++++----------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index dc0ce51..39bdf14 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -580,9 +580,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (thecall && thecall->expectReply) { VIR_DEBUG("Got sync data packet completion"); thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } else { - // XXX - //remoteStreamEventTimerUpdate(privst); } return 0; } diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 9da5aee..d5efab1 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -55,6 +55,7 @@ struct _virNetClientStream { char *incoming; size_t incomingOffset; size_t incomingLength; + bool incomingEOF; virNetClientStreamEventCallback cb; void *cbOpaque; @@ -73,7 +74,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); - if ((st->incomingOffset && + if (((st->incomingOffset || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -96,7 +97,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - st->incomingOffset) + (st->incomingOffset || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) @@ -284,24 +285,30 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virMutexLock(&st->lock); need = msg->bufferLength - msg->bufferOffset; - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; + if (need) { + size_t avail = st->incomingLength - st->incomingOffset; + if (need > avail) { + size_t extra = need - avail; + if (VIR_REALLOC_N(st->incoming, + st->incomingLength + extra) < 0) { + VIR_DEBUG("Out of memory handling stream data"); + goto cleanup; + } + st->incomingLength += extra; } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); + memcpy(st->incoming + st->incomingOffset, + msg->buffer + msg->bufferOffset, + msg->bufferLength - msg->bufferOffset); + st->incomingOffset += (msg->bufferLength - msg->bufferOffset); + } else { + st->incomingEOF = true; + } - VIR_DEBUG("Stream incoming data offset %zu length %zu", - st->incomingOffset, st->incomingLength); + VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", + st->incomingOffset, st->incomingLength, + st->incomingEOF); + virNetClientStreamEventTimerUpdate(st); ret = 0; @@ -372,7 +379,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virMutexLock(&st->lock); - if (!st->incomingOffset) { + if (!st->incomingOffset && !st->incomingEOF) { virNetMessagePtr msg; int ret; -- 1.7.4.4

On 06/28/2011 11:01 AM, Daniel P. Berrange wrote:
When the remote client receives end of file on the stream it never invokes the stream callback. Applications relying on async event driven I/O will thus never see the EOF condition on the stream
* src/rpc/virnetclient.c, src/rpc/virnetclientstream.c: Ensure EOF is dispatched --- src/rpc/virnetclient.c | 3 -- src/rpc/virnetclientstream.c | 43 ++++++++++++++++++++++++----------------- 2 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index dc0ce51..39bdf14 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -580,9 +580,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (thecall && thecall->expectReply) { VIR_DEBUG("Got sync data packet completion"); thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } else { - // XXX - //remoteStreamEventTimerUpdate(privst);
Not strictly related, but doesn't hurt. ACK. -- Eric Blake eblake@redhat.com +1-801-349-2682 Libvirt virtualization library http://libvirt.org
participants (2)
-
Daniel P. Berrange
-
Eric Blake