This is a bit tricky to grasp. But lets try anyway. So we read
data from a pipe. Now, the data are not raw file data as they
used to be. But they are a formatted message (encoded
virNetMessage in this case). So until we've read it whole, we
must: a) continue reading its remainder b) if reading would
block and caller wants us to read nonblock, claim EAGAIN and
don't return any partially read data.
Now, another interesting thing may happen - we've read a big
message, decoded it but caller is reading just a small chunks. In
that case we must not read any new message but rather copy those
small chunks into caller's buffer until the whole message is
processed. Only after that we can read new message from the pipe
and process continues.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/fdstream.c | 2 +-
src/iohelper/iohelper_message.c | 137 +++++++++++++++++++++++++++++++++++++---
src/iohelper/iohelper_message.h | 3 +-
3 files changed, 131 insertions(+), 11 deletions(-)
diff --git a/src/fdstream.c b/src/fdstream.c
index dc41164..77b5586 100644
--- a/src/fdstream.c
+++ b/src/fdstream.c
@@ -646,7 +646,7 @@ static int virFDStreamOpenInternal(virStreamPtr st,
fdst->length = length;
fdst->formatted = formatted;
if (formatted &&
- !(fdst->ioCtl = iohelperCtlNew(fd)))
+ !(fdst->ioCtl = iohelperCtlNew(fd, false)))
goto error;
diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c
index 51b283d..fe2304b 100644
--- a/src/iohelper/iohelper_message.c
+++ b/src/iohelper/iohelper_message.c
@@ -22,8 +22,11 @@
#include <config.h>
#include "iohelper_message.h"
-#include "virobject.h"
+#include "viralloc.h"
+#include "virfile.h"
#include "virlog.h"
+#include "virnetmessage.h"
+#include "virobject.h"
#define VIR_FROM_THIS VIR_FROM_STREAMS
@@ -33,8 +36,13 @@ struct iohelperCtl {
virObject parent;
int fd;
+ bool blocking;
+ virNetMessagePtr msg;
+ bool msgReadyRead;
};
+typedef ssize_t (*readfunc)(int fd, void *buf, size_t count);
+
static virClassPtr iohelperCtlClass;
static void
@@ -42,7 +50,7 @@ iohelperCtlDispose(void *obj)
{
iohelperCtlPtr ctl = obj;
- VIR_DEBUG("obj = %p", ctl);
+ virNetMessageFree(ctl->msg);
}
static int iohelperCtlOnceInit(void)
@@ -59,7 +67,8 @@ static int iohelperCtlOnceInit(void)
VIR_ONCE_GLOBAL_INIT(iohelperCtl)
iohelperCtlPtr
-iohelperCtlNew(int fd)
+iohelperCtlNew(int fd,
+ bool blocking)
{
iohelperCtlPtr ret;
@@ -69,20 +78,130 @@ iohelperCtlNew(int fd)
if (!(ret = virObjectNew(iohelperCtlClass)))
return NULL;
+ if (!(ret->msg = virNetMessageNew(false)))
+ goto error;
+
ret->fd = fd;
+ ret->blocking = blocking;
+ ret->msgReadyRead = false;
return ret;
+
+ error:
+ virObjectUnref(ret);
+ return NULL;
+}
+
+
+static void
+messageClear(iohelperCtlPtr ctl)
+{
+ virNetMessageClear(ctl->msg);
+ ctl->msgReadyRead = false;
+}
+
+
+static inline bool
+messageReadyRead(iohelperCtlPtr ctl)
+{
+ return ctl->msgReadyRead;
+}
+
+
+static ssize_t
+messageRecv(iohelperCtlPtr ctl)
+{
+ virNetMessagePtr msg = ctl->msg;
+ readfunc readF = ctl->blocking ? saferead : read;
+
+ ctl->msgReadyRead = false;
+
+ if (!msg->bufferLength) {
+ msg->bufferLength = 4;
+ if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0)
+ return -1;
+ }
+
+ while (true) {
+ ssize_t nread;
+ size_t want;
+
+ want = msg->bufferLength - msg->bufferOffset;
+
+ reread:
+ errno = 0;
+ nread = readF(ctl->fd,
+ msg->buffer + msg->bufferOffset,
+ want);
+
+ if (nread < 0) {
+ if (errno == EINTR)
+ goto reread;
+ if (errno == EAGAIN)
+ return 0;
+ return -1;
+ } else if (nread == 0) {
+ /* EOF while reading */
+ return 0;
+ } else {
+ msg->bufferOffset += nread;
+ }
+
+ if (msg->bufferOffset == msg->bufferLength) {
+ if (msg->bufferOffset == 4) {
+ if (virNetMessageDecodeLength(msg) < 0)
+ return -1;
+ } else {
+ if (virNetMessageDecodeHeader(msg) < 0)
+ return -1;
+
+ /* Here we would decode the payload someday */
+
+ ctl->msgReadyRead = true;
+ return msg->bufferLength - msg->bufferOffset;
+ }
+ }
+ }
}
ssize_t
-iohelperRead(iohelperCtlPtr ctl ATTRIBUTE_UNUSED,
- char *bytes ATTRIBUTE_UNUSED,
- size_t nbytes ATTRIBUTE_UNUSED)
+iohelperRead(iohelperCtlPtr ctl,
+ char *bytes,
+ size_t nbytes)
{
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("sparse stream not supported"));
- return -1;
+ ssize_t want = nbytes;
+ virNetMessagePtr msg = ctl->msg;
+
+ if (!messageReadyRead(ctl)) {
+ ssize_t nread;
+ /* Okay, the incoming message is not fully read. Try to
+ * finish its receiving and recheck. */
+ if ((nread = messageRecv(ctl)) < 0)
+ return -1;
+
+ if (!nread && errno != EAGAIN)
+ return 0;
+
+ if (!messageReadyRead(ctl)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ }
+
+ if (want > msg->bufferLength - msg->bufferOffset)
+ want = msg->bufferLength - msg->bufferOffset;
+
+ memcpy(bytes,
+ msg->buffer + msg->bufferOffset,
+ want);
+
+ msg->bufferOffset += want;
+
+ if (msg->bufferOffset == msg->bufferLength)
+ messageClear(ctl);
+
+ return want;
}
diff --git a/src/iohelper/iohelper_message.h b/src/iohelper/iohelper_message.h
index 74afd49..68beef0 100644
--- a/src/iohelper/iohelper_message.h
+++ b/src/iohelper/iohelper_message.h
@@ -28,7 +28,8 @@
typedef struct iohelperCtl iohelperCtl;
typedef iohelperCtl *iohelperCtlPtr;
-iohelperCtlPtr iohelperCtlNew(int fd);
+iohelperCtlPtr iohelperCtlNew(int fd,
+ bool blocking);
ssize_t
iohelperRead(iohelperCtlPtr ctl,
--
2.8.4