The virDomainBlockPull* family of commands are enabled by the
'block_stream' and 'info block_stream' qemu monitor commands.
* src/qemu/qemu_driver.c src/qemu/qemu_monitor_text.[ch]: implement disk
streaming by using the stream and info stream text monitor commands
* src/qemu/qemu_monitor_json.[ch]: implement commands using the qmp monitor
Signed-off-by: Adam Litke <agl(a)us.ibm.com>
---
src/qemu/qemu_driver.c | 112 +++++++++++++++++++++++++++++++-
src/qemu/qemu_monitor.c | 16 +++++
src/qemu/qemu_monitor.h | 13 ++++
src/qemu/qemu_monitor_json.c | 118 ++++++++++++++++++++++++++++++++++
src/qemu/qemu_monitor_json.h | 5 ++
src/qemu/qemu_monitor_text.c | 145 ++++++++++++++++++++++++++++++++++++++++++
src/qemu/qemu_monitor_text.h | 5 ++
7 files changed, 410 insertions(+), 4 deletions(-)
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index cfbe199..481b168 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -7080,6 +7080,110 @@ cleanup:
return ret;
}
+static const char *
+qemuDiskPathToAlias(virDomainObjPtr vm, const char *path) {
+ int i;
+ char *ret = NULL;
+
+ for (i = 0 ; i < vm->def->ndisks ; i++) {
+ virDomainDiskDefPtr disk = vm->def->disks[i];
+
+ if (disk->src != NULL && STREQ(disk->src, path)) {
+ if (virAsprintf(&ret, "drive-%s", disk->info.alias) < 0)
{
+ virReportOOMError();
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ if (!ret) {
+ qemuReportError(VIR_ERR_INVALID_ARG,
+ "%s", _("No device found for specified
path"));
+ }
+ return ret;
+}
+
+static int
+qemuDomainBlockPullImpl(virDomainPtr dom, const char *path,
+ virDomainBlockPullInfoPtr info,
+ int mode)
+{
+ struct qemud_driver *driver = dom->conn->privateData;
+ virDomainObjPtr vm = NULL;
+ qemuDomainObjPrivatePtr priv;
+ char uuidstr[VIR_UUID_STRING_BUFLEN];
+ const char *device = NULL;
+ int ret = -1;
+
+ qemuDriverLock(driver);
+ virUUIDFormat(dom->uuid, uuidstr);
+ vm = virDomainFindByUUID(&driver->domains, dom->uuid);
+ if (!vm) {
+ qemuReportError(VIR_ERR_NO_DOMAIN,
+ _("no domain with matching uuid '%s'"),
uuidstr);
+ goto cleanup;
+ }
+
+ if (!virDomainObjIsActive(vm)) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID,
+ "%s", _("domain is not running"));
+ goto cleanup;
+ }
+
+ device = qemuDiskPathToAlias(vm, path);
+ if (!device) {
+ goto cleanup;
+ }
+
+ if (qemuDomainObjBeginJobWithDriver(driver, vm) < 0)
+ goto cleanup;
+ qemuDomainObjEnterMonitorWithDriver(driver, vm);
+ priv = vm->privateData;
+ ret = qemuMonitorBlockPull(priv->mon, device, info, mode);
+ qemuDomainObjExitMonitorWithDriver(driver, vm);
+ if (qemuDomainObjEndJob(vm) == 0) {
+ vm = NULL;
+ goto cleanup;
+ }
+
+cleanup:
+ VIR_FREE(device);
+ if (vm)
+ virDomainObjUnlock(vm);
+ qemuDriverUnlock(driver);
+ return ret;
+}
+
+static int
+qemuDomainBlockPull(virDomainPtr dom, const char *path,
+ virDomainBlockPullInfoPtr info, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockPullImpl(dom, path, info, BLOCK_PULL_MODE_ONE);
+}
+
+static int
+qemuDomainBlockPullAll(virDomainPtr dom, const char *path, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockPullImpl(dom, path, NULL, BLOCK_PULL_MODE_ALL);
+}
+
+static int
+qemuDomainBlockPullAbort(virDomainPtr dom, const char *path, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockPullImpl(dom, path, NULL, BLOCK_PULL_MODE_ABORT);
+}
+
+static int
+qemuDomainGetBlockPullInfo(virDomainPtr dom, const char *path,
+ virDomainBlockPullInfoPtr info, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockPullImpl(dom, path, info, BLOCK_PULL_MODE_INFO);
+}
static virDriver qemuDriver = {
VIR_DRV_QEMU,
@@ -7192,10 +7296,10 @@ static virDriver qemuDriver = {
qemuDomainSnapshotDelete, /* domainSnapshotDelete */
qemuDomainMonitorCommand, /* qemuDomainMonitorCommand */
qemuDomainOpenConsole, /* domainOpenConsole */
- NULL, /* domainBlockPull */
- NULL, /* domainBlockPullAll */
- NULL, /* domainBlockPullAbort */
- NULL, /* domainGetBlockPullInfo */
+ qemuDomainBlockPull, /* domainBlockPull */
+ qemuDomainBlockPullAll, /* domainBlockPullAll */
+ qemuDomainBlockPullAbort, /* domainBlockPullAbort */
+ qemuDomainGetBlockPullInfo, /* domainGetBlockPullInfo */
};
diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c
index f89038e..60e4ee2 100644
--- a/src/qemu/qemu_monitor.c
+++ b/src/qemu/qemu_monitor.c
@@ -2228,3 +2228,19 @@ int qemuMonitorArbitraryCommand(qemuMonitorPtr mon,
ret = qemuMonitorTextArbitraryCommand(mon, cmd, reply);
return ret;
}
+
+int qemuMonitorBlockPull(qemuMonitorPtr mon,
+ const char *path,
+ virDomainBlockPullInfoPtr info,
+ int mode)
+{
+ int ret;
+
+ VIR_DEBUG("mon=%p, path=%p, info=%p, mode=%i", mon, path, info, mode);
+
+ if (mon->json)
+ ret = qemuMonitorJSONBlockPull(mon, path, info, mode);
+ else
+ ret = qemuMonitorTextBlockPull(mon, path, info, mode);
+ return ret;
+}
diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h
index c90219b..62875a3 100644
--- a/src/qemu/qemu_monitor.h
+++ b/src/qemu/qemu_monitor.h
@@ -423,6 +423,19 @@ int qemuMonitorArbitraryCommand(qemuMonitorPtr mon,
char **reply,
bool hmp);
+typedef enum {
+ BLOCK_PULL_MODE_ONE = 0,
+ BLOCK_PULL_MODE_ALL = 1,
+ BLOCK_PULL_MODE_ABORT = 2,
+ BLOCK_PULL_MODE_INFO = 3,
+} BLOCK_PULL_MODE;
+
+
+int qemuMonitorBlockPull(qemuMonitorPtr mon,
+ const char *path,
+ virDomainBlockPullInfoPtr info,
+ int mode);
+
/**
* When running two dd process and using <> redirection, we need a
* shell that will not truncate files. These two strings serve that
diff --git a/src/qemu/qemu_monitor_json.c b/src/qemu/qemu_monitor_json.c
index 20a78e1..bc491e0 100644
--- a/src/qemu/qemu_monitor_json.c
+++ b/src/qemu/qemu_monitor_json.c
@@ -2513,3 +2513,121 @@ cleanup:
return ret;
}
+
+static int qemuMonitorJSONGetBlockPullInfoOne(virJSONValuePtr entry,
+ const char *device,
+ virDomainBlockPullInfoPtr info)
+{
+ const char *this_dev;
+
+ if ((this_dev = virJSONValueObjectGetString(entry, "device")) == NULL) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'device'"));
+ return -1;
+ }
+ if (!STREQ(this_dev, device))
+ return -1;
+
+ if (virJSONValueObjectGetNumberUlong(entry, "offset", &info->cur)
< 0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'offset'"));
+ return -1;
+ }
+
+ if (virJSONValueObjectGetNumberUlong(entry, "len", &info->end) <
0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'len'"));
+ return -1;
+ }
+ return 0;
+}
+
+/** qemuMonitorJSONGetBlockPullInfo:
+ * Parse Block Pull information.
+ * The reply can be a JSON array of objects or just an object.
+ */
+static int qemuMonitorJSONGetBlockPullInfo(virJSONValuePtr reply,
+ const char *device,
+ virDomainBlockPullInfoPtr info)
+{
+ virJSONValuePtr data;
+ int nr_results, i = 0;
+
+ if (!info)
+ return -1;
+
+ if ((data = virJSONValueObjectGet(reply, "return")) == NULL) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("reply was missing block_pull progress
information"));
+ return -1;
+ }
+
+ if (data->type == VIR_JSON_TYPE_OBJECT) {
+ if (qemuMonitorJSONGetBlockPullInfoOne(data, device, info) != 0)
+ goto not_found;
+ else
+ return 0;
+ } else if (data->type != VIR_JSON_TYPE_ARRAY) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("urecognized format of block pull information"));
+ return -1;
+ }
+
+ if ((nr_results = virJSONValueArraySize(data)) < 0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("unable to determine array size"));
+ return -1;
+ }
+
+ for (i = 0; i < nr_results; i++) {
+ virJSONValuePtr entry = virJSONValueArrayGet(data, i);
+ if (qemuMonitorJSONGetBlockPullInfoOne(entry, device, info) == 0)
+ return 0;
+ }
+
+not_found:
+ qemuReportError(VIR_ERR_OPERATION_FAILED, "%s",
+ _("No associated information for the specified disk"));
+ return -1;
+}
+
+
+int qemuMonitorJSONBlockPull(qemuMonitorPtr mon,
+ const char *device,
+ virDomainBlockPullInfoPtr info,
+ int mode)
+{
+ int ret = -1;
+ virJSONValuePtr cmd = NULL;
+ virJSONValuePtr reply = NULL;
+ int parse_info = 0;
+
+ if (mode == BLOCK_PULL_MODE_ONE) {
+ cmd = qemuMonitorJSONMakeCommand("block_stream", "s:device",
device, NULL);
+ parse_info = 1;
+ } else if (mode == BLOCK_PULL_MODE_ALL) {
+ cmd = qemuMonitorJSONMakeCommand("block_stream", "s:device",
device,
+ "b:all", 1, NULL);
+ } else if (mode == BLOCK_PULL_MODE_ABORT) {
+ cmd = qemuMonitorJSONMakeCommand("block_stream", "s:device",
device,
+ "b:stop", 1, NULL);
+ } else if (mode == BLOCK_PULL_MODE_INFO) {
+ cmd = qemuMonitorJSONMakeCommand("query-block-stream", NULL);
+ parse_info = 1;
+ }
+
+ if (!cmd)
+ return -1;
+
+ ret = qemuMonitorJSONCommand(mon, cmd, &reply);
+
+ if (ret == 0)
+ ret = qemuMonitorJSONCheckError(cmd, reply);
+
+ if (ret == 0 && parse_info)
+ ret = qemuMonitorJSONGetBlockPullInfo(reply, device, info);
+
+ virJSONValueFree(cmd);
+ virJSONValueFree(reply);
+ return ret;
+}
diff --git a/src/qemu/qemu_monitor_json.h b/src/qemu/qemu_monitor_json.h
index 086f0e1..129fbbe 100644
--- a/src/qemu/qemu_monitor_json.h
+++ b/src/qemu/qemu_monitor_json.h
@@ -204,4 +204,9 @@ int qemuMonitorJSONArbitraryCommand(qemuMonitorPtr mon,
char **reply_str,
bool hmp);
+int qemuMonitorJSONBlockPull(qemuMonitorPtr mon,
+ const char *device,
+ virDomainBlockPullInfoPtr info,
+ int mode);
+
#endif /* QEMU_MONITOR_JSON_H */
diff --git a/src/qemu/qemu_monitor_text.c b/src/qemu/qemu_monitor_text.c
index 53781c8..c4c469e 100644
--- a/src/qemu/qemu_monitor_text.c
+++ b/src/qemu/qemu_monitor_text.c
@@ -2628,3 +2628,148 @@ int qemuMonitorTextArbitraryCommand(qemuMonitorPtr mon, const char
*cmd,
return ret;
}
+
+static int qemuMonitorTextParseBlockPullOne(const char *text,
+ const char *device,
+ virDomainBlockPullInfoPtr info,
+ const char **next)
+{
+ virDomainBlockPullInfo tmp;
+ char *p;
+ int mismatch = 0;
+
+ if (next == NULL)
+ return -1;
+ *next = NULL;
+
+ /*
+ * Each active stream will appear on its own line in the following format:
+ * Streaming device <device>: Completed <cur> of <end> bytes
+ */
+ if ((text = STRSKIP(text, "Streaming device ")) == NULL)
+ return -EINVAL;
+
+ if (!STREQLEN(text, device, strlen(device)))
+ mismatch = 1;
+
+ if ((text = strstr(text, ": Completed ")) == NULL)
+ return -EINVAL;
+ text += 11;
+
+ if (virStrToLong_ull (text, &p, 10, &tmp.cur))
+ return -EINVAL;
+ text = p;
+
+ if (!STRPREFIX(text, " of "))
+ return -EINVAL;
+ text += 4;
+
+ if (virStrToLong_ull (text, &p, 10, &tmp.end))
+ return -EINVAL;
+ text = p;
+
+ if (!STRPREFIX(text, " bytes"))
+ return -EINVAL;
+
+ if (mismatch) {
+ *next = STRSKIP(text, "\n");
+ return -EAGAIN;
+ }
+
+ if (info) {
+ info->cur = tmp.cur;
+ info->end = tmp.end;
+ }
+ return 0;
+}
+
+static int qemuMonitorTextParseBlockPull(const char *text,
+ const char *device,
+ virDomainBlockPullInfoPtr info)
+{
+ const char *next = NULL;
+ int ret = 0;
+
+ /* Check error: Device not found */
+ if (strstr(text, "Device '") && strstr(text, "' not
found")) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID, "%s", _("Device not
found"));
+ return -1;
+ }
+
+ /* Check if we have exceeded the number of simultaneous streams */
+ if (strstr(text, "Device '") && strstr(text, "' is in
use")) {
+ qemuReportError(VIR_ERR_OPERATION_FAILED, "%s",
+ _("Another streaming operation is in progress"));
+ return -1;
+ }
+
+ /* Check error: Non-existent stream */
+ if (strstr(text, "No such process") ||
+ strstr(text, "No active stream")) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID, "%s",
+ _("No active stream"));
+ return -1;
+ }
+
+ /* No output indicates success for BlockPullAll and BlockPullAbort */
+ if (STREQ(text, ""))
+ return 0;
+
+ /* Now try to parse lines of block_stream output */
+ do {
+ ret = qemuMonitorTextParseBlockPullOne(text, device, info, &next);
+ text = next;
+ } while (ret == -EAGAIN);
+
+ if (ret != 0) {
+ qemuReportError(VIR_ERR_OPERATION_FAILED, "%s",
+ _("No associated information for the specified
disk"));
+ ret = -1;
+ }
+ return ret;
+}
+
+int qemuMonitorTextBlockPull(qemuMonitorPtr mon,
+ const char *device,
+ virDomainBlockPullInfoPtr info,
+ int mode)
+{
+ char *cmd = NULL;
+ char *reply = NULL;
+ int ret, parse_info = 0;
+
+ if (mode == BLOCK_PULL_MODE_ONE) {
+ ret = virAsprintf(&cmd, "block_stream %s", device);
+ parse_info = 1;
+ } else if (mode == BLOCK_PULL_MODE_ALL) {
+ ret = virAsprintf(&cmd, "block_stream -a %s", device);
+ } else if (mode == BLOCK_PULL_MODE_ABORT) {
+ ret = virAsprintf(&cmd, "block_stream -s %s", device);
+ } else if (mode == BLOCK_PULL_MODE_INFO) {
+ ret = virAsprintf(&cmd, "info block_stream");
+ parse_info = 1;
+ } else {
+ return -1;
+ }
+
+ if (ret < 0) {
+ virReportOOMError();
+ return -1;
+ }
+
+ ret = 0;
+ if (qemuMonitorHMPCommand(mon, cmd, &reply) < 0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("cannot run monitor command"));
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (parse_info && (qemuMonitorTextParseBlockPull(reply, device, info) != 0))
+ ret = -1;
+
+cleanup:
+ VIR_FREE(cmd);
+ VIR_FREE(reply);
+ return ret;
+}
diff --git a/src/qemu/qemu_monitor_text.h b/src/qemu/qemu_monitor_text.h
index 0838a2b..e72f654 100644
--- a/src/qemu/qemu_monitor_text.h
+++ b/src/qemu/qemu_monitor_text.h
@@ -198,4 +198,9 @@ int qemuMonitorTextDeleteSnapshot(qemuMonitorPtr mon, const char
*name);
int qemuMonitorTextArbitraryCommand(qemuMonitorPtr mon, const char *cmd,
char **reply);
+int qemuMonitorTextBlockPull(qemuMonitorPtr mon,
+ const char *device,
+ virDomainBlockPullInfoPtr info,
+ int mode);
+
#endif /* QEMU_MONITOR_TEXT_H */
--
1.7.3