Now that we have formatted messages flying through pipe back and
forth, we can start introducing support for other types of
messages. For instance, a type to represent a hole in
file/stream.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
daemon/stream.c | 21 ++++---
po/POTFILES.in | 1 -
src/iohelper/iohelper_message.c | 131 ++++++++++++++++++++++++++++++++++------
src/libvirt-stream.c | 1 +
tests/Makefile.am | 2 +-
5 files changed, 129 insertions(+), 27 deletions(-)
diff --git a/daemon/stream.c b/daemon/stream.c
index 22d7cf7..83f7310 100644
--- a/daemon/stream.c
+++ b/daemon/stream.c
@@ -833,14 +833,19 @@ daemonStreamHandleRead(virNetServerClientPtr client,
VIR_DEBUG("rv=%d inData=%d length=%llu", rv, inData, length);
if (rv < 0) {
- if (virNetServerProgramSendStreamError(remoteProgram,
- client,
- msg,
- &rerr,
- stream->procedure,
- stream->serial) < 0)
- goto cleanup;
- msg = NULL;
+ if (rv == -2) {
+ /* Unable to determine yet. Claim success. */
+ } else {
+ /* Proper error. */
+ if (virNetServerProgramSendStreamError(remoteProgram,
+ client,
+ msg,
+ &rerr,
+ stream->procedure,
+ stream->serial) < 0)
+ goto cleanup;
+ msg = NULL;
+ }
/* We're done with this call */
goto done;
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 7f40200..9f4866c 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -64,7 +64,6 @@ src/interface/interface_backend_netcf.c
src/interface/interface_backend_udev.c
src/internal.h
src/iohelper/iohelper.c
-src/iohelper/iohelper_message.c
src/libvirt-admin.c
src/libvirt-domain-snapshot.c
src/libvirt-domain.c
diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c
index d900c2f..02c0283 100644
--- a/src/iohelper/iohelper_message.c
+++ b/src/iohelper/iohelper_message.c
@@ -40,6 +40,7 @@ struct iohelperCtl {
virNetMessagePtr msg;
bool msgReadyRead;
bool msgReadyWrite;
+ unsigned long long skipLength;
};
typedef ssize_t (*readfunc)(int fd, void *buf, size_t count);
@@ -122,19 +123,20 @@ messageRecv(iohelperCtlPtr ctl)
{
virNetMessagePtr msg = ctl->msg;
readfunc readF = ctl->blocking ? saferead : read;
+ virNetStreamSkip data;
ctl->msgReadyRead = false;
- if (!msg->bufferLength) {
- msg->bufferLength = 4;
- if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0)
- return -1;
- }
-
while (true) {
ssize_t nread;
size_t want;
+ if (!msg->bufferLength) {
+ msg->bufferLength = 4;
+ if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0)
+ return -1;
+ }
+
want = msg->bufferLength - msg->bufferOffset;
reread:
@@ -164,7 +166,17 @@ messageRecv(iohelperCtlPtr ctl)
if (virNetMessageDecodeHeader(msg) < 0)
return -1;
- /* Here we would decode the payload someday */
+ if (msg->header.type == VIR_NET_STREAM_SKIP) {
+ if (virNetMessageDecodePayload(msg,
+ (xdrproc_t) xdr_virNetStreamSkip,
+ &data) < 0) {
+ return -1;
+ }
+
+ ctl->skipLength += data.length;
+ messageClear(ctl);
+ continue;
+ }
ctl->msgReadyRead = true;
return msg->bufferLength - msg->bufferOffset;
@@ -239,6 +251,12 @@ iohelperRead(iohelperCtlPtr ctl,
}
}
+ /* Should never happen, but things change. */
+ if (msg->header.type != VIR_NET_STREAM) {
+ errno = EAGAIN;
+ return -1;
+ }
+
if (want > msg->bufferLength - msg->bufferOffset)
want = msg->bufferLength - msg->bufferOffset;
@@ -312,21 +330,100 @@ iohelperWrite(iohelperCtlPtr ctl,
int
-iohelperSkip(iohelperCtlPtr ctl ATTRIBUTE_UNUSED,
- unsigned long long length ATTRIBUTE_UNUSED)
+iohelperSkip(iohelperCtlPtr ctl,
+ unsigned long long length)
{
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("sparse stream not supported"));
+ virNetMessagePtr msg = ctl->msg;
+ virNetStreamSkip data;
+
+ if (messageReadyRead(ctl)) {
+ /* This stream is used for reading. */
+ return 0;
+ }
+
+ if (!messageReadyWrite(ctl)) {
+ ssize_t nwritten;
+ /* Okay, the outgoing message is not fully sent. Try to
+ * finish the sending and recheck. */
+ if ((nwritten = messageSend(ctl)) < 0)
+ return -1;
+
+ if (!nwritten && errno != EAGAIN)
+ return 0;
+
+ if (!messageReadyWrite(ctl)) {
+ errno = EAGAIN;
+ return -2;
+ }
+ }
+
+ memset(&msg->header, 0, sizeof(msg->header));
+ msg->header.type = VIR_NET_STREAM_SKIP;
+ msg->header.status = VIR_NET_CONTINUE;
+
+ memset(&data, 0, sizeof(data));
+ data.length = length;
+
+ /* Encoding a message is fatal and we should discard any
+ * partially encoded message. */
+ if (virNetMessageEncodeHeader(msg) < 0)
+ goto error;
+
+ if (virNetMessageEncodePayload(msg,
+ (xdrproc_t) xdr_virNetStreamSkip,
+ &data) < 0)
+ goto error;
+
+ /* At this point, the message is successfully encoded. Don't
+ * discard it if something below fails. */
+ if (messageSend(ctl) < 0)
+ return -1;
+
+ return 0;
+
+ error:
+ messageClear(ctl);
return -1;
}
int
-iohelperInData(iohelperCtlPtr ctl ATTRIBUTE_UNUSED,
- int *inData ATTRIBUTE_UNUSED,
- unsigned long long *length ATTRIBUTE_UNUSED)
+iohelperInData(iohelperCtlPtr ctl,
+ int *inData,
+ unsigned long long *length)
{
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("sparse stream not supported"));
- return -1;
+ virNetMessagePtr msg;
+
+ /* Make sure we have a message waiting in the queue. */
+
+ if (!messageReadyRead(ctl)) {
+ ssize_t nread;
+ /* Okay, the incoming message is not fully read. Try to
+ * finish its receiving and recheck. */
+ if ((nread = messageRecv(ctl)) < 0)
+ return -1;
+
+ if (!nread && errno != EAGAIN) {
+ /* EOF */
+ *inData = *length = 0;
+ return 0;
+ }
+
+ if (!messageReadyRead(ctl)) {
+ errno = EAGAIN;
+ return -2;
+ }
+ }
+
+ if (ctl->skipLength) {
+ *inData = 0;
+ *length = ctl->skipLength;
+ ctl->skipLength = 0;
+ } else {
+ msg = ctl->msg;
+ *inData = 1;
+ *length = msg->bufferLength - msg->bufferOffset;
+ }
+
+ return 0;
}
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
index 2632d55..13cbbe5 100644
--- a/src/libvirt-stream.c
+++ b/src/libvirt-stream.c
@@ -491,6 +491,7 @@ virStreamHoleSize(virStreamPtr stream,
* and return 0.
*
* Returns 0 on success,
+ * -2 if unable to determine yet,
* -1 otherwise
*/
int
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a87de5f..aa35a6f 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1303,7 +1303,7 @@ iohelpermessagetest_SOURCES = \
iohelpermessagetest_CFLAGS = \
$(AM_CFLAGS) -I$(top_srcdir)/src/iohelper
iohelpermessagetest_LDADD = \
- $(LDADDS) ../src/libvirt-iohelper.la
+ $(LDADDS) ../src/libvirt-iohelper.la ../src/libvirt-net-rpc.la
endif WITH_LIBVIRTD
libshunload_la_SOURCES = shunloadhelper.c
--
2.8.4