
On Thu, Jan 30, 2020 at 03:51:05PM +0100, Pavel Hrdina wrote:
On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote:
To eliminate the dependancy on GNULIB's poll impl, we need to change the RPC client code to use GMainLoop. We don't really want to use GIOChannel, but it provides the most convenient way to do socket event watches with Windows portability. The other alternative would be to use GSocket but that is a much more complex change affecting libvirt more broadly.
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> --- src/rpc/virnetclient.c | 215 ++++++++++++++++++++++------------------- 1 file changed, 113 insertions(+), 102 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 031a99711f..9069c57113 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client, * queue and close the client because we set client->wantClose. */
This comment should be probably updated to not reference threads.
I'm not sure what you mean here, as the comment looks still accurate to me.
if (client->haveTheBuck) { - char ignore = 1; - size_t len = sizeof(ignore); - - if (safewrite(client->wakeupSendFD, &ignore, len) != len) - VIR_ERROR(_("failed to wake up polling thread")); + g_main_loop_quit(client->eventLoop); } else { virNetClientIOEventLoopPassTheBuck(client, NULL); } @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client, #endif
+static gboolean +virNetClientIOEventTLS(int fd, + GIOCondition ev, + gpointer opaque); + +static gboolean +virNetClientTLSHandshake(virNetClientPtr client) +{ + GIOCondition ev; + int ret; + + ret = virNetTLSSessionHandshake(client->tls); + + if (ret <= 0) + return FALSE; + + if (virNetTLSSessionGetHandshakeStatus(client->tls) == + VIR_NET_TLS_HANDSHAKE_RECVING) + ev = G_IO_IN; + else + ev = G_IO_OUT; + + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), + ev, + client->eventCtx, + virNetClientIOEventTLS, client, NULL); + + return TRUE; +} + + +static gboolean +virNetClientIOEventTLS(int fd G_GNUC_UNUSED, + GIOCondition ev G_GNUC_UNUSED, + gpointer opaque) +{ + virNetClientPtr client = opaque; + + if (!virNetClientTLSHandshake(client)) + g_main_loop_quit(client->eventLoop); + + return G_SOURCE_REMOVE; +} + + +static gboolean +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED, + GIOCondition ev G_GNUC_UNUSED, + gpointer opaque) +{ + virNetClientPtr client = opaque; + + g_main_loop_quit(client->eventLoop); + + return G_SOURCE_REMOVE; +} + + int virNetClientSetTLSSession(virNetClientPtr client, virNetTLSContextPtr tls) { int ret; char buf[1]; int len; - struct pollfd fds[1];
#ifndef WIN32 sigset_t oldmask, blockedsigs; @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client,
virNetSocketSetTLSSession(client->sock, client->tls);
- for (;;) { - ret = virNetTLSSessionHandshake(client->tls); - - if (ret < 0) - goto error; - if (ret == 0) - break; - - fds[0].fd = virNetSocketGetFD(client->sock); - fds[0].revents = 0; - if (virNetTLSSessionGetHandshakeStatus(client->tls) == - VIR_NET_TLS_HANDSHAKE_RECVING) - fds[0].events = POLLIN; - else - fds[0].events = POLLOUT; - + virResetLastError(); + if (virNetClientTLSHandshake(client)) { #ifndef WIN32 /* Block SIGWINCH from interrupting poll in curses programs, * then restore the original signal mask again immediately @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client, ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); #endif /* !WIN32 */
- repoll: - ret = poll(fds, G_N_ELEMENTS(fds), -1); - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) - goto repoll; + g_main_loop_run(client->eventLoop);
#ifndef WIN32 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); #endif /* !WIN32 */ }
+ if (virGetLastErrorCode() != VIR_ERR_OK) + goto error; + ret = virNetTLSContextCheckCertificate(tls, client->tls);
if (ret < 0) @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client, * etc. If we make the grade, it will send us a '\1' byte. */
- fds[0].fd = virNetSocketGetFD(client->sock); - fds[0].revents = 0; - fds[0].events = POLLIN; + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), + G_IO_IN, + client->eventCtx, + virNetClientIOEventTLSConfirm, client, NULL);
#ifndef WIN32 /* Block SIGWINCH from interrupting poll in curses programs */ ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); #endif /* !WIN32 */
- repoll2: - ret = poll(fds, G_N_ELEMENTS(fds), -1); - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) - goto repoll2; + g_main_loop_run(client->eventLoop);
#ifndef WIN32 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client) static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, void *opaque) { - struct pollfd *fd = opaque; + GIOCondition *ev = opaque;
if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) - fd->events |= POLLIN; + *ev |= G_IO_IN; if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - fd->events |= POLLOUT; + *ev |= G_IO_OUT;
return false; } @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, }
+static gboolean +virNetClientIOEventFD(int fd G_GNUC_UNUSED, + GIOCondition ev, + gpointer opaque) +{ + GIOCondition *rev = opaque; + *rev = ev; + + return G_SOURCE_REMOVE; +} + + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) { - struct pollfd fds[2]; bool error = false; int closeReason; - int ret; - - fds[0].fd = virNetSocketGetFD(client->sock); - fds[1].fd = client->wakeupReadFD;
for (;;) { - char ignore; #ifndef WIN32 sigset_t oldmask, blockedsigs; #endif /* !WIN32 */ int timeout = -1; virNetMessagePtr msg = NULL; + GIOCondition ev = 0; + GIOCondition rev = 0;
/* If we have existing SASL decoded data we don't want to sleep in * the poll(), just check if any other FDs are also ready. @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (timeout == -1) timeout = virKeepAliveTimeout(client->keepalive);
- fds[0].events = fds[0].revents = 0; - fds[1].events = fds[1].revents = 0; - - fds[1].events = POLLIN; - /* Calculate poll events for calls */ virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopPollEvents, - &fds[0]); + &ev);
/* We have to be prepared to receive stream data * regardless of whether any of the calls waiting * for dispatch are for streams. */ if (client->nstreams) - fds[0].events |= POLLIN; + ev |= G_IO_IN; + + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), + ev, + client->eventCtx, + virNetClientIOEventFD, &rev, NULL);
/* Release lock while poll'ing so other threads * can stuff themselves on the queue */ @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, sigaddset(&blockedsigs, SIGCHLD); # endif sigaddset(&blockedsigs, SIGPIPE); + ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); #endif /* !WIN32 */
- repoll: - ret = poll(fds, G_N_ELEMENTS(fds), timeout); - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) - goto repoll; + while (!rev) + g_main_context_iteration(client->eventCtx, TRUE);
Is there a reason why we don't use g_main_loop_run() here and use g_main_loop_quit() in virNetClientIOEventFD() the same way we use it in virNetClientIOEventTLSConfirm() ?
If I'm looking at the code correctly the call to g_main_loop_quit() from virNetClientIO() where we want to force other threads from poll would be ignored by the g_main_context_iteration(). This would be a change in behavior from the old core where the write to "client->wakeupSendFD" would make the poll() function wake since it is listening on "client->wakeupReadFD" as well.
Yeah you are right here. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|