On Thu, Jan 30, 2020 at 03:51:05PM +0100, Pavel Hrdina wrote:
On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote:
> To eliminate the dependancy on GNULIB's poll impl, we need
> to change the RPC client code to use GMainLoop. We don't
> really want to use GIOChannel, but it provides the most
> convenient way to do socket event watches with Windows
> portability. The other alternative would be to use GSocket
> but that is a much more complex change affecting libvirt
> more broadly.
>
> Signed-off-by: Daniel P. Berrangé <berrange(a)redhat.com>
> ---
> src/rpc/virnetclient.c | 215 ++++++++++++++++++++++-------------------
> 1 file changed, 113 insertions(+), 102 deletions(-)
>
> diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
> index 031a99711f..9069c57113 100644
> --- a/src/rpc/virnetclient.c
> +++ b/src/rpc/virnetclient.c
> @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client,
> * queue and close the client because we set client->wantClose.
> */
This comment should be probably updated to not reference threads.
I'm not sure what you mean here, as the comment looks still
accurate to me.
> if (client->haveTheBuck) {
> - char ignore = 1;
> - size_t len = sizeof(ignore);
> -
> - if (safewrite(client->wakeupSendFD, &ignore, len) != len)
> - VIR_ERROR(_("failed to wake up polling thread"));
> + g_main_loop_quit(client->eventLoop);
> } else {
> virNetClientIOEventLoopPassTheBuck(client, NULL);
> }
> @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client,
> #endif
>
>
> +static gboolean
> +virNetClientIOEventTLS(int fd,
> + GIOCondition ev,
> + gpointer opaque);
> +
> +static gboolean
> +virNetClientTLSHandshake(virNetClientPtr client)
> +{
> + GIOCondition ev;
> + int ret;
> +
> + ret = virNetTLSSessionHandshake(client->tls);
> +
> + if (ret <= 0)
> + return FALSE;
> +
> + if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> + VIR_NET_TLS_HANDSHAKE_RECVING)
> + ev = G_IO_IN;
> + else
> + ev = G_IO_OUT;
> +
> + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> + ev,
> + client->eventCtx,
> + virNetClientIOEventTLS, client, NULL);
> +
> + return TRUE;
> +}
> +
> +
> +static gboolean
> +virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
> + GIOCondition ev G_GNUC_UNUSED,
> + gpointer opaque)
> +{
> + virNetClientPtr client = opaque;
> +
> + if (!virNetClientTLSHandshake(client))
> + g_main_loop_quit(client->eventLoop);
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +
> +static gboolean
> +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
> + GIOCondition ev G_GNUC_UNUSED,
> + gpointer opaque)
> +{
> + virNetClientPtr client = opaque;
> +
> + g_main_loop_quit(client->eventLoop);
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +
> int virNetClientSetTLSSession(virNetClientPtr client,
> virNetTLSContextPtr tls)
> {
> int ret;
> char buf[1];
> int len;
> - struct pollfd fds[1];
>
> #ifndef WIN32
> sigset_t oldmask, blockedsigs;
> @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client,
>
> virNetSocketSetTLSSession(client->sock, client->tls);
>
> - for (;;) {
> - ret = virNetTLSSessionHandshake(client->tls);
> -
> - if (ret < 0)
> - goto error;
> - if (ret == 0)
> - break;
> -
> - fds[0].fd = virNetSocketGetFD(client->sock);
> - fds[0].revents = 0;
> - if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> - VIR_NET_TLS_HANDSHAKE_RECVING)
> - fds[0].events = POLLIN;
> - else
> - fds[0].events = POLLOUT;
> -
> + virResetLastError();
> + if (virNetClientTLSHandshake(client)) {
> #ifndef WIN32
> /* Block SIGWINCH from interrupting poll in curses programs,
> * then restore the original signal mask again immediately
> @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> #endif /* !WIN32 */
>
> - repoll:
> - ret = poll(fds, G_N_ELEMENTS(fds), -1);
> - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> - goto repoll;
> + g_main_loop_run(client->eventLoop);
>
> #ifndef WIN32
> ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> #endif /* !WIN32 */
> }
>
> + if (virGetLastErrorCode() != VIR_ERR_OK)
> + goto error;
> +
> ret = virNetTLSContextCheckCertificate(tls, client->tls);
>
> if (ret < 0)
> @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> * etc. If we make the grade, it will send us a '\1' byte.
> */
>
> - fds[0].fd = virNetSocketGetFD(client->sock);
> - fds[0].revents = 0;
> - fds[0].events = POLLIN;
> + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> + G_IO_IN,
> + client->eventCtx,
> + virNetClientIOEventTLSConfirm, client, NULL);
>
> #ifndef WIN32
> /* Block SIGWINCH from interrupting poll in curses programs */
> ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> #endif /* !WIN32 */
>
> - repoll2:
> - ret = poll(fds, G_N_ELEMENTS(fds), -1);
> - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> - goto repoll2;
> + g_main_loop_run(client->eventLoop);
>
> #ifndef WIN32
> ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client)
> static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
> void *opaque)
> {
> - struct pollfd *fd = opaque;
> + GIOCondition *ev = opaque;
>
> if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
> - fd->events |= POLLIN;
> + *ev |= G_IO_IN;
> if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
> - fd->events |= POLLOUT;
> + *ev |= G_IO_OUT;
>
> return false;
> }
> @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> }
>
>
> +static gboolean
> +virNetClientIOEventFD(int fd G_GNUC_UNUSED,
> + GIOCondition ev,
> + gpointer opaque)
> +{
> + GIOCondition *rev = opaque;
> + *rev = ev;
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +
> /*
> * Process all calls pending dispatch/receive until we
> * get a reply to our own call. Then quit and pass the buck
> @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> static int virNetClientIOEventLoop(virNetClientPtr client,
> virNetClientCallPtr thiscall)
> {
> - struct pollfd fds[2];
> bool error = false;
> int closeReason;
> - int ret;
> -
> - fds[0].fd = virNetSocketGetFD(client->sock);
> - fds[1].fd = client->wakeupReadFD;
>
> for (;;) {
> - char ignore;
> #ifndef WIN32
> sigset_t oldmask, blockedsigs;
> #endif /* !WIN32 */
> int timeout = -1;
> virNetMessagePtr msg = NULL;
> + GIOCondition ev = 0;
> + GIOCondition rev = 0;
>
> /* If we have existing SASL decoded data we don't want to sleep in
> * the poll(), just check if any other FDs are also ready.
> @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> if (timeout == -1)
> timeout = virKeepAliveTimeout(client->keepalive);
>
> - fds[0].events = fds[0].revents = 0;
> - fds[1].events = fds[1].revents = 0;
> -
> - fds[1].events = POLLIN;
> -
> /* Calculate poll events for calls */
> virNetClientCallMatchPredicate(client->waitDispatch,
> virNetClientIOEventLoopPollEvents,
> - &fds[0]);
> + &ev);
>
> /* We have to be prepared to receive stream data
> * regardless of whether any of the calls waiting
> * for dispatch are for streams.
> */
> if (client->nstreams)
> - fds[0].events |= POLLIN;
> + ev |= G_IO_IN;
> +
> + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> + ev,
> + client->eventCtx,
> + virNetClientIOEventFD, &rev, NULL);
>
> /* Release lock while poll'ing so other threads
> * can stuff themselves on the queue */
> @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> sigaddset(&blockedsigs, SIGCHLD);
> # endif
> sigaddset(&blockedsigs, SIGPIPE);
> +
> ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> #endif /* !WIN32 */
>
> - repoll:
> - ret = poll(fds, G_N_ELEMENTS(fds), timeout);
> - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> - goto repoll;
> + while (!rev)
> + g_main_context_iteration(client->eventCtx, TRUE);
Is there a reason why we don't use g_main_loop_run() here and use
g_main_loop_quit() in virNetClientIOEventFD() the same way we use it
in virNetClientIOEventTLSConfirm() ?
If I'm looking at the code correctly the call to g_main_loop_quit() from
virNetClientIO() where we want to force other threads from poll would be
ignored by the g_main_context_iteration(). This would be a change in
behavior from the old core where the write to "client->wakeupSendFD"
would make the poll() function wake since it is listening on
"client->wakeupReadFD" as well.
Yeah you are right here.
Regards,
Daniel
--
|:
https://berrange.com -o-
https://www.flickr.com/photos/dberrange :|
|:
https://libvirt.org -o-
https://fstop138.berrange.com :|
|:
https://entangle-photo.org -o-
https://www.instagram.com/dberrange :|