Implement virStreamSkip and virStreamInData callbacks. These
callbacks do no magic, just skip a hole or detect whether we are
in a data section of a file or in a hole and how much bytes can
we read until section changes.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/storage/storage_util.c | 4 +-
src/util/virfdstream.c | 234 +++++++++++++++++++++++++++++++++++++++++----
src/util/virfdstream.h | 1 +
3 files changed, 216 insertions(+), 23 deletions(-)
diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c
index a2d89af..3576435 100644
--- a/src/storage/storage_util.c
+++ b/src/storage/storage_util.c
@@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn
ATTRIBUTE_UNUSED,
/* Not using O_CREAT because the file is required to already exist at
* this point */
ret = virFDStreamOpenBlockDevice(stream, target_path,
- offset, len, O_WRONLY);
+ offset, len, false, O_WRONLY);
cleanup:
VIR_FREE(path);
@@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn
ATTRIBUTE_UNUSED,
}
ret = virFDStreamOpenBlockDevice(stream, target_path,
- offset, len, O_RDONLY);
+ offset, len, false, O_RDONLY);
cleanup:
VIR_FREE(path);
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index efd9199..e9b5962 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream");
typedef enum {
VIR_FDSTREAM_MSG_TYPE_DATA,
+ VIR_FDSTREAM_MSG_TYPE_SKIP,
} virFDStreamMsgType;
typedef struct _virFDStreamMsg virFDStreamMsg;
@@ -66,6 +67,9 @@ struct _virFDStreamMsg {
size_t len;
size_t offset;
} data;
+ struct {
+ size_t len;
+ } skip;
} stream;
};
@@ -175,6 +179,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg)
case VIR_FDSTREAM_MSG_TYPE_DATA:
VIR_FREE(msg->stream.data.buf);
break;
+ case VIR_FDSTREAM_MSG_TYPE_SKIP:
+ /* nada */
+ break;
}
VIR_FREE(msg);
@@ -361,6 +368,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
struct _virFDStreamThreadData {
virStreamPtr st;
size_t length;
+ bool sparse;
int fdin;
char *fdinname;
int fdout;
@@ -383,32 +391,66 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
static ssize_t
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+ bool sparse,
const int fdin,
const char *fdinname,
+ size_t *dataLen,
size_t buflen)
{
virFDStreamMsgPtr msg = NULL;
+ int inData = 0;
+ unsigned long long sectionLen = 0;
char *buf = NULL;
ssize_t got;
+ if (sparse && *dataLen == 0) {
+ if (virFileInData(fdin, &inData, §ionLen) < 0)
+ goto error;
+
+ if (inData)
+ *dataLen = sectionLen;
+ }
+
if (VIR_ALLOC(msg) < 0)
goto error;
- if (VIR_ALLOC_N(buf, buflen) < 0)
- goto error;
-
- if ((got = saferead(fdin, buf, buflen)) < 0) {
- virReportSystemError(errno,
- _("Unable to read %s"),
- fdinname);
- goto error;
+ if (sparse && *dataLen == 0) {
+ msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP;
+ msg->stream.skip.len = sectionLen;
+ got = sectionLen;
+
+ /* HACK. The message queue is one directional. So caller
+ * cannot make us skip the hole. Do that for them instead. */
+ if (sectionLen &&
+ lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
+ virReportSystemError(errno,
+ _("unable to seek in %s"),
+ fdinname);
+ goto error;
+ }
+ } else {
+ if (sparse &&
+ buflen > *dataLen)
+ buflen = *dataLen;
+
+ if (VIR_ALLOC_N(buf, buflen) < 0)
+ goto error;
+
+ if ((got = saferead(fdin, buf, buflen)) < 0) {
+ virReportSystemError(errno,
+ _("Unable to read %s"),
+ fdinname);
+ goto error;
+ }
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+ msg->stream.data.buf = buf;
+ msg->stream.data.len = got;
+ buf = NULL;
+ if (sparse)
+ *dataLen -= got;
}
- msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
- msg->stream.data.buf = buf;
- msg->stream.data.len = got;
- buf = NULL;
-
virFDStreamMsgQueuePush(fdst, msg);
msg = NULL;
@@ -423,11 +465,13 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
static ssize_t
virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+ bool sparse,
const int fdout,
const char *fdoutname)
{
ssize_t got;
virFDStreamMsgPtr msg = fdst->msg;
+ off_t off;
bool pop = false;
switch (msg->type) {
@@ -446,6 +490,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
pop = msg->stream.data.offset == msg->stream.data.len;
break;
+
+ case VIR_FDSTREAM_MSG_TYPE_SKIP:
+ if (!sparse) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("unexpected stream skip"));
+ return -1;
+ }
+
+ got = msg->stream.skip.len;
+ off = lseek(fdout, got, SEEK_CUR);
+ if (off == (off_t) -1) {
+ virReportSystemError(errno,
+ _("unable to seek in %s"),
+ fdoutname);
+ return -1;
+ }
+
+ if (ftruncate(fdout, off) < 0) {
+ virReportSystemError(errno,
+ _("unable to truncate %s"),
+ fdoutname);
+ return -1;
+ }
+
+ pop = true;
+ break;
}
if (pop) {
@@ -463,6 +533,7 @@ virFDStreamThread(void *opaque)
virFDStreamThreadDataPtr data = opaque;
virStreamPtr st = data->st;
size_t length = data->length;
+ bool sparse = data->sparse;
int fdin = data->fdin;
char *fdinname = data->fdinname;
int fdout = data->fdout;
@@ -471,6 +542,7 @@ virFDStreamThread(void *opaque)
bool doRead = fdst->threadDoRead;
size_t buflen = 256 * 1024;
size_t total = 0;
+ size_t dataLen = 0;
virObjectRef(fdst);
virObjectLock(fdst);
@@ -505,9 +577,9 @@ virFDStreamThread(void *opaque)
}
if (doRead)
- got = virFDStreamThreadDoRead(fdst, fdin, fdinname, buflen);
+ got = virFDStreamThreadDoRead(fdst, sparse, fdin, fdinname, &dataLen,
buflen);
else
- got = virFDStreamThreadDoWrite(fdst, fdout, fdoutname);
+ got = virFDStreamThreadDoWrite(fdst, sparse, fdout, fdoutname);
if (got < 0)
goto error;
@@ -773,6 +845,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t
nbytes)
}
}
+ /* Shortcut, if the stream is in the trailing hole,
+ * return 0 immediately. */
+ if (msg->type == VIR_FDSTREAM_MSG_TYPE_SKIP &&
+ msg->stream.skip.len == 0) {
+ ret = 0;
+ goto cleanup;
+ }
+
if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
/* Nope, nope, I'm outta here */
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -823,11 +903,120 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t
nbytes)
}
+static int
+virFDStreamSkip(virStreamPtr st,
+ unsigned long long length)
+{
+ virFDStreamDataPtr fdst = st->privateData;
+ virFDStreamMsgPtr msg = NULL;
+ off_t off;
+ int ret = -1;
+
+ virObjectLock(fdst);
+ if (fdst->length) {
+ if (length > fdst->length - fdst->offset)
+ length = fdst->length - fdst->offset;
+ fdst->offset += length;
+ }
+
+ if (fdst->thread) {
+ /* Things are a bit complicated here. But bear with me. If FDStream is
+ * in a read mode, then if the message at the queue head is SKIP, just
+ * pop it. The thread has lseek()-ed anyway. If however, the FDStream
+ * is in write mode, then tell the thread to do the lseek() for us.
+ * Under no circumstances we can do the lseek() ourselves here. We
+ * might mess up file position for the thread. */
+ if (fdst->threadDoRead) {
+ msg = fdst->msg;
+ if (msg->type != VIR_FDSTREAM_MSG_TYPE_SKIP) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Invalid stream skip"));
+ goto cleanup;
+ }
+
+ virFDStreamMsgQueuePop(fdst);
+ } else {
+ if (VIR_ALLOC(msg) < 0)
+ goto cleanup;
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_SKIP;
+ msg->stream.skip.len = length;
+ virFDStreamMsgQueuePush(fdst, msg);
+ msg = NULL;
+ }
+ } else {
+ off = lseek(fdst->fd, length, SEEK_CUR);
+ if (off == (off_t) -1) {
+ virReportSystemError(errno, "%s",
+ _("unable to seek"));
+ goto cleanup;
+ }
+
+ if (ftruncate(fdst->fd, off) < 0) {
+ virReportSystemError(errno, "%s",
+ _("unable to truncate"));
+ goto cleanup;
+ }
+ }
+
+ ret = 0;
+ cleanup:
+ virObjectUnlock(fdst);
+ virFDStreamMsgFree(msg);
+ return ret;
+}
+
+
+static int
+virFDStreamInData(virStreamPtr st,
+ int *inData,
+ unsigned long long *length)
+{
+ virFDStreamDataPtr fdst = st->privateData;
+ int ret = -1;
+
+ virObjectLock(fdst);
+
+ if (fdst->thread) {
+ virFDStreamMsgPtr msg;
+
+ while (!(msg = fdst->msg)) {
+ if (fdst->threadQuit) {
+ *inData = *length = 0;
+ ret = 0;
+ goto cleanup;
+ } else {
+ virObjectUnlock(fdst);
+ virCondSignal(&fdst->threadCond);
+ virObjectLock(fdst);
+ }
+ }
+
+ if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) {
+ *inData = 1;
+ *length = msg->stream.data.len - msg->stream.data.offset;
+ } else {
+ *inData = 0;
+ *length = msg->stream.skip.len;
+ }
+ ret = 0;
+ } else {
+ ret = virFileInData(fdst->fd, inData, length);
+ }
+
+ cleanup:
+ virObjectUnlock(fdst);
+ return ret;
+}
+
+
static virStreamDriver virFDStreamDrv = {
.streamSend = virFDStreamWrite,
.streamRecv = virFDStreamRead,
.streamFinish = virFDStreamClose,
.streamAbort = virFDStreamAbort,
+ .streamSkip = virFDStreamSkip,
+ .streamInData = virFDStreamInData,
.streamEventAddCallback = virFDStreamAddCallback,
.streamEventUpdateCallback = virFDStreamUpdateCallback,
.streamEventRemoveCallback = virFDStreamRemoveCallback
@@ -969,7 +1158,8 @@ virFDStreamOpenFileInternal(virStreamPtr st,
unsigned long long length,
int oflags,
int mode,
- bool forceIOHelper)
+ bool forceIOHelper,
+ bool sparse)
{
int fd = -1;
struct stat sb;
@@ -1026,6 +1216,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
threadData->st = virObjectRef(st);
threadData->length = length;
+ threadData->sparse = sparse;
if ((oflags & O_ACCMODE) == O_RDONLY) {
threadData->fdin = fd;
@@ -1067,7 +1258,7 @@ int virFDStreamOpenFile(virStreamPtr st,
}
return virFDStreamOpenFileInternal(st, path,
offset, length,
- oflags, 0, false);
+ oflags, 0, false, false);
}
int virFDStreamCreateFile(virStreamPtr st,
@@ -1080,7 +1271,7 @@ int virFDStreamCreateFile(virStreamPtr st,
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, mode,
- false);
+ false, false);
}
#ifdef HAVE_CFMAKERAW
@@ -1096,7 +1287,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
if (virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, 0,
- false) < 0)
+ false, false) < 0)
return -1;
fdst = st->privateData;
@@ -1133,7 +1324,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, 0,
- false);
+ false, false);
}
#endif /* !HAVE_CFMAKERAW */
@@ -1141,11 +1332,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
+ bool sparse,
int oflags)
{
return virFDStreamOpenFileInternal(st, path,
offset, length,
- oflags, 0, true);
+ oflags, 0, true, sparse);
}
int virFDStreamSetInternalCloseCb(virStreamPtr st,
diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h
index 34c4c3f..887c991 100644
--- a/src/util/virfdstream.h
+++ b/src/util/virfdstream.h
@@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
const char *path,
unsigned long long offset,
unsigned long long length,
+ bool sparse,
int oflags);
int virFDStreamSetInternalCloseCb(virStreamPtr st,
--
2.10.2