[libvirt] [PATCH 0/9] rpc: make client streams multi-thread safer

Nikolay Shirokovskiy (9): rpc: fix race on stream abort/finish and server side abort rpc: use single function to send stream messages rpc: remove unused virNetClientSendNoReply rpc: fix propagation of errors from server rpc: add mising locking in virNetClientStreamRecvHole rpc: client: incapsulate error checks rpc: client: don't set incomingEOF on errors rpc: client stream: dispose private data on stream dispose rpc: client: stream: fix multi thread abort/finish src/datatypes.c | 2 + src/datatypes.h | 1 + src/libvirt_remote.syms | 6 +- src/remote/remote_driver.c | 27 ++------ src/rpc/gendispatch.pl | 3 +- src/rpc/virnetclient.c | 146 ++++++++++++++++++++++--------------------- src/rpc/virnetclient.h | 6 +- src/rpc/virnetclientstream.c | 110 ++++++++++++++++++++++++-------- src/rpc/virnetclientstream.h | 17 ++++- 9 files changed, 188 insertions(+), 130 deletions(-) -- 1.8.3.1

Stream abort/finish can hang because we can receive abort message from server and yet sent abort/finish message to server. The latter will not be answered ever because after server sends abort message it forgets the stream and messages for unknown stream are simply ignored. We check for stream error at the very beginning of remoteStreamFinish/remoteStreamAbort but stream error can be set after the check in another thread operating on stream. Let's check for stream error under client lock similar to what's done in [1]. [1] 833b901cb: stream: Check for stream EOF Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 30 ++++++++++++++++++------------ src/rpc/virnetclientstream.c | 2 +- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 40d45b9..7aa5223 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -2216,20 +2216,26 @@ int virNetClientSendWithReplyStream(virNetClientPtr client, virNetMessagePtr msg, virNetClientStreamPtr st) { - int ret; + int ret = -1; + virObjectLock(client); - /* Other thread might have already received - * stream EOF so we don't want sent anything. - * Server won't respond anyway. - */ - if (virNetClientStreamEOF(st)) { - virObjectUnlock(client); - return 0; + + if (virNetClientStreamRaiseError(st)) + goto cleanup; + + /* Check for EOF only if we are going to wait for incoming data */ + if (!msg->bufferLength && virNetClientStreamEOF(st)) { + ret = 0; + goto cleanup; } - ret = virNetClientSendInternal(client, msg, true, false); + if (virNetClientSendInternal(client, msg, true, false) < 0) + goto cleanup; + + ret = 0; + + cleanup: virObjectUnlock(client); - if (ret < 0) - return -1; - return 0; + + return ret; } diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index a17da31..3b0db52 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -350,7 +350,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, if (virNetMessageEncodePayloadRaw(msg, NULL, 0) < 0) goto error; - if (virNetClientSendWithReply(client, msg) < 0) + if (virNetClientSendWithReplyStream(client, msg, st) < 0) goto error; } -- 1.8.3.1

In next patches we'll add stream state checks to this function that applicable to all call paths. This is handy place because we hold client lock here. Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/libvirt_remote.syms | 2 +- src/rpc/virnetclient.c | 13 ++++++++----- src/rpc/virnetclient.h | 6 +++--- src/rpc/virnetclientstream.c | 12 ++++-------- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 9a33626..704f7ea 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -39,8 +39,8 @@ virNetClientRemoteAddrStringSASL; virNetClientRemoveStream; virNetClientSendNonBlock; virNetClientSendNoReply; +virNetClientSendStream; virNetClientSendWithReply; -virNetClientSendWithReplyStream; virNetClientSetCloseCallback; virNetClientSetTLSSession; diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 7aa5223..29c4dc5 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -2205,18 +2205,21 @@ int virNetClientSendNonBlock(virNetClientPtr client, /* * @msg: a message allocated on heap or stack * - * Send a message synchronously, and wait for the reply synchronously + * Send a message synchronously, and wait for the reply synchronously if + * message is dummy (just to wait for incoming data) or abort/finish message. * * The caller is responsible for free'ing @msg if it was allocated * on the heap * * Returns 0 on success, -1 on failure */ -int virNetClientSendWithReplyStream(virNetClientPtr client, - virNetMessagePtr msg, - virNetClientStreamPtr st) +int virNetClientSendStream(virNetClientPtr client, + virNetMessagePtr msg, + virNetClientStreamPtr st) { int ret = -1; + bool expectReply = !msg->bufferLength || + msg->header.status != VIR_NET_CONTINUE; virObjectLock(client); @@ -2229,7 +2232,7 @@ int virNetClientSendWithReplyStream(virNetClientPtr client, goto cleanup; } - if (virNetClientSendInternal(client, msg, true, false) < 0) + if (virNetClientSendInternal(client, msg, expectReply, false) < 0) goto cleanup; ret = 0; diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index 39a6176..12ac2b5 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -115,9 +115,9 @@ int virNetClientSendNoReply(virNetClientPtr client, int virNetClientSendNonBlock(virNetClientPtr client, virNetMessagePtr msg); -int virNetClientSendWithReplyStream(virNetClientPtr client, - virNetMessagePtr msg, - virNetClientStreamPtr st); +int virNetClientSendStream(virNetClientPtr client, + virNetMessagePtr msg, + virNetClientStreamPtr st); # ifdef WITH_SASL void virNetClientSetSASLSession(virNetClientPtr client, diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 3b0db52..65aa583 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -343,17 +343,13 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, if (status == VIR_NET_CONTINUE) { if (virNetMessageEncodePayloadRaw(msg, data, nbytes) < 0) goto error; - - if (virNetClientSendNoReply(client, msg) < 0) - goto error; } else { if (virNetMessageEncodePayloadRaw(msg, NULL, 0) < 0) goto error; - - if (virNetClientSendWithReplyStream(client, msg, st) < 0) - goto error; } + if (virNetClientSendStream(client, msg, st) < 0) + goto error; virNetMessageFree(msg); @@ -500,7 +496,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + ret = virNetClientSendStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); @@ -627,7 +623,7 @@ virNetClientStreamSendHole(virNetClientStreamPtr st, &data) < 0) goto cleanup; - if (virNetClientSendNoReply(client, msg) < 0) + if (virNetClientSendStream(client, msg, st) < 0) goto cleanup; ret = 0; -- 1.8.3.1

Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/libvirt_remote.syms | 1 - src/rpc/virnetclient.c | 22 ---------------------- 2 files changed, 23 deletions(-) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 704f7ea..88745f2 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -38,7 +38,6 @@ virNetClientRegisterKeepAlive; virNetClientRemoteAddrStringSASL; virNetClientRemoveStream; virNetClientSendNonBlock; -virNetClientSendNoReply; virNetClientSendStream; virNetClientSendWithReply; virNetClientSetCloseCallback; diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 29c4dc5..c102cdc 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -2160,28 +2160,6 @@ int virNetClientSendWithReply(virNetClientPtr client, /* - * @msg: a message allocated on heap or stack - * - * Send a message synchronously, without any reply - * - * The caller is responsible for free'ing @msg if it was allocated - * on the heap - * - * Returns 0 on success, -1 on failure - */ -int virNetClientSendNoReply(virNetClientPtr client, - virNetMessagePtr msg) -{ - int ret; - virObjectLock(client); - ret = virNetClientSendInternal(client, msg, false, false); - virObjectUnlock(client); - if (ret < 0) - return -1; - return 0; -} - -/* * @msg: a message allocated on the heap. * * Send a message asynchronously, without any reply -- 1.8.3.1

On 2/7/19 1:58 PM, Nikolay Shirokovskiy wrote:
Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/libvirt_remote.syms | 1 - src/rpc/virnetclient.c | 22 ---------------------- 2 files changed, 23 deletions(-)
Forgot to remove it from the header file. Michal

Stream server error is not propagated if thread does not have the buck. In case we have the buck we are ok due to the code added in [1]. Let's check for stream error on all paths. Now we don't need to raise error in virNetClientCallDispatchStream. Old code reported error only if the first message in wait queue awaits reply. It is odd as depends on wait queue situation. For example if we have only TX message in queue and in one iteration loop both send the message and receive error then thread sending TX message did not receive the error. Next if we have RX message (first) and TX message (second) in queue and in one iteration loop both send the TX message and receive error then thread sending TX message received error. In short it was inconsistent. Let's report error whenever we received it and for every type of message as it makes sense to report errors as early as possible. [1] 16c6e2b41: Fix propagation of RPC errors from streams Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index c102cdc..fcc2e80 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1230,9 +1230,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (thecall && thecall->expectReply) { VIR_DEBUG("Got a synchronous error"); - /* Raise error now, so that this call will see it immediately */ - if (!virNetClientStreamRaiseError(st)) - VIR_DEBUG("unable to raise synchronous error"); thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; } return 0; @@ -1947,16 +1944,11 @@ static int virNetClientIO(virNetClientPtr client, */ virNetClientIOUpdateCallback(client, false); - virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); if (client->sock) virNetClientIOUpdateCallback(client, true); - if (rv == 0 && - virGetLastErrorCode()) - rv = -1; - cleanup: VIR_DEBUG("All done with our call head=%p call=%p rv=%d", client->waitDispatch, thiscall, rv); @@ -2213,6 +2205,9 @@ int virNetClientSendStream(virNetClientPtr client, if (virNetClientSendInternal(client, msg, expectReply, false) < 0) goto cleanup; + if (virNetClientStreamRaiseError(st)) + goto cleanup; + ret = 0; cleanup: -- 1.8.3.1

Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclientstream.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 65aa583..136ed16 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -644,8 +644,12 @@ virNetClientStreamRecvHole(virNetClientPtr client ATTRIBUTE_UNUSED, return -1; } + virObjectLock(st); + *length = st->holeLength; st->holeLength = 0; + + virObjectUnlock(st); return 0; } -- 1.8.3.1

Checking virNetClientStreamRaiseError without client lock is racy which is fixed in [1] for example. Thus let's remove such checks when we are sending message to server. And in other cases (like virNetClientStreamRecvHole for example) let's move the check into client stream code. virNetClientStreamRecvPacket already have stream lock so we could introduce another error checking function like virNetClientStreamRaiseErrorLocked but as error is set when both client and stream lock are hold we can remove locking from virNetClientStreamRaiseError because all callers hold either client or stream lock. Also let's split virNetClientStreamRaiseErrorLocked into checking state function and checking message send status function. They are same yet. [1] 1b6a29c21: rpc: fix race on stream abort/finish and server side abort Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/libvirt_remote.syms | 3 ++- src/remote/remote_driver.c | 16 ---------------- src/rpc/virnetclient.c | 4 ++-- src/rpc/virnetclientstream.c | 45 ++++++++++++++++++++++++++++++++++---------- src/rpc/virnetclientstream.h | 5 ++++- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 88745f2..98586d1 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -54,6 +54,8 @@ virNetClientProgramNew; # rpc/virnetclientstream.h +virNetClientStreamCheckSendStatus; +virNetClientStreamCheckState; virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; @@ -61,7 +63,6 @@ virNetClientStreamEventUpdateCallback; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; -virNetClientStreamRaiseError; virNetClientStreamRecvHole; virNetClientStreamRecvPacket; virNetClientStreamSendHole; diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 058e4c9..1ff55e2 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5600,9 +5600,6 @@ remoteStreamSend(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv; - if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5634,9 +5631,6 @@ remoteStreamRecvFlags(virStreamPtr st, virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1); - if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5676,9 +5670,6 @@ remoteStreamSendHole(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv; - if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5709,9 +5700,6 @@ remoteStreamRecvHole(virStreamPtr st, virCheckFlags(0, -1); - if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5834,9 +5822,6 @@ remoteStreamCloseInt(virStreamPtr st, bool streamAbort) remoteDriverLock(priv); - if (virNetClientStreamRaiseError(privst)) - goto cleanup; - priv->localUses++; remoteDriverUnlock(priv); @@ -5849,7 +5834,6 @@ remoteStreamCloseInt(virStreamPtr st, bool streamAbort) remoteDriverLock(priv); priv->localUses--; - cleanup: virNetClientRemoveStream(priv->client, privst); virObjectUnref(privst); st->privateData = NULL; diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index fcc2e80..70192a9 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -2193,7 +2193,7 @@ int virNetClientSendStream(virNetClientPtr client, virObjectLock(client); - if (virNetClientStreamRaiseError(st)) + if (virNetClientStreamCheckState(st) < 0) goto cleanup; /* Check for EOF only if we are going to wait for incoming data */ @@ -2205,7 +2205,7 @@ int virNetClientSendStream(virNetClientPtr client, if (virNetClientSendInternal(client, msg, expectReply, false) < 0) goto cleanup; - if (virNetClientStreamRaiseError(st)) + if (virNetClientStreamCheckSendStatus(st, msg) < 0) goto cleanup; ret = 0; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 136ed16..a7a7824 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -184,14 +184,9 @@ bool virNetClientStreamMatches(virNetClientStreamPtr st, } -bool virNetClientStreamRaiseError(virNetClientStreamPtr st) +static +void virNetClientStreamRaiseError(virNetClientStreamPtr st) { - virObjectLock(st); - if (st->err.code == VIR_ERR_OK) { - virObjectUnlock(st); - return false; - } - virRaiseErrorFull(__FILE__, __FUNCTION__, __LINE__, st->err.domain, st->err.code, @@ -202,8 +197,31 @@ bool virNetClientStreamRaiseError(virNetClientStreamPtr st) st->err.int1, st->err.int2, "%s", st->err.message ? st->err.message : _("Unknown error")); - virObjectUnlock(st); - return true; +} + + +/* MUST be called under stream or client lock */ +int virNetClientStreamCheckState(virNetClientStreamPtr st) +{ + if (st->err.code != VIR_ERR_OK) { + virNetClientStreamRaiseError(st); + return -1; + } + + return 0; +} + + +/* MUST be called under stream or client lock */ +int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, + virNetMessagePtr msg ATTRIBUTE_UNUSED) +{ + if (st->err.code != VIR_ERR_OK) { + virNetClientStreamRaiseError(st); + return -1; + } + + return 0; } @@ -474,7 +492,9 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virObjectLock(st); reread: - if (!st->rx && !st->incomingEOF) { + if (virNetClientStreamCheckState(st) < 0) { + goto cleanup; + } else if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -646,6 +666,11 @@ virNetClientStreamRecvHole(virNetClientPtr client ATTRIBUTE_UNUSED, virObjectLock(st); + if (virNetClientStreamCheckState(st) < 0) { + virObjectUnlock(st); + return -1; + } + *length = st->holeLength; st->holeLength = 0; diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index d137932..d81ec60 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -36,7 +36,10 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, unsigned serial, bool allowSkip); -bool virNetClientStreamRaiseError(virNetClientStreamPtr st); +int virNetClientStreamCheckState(virNetClientStreamPtr st); + +int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, + virNetMessagePtr msg); int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessagePtr msg); -- 1.8.3.1

On 2/7/19 1:58 PM, Nikolay Shirokovskiy wrote:
Checking virNetClientStreamRaiseError without client lock is racy which is fixed in [1] for example. Thus let's remove such checks when we are sending message to server. And in other cases (like virNetClientStreamRecvHole for example) let's move the check into client stream code.
virNetClientStreamRecvPacket already have stream lock so we could introduce another error checking function like virNetClientStreamRaiseErrorLocked but as error is set when both client and stream lock are hold we can remove locking from virNetClientStreamRaiseError because all callers hold either client or stream lock.
Also let's split virNetClientStreamRaiseErrorLocked into checking state function and checking message send status function. They are same yet.
[1] 1b6a29c21: rpc: fix race on stream abort/finish and server side abort
Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/libvirt_remote.syms | 3 ++- src/remote/remote_driver.c | 16 ---------------- src/rpc/virnetclient.c | 4 ++-- src/rpc/virnetclientstream.c | 45 ++++++++++++++++++++++++++++++++++---------- src/rpc/virnetclientstream.h | 5 ++++- 5 files changed, 43 insertions(+), 30 deletions(-)
diff --git a/src/libvirt_remote.syms b/src/libvirt_remote.syms index 88745f2..98586d1 100644 --- a/src/libvirt_remote.syms +++ b/src/libvirt_remote.syms @@ -54,6 +54,8 @@ virNetClientProgramNew;
# rpc/virnetclientstream.h +virNetClientStreamCheckSendStatus; +virNetClientStreamCheckState; virNetClientStreamEOF; virNetClientStreamEventAddCallback; virNetClientStreamEventRemoveCallback; @@ -61,7 +63,6 @@ virNetClientStreamEventUpdateCallback; virNetClientStreamMatches; virNetClientStreamNew; virNetClientStreamQueuePacket; -virNetClientStreamRaiseError; virNetClientStreamRecvHole; virNetClientStreamRecvPacket; virNetClientStreamSendHole; diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 058e4c9..1ff55e2 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5600,9 +5600,6 @@ remoteStreamSend(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv;
- if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5634,9 +5631,6 @@ remoteStreamRecvFlags(virStreamPtr st,
virCheckFlags(VIR_STREAM_RECV_STOP_AT_HOLE, -1);
- if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5676,9 +5670,6 @@ remoteStreamSendHole(virStreamPtr st, virNetClientStreamPtr privst = st->privateData; int rv;
- if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5709,9 +5700,6 @@ remoteStreamRecvHole(virStreamPtr st,
virCheckFlags(0, -1);
- if (virNetClientStreamRaiseError(privst)) - return -1; - remoteDriverLock(priv); priv->localUses++; remoteDriverUnlock(priv); @@ -5834,9 +5822,6 @@ remoteStreamCloseInt(virStreamPtr st, bool streamAbort)
remoteDriverLock(priv);
- if (virNetClientStreamRaiseError(privst)) - goto cleanup; - priv->localUses++; remoteDriverUnlock(priv);
@@ -5849,7 +5834,6 @@ remoteStreamCloseInt(virStreamPtr st, bool streamAbort) remoteDriverLock(priv); priv->localUses--;
- cleanup: virNetClientRemoveStream(priv->client, privst); virObjectUnref(privst); st->privateData = NULL; diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index fcc2e80..70192a9 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -2193,7 +2193,7 @@ int virNetClientSendStream(virNetClientPtr client,
virObjectLock(client);
- if (virNetClientStreamRaiseError(st)) + if (virNetClientStreamCheckState(st) < 0) goto cleanup;
/* Check for EOF only if we are going to wait for incoming data */ @@ -2205,7 +2205,7 @@ int virNetClientSendStream(virNetClientPtr client, if (virNetClientSendInternal(client, msg, expectReply, false) < 0) goto cleanup;
- if (virNetClientStreamRaiseError(st)) + if (virNetClientStreamCheckSendStatus(st, msg) < 0) goto cleanup;
ret = 0; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 136ed16..a7a7824 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -184,14 +184,9 @@ bool virNetClientStreamMatches(virNetClientStreamPtr st, }
-bool virNetClientStreamRaiseError(virNetClientStreamPtr st) +static +void virNetClientStreamRaiseError(virNetClientStreamPtr st) { - virObjectLock(st); - if (st->err.code == VIR_ERR_OK) { - virObjectUnlock(st); - return false; - } - virRaiseErrorFull(__FILE__, __FUNCTION__, __LINE__, st->err.domain, st->err.code, @@ -202,8 +197,31 @@ bool virNetClientStreamRaiseError(virNetClientStreamPtr st) st->err.int1, st->err.int2, "%s", st->err.message ? st->err.message : _("Unknown error")); - virObjectUnlock(st); - return true; +} + + +/* MUST be called under stream or client lock */
This sounds very fishy. There should be one exact lock that needs to be locked. I understand why you've written it this way though. It's because virNetClientStreamRecvPacket() unlocks the stream just before calling virNetClientSendStream() which then locks the client. I'll post a separate patch for that.
+int virNetClientStreamCheckState(virNetClientStreamPtr st) +{ + if (st->err.code != VIR_ERR_OK) { + virNetClientStreamRaiseError(st); + return -1; + } + + return 0; +} + + +/* MUST be called under stream or client lock */ +int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, + virNetMessagePtr msg ATTRIBUTE_UNUSED) +{ + if (st->err.code != VIR_ERR_OK) { + virNetClientStreamRaiseError(st); + return -1; + } + + return 0; }
@@ -474,7 +492,9 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, virObjectLock(st);
reread: - if (!st->rx && !st->incomingEOF) { + if (virNetClientStreamCheckState(st) < 0) { + goto cleanup; + } else if (!st->rx && !st->incomingEOF) {
I'd prefer if these were two separate if-s. Also, ultimately this would be a while() or do-while() loop.
virNetMessagePtr msg; int ret;
Michal

This mixing errors and EOF condition in one flag is odd. Instead let's check st->err.code where appropriate. Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclientstream.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index a7a7824..713307c 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -86,7 +86,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); - if (((st->rx || st->incomingEOF) && + if (((st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -108,7 +108,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->rx || st->incomingEOF)) + (st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) @@ -272,7 +272,6 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, st->err.int1 = err.int1; st->err.int2 = err.int2; - st->incomingEOF = true; virNetClientStreamEventTimerUpdate(st); ret = 0; -- 1.8.3.1

If we call virStreamFinish and virStreamAbort from 2 distinct threads for example we can have access to freed memory. Because when virStreamFinish finishes for example virStreamAbort yet to be finished and it access virNetClientStreamPtr object in stream->privateData. Also it does not make sense to clear @driver field. After stream is finished/aborted it is better to have appropriate error message instead of "unsupported error". This commit reverts [1] or virNetClientStreamPtr and virStreamPtr will never be unrefed due to cyclic dependency. Before this patch we don't have leaks because all execution paths we call virStreamFinish or virStreamAbort. [1] 8b6ffe40 : virNetClientStreamNew: Track origin stream Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/datatypes.c | 2 ++ src/datatypes.h | 1 + src/remote/remote_driver.c | 11 ++++------- src/rpc/gendispatch.pl | 3 ++- src/rpc/virnetclientstream.c | 7 +------ src/rpc/virnetclientstream.h | 3 +-- 6 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/datatypes.c b/src/datatypes.c index 09f63d9..be9b528 100644 --- a/src/datatypes.c +++ b/src/datatypes.c @@ -763,6 +763,8 @@ virStreamDispose(void *obj) virStreamPtr st = obj; VIR_DEBUG("release dev %p", st); + if (st->ff) + st->ff(st->privateData); virObjectUnref(st->conn); } diff --git a/src/datatypes.h b/src/datatypes.h index 529b340..1201567 100644 --- a/src/datatypes.h +++ b/src/datatypes.h @@ -665,6 +665,7 @@ struct _virStream { virStreamDriverPtr driver; void *privateData; + virFreeCallback ff; }; /** diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 1ff55e2..2861ee6 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5835,9 +5835,6 @@ remoteStreamCloseInt(virStreamPtr st, bool streamAbort) priv->localUses--; virNetClientRemoveStream(priv->client, privst); - virObjectUnref(privst); - st->privateData = NULL; - st->driver = NULL; remoteDriverUnlock(priv); return ret; @@ -6177,8 +6174,7 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, memset(&args, 0, sizeof(args)); memset(&ret, 0, sizeof(ret)); - if (!(netst = virNetClientStreamNew(st, - priv->remoteProgram, + if (!(netst = virNetClientStreamNew(priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3, priv->counter, false))) @@ -6191,6 +6187,7 @@ remoteDomainMigratePrepareTunnel3(virConnectPtr dconn, st->driver = &remoteStreamDrv; st->privateData = netst; + st->ff = virObjectFreeCallback; args.cookie_in.cookie_in_val = (char *)cookiein; args.cookie_in.cookie_in_len = cookieinlen; @@ -7142,8 +7139,7 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, goto cleanup; } - if (!(netst = virNetClientStreamNew(st, - priv->remoteProgram, + if (!(netst = virNetClientStreamNew(priv->remoteProgram, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, priv->counter, false))) @@ -7156,6 +7152,7 @@ remoteDomainMigratePrepareTunnel3Params(virConnectPtr dconn, st->driver = &remoteStreamDrv; st->privateData = netst; + st->ff = virObjectFreeCallback; if (call(dconn, priv, 0, REMOTE_PROC_DOMAIN_MIGRATE_PREPARE_TUNNEL3_PARAMS, (xdrproc_t) xdr_remote_domain_migrate_prepare_tunnel3_params_args, diff --git a/src/rpc/gendispatch.pl b/src/rpc/gendispatch.pl index ce4db5d..ae3a42c 100755 --- a/src/rpc/gendispatch.pl +++ b/src/rpc/gendispatch.pl @@ -1804,7 +1804,7 @@ elsif ($mode eq "client") { if ($call->{streamflag} ne "none") { print "\n"; - print " if (!(netst = virNetClientStreamNew(st, priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; + print " if (!(netst = virNetClientStreamNew(priv->remoteProgram, $call->{constname}, priv->counter, sparse)))\n"; print " goto done;\n"; print "\n"; print " if (virNetClientAddStream(priv->client, netst) < 0) {\n"; @@ -1814,6 +1814,7 @@ elsif ($mode eq "client") { print "\n"; print " st->driver = &remoteStreamDrv;\n"; print " st->privateData = netst;\n"; + print " st->ff = virObjectFreeCallback;\n"; } if ($call->{ProcName} eq "SupportsFeature") { diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 713307c..cfdaa74 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -34,8 +34,6 @@ VIR_LOG_INIT("rpc.netclientstream"); struct _virNetClientStream { virObjectLockable parent; - virStreamPtr stream; /* Reverse pointer to parent stream */ - virNetClientProgramPtr prog; int proc; unsigned serial; @@ -133,8 +131,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) } -virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, - virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, int proc, unsigned serial, bool allowSkip) @@ -147,7 +144,6 @@ virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, if (!(st = virObjectLockableNew(virNetClientStreamClass))) return NULL; - st->stream = virObjectRef(stream); st->prog = virObjectRef(prog); st->proc = proc; st->serial = serial; @@ -167,7 +163,6 @@ void virNetClientStreamDispose(void *obj) virNetMessageFree(msg); } virObjectUnref(st->prog); - virObjectUnref(st->stream); } bool virNetClientStreamMatches(virNetClientStreamPtr st, diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index d81ec60..49b74bc 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -30,8 +30,7 @@ typedef virNetClientStream *virNetClientStreamPtr; typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); -virNetClientStreamPtr virNetClientStreamNew(virStreamPtr stream, - virNetClientProgramPtr prog, +virNetClientStreamPtr virNetClientStreamNew(virNetClientProgramPtr prog, int proc, unsigned serial, bool allowSkip); -- 1.8.3.1

If 2 threads call abort for example then one of them will hang because client will send 2 abort messages and server will reply only on first of them, the second will be ignored. And on server reply client changes the state only one of abort message to complete, the second will hang forever. There are other similar issues. We should complete all messages waiting reply if we got error or expected abort/finish reply from server. Also if one thread send finish and another abort one of them will win the race and server will either abort or finish stream. If stream is aborted then thread requested finishing should report error. In order to archive this let's keep stream closing reason in @closed field. If we receive VIR_NET_OK message for stream then stream is finished if oldest (closest to queue end) message in stream queue is finish message and stream is aborted if oldest message is abort message. Otherwise it is protocol error. By the way we need to fix case of receiving VIR_NET_CONTINUE message. Now we take oldest message in queue and check if this is dummy message. If one thread first sends abort and second thread then receives data then oldest message is abort message and second thread won't be notified when data arrives. Let's find oldest dummy message instead. Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 74 ++++++++++++++++++++++++++++---------------- src/rpc/virnetclientstream.c | 47 +++++++++++++++++++++++++--- src/rpc/virnetclientstream.h | 9 ++++++ 3 files changed, 100 insertions(+), 30 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 70192a9..64855fb 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1158,6 +1158,19 @@ static int virNetClientCallDispatchMessage(virNetClientPtr client) return 0; } +static void virNetClientCallCompleteAllWaitingReply(virNetClientPtr client) +{ + virNetClientCallPtr call; + + for (call = client->waitDispatch; call; call = call->next) { + if (call->msg->header.prog == client->msg.header.prog && + call->msg->header.vers == client->msg.header.vers && + call->msg->header.serial == client->msg.header.serial && + call->expectReply) + call->mode = VIR_NET_CLIENT_MODE_COMPLETE; + } +} + static int virNetClientCallDispatchStream(virNetClientPtr client) { size_t i; @@ -1181,16 +1194,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) return 0; } - /* Finish/Abort are synchronous, so also see if there's an - * (optional) call waiting for this stream packet */ - thecall = client->waitDispatch; - while (thecall && - !(thecall->msg->header.prog == client->msg.header.prog && - thecall->msg->header.vers == client->msg.header.vers && - thecall->msg->header.serial == client->msg.header.serial)) - thecall = thecall->next; - - VIR_DEBUG("Found call %p", thecall); /* Status is either * - VIR_NET_OK - no payload for streams @@ -1202,25 +1205,47 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (virNetClientStreamQueuePacket(st, &client->msg) < 0) return -1; - if (thecall && thecall->expectReply) { - if (thecall->msg->header.status == VIR_NET_CONTINUE) { - VIR_DEBUG("Got a synchronous confirm"); - thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } else { - VIR_DEBUG("Not completing call with status %d", thecall->msg->header.status); - } + /* Find oldest dummy message waiting for incoming data. */ + for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { + if (thecall->msg->header.prog == client->msg.header.prog && + thecall->msg->header.vers == client->msg.header.vers && + thecall->msg->header.serial == client->msg.header.serial && + thecall->expectReply && + thecall->msg->header.status == VIR_NET_CONTINUE) + break; + } + + if (thecall) { + VIR_DEBUG("Got a new incoming stream data"); + thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; } return 0; } case VIR_NET_OK: - if (thecall && thecall->expectReply) { - VIR_DEBUG("Got a synchronous confirm"); - thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } else { + /* Find oldest abort/finish message. */ + for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { + if (thecall->msg->header.prog == client->msg.header.prog && + thecall->msg->header.vers == client->msg.header.vers && + thecall->msg->header.serial == client->msg.header.serial && + thecall->expectReply && + thecall->msg->header.status != VIR_NET_CONTINUE) + break; + } + + if (!thecall) { VIR_DEBUG("Got unexpected async stream finish confirmation"); return -1; } + + VIR_DEBUG("Got a synchronous abort/finish confirm"); + + virNetClientStreamSetClosed(st, + thecall->msg->header.status == VIR_NET_OK ? + VIR_NET_CLIENT_STREAM_CLOSED_FINISHED : + VIR_NET_CLIENT_STREAM_CLOSED_ABORTED); + + virNetClientCallCompleteAllWaitingReply(client); return 0; case VIR_NET_ERROR: @@ -1228,10 +1253,7 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (virNetClientStreamSetError(st, &client->msg) < 0) return -1; - if (thecall && thecall->expectReply) { - VIR_DEBUG("Got a synchronous error"); - thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } + virNetClientCallCompleteAllWaitingReply(client); return 0; default: @@ -2205,7 +2227,7 @@ int virNetClientSendStream(virNetClientPtr client, if (virNetClientSendInternal(client, msg, expectReply, false) < 0) goto cleanup; - if (virNetClientStreamCheckSendStatus(st, msg) < 0) + if (expectReply && virNetClientStreamCheckSendStatus(st, msg) < 0) goto cleanup; ret = 0; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index cfdaa74..583cd369 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,6 +49,7 @@ struct _virNetClientStream { */ virNetMessagePtr rx; bool incomingEOF; + int closed; /* enum virNetClientStreamClosed */ bool allowSkip; long long holeLength; /* Size of incoming hole in stream. */ @@ -84,7 +85,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); - if (((st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK) && + if (((st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK || st->closed) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -106,7 +107,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK)) + (st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK || st->closed)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) @@ -203,23 +204,61 @@ int virNetClientStreamCheckState(virNetClientStreamPtr st) return -1; } + if (st->closed) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("stream is closed")); + return -1; + } + return 0; } -/* MUST be called under stream or client lock */ +/* MUST be called under stream or client lock. This should + * be called only for message that expect reply. */ int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, - virNetMessagePtr msg ATTRIBUTE_UNUSED) + virNetMessagePtr msg) { if (st->err.code != VIR_ERR_OK) { virNetClientStreamRaiseError(st); return -1; } + /* We can not check if the message is dummy in a usual way + * by checking msg->bufferLength because at this point message payload + * is cleared. As caller must not call this function for messages + * not expecting reply we can check for dummy messages just by status. + */ + if (msg->header.status == VIR_NET_CONTINUE) { + if (st->closed) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("stream is closed")); + return -1; + } + return 0; + } else if (msg->header.status == VIR_NET_OK && + st->closed != VIR_NET_CLIENT_STREAM_CLOSED_FINISHED) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("stream aborted by another thread")); + return -1; + } + return 0; } +void virNetClientStreamSetClosed(virNetClientStreamPtr st, + int closed) +{ + virObjectLock(st); + + st->closed = closed; + virNetClientStreamEventTimerUpdate(st); + + virObjectUnlock(st); +} + + int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessagePtr msg) { diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 49b74bc..cb28428 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -27,6 +27,12 @@ typedef struct _virNetClientStream virNetClientStream; typedef virNetClientStream *virNetClientStreamPtr; +typedef enum { + VIR_NET_CLIENT_STREAM_CLOSED_NOT = 0, + VIR_NET_CLIENT_STREAM_CLOSED_FINISHED, + VIR_NET_CLIENT_STREAM_CLOSED_ABORTED, +} virNetClientStreamClosed; + typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque); @@ -43,6 +49,9 @@ int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessagePtr msg); +void virNetClientStreamSetClosed(virNetClientStreamPtr st, + int closed); + bool virNetClientStreamMatches(virNetClientStreamPtr st, virNetMessagePtr msg); -- 1.8.3.1

On 2/7/19 1:58 PM, Nikolay Shirokovskiy wrote:
If 2 threads call abort for example then one of them will hang because client will send 2 abort messages and server will reply only on first of them, the second will be ignored. And on server reply client changes the state only one of abort message to complete, the second will hang forever. There are other similar issues.
We should complete all messages waiting reply if we got error or expected abort/finish reply from server. Also if one thread send finish and another abort one of them will win the race and server will either abort or finish stream. If stream is aborted then thread requested finishing should report error. In order to archive this let's keep stream closing reason in @closed field. If we receive VIR_NET_OK message for stream then stream is finished if oldest (closest to queue end) message in stream queue is finish message and stream is aborted if oldest message is abort message. Otherwise it is protocol error.
By the way we need to fix case of receiving VIR_NET_CONTINUE message. Now we take oldest message in queue and check if this is dummy message. If one thread first sends abort and second thread then receives data then oldest message is abort message and second thread won't be notified when data arrives. Let's find oldest dummy message instead.
Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 74 ++++++++++++++++++++++++++++---------------- src/rpc/virnetclientstream.c | 47 +++++++++++++++++++++++++--- src/rpc/virnetclientstream.h | 9 ++++++ 3 files changed, 100 insertions(+), 30 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 70192a9..64855fb 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1158,6 +1158,19 @@ static int virNetClientCallDispatchMessage(virNetClientPtr client) return 0; }
+static void virNetClientCallCompleteAllWaitingReply(virNetClientPtr client) +{ + virNetClientCallPtr call; + + for (call = client->waitDispatch; call; call = call->next) { + if (call->msg->header.prog == client->msg.header.prog && + call->msg->header.vers == client->msg.header.vers && + call->msg->header.serial == client->msg.header.serial && + call->expectReply) + call->mode = VIR_NET_CLIENT_MODE_COMPLETE; + } +} + static int virNetClientCallDispatchStream(virNetClientPtr client) { size_t i; @@ -1181,16 +1194,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) return 0; }
- /* Finish/Abort are synchronous, so also see if there's an - * (optional) call waiting for this stream packet */ - thecall = client->waitDispatch; - while (thecall && - !(thecall->msg->header.prog == client->msg.header.prog && - thecall->msg->header.vers == client->msg.header.vers && - thecall->msg->header.serial == client->msg.header.serial)) - thecall = thecall->next; - - VIR_DEBUG("Found call %p", thecall);
/* Status is either * - VIR_NET_OK - no payload for streams @@ -1202,25 +1205,47 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (virNetClientStreamQueuePacket(st, &client->msg) < 0) return -1;
- if (thecall && thecall->expectReply) { - if (thecall->msg->header.status == VIR_NET_CONTINUE) { - VIR_DEBUG("Got a synchronous confirm"); - thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } else { - VIR_DEBUG("Not completing call with status %d", thecall->msg->header.status); - } + /* Find oldest dummy message waiting for incoming data. */ + for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { + if (thecall->msg->header.prog == client->msg.header.prog && + thecall->msg->header.vers == client->msg.header.vers && + thecall->msg->header.serial == client->msg.header.serial && + thecall->expectReply && + thecall->msg->header.status == VIR_NET_CONTINUE) + break; + } + + if (thecall) { + VIR_DEBUG("Got a new incoming stream data"); + thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; } return 0; }
case VIR_NET_OK: - if (thecall && thecall->expectReply) { - VIR_DEBUG("Got a synchronous confirm"); - thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } else { + /* Find oldest abort/finish message. */ + for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { + if (thecall->msg->header.prog == client->msg.header.prog && + thecall->msg->header.vers == client->msg.header.vers && + thecall->msg->header.serial == client->msg.header.serial && + thecall->expectReply && + thecall->msg->header.status != VIR_NET_CONTINUE) + break; + } + + if (!thecall) { VIR_DEBUG("Got unexpected async stream finish confirmation"); return -1; } + + VIR_DEBUG("Got a synchronous abort/finish confirm"); + + virNetClientStreamSetClosed(st, + thecall->msg->header.status == VIR_NET_OK ? + VIR_NET_CLIENT_STREAM_CLOSED_FINISHED : + VIR_NET_CLIENT_STREAM_CLOSED_ABORTED); + + virNetClientCallCompleteAllWaitingReply(client); return 0;
case VIR_NET_ERROR: @@ -1228,10 +1253,7 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) if (virNetClientStreamSetError(st, &client->msg) < 0) return -1;
- if (thecall && thecall->expectReply) { - VIR_DEBUG("Got a synchronous error"); - thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE; - } + virNetClientCallCompleteAllWaitingReply(client); return 0;
default: @@ -2205,7 +2227,7 @@ int virNetClientSendStream(virNetClientPtr client, if (virNetClientSendInternal(client, msg, expectReply, false) < 0) goto cleanup;
- if (virNetClientStreamCheckSendStatus(st, msg) < 0) + if (expectReply && virNetClientStreamCheckSendStatus(st, msg) < 0) goto cleanup;
ret = 0; diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index cfdaa74..583cd369 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,6 +49,7 @@ struct _virNetClientStream { */ virNetMessagePtr rx; bool incomingEOF; + int closed; /* enum virNetClientStreamClosed */
bool allowSkip; long long holeLength; /* Size of incoming hole in stream. */ @@ -84,7 +85,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK) && + if (((st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK || st->closed) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -106,7 +107,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK)) + (st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK || st->closed)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) @@ -203,23 +204,61 @@ int virNetClientStreamCheckState(virNetClientStreamPtr st) return -1; }
+ if (st->closed) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("stream is closed")); + return -1; + } + return 0; }
-/* MUST be called under stream or client lock */ +/* MUST be called under stream or client lock. This should + * be called only for message that expect reply. */ int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, - virNetMessagePtr msg ATTRIBUTE_UNUSED) + virNetMessagePtr msg) { if (st->err.code != VIR_ERR_OK) { virNetClientStreamRaiseError(st); return -1; }
+ /* We can not check if the message is dummy in a usual way + * by checking msg->bufferLength because at this point message payload + * is cleared. As caller must not call this function for messages + * not expecting reply we can check for dummy messages just by status. + */ + if (msg->header.status == VIR_NET_CONTINUE) { + if (st->closed) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("stream is closed")); + return -1; + } + return 0; + } else if (msg->header.status == VIR_NET_OK && + st->closed != VIR_NET_CLIENT_STREAM_CLOSED_FINISHED) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("stream aborted by another thread")); + return -1; + } + return 0; }
+void virNetClientStreamSetClosed(virNetClientStreamPtr st, + int closed) +{ + virObjectLock(st); + + st->closed = closed; + virNetClientStreamEventTimerUpdate(st); + + virObjectUnlock(st); +} + + int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessagePtr msg) { diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 49b74bc..cb28428 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -27,6 +27,12 @@ typedef struct _virNetClientStream virNetClientStream; typedef virNetClientStream *virNetClientStreamPtr;
+typedef enum { + VIR_NET_CLIENT_STREAM_CLOSED_NOT = 0, + VIR_NET_CLIENT_STREAM_CLOSED_FINISHED, + VIR_NET_CLIENT_STREAM_CLOSED_ABORTED, +} virNetClientStreamClosed; + typedef void (*virNetClientStreamEventCallback)(virNetClientStreamPtr stream, int events, void *opaque);
@@ -43,6 +49,9 @@ int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessagePtr msg);
+void virNetClientStreamSetClosed(virNetClientStreamPtr st, + int closed);
It's okay to use virNetClientStreamClosed instead of int here. This is not a public API, we can rely on compiler doing its job here.
+ bool virNetClientStreamMatches(virNetClientStreamPtr st, virNetMessagePtr msg);
Michal

On 2/7/19 1:58 PM, Nikolay Shirokovskiy wrote:
Nikolay Shirokovskiy (9): rpc: fix race on stream abort/finish and server side abort rpc: use single function to send stream messages rpc: remove unused virNetClientSendNoReply rpc: fix propagation of errors from server rpc: add mising locking in virNetClientStreamRecvHole rpc: client: incapsulate error checks rpc: client: don't set incomingEOF on errors rpc: client stream: dispose private data on stream dispose rpc: client: stream: fix multi thread abort/finish
src/datatypes.c | 2 + src/datatypes.h | 1 + src/libvirt_remote.syms | 6 +- src/remote/remote_driver.c | 27 ++------ src/rpc/gendispatch.pl | 3 +- src/rpc/virnetclient.c | 146 ++++++++++++++++++++++--------------------- src/rpc/virnetclient.h | 6 +- src/rpc/virnetclientstream.c | 110 ++++++++++++++++++++++++-------- src/rpc/virnetclientstream.h | 17 ++++- 9 files changed, 188 insertions(+), 130 deletions(-)
ACK series. I'm fixing all the small nits I've found and pushing. I've also done some testing and nothing broke :-) Michal

On 08.02.2019 19:19, Michal Privoznik wrote:
On 2/7/19 1:58 PM, Nikolay Shirokovskiy wrote:
Nikolay Shirokovskiy (9): rpc: fix race on stream abort/finish and server side abort rpc: use single function to send stream messages rpc: remove unused virNetClientSendNoReply rpc: fix propagation of errors from server rpc: add mising locking in virNetClientStreamRecvHole rpc: client: incapsulate error checks rpc: client: don't set incomingEOF on errors rpc: client stream: dispose private data on stream dispose rpc: client: stream: fix multi thread abort/finish
src/datatypes.c | 2 + src/datatypes.h | 1 + src/libvirt_remote.syms | 6 +- src/remote/remote_driver.c | 27 ++------ src/rpc/gendispatch.pl | 3 +- src/rpc/virnetclient.c | 146 ++++++++++++++++++++++--------------------- src/rpc/virnetclient.h | 6 +- src/rpc/virnetclientstream.c | 110 ++++++++++++++++++++++++-------- src/rpc/virnetclientstream.h | 17 ++++- 9 files changed, 188 insertions(+), 130 deletions(-)
ACK series. I'm fixing all the small nits I've found and pushing.
I've also done some testing and nothing broke :-)
Thanx! Nikolay
participants (2)
-
Michal Privoznik
-
Nikolay Shirokovskiy