On 05/16/2017 10:04 AM, Michal Privoznik wrote:
Basically, what is needed here is to introduce new message type
for the messages passed between the event loop callbacks and the
worker thread that does all the I/O. The idea is that instead of
a queue of read buffers we will have a queue where "hole of size
X" messages appear. That way the even loop callbacks can just
s/even/event/
check the head of the queue and see if the worker thread is in
data or a hole section and how long the section is.
Signed-off-by: Michal Privoznik <mprivozn(a)redhat.com>
---
src/storage/storage_util.c | 4 +-
src/util/virfdstream.c | 239 ++++++++++++++++++++++++++++++++++++++++-----
src/util/virfdstream.h | 1 +
3 files changed, 220 insertions(+), 24 deletions(-)
[...]
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index 4b42939e7..ba209025a 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
[...]
static ssize_t
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+ bool sparse,
const int fdin,
const int fdout,
const char *fdinname,
const char *fdoutname,
+ size_t *dataLen,
size_t buflen)
{
virFDStreamMsgPtr msg = NULL;
+ int inData = 0;
+ long long sectionLen = 0;
char *buf = NULL;
ssize_t got;
+ if (sparse && *dataLen == 0) {
+ if (virFileInData(fdin, &inData, §ionLen) < 0)
+ goto error;
+
+ if (inData)
+ *dataLen = sectionLen;
+ }
+
if (VIR_ALLOC(msg) < 0)
goto error;
- if (VIR_ALLOC_N(buf, buflen) < 0)
- goto error;
-
- if ((got = saferead(fdin, buf, buflen)) < 0) {
- virReportSystemError(errno,
- _("Unable to read %s"),
- fdinname);
- goto error;
+ if (sparse && *dataLen == 0) {
+ msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
+ msg->stream.hole.len = sectionLen;
+ got = sectionLen;
+
+ /* HACK. The message queue is one directional. So caller
HACK or "By design"
+ * cannot make us skip the hole. Do that for them instead.
*/
+ if (sectionLen &&
+ lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
+ virReportSystemError(errno,
+ _("unable to seek in %s"),
+ fdinname);
+ goto error;
+ }
[...]
+static int
+virFDStreamSendHole(virStreamPtr st,
+ long long length,
+ unsigned int flags)
+{
+ virFDStreamDataPtr fdst = st->privateData;
+ virFDStreamMsgPtr msg = NULL;
+ off_t off;
+ int ret = -1;
+
+ virCheckFlags(0, -1);
+
+ virObjectLock(fdst);
+ if (fdst->length) {
+ if (length > fdst->length - fdst->offset)
+ length = fdst->length - fdst->offset;
+ fdst->offset += length;
+ }
+
+ if (fdst->thread) {
+ /* Things are a bit complicated here. But bear with me. If FDStream is
s/But bear with me.//
+ * in a read mode, then if the message at the queue head is
HOLE, just
+ * pop it. The thread has lseek()-ed anyway. If however, the FDStream
However, if the FDStream
Reviewed-by: John Ferlan <jferlan(a)redhat.com>
John
+ * is in write mode, then tell the thread to do the lseek()
for us.
+ * Under no circumstances we can do the lseek() ourselves here. We
+ * might mess up file position for the thread. */
+ if (fdst->threadDoRead) {
+ msg = fdst->msg;
+ if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Invalid stream hole"));
+ goto cleanup;
+ }
+
+ virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
+ } else {
+ if (VIR_ALLOC(msg) < 0)
+ goto cleanup;
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
+ msg->stream.hole.len = length;
+ virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
+ msg = NULL;
+ }
[...]