From: "Daniel P. Berrange" <berrange(a)redhat.com>
Directly messing around with the linked list is potentially
dangerous. Introduce some helper APIs to deal with list
manipulating the list
* src/rpc/virnetclient.c: Create linked list handlers
---
src/rpc/virnetclient.c | 207 +++++++++++++++++++++++++++++++++---------------
1 files changed, 144 insertions(+), 63 deletions(-)
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 4b7d4a9..463dc5a 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -110,6 +110,97 @@ static void virNetClientIncomingEvent(virNetSocketPtr sock,
int events,
void *opaque);
+/* Append a call to the end of the list */
+static void virNetClientCallQueue(virNetClientCallPtr *head,
+ virNetClientCallPtr call)
+{
+ virNetClientCallPtr tmp = *head;
+ while (tmp && tmp->next) {
+ tmp = tmp->next;
+ }
+ if (tmp)
+ tmp->next = call;
+ else
+ *head = call;
+ call->next = NULL;
+}
+
+#if 0
+/* Obtain a call from the head of the list */
+static virNetClientCallPtr virNetClientCallServe(virNetClientCallPtr *head)
+{
+ virNetClientCallPtr tmp = *head;
+ if (tmp)
+ *head = tmp->next;
+ else
+ *head = NULL;
+ tmp->next = NULL;
+ return tmp;
+}
+#endif
+
+/* Remove a call from anywhere in the list */
+static void virNetClientCallRemove(virNetClientCallPtr *head,
+ virNetClientCallPtr call)
+{
+ virNetClientCallPtr tmp = *head;
+ virNetClientCallPtr prev = NULL;
+ while (tmp) {
+ if (tmp == call) {
+ if (prev)
+ prev->next = tmp->next;
+ else
+ *head = tmp->next;
+ tmp->next = NULL;
+ return;
+ }
+ prev = tmp;
+ tmp = tmp->next;
+ }
+}
+
+/* Predicate returns true if matches */
+typedef bool (*virNetClientCallPredicate)(virNetClientCallPtr call, void *opaque);
+
+/* Remove a list of calls from the list based on a predicate */
+static void virNetClientCallRemovePredicate(virNetClientCallPtr *head,
+ virNetClientCallPredicate pred,
+ void *opaque)
+{
+ virNetClientCallPtr tmp = *head;
+ virNetClientCallPtr prev = NULL;
+ while (tmp) {
+ virNetClientCallPtr next = tmp->next;
+ tmp->next = NULL; /* Temp unlink */
+ if (pred(tmp, opaque)) {
+ if (prev)
+ prev->next = next;
+ else
+ *head = next;
+ } else {
+ tmp->next = next; /* Reverse temp unlink */
+ prev = tmp;
+ }
+ tmp = next;
+ }
+}
+
+/* Returns true if the predicate matches at least one call in the list */
+static bool virNetClientCallMatchPredicate(virNetClientCallPtr head,
+ virNetClientCallPredicate pred,
+ void *opaque)
+{
+ virNetClientCallPtr tmp = head;
+ while (tmp) {
+ if (pred(tmp, opaque)) {
+ return true;
+ }
+ tmp = tmp->next;
+ }
+ return false;
+}
+
+
static void virNetClientEventFree(void *opaque)
{
virNetClientPtr client = opaque;
@@ -896,6 +987,42 @@ virNetClientIOHandleInput(virNetClientPtr client)
}
+static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
+ void *opaque)
+{
+ struct pollfd *fd = opaque;
+
+ if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
+ fd->events |= POLLIN;
+ if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
+ fd->events |= POLLOUT;
+
+ return true;
+}
+
+
+static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
+ void *opaque)
+{
+ virNetClientCallPtr thiscall = opaque;
+
+ if (call == thiscall)
+ return false;
+
+ if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE)
+ return false;
+
+ /*
+ * ...they won't actually wakeup until
+ * we release our mutex a short while
+ * later...
+ */
+ VIR_DEBUG("Waking up sleeping call %p", call);
+ virCondSignal(&call->cond);
+
+ return true;
+}
+
/*
* Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck
@@ -911,8 +1038,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[1].fd = client->wakeupReadFD;
for (;;) {
- virNetClientCallPtr tmp = client->waitDispatch;
- virNetClientCallPtr prev;
char ignore;
sigset_t oldmask, blockedsigs;
int timeout = -1;
@@ -928,14 +1053,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
fds[1].events = fds[1].revents = 0;
fds[1].events = POLLIN;
- while (tmp) {
- if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
- fds[0].events |= POLLIN;
- if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
- fds[0].events |= POLLOUT;
- tmp = tmp->next;
- }
+ /* Calculate poll events for calls */
+ virNetClientCallMatchPredicate(client->waitDispatch,
+ virNetClientIOEventLoopPollEvents,
+ &fds[0]);
/* We have to be prepared to receive stream data
* regardless of whether any of the calls waiting
@@ -1008,37 +1130,16 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
/* Iterate through waiting threads and if
* any are complete then tell 'em to wakeup
*/
- tmp = client->waitDispatch;
- prev = NULL;
- while (tmp) {
- if (tmp != thiscall &&
- tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
- /* Take them out of the list */
- if (prev)
- prev->next = tmp->next;
- else
- client->waitDispatch = tmp->next;
-
- /* And wake them up....
- * ...they won't actually wakeup until
- * we release our mutex a short while
- * later...
- */
- VIR_DEBUG("Waking up sleep %p %p", tmp,
client->waitDispatch);
- virCondSignal(&tmp->cond);
- } else {
- prev = tmp;
- }
- tmp = tmp->next;
- }
+ virNetClientCallRemovePredicate(&client->waitDispatch,
+ virNetClientIOEventLoopRemoveDone,
+ thiscall);
/* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
- /* We're at head of the list already, so
- * remove us
- */
- client->waitDispatch = thiscall->next;
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
+
VIR_DEBUG("Giving up the buck %p %p", thiscall,
client->waitDispatch);
+
/* See if someone else is still waiting
* and if so, then pass the buck ! */
if (client->waitDispatch) {
@@ -1058,7 +1159,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
error:
- client->waitDispatch = thiscall->next;
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall,
client->waitDispatch);
/* See if someone else is still waiting
* and if so, then pass the buck ! */
@@ -1119,22 +1220,13 @@ static int virNetClientIO(virNetClientPtr client,
/* Check to see if another thread is dispatching */
if (client->waitDispatch) {
- /* Stick ourselves on the end of the wait queue */
- virNetClientCallPtr tmp = client->waitDispatch;
char ignore = 1;
- while (tmp && tmp->next)
- tmp = tmp->next;
- if (tmp)
- tmp->next = thiscall;
- else
- client->waitDispatch = thiscall;
+ /* Stick ourselves on the end of the wait queue */
+ virNetClientCallQueue(&client->waitDispatch, thiscall);
/* Force other thread to wakeup from poll */
if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) !=
sizeof(ignore)) {
- if (tmp)
- tmp->next = NULL;
- else
- client->waitDispatch = NULL;
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
virReportSystemError(errno, "%s",
_("failed to wake up polling thread"));
return -1;
@@ -1143,17 +1235,7 @@ static int virNetClientIO(virNetClientPtr client,
VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
/* Go to sleep while other thread is working... */
if (virCondWait(&thiscall->cond, &client->lock) < 0) {
- if (client->waitDispatch == thiscall) {
- client->waitDispatch = thiscall->next;
- } else {
- tmp = client->waitDispatch;
- while (tmp && tmp->next &&
- tmp->next != thiscall) {
- tmp = tmp->next;
- }
- if (tmp && tmp->next == thiscall)
- tmp->next = thiscall->next;
- }
+ virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("failed to wait on condition"));
return -1;
@@ -1177,10 +1259,9 @@ static int virNetClientIO(virNetClientPtr client,
}
/* Grr, someone passed the buck onto us ... */
-
} else {
- /* We're first to catch the buck */
- client->waitDispatch = thiscall;
+ /* We're the first to arrive */
+ virNetClientCallQueue(&client->waitDispatch, thiscall);
}
VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);
--
1.7.6.4