This patch causes the fdstream driver to call the stream event callback
if virStreamAbort() is issued on a stream using this driver. This
prohibited to abort streams from the daemon, as the daemon remote
handler installs a callback to watch for stream errors as the only mean
of detecting changes in the stream.
* src/fdstream.c:
- modify close function to call stream event callback
---
Diff to v2:
- fixed typos
src/fdstream.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 50 insertions(+), 6 deletions(-)
diff --git a/src/fdstream.c b/src/fdstream.c
index 841f979..35f5135 100644
--- a/src/fdstream.c
+++ b/src/fdstream.c
@@ -1,5 +1,5 @@
/*
- * fdstream.h: generic streams impl for file descriptors
+ * fdstream.c: generic streams impl for file descriptors
*
* Copyright (C) 2009-2011 Red Hat, Inc.
*
@@ -55,6 +55,7 @@ struct virFDStreamData {
unsigned long long length;
int watch;
+ int events; /* events the stream callback is subscribed for */
bool cbRemoved;
bool dispatching;
bool closed;
@@ -62,6 +63,10 @@ struct virFDStreamData {
void *opaque;
virFreeCallback ff;
+ /* don't call the abort callback more than once */
+ bool abortCallbackCalled;
+ bool abortCallbackDispatching;
+
virMutex lock;
};
@@ -92,6 +97,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream)
fdst->watch = 0;
fdst->ff = NULL;
fdst->cb = NULL;
+ fdst->events = 0;
fdst->opaque = NULL;
ret = 0;
@@ -120,6 +126,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events)
}
virEventUpdateHandle(fdst->watch, events);
+ fdst->events = events;
ret = 0;
@@ -214,6 +221,8 @@ virFDStreamAddCallback(virStreamPtr st,
fdst->cb = cb;
fdst->opaque = opaque;
fdst->ff = ff;
+ fdst->events = events;
+ fdst->abortCallbackCalled = false;
virStreamRef(st);
ret = 0;
@@ -223,20 +232,43 @@ cleanup:
return ret;
}
-
static int
-virFDStreamClose(virStreamPtr st)
+virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
{
- struct virFDStreamData *fdst = st->privateData;
+ struct virFDStreamData *fdst;
int ret;
VIR_DEBUG("st=%p", st);
- if (!fdst)
+ if (!st || !(fdst = st->privateData) || fdst->abortCallbackDispatching)
return 0;
virMutexLock(&fdst->lock);
+ /* aborting the stream, ensure the callback is called if it's
+ * registered for stream error event */
+ if (streamAbort &&
+ fdst->cb &&
+ (fdst->events & (VIR_STREAM_EVENT_READABLE |
+ VIR_STREAM_EVENT_WRITABLE))) {
+ /* don't enter this function accidentaly from the callback again */
+ if (fdst->abortCallbackCalled) {
+ virMutexUnlock(&fdst->lock);
+ return 0;
+ }
+
+ fdst->abortCallbackCalled = true;
+ fdst->abortCallbackDispatching = true;
+ virMutexUnlock(&fdst->lock);
+
+ /* call failure callback, poll does report nothing on closed fd */
+ (fdst->cb)(st, VIR_STREAM_EVENT_ERROR, fdst->opaque);
+
+ virMutexLock(&fdst->lock);
+ fdst->abortCallbackDispatching = false;
+ }
+
+ /* mutex locked */
ret = VIR_CLOSE(fdst->fd);
if (fdst->cmd) {
char buf[1024];
@@ -286,6 +318,18 @@ virFDStreamClose(virStreamPtr st)
return ret;
}
+static int
+virFDStreamClose(virStreamPtr st)
+{
+ return virFDStreamCloseInt(st, false);
+}
+
+static int
+virFDStreamAbort(virStreamPtr st)
+{
+ return virFDStreamCloseInt(st, true);
+}
+
static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
{
struct virFDStreamData *fdst = st->privateData;
@@ -392,7 +436,7 @@ static virStreamDriver virFDStreamDrv = {
.streamSend = virFDStreamWrite,
.streamRecv = virFDStreamRead,
.streamFinish = virFDStreamClose,
- .streamAbort = virFDStreamClose,
+ .streamAbort = virFDStreamAbort,
.streamAddCallback = virFDStreamAddCallback,
.streamUpdateCallback = virFDStreamUpdateCallback,
.streamRemoveCallback = virFDStreamRemoveCallback
--
1.7.3.4