[libvirt] [PATCH 0/2] rpc: client: stream bugfix and improvement

Nikolay Shirokovskiy (2): rpc: client: fix race on stream error and stream creation rpc: client: stream: notify streams of closing connection src/rpc/virnetclient.c | 13 ++++++++++--- src/rpc/virnetclientstream.c | 30 ++++++++++++++++++++++++++++-- src/rpc/virnetclientstream.h | 2 ++ 3 files changed, 40 insertions(+), 5 deletions(-) -- 1.8.3.1

Message of API call that creates stream and stream itself have same rpc serial. This can lead to issues. If stream got error immediately after creation then notification can be delivered before API call reply arrived. This is possible because the reply and the error message are sent from different threads (rpc worker thread and event loop thread respectively). As we don't check for message type in virNetClientCallCompleteAllWaitingReply we complete the API call which leads to API call error [1] as there is no actual reply. Later when reply arrives connection will be closed due to protocol error (see check in virNetClientCallDispatchReply). Let's fix virNetClientCallCompleteAllWaitingReply. Queue inspection on arriving VIR_NET_CONTINUE message in virNetClientCallDispatchStream is safe because there we check for status field also. Queue inspection on arriving VIR_NET_OK is safe too as this message can not arrive before we call virFinishAbort(Finish) which is not possible before API call that creates streams returns. But just to be sure let's add checking message type in these places too. [1] error: internal error: Unexpected message type 0 Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 64855fb..0393587 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1163,7 +1163,8 @@ static void virNetClientCallCompleteAllWaitingReply(virNetClientPtr client) virNetClientCallPtr call; for (call = client->waitDispatch; call; call = call->next) { - if (call->msg->header.prog == client->msg.header.prog && + if (call->msg->header.type == VIR_NET_STREAM && + call->msg->header.prog == client->msg.header.prog && call->msg->header.vers == client->msg.header.vers && call->msg->header.serial == client->msg.header.serial && call->expectReply) @@ -1207,7 +1208,8 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) /* Find oldest dummy message waiting for incoming data. */ for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { - if (thecall->msg->header.prog == client->msg.header.prog && + if (thecall->msg->header.type == VIR_NET_STREAM && + thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply && @@ -1225,7 +1227,8 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) case VIR_NET_OK: /* Find oldest abort/finish message. */ for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { - if (thecall->msg->header.prog == client->msg.header.prog && + if (thecall->msg->header.type == VIR_NET_STREAM && + thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply && -- 1.8.3.1

Below is patch that simulates race behaviour. Let's add delay on replying to virDomainOpenConsole and report error for console stream immediately. For this let's use IO helper thread for console fd stream. With this patch you'll get next error in interactive virsh session: (note that a generated file with stubs is patched also) ######################################################### Welcome to virsh, the virtualization interactive terminal. Type: 'help' for help with commands 'quit' to quit virsh # console centos2 Connected to domain centos2 Escape character is ^] error: internal error: Unexpected message type 0 virsh # error: Disconnected from qemu:///system due to I/O error diff --git a/src/conf/virchrdev.c b/src/conf/virchrdev.c index 5090a67..740845c 100644 --- a/src/conf/virchrdev.c +++ b/src/conf/virchrdev.c @@ -413,7 +413,7 @@ int virChrdevOpen(virChrdevsPtr devs, /* open the character device */ switch (source->type) { case VIR_DOMAIN_CHR_TYPE_PTY: - if (virFDStreamOpenPTY(st, path, 0, 0, O_RDWR) < 0) + if (virFDStreamOpenPTY(st, path, 0, 0, O_RDONLY) < 0) goto error; break; case VIR_DOMAIN_CHR_TYPE_UNIX: diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 1bc43e2..6968f95 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -434,6 +434,9 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst, char *buf = NULL; ssize_t got; + virReportError(VIR_ERR_INTERNAL_ERROR, _("immediate stream error")); + goto error; + if (sparse && *dataLen == 0) { if (virFileInData(fdin, &inData, §ionLen) < 0) goto error; @@ -1376,11 +1379,13 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false, false) < 0) + true, false) < 0) return -1; fdst = st->privateData; + return 0; + if (tcgetattr(fdst->fd, &rawattr) < 0) { virReportSystemError(errno, _("unable to get tty attributes: %s"), --- a/src/remote/remote_daemon_dispatch_stubs.h +++ b/src/remote/remote_daemon_dispatch_stubs.h @@ -8936,6 +8936,8 @@ if (daemonAddClientStream(client, stream, true) < 0) goto cleanup; + sleep(3); + rv = 0; cleanup:

On 2/15/19 9:10 AM, Nikolay Shirokovskiy wrote:
Message of API call that creates stream and stream itself have same rpc serial. This can lead to issues. If stream got error immediately after creation then notification can be delivered before API call reply arrived. This is possible because the reply and the error message are sent from different threads (rpc worker thread and event loop thread respectively). As we don't check for message type in virNetClientCallCompleteAllWaitingReply we complete the API call which leads to API call error [1] as there is no actual reply. Later when reply arrives connection will be closed due to protocol error (see check in virNetClientCallDispatchReply). Let's fix virNetClientCallCompleteAllWaitingReply.
Queue inspection on arriving VIR_NET_CONTINUE message in virNetClientCallDispatchStream is safe because there we check for status field also. Queue inspection on arriving VIR_NET_OK is safe too as this message can not arrive before we call virFinishAbort(Finish) which is not possible before API call that creates streams returns. But just to be sure let's add checking message type in these places too.
[1] error: internal error: Unexpected message type 0
Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-)
The commit message was hard to read, but I know it's an english is not the first language type thing.
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 64855fb..0393587 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1163,7 +1163,8 @@ static void virNetClientCallCompleteAllWaitingReply(virNetClientPtr client) virNetClientCallPtr call;
for (call = client->waitDispatch; call; call = call->next) { - if (call->msg->header.prog == client->msg.header.prog && + if (call->msg->header.type == VIR_NET_STREAM &&
What about VIR_NET_STREAM_HOLE ? Hopefully someone with greater working knowledge of the processing of this code can help out here. I guess I'm just concerned about some odd processing case that may have expected to be called, but now wouldn't be because of this change. BTW: any thoughts to perhaps extracting out the common parts between the three places changed into it's own helper/method? e.g.: thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply and then the follow-up patch to add the additional '.type' checks? John
+ call->msg->header.prog == client->msg.header.prog && call->msg->header.vers == client->msg.header.vers && call->msg->header.serial == client->msg.header.serial && call->expectReply) @@ -1207,7 +1208,8 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
/* Find oldest dummy message waiting for incoming data. */ for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { - if (thecall->msg->header.prog == client->msg.header.prog && + if (thecall->msg->header.type == VIR_NET_STREAM && + thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply && @@ -1225,7 +1227,8 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) case VIR_NET_OK: /* Find oldest abort/finish message. */ for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { - if (thecall->msg->header.prog == client->msg.header.prog && + if (thecall->msg->header.type == VIR_NET_STREAM && + thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply &&

On 22.02.2019 18:15, John Ferlan wrote:
On 2/15/19 9:10 AM, Nikolay Shirokovskiy wrote:
Message of API call that creates stream and stream itself have same rpc serial. This can lead to issues. If stream got error immediately after creation then notification can be delivered before API call reply arrived. This is possible because the reply and the error message are sent from different threads (rpc worker thread and event loop thread respectively). As we don't check for message type in virNetClientCallCompleteAllWaitingReply we complete the API call which leads to API call error [1] as there is no actual reply. Later when reply arrives connection will be closed due to protocol error (see check in virNetClientCallDispatchReply). Let's fix virNetClientCallCompleteAllWaitingReply.
Queue inspection on arriving VIR_NET_CONTINUE message in virNetClientCallDispatchStream is safe because there we check for status field also. Queue inspection on arriving VIR_NET_OK is safe too as this message can not arrive before we call virFinishAbort(Finish) which is not possible before API call that creates streams returns. But just to be sure let's add checking message type in these places too.
[1] error: internal error: Unexpected message type 0
Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-)
The commit message was hard to read, but I know it's an english is not the first language type thing.
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 64855fb..0393587 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -1163,7 +1163,8 @@ static void virNetClientCallCompleteAllWaitingReply(virNetClientPtr client) virNetClientCallPtr call;
for (call = client->waitDispatch; call; call = call->next) { - if (call->msg->header.prog == client->msg.header.prog && + if (call->msg->header.type == VIR_NET_STREAM &&
What about VIR_NET_STREAM_HOLE ?
Here and in the other places we care only on "dummy recv"/finish/abort messages (only such messages have expectReply flag set) and they all have type VIR_NET_STREAM.
Hopefully someone with greater working knowledge of the processing of this code can help out here. I guess I'm just concerned about some odd processing case that may have expected to be called, but now wouldn't be because of this change.
BTW: any thoughts to perhaps extracting out the common parts between the three places changed into it's own helper/method? e.g.:
thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply
and then the follow-up patch to add the additional '.type' checks?
I'm ok. Nikolay
+ call->msg->header.prog == client->msg.header.prog && call->msg->header.vers == client->msg.header.vers && call->msg->header.serial == client->msg.header.serial && call->expectReply) @@ -1207,7 +1208,8 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
/* Find oldest dummy message waiting for incoming data. */ for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { - if (thecall->msg->header.prog == client->msg.header.prog && + if (thecall->msg->header.type == VIR_NET_STREAM && + thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply && @@ -1225,7 +1227,8 @@ static int virNetClientCallDispatchStream(virNetClientPtr client) case VIR_NET_OK: /* Find oldest abort/finish message. */ for (thecall = client->waitDispatch; thecall; thecall = thecall->next) { - if (thecall->msg->header.prog == client->msg.header.prog && + if (thecall->msg->header.type == VIR_NET_STREAM && + thecall->msg->header.prog == client->msg.header.prog && thecall->msg->header.vers == client->msg.header.vers && thecall->msg->header.serial == client->msg.header.serial && thecall->expectReply &&

It not done yet. As a result if we doing 'virsh console' and libvirtd is killed we get [1] message as virsh tracks connection status but virsh itself won't exit because it won't get notification that stream is broken. Only after we press a key and virsh tries to write that key code to stream we get an error. So let's add notification that stream's underlying connection is closed. [1] error: Disconnected from qemu:///system due to end of file Signed-off-by: Nikolay Shirokovskiy <nshirokovskiy@virtuozzo.com> --- src/rpc/virnetclient.c | 4 ++++ src/rpc/virnetclientstream.c | 30 ++++++++++++++++++++++++++++-- src/rpc/virnetclientstream.h | 2 ++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 0393587..3fa0320 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -785,6 +785,7 @@ static void virNetClientCloseLocked(virNetClientPtr client) { virKeepAlivePtr ka; + size_t i; VIR_DEBUG("client=%p, sock=%p, reason=%d", client, client->sock, client->closeReason); @@ -825,6 +826,9 @@ virNetClientCloseLocked(virNetClientPtr client) virObjectLock(client); virObjectUnref(client); } + + for (i = 0; i < client->nstreams; i++) + virNetClientStreamSetClientClosed(client->streams[i]); } static void virNetClientCloseInternal(virNetClientPtr client, diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 834c448..de0c961 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -50,6 +50,7 @@ struct _virNetClientStream { virNetMessagePtr rx; bool incomingEOF; virNetClientStreamClosed closed; + bool clientClosed; bool allowSkip; long long holeLength; /* Size of incoming hole in stream. */ @@ -85,7 +86,11 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); - if (((st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK || st->closed) && + if (((st->rx || + st->incomingEOF || + st->err.code != VIR_ERR_OK || + st->closed || + st->clientClosed) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -107,7 +112,11 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->rx || st->incomingEOF || st->err.code != VIR_ERR_OK || st->closed)) + (st->rx || + st->incomingEOF || + st->err.code != VIR_ERR_OK || + st->closed || + st->clientClosed)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) @@ -204,6 +213,12 @@ int virNetClientStreamCheckState(virNetClientStreamPtr st) return -1; } + if (st->clientClosed) { + virReportError(VIR_ERR_OPERATION_FAILED, "%s", + _("client socket is closed")); + return -1; + } + if (st->closed) { virReportError(VIR_ERR_OPERATION_FAILED, "%s", _("stream is closed")); @@ -247,6 +262,17 @@ int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, } +void virNetClientStreamSetClientClosed(virNetClientStreamPtr st) +{ + virObjectLock(st); + + st->clientClosed = true; + virNetClientStreamEventTimerUpdate(st); + + virObjectUnlock(st); +} + + void virNetClientStreamSetClosed(virNetClientStreamPtr st, virNetClientStreamClosed closed) { diff --git a/src/rpc/virnetclientstream.h b/src/rpc/virnetclientstream.h index 33a2af3..f1df4e2 100644 --- a/src/rpc/virnetclientstream.h +++ b/src/rpc/virnetclientstream.h @@ -49,6 +49,8 @@ int virNetClientStreamCheckSendStatus(virNetClientStreamPtr st, int virNetClientStreamSetError(virNetClientStreamPtr st, virNetMessagePtr msg); +void virNetClientStreamSetClientClosed(virNetClientStreamPtr st); + void virNetClientStreamSetClosed(virNetClientStreamPtr st, virNetClientStreamClosed closed); -- 1.8.3.1

ping On 15.02.2019 17:10, Nikolay Shirokovskiy wrote:
Nikolay Shirokovskiy (2): rpc: client: fix race on stream error and stream creation rpc: client: stream: notify streams of closing connection
src/rpc/virnetclient.c | 13 ++++++++++--- src/rpc/virnetclientstream.c | 30 ++++++++++++++++++++++++++++-- src/rpc/virnetclientstream.h | 2 ++ 3 files changed, 40 insertions(+), 5 deletions(-)
participants (2)
-
John Ferlan
-
Nikolay Shirokovskiy