allow interleaved parallel write to a single file,
using a record size equal to the io buffer size (1MB).
Signed-off-by: Claudio Fontana <cfontana(a)suse.de>
---
src/util/iohelper.c | 3 +
src/util/virfile.c | 151 +++++++++++++++++++++++++++++---------------
src/util/virfile.h | 2 +
3 files changed, 106 insertions(+), 50 deletions(-)
diff --git a/src/util/iohelper.c b/src/util/iohelper.c
index 055540c8c4..dcbdda366f 100644
--- a/src/util/iohelper.c
+++ b/src/util/iohelper.c
@@ -85,6 +85,9 @@ main(int argc, char **argv)
if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0)
goto error;
+ if (VIR_CLOSE(fd) < 0)
+ goto error;
+
return 0;
error:
diff --git a/src/util/virfile.c b/src/util/virfile.c
index 201d7f4e64..f9ae7d94c4 100644
--- a/src/util/virfile.c
+++ b/src/util/virfile.c
@@ -4761,6 +4761,9 @@ struct runIOParams {
const char *fdinname;
int fdout;
const char *fdoutname;
+ int idx;
+ int nchannels;
+ off_t total;
};
/**
@@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p)
off_t total = 0;
size_t buflen = 1024*1024;
char *buf = virFileDirectBufferNew(&base, buflen);
+ int diskfd = p.isWrite ? p.fdout : p.fdin;
if (!buf) {
virReportSystemError(errno, _("Failed to allocate aligned memory in function
%s"), __FUNCTION__);
return -5;
}
-
+ if (p.idx >= 0) {
+ if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) {
+ virReportSystemError(errno, "%s", _("Failed to lseek to file
channel offset"));
+ return -6;
+ }
+ }
while (1) {
ssize_t got;
@@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p)
break;
total += got;
-
+ if (p.idx >= 0 && !p.isWrite && total > p.total) {
+ /* do not write to socket too much for this channel, according to CLIA */
+ off_t difference = total - p.total;
+ got -= difference;
+ total -= difference;
+ }
/* handle last write size align in direct case */
if (got < buflen && p.isDirect && p.isWrite) {
ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got);
@@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p)
virReportSystemError(errno, _("Unable to write %s"),
p.fdoutname);
return -3;
}
- if (!p.isBlockDev) {
+ if (!p.isBlockDev && p.idx < 0) {
off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR);
if (off < 0) {
virReportSystemError(errno, "%s", _("Failed to lseek
to get current file offset"));
@@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p)
}
if (nwritten > got) {
off -= nwritten - got;
+ total -= nwritten - got;
}
if (ftruncate(p.fdout, off) < 0) {
virReportSystemError(errno, _("Unable to truncate %s"),
p.fdoutname);
@@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p)
virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
return -3;
}
+ if (p.idx >= 0) {
+ if (!p.isWrite && total >= p.total) {
+ /* done for this channel */
+ break;
+ }
+ /* move channel cursor to the next record */
+ if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) {
+ virReportSystemError(errno, "%s", _("Failed to lseek to
next channel record"));
+ return -7;
+ }
+ }
}
return total;
}
/**
- * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
- *
- * @disk_fd: the already open regular file or block device
- * @disk_path: the pathname corresponding to disk_fd (for error reporting)
- * @remote_fd: the pipe or socket
- * Use -1 to auto-choose between STDIN or STDOUT.
- * @remote_path: the pathname corresponding to remote_fd (for error reporting)
- *
- * Note that the direction of the transfer is detected based on the @disk_fd
- * file access mode (man 2 open). Therefore @disk_fd must be opened with
- * O_RDONLY or O_WRONLY. O_RDWR is not supported.
- *
- * virFileDiskCopy always closes the file descriptor disk_fd,
- * and any error during close(2) is reported and considered a failure.
- *
- * Returns: bytes transferred or < 0 on failure.
+ * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write
+ * ...
+ * @idx: channel index
+ * @nchannels: total number of channels
*/
off_t
-virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char
*remote_path)
+virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char
*remote_path,
+ int idx, int nchannels, off_t total)
{
- int ret = -1;
- off_t total = 0;
+ off_t new_total = -1;
struct stat sb;
struct runIOParams p;
int oflags = -1;
+ if ((nchannels == 0) ||
+ (nchannels > 0 && idx >= nchannels) ||
+ (nchannels > 0 && idx < 0) ||
+ (nchannels < 0 && idx >= 0)) {
+ virReportSystemError(EINVAL, "%s", _("Invalid channel
arguments"));
+ goto out;
+ }
+ p.idx = idx;
+ p.nchannels = nchannels;
+ p.total = total;
+
oflags = fcntl(disk_fd, F_GETFL);
if (oflags < 0) {
virReportSystemError(errno,
_("unable to determine access mode of %s"),
disk_path);
- goto cleanup;
+ goto out;
}
if (fstat(disk_fd, &sb) < 0) {
virReportSystemError(errno,
_("unable to stat file descriptor %d path %s"),
disk_fd, disk_path);
- goto cleanup;
+ goto out;
}
p.isBlockDev = S_ISBLK(sb.st_mode);
p.isDirect = O_DIRECT && (oflags & O_DIRECT);
@@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd,
const char *r
default:
virReportSystemError(EINVAL, _("Unable to process file with flags
%d"),
(oflags & O_ACCMODE));
- goto cleanup;
+ goto out;
}
if (!p.isBlockDev && p.isDirect) {
off_t off = lseek(disk_fd, 0, SEEK_CUR);
if (off < 0) {
virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable
file"));
- goto cleanup;
+ goto out;
}
if (virFileDirectAlign(off) != off) {
/* we could write some zeroes, but maybe it is safer to just fail */
virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with
unaligned file pointer"));
- goto cleanup;
+ goto out;
}
}
- total = runIOCopy(p);
- if (total < 0)
- goto cleanup;
-
- /* Ensure all data is written */
- if (virFileDataSync(p.fdout) < 0) {
- if (errno != EINVAL && errno != EROFS) {
- /* fdatasync() may fail on some special FDs, e.g. pipes */
- virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname);
- goto cleanup;
+ new_total = runIOCopy(p);
+ if (new_total < 0)
+ goto out;
+
+ if (p.idx < 0 && p.isWrite) {
+ /* without channels we can run the fdatasync here */
+ if (virFileDataSync(disk_fd) < 0) {
+ if (errno != EINVAL && errno != EROFS) {
+ virReportSystemError(errno, _("unable to fsyncdata %s"),
p.fdoutname);
+ new_total = -1;
+ goto out;
+ }
}
}
- ret = 0;
-
- cleanup:
- if (VIR_CLOSE(disk_fd) < 0 && ret == 0) {
- virReportSystemError(errno, _("Unable to close %s"), disk_path);
- ret = -1;
- }
- return ret;
+ out:
+ return new_total;
}
#else /* WIN32 */
off_t
-virFileDiskCopy(int disk_fd G_GNUC_UNUSED,
- const char *disk_path G_GNUC_UNUSED,
- int remote_fd G_GNUC_UNUSED,
- const char *remote_path G_GNUC_UNUSED)
+virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED,
+ const char *disk_path G_GNUC_UNUSED,
+ int remote_fd G_GNUC_UNUSED,
+ const char *remote_path G_GNUC_UNUSED,
+ int idx G_GNUC_UNUSED,
+ int nchannels G_GNUC_UNUSED,
+ off_t total G_GNUC_UNUSED)
{
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("virFileDiskCopy unsupported on this platform"));
+ _("virFileDiskCopyChannel unsupported on this platform"));
return -1;
}
#endif /* WIN32 */
+
+/**
+ * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
+ *
+ * @disk_fd: the already open regular file or block device
+ * @disk_path: the pathname corresponding to disk_fd (for error reporting)
+ * @remote_fd: the pipe or socket
+ * Use -1 to auto-choose between STDIN or STDOUT.
+ * @remote_path: the pathname corresponding to remote_fd (for error reporting)
+ *
+ * Note that the direction of the transfer is detected based on the @disk_fd
+ * file access mode (man 2 open). Therefore @disk_fd must be opened with
+ * O_RDONLY or O_WRONLY. O_RDWR is not supported.
+ *
+ * virFileDiskCopy always closes the file descriptor disk_fd,
+ * and any error during close(2) is reported and considered a failure.
+ *
+ * Returns: bytes transferred or < 0 on failure.
+ */
+
+off_t
+virFileDiskCopy(int disk_fd, const char *disk_path,
+ int remote_fd, const char *remote_path)
+{
+ return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path,
+ -1, -1, 0);
+}
diff --git a/src/util/virfile.h b/src/util/virfile.h
index 844261e0a4..4d75389c84 100644
--- a/src/util/virfile.h
+++ b/src/util/virfile.h
@@ -394,3 +394,5 @@ int virFileSetCOW(const char *path,
virTristateBool state);
off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char
*remote_path);
+off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const
char *remote_path,
+ int idx, int nchannels, off_t total);
--
2.26.2