The virDomainBlockPull* family of commands are enabled by the
following HMP/QMP commands: 'block_stream', 'block_job_cancel',
'info block-jobs' / 'query-block-jobs', and
'block_job_set_speed'.
* src/qemu/qemu_driver.c src/qemu/qemu_monitor_text.[ch]: implement disk
streaming by using the proper qemu monitor commands.
* src/qemu/qemu_monitor_json.[ch]: implement commands using the qmp monitor
Signed-off-by: Adam Litke <agl(a)us.ibm.com>
---
src/qemu/qemu_driver.c | 113 +++++++++++++++++++++++++++++
src/qemu/qemu_monitor.c | 18 +++++
src/qemu/qemu_monitor.h | 13 ++++
src/qemu/qemu_monitor_json.c | 147 ++++++++++++++++++++++++++++++++++++++
src/qemu/qemu_monitor_json.h | 5 ++
src/qemu/qemu_monitor_text.c | 162 ++++++++++++++++++++++++++++++++++++++++++
src/qemu/qemu_monitor_text.h | 6 ++
7 files changed, 464 insertions(+), 0 deletions(-)
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index 8870e33..0f556a9 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -8493,6 +8493,115 @@ cleanup:
return ret;
}
+static const char *
+qemuDiskPathToAlias(virDomainObjPtr vm, const char *path) {
+ int i;
+ char *ret = NULL;
+
+ for (i = 0 ; i < vm->def->ndisks ; i++) {
+ virDomainDiskDefPtr disk = vm->def->disks[i];
+
+ if (disk->type != VIR_DOMAIN_DISK_TYPE_BLOCK &&
+ disk->type != VIR_DOMAIN_DISK_TYPE_FILE)
+ continue;
+
+ if (disk->src != NULL && STREQ(disk->src, path)) {
+ if (virAsprintf(&ret, "drive-%s", disk->info.alias) < 0)
{
+ virReportOOMError();
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ if (!ret) {
+ qemuReportError(VIR_ERR_INVALID_ARG,
+ "%s", _("No device found for specified
path"));
+ }
+ return ret;
+}
+
+static int
+qemuDomainBlockJobImpl(virDomainPtr dom, const char *path,
+ unsigned long bandwidth, virDomainBlockJobInfoPtr info,
+ int mode)
+{
+ struct qemud_driver *driver = dom->conn->privateData;
+ virDomainObjPtr vm = NULL;
+ qemuDomainObjPrivatePtr priv;
+ char uuidstr[VIR_UUID_STRING_BUFLEN];
+ const char *device = NULL;
+ int ret = -1;
+
+ qemuDriverLock(driver);
+ virUUIDFormat(dom->uuid, uuidstr);
+ vm = virDomainFindByUUID(&driver->domains, dom->uuid);
+ if (!vm) {
+ qemuReportError(VIR_ERR_NO_DOMAIN,
+ _("no domain with matching uuid '%s'"),
uuidstr);
+ goto cleanup;
+ }
+
+ if (!virDomainObjIsActive(vm)) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID,
+ "%s", _("domain is not running"));
+ goto cleanup;
+ }
+
+ device = qemuDiskPathToAlias(vm, path);
+ if (!device) {
+ goto cleanup;
+ }
+
+ if (qemuDomainObjBeginJobWithDriver(driver, vm, QEMU_JOB_MODIFY) < 0)
+ goto cleanup;
+ ignore_value(qemuDomainObjEnterMonitorWithDriver(driver, vm));
+ priv = vm->privateData;
+ ret = qemuMonitorBlockJob(priv->mon, device, bandwidth, info, mode);
+ qemuDomainObjExitMonitorWithDriver(driver, vm);
+ if (qemuDomainObjEndJob(driver, vm) == 0) {
+ vm = NULL;
+ goto cleanup;
+ }
+
+cleanup:
+ VIR_FREE(device);
+ if (vm)
+ virDomainObjUnlock(vm);
+ qemuDriverUnlock(driver);
+ return ret;
+}
+
+static int
+qemuDomainBlockJobAbort(virDomainPtr dom, const char *path, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockJobImpl(dom, path, 0, NULL, BLOCK_JOB_ABORT);
+}
+
+static int
+qemuDomainGetBlockJobInfo(virDomainPtr dom, const char *path,
+ virDomainBlockJobInfoPtr info, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockJobImpl(dom, path, 0, info, BLOCK_JOB_INFO);
+}
+
+static int
+qemuDomainBlockJobSetSpeed(virDomainPtr dom, const char *path,
+ unsigned long bandwidth, unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockJobImpl(dom, path, bandwidth, NULL, BLOCK_JOB_SPEED);
+}
+
+static int
+qemuDomainBlockPull(virDomainPtr dom, const char *path, unsigned long bandwidth,
+ unsigned int flags)
+{
+ virCheckFlags(0, -1);
+ return qemuDomainBlockJobImpl(dom, path, bandwidth, NULL, BLOCK_JOB_PULL);
+}
static virDriver qemuDriver = {
.no = VIR_DRV_QEMU,
@@ -8619,6 +8728,10 @@ static virDriver qemuDriver = {
.domainMigratePerform3 = qemuDomainMigratePerform3, /* 0.9.2 */
.domainMigrateFinish3 = qemuDomainMigrateFinish3, /* 0.9.2 */
.domainMigrateConfirm3 = qemuDomainMigrateConfirm3, /* 0.9.2 */
+ .domainBlockJobAbort = qemuDomainBlockJobAbort, /* 0.9.4 */
+ .domainGetBlockJobInfo = qemuDomainGetBlockJobInfo, /* 0.9.4 */
+ .domainBlockJobSetSpeed = qemuDomainBlockJobSetSpeed, /* 0.9.4 */
+ .domainBlockPull = qemuDomainBlockPull, /* 0.9.4 */
};
diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c
index 3a30a15..5c048eb 100644
--- a/src/qemu/qemu_monitor.c
+++ b/src/qemu/qemu_monitor.c
@@ -2427,3 +2427,21 @@ int qemuMonitorScreendump(qemuMonitorPtr mon,
ret = qemuMonitorTextScreendump(mon, file);
return ret;
}
+
+int qemuMonitorBlockJob(qemuMonitorPtr mon,
+ const char *device,
+ unsigned long bandwidth,
+ virDomainBlockJobInfoPtr info,
+ int mode)
+{
+ int ret;
+
+ VIR_DEBUG("mon=%p, device=%p, bandwidth=%lu, info=%p, mode=%o",
+ mon, device, bandwidth, info, mode);
+
+ if (mon->json)
+ ret = qemuMonitorJSONBlockJob(mon, device, bandwidth, info, mode);
+ else
+ ret = qemuMonitorTextBlockJob(mon, device, bandwidth, info, mode);
+ return ret;
+}
diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h
index f246d21..c5d27ef 100644
--- a/src/qemu/qemu_monitor.h
+++ b/src/qemu/qemu_monitor.h
@@ -447,6 +447,19 @@ int qemuMonitorInjectNMI(qemuMonitorPtr mon);
int qemuMonitorScreendump(qemuMonitorPtr mon,
const char *file);
+typedef enum {
+ BLOCK_JOB_ABORT = 0,
+ BLOCK_JOB_INFO = 1,
+ BLOCK_JOB_SPEED = 2,
+ BLOCK_JOB_PULL = 3,
+} BLOCK_JOB_CMD;
+
+int qemuMonitorBlockJob(qemuMonitorPtr mon,
+ const char *device,
+ unsigned long bandwidth,
+ virDomainBlockJobInfoPtr info,
+ int mode);
+
/**
* When running two dd process and using <> redirection, we need a
* shell that will not truncate files. These two strings serve that
diff --git a/src/qemu/qemu_monitor_json.c b/src/qemu/qemu_monitor_json.c
index 4db2b78..e7163bb 100644
--- a/src/qemu/qemu_monitor_json.c
+++ b/src/qemu/qemu_monitor_json.c
@@ -2717,3 +2717,150 @@ int qemuMonitorJSONScreendump(qemuMonitorPtr mon,
virJSONValueFree(reply);
return ret;
}
+
+static int qemuMonitorJSONGetBlockJobInfoOne(virJSONValuePtr entry,
+ const char *device,
+ virDomainBlockJobInfoPtr info)
+{
+ const char *this_dev;
+ const char *type;
+ unsigned long long speed_bytes;
+
+ if ((this_dev = virJSONValueObjectGetString(entry, "device")) == NULL) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'device'"));
+ return -1;
+ }
+ if (!STREQ(this_dev, device))
+ return -1;
+
+ type = virJSONValueObjectGetString(entry, "type");
+ if (!type) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'type'"));
+ return -1;
+ }
+ if (STREQ(type, "stream"))
+ info->type = VIR_DOMAIN_BLOCK_JOB_TYPE_PULL;
+ else
+ info->type = VIR_DOMAIN_BLOCK_JOB_TYPE_UNKNOWN;
+
+ if (virJSONValueObjectGetNumberUlong(entry, "speed", &speed_bytes) <
0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'speed'"));
+ return -1;
+ }
+ info->bandwidth = speed_bytes / 1024ULL / 1024ULL;
+
+ if (virJSONValueObjectGetNumberUlong(entry, "offset", &info->cur)
< 0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'offset'"));
+ return -1;
+ }
+
+ if (virJSONValueObjectGetNumberUlong(entry, "len", &info->end) <
0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("entry was missing 'len'"));
+ return -1;
+ }
+ return 0;
+}
+
+/** qemuMonitorJSONGetBlockJobInfo:
+ * Parse Block Job information.
+ * The reply is a JSON array of objects, one per active job.
+ */
+static int qemuMonitorJSONGetBlockJobInfo(virJSONValuePtr reply,
+ const char *device,
+ virDomainBlockJobInfoPtr info)
+{
+ virJSONValuePtr data;
+ int nr_results, i;
+
+ if (!info)
+ return -1;
+
+ if ((data = virJSONValueObjectGet(reply, "return")) == NULL) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("reply was missing return data"));
+ return -1;
+ }
+
+ if (data->type != VIR_JSON_TYPE_ARRAY) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("urecognized format of block job information"));
+ return -1;
+ }
+
+ if ((nr_results = virJSONValueArraySize(data)) < 0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("unable to determine array size"));
+ return -1;
+ }
+
+ for (i = 0; i < nr_results; i++) {
+ virJSONValuePtr entry = virJSONValueArrayGet(data, i);
+ if (qemuMonitorJSONGetBlockJobInfoOne(entry, device, info) == 0)
+ return 1;
+ }
+
+ return 0;
+}
+
+
+int qemuMonitorJSONBlockJob(qemuMonitorPtr mon,
+ const char *device,
+ unsigned long bandwidth,
+ virDomainBlockJobInfoPtr info,
+ int mode)
+{
+ int ret = -1;
+ virJSONValuePtr cmd = NULL;
+ virJSONValuePtr reply = NULL;
+
+ if (mode == BLOCK_JOB_ABORT)
+ cmd = qemuMonitorJSONMakeCommand("block_job_cancel",
+ "s:device", device, NULL);
+ else if (mode == BLOCK_JOB_INFO)
+ cmd = qemuMonitorJSONMakeCommand("query-block-jobs", NULL);
+ else if (mode == BLOCK_JOB_SPEED)
+ cmd = qemuMonitorJSONMakeCommand("block_job_set_speed",
+ "s:device", device,
+ "U:value", bandwidth * 1024ULL *
1024ULL,
+ NULL);
+ else if (mode == BLOCK_JOB_PULL)
+ cmd = qemuMonitorJSONMakeCommand("block_stream",
+ "s:device", device, NULL);
+
+ if (!cmd)
+ return -1;
+
+ ret = qemuMonitorJSONCommand(mon, cmd, &reply);
+
+ if (ret == 0 && virJSONValueObjectHasKey(reply, "error")) {
+ if (qemuMonitorJSONHasError(reply, "DeviceNotActive"))
+ qemuReportError(VIR_ERR_OPERATION_INVALID,
+ _("No active operation on device: %s"), device);
+ else if (qemuMonitorJSONHasError(reply, "DeviceInUse"))
+ qemuReportError(VIR_ERR_OPERATION_FAILED,
+ _("Device %s in use"), device);
+ else if (qemuMonitorJSONHasError(reply, "NotSupported"))
+ qemuReportError(VIR_ERR_OPERATION_INVALID,
+ _("Operation is not supported for device: %s"), device);
+ else
+ qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Unexpected error"));
+ ret = -1;
+ }
+
+ if (ret == 0 && mode == BLOCK_JOB_INFO)
+ ret = qemuMonitorJSONGetBlockJobInfo(reply, device, info);
+
+ if (ret == 0 && mode == BLOCK_JOB_PULL && bandwidth != 0)
+ ret = qemuMonitorJSONBlockJob(mon, device, bandwidth, NULL,
+ BLOCK_JOB_SPEED);
+
+ virJSONValueFree(cmd);
+ virJSONValueFree(reply);
+ return ret;
+}
diff --git a/src/qemu/qemu_monitor_json.h b/src/qemu/qemu_monitor_json.h
index 380e26a..1804390 100644
--- a/src/qemu/qemu_monitor_json.h
+++ b/src/qemu/qemu_monitor_json.h
@@ -220,5 +220,10 @@ int qemuMonitorJSONInjectNMI(qemuMonitorPtr mon);
int qemuMonitorJSONScreendump(qemuMonitorPtr mon,
const char *file);
+int qemuMonitorJSONBlockJob(qemuMonitorPtr mon,
+ const char *device,
+ unsigned long bandwidth,
+ virDomainBlockJobInfoPtr info,
+ int mode);
#endif /* QEMU_MONITOR_JSON_H */
diff --git a/src/qemu/qemu_monitor_text.c b/src/qemu/qemu_monitor_text.c
index 0965a08..c7632e2 100644
--- a/src/qemu/qemu_monitor_text.c
+++ b/src/qemu/qemu_monitor_text.c
@@ -2785,3 +2785,165 @@ cleanup:
VIR_FREE(cmd);
return ret;
}
+
+static int qemuMonitorTextParseBlockJobOne(const char *text,
+ const char *device,
+ virDomainBlockJobInfoPtr info,
+ const char **next)
+{
+ virDomainBlockJobInfo tmp;
+ char *p;
+ unsigned long long speed_bytes;
+ int mismatch = 0;
+
+ if (next == NULL)
+ return -1;
+ *next = NULL;
+
+ /*
+ * Each active stream will appear on its own line in the following format:
+ * Streaming device <device>: Completed <cur> of <end> bytes
+ */
+ if ((text = STRSKIP(text, "Streaming device ")) == NULL)
+ return -EINVAL;
+
+ if (!STREQLEN(text, device, strlen(device)))
+ mismatch = 1;
+
+ if ((text = strstr(text, ": Completed ")) == NULL)
+ return -EINVAL;
+ text += 11;
+
+ if (virStrToLong_ull (text, &p, 10, &tmp.cur))
+ return -EINVAL;
+ text = p;
+
+ if (!STRPREFIX(text, " of "))
+ return -EINVAL;
+ text += 4;
+
+ if (virStrToLong_ull (text, &p, 10, &tmp.end))
+ return -EINVAL;
+ text = p;
+
+ if (!STRPREFIX(text, " bytes, speed limit "))
+ return -EINVAL;
+ text += 20;
+
+ if (virStrToLong_ull (text, &p, 10, &speed_bytes))
+ return -EINVAL;
+ text = p;
+
+ if (!STRPREFIX(text, " bytes/s"))
+ return -EINVAL;
+
+ if (mismatch) {
+ *next = STRSKIP(text, "\n");
+ return -EAGAIN;
+ }
+
+ if (info) {
+ info->cur = tmp.cur;
+ info->end = tmp.end;
+ info->bandwidth = speed_bytes / 1024ULL / 1024ULL;
+ info->type = VIR_DOMAIN_BLOCK_JOB_TYPE_PULL;
+ }
+ return 1;
+}
+
+static int qemuMonitorTextParseBlockJob(const char *text,
+ const char *device,
+ virDomainBlockJobInfoPtr info)
+{
+ const char *next = NULL;
+ int ret = 0;
+
+ /* Check error: Device not found */
+ if (strstr(text, "Device '") && strstr(text, "' not
found")) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID, "%s", _("Device not
found"));
+ return -1;
+ }
+
+ /* Check error: Job already active on this device */
+ if (strstr(text, "Device '") && strstr(text, "' is in
use")) {
+ qemuReportError(VIR_ERR_OPERATION_FAILED, _("Device %s in use"),
+ device);
+ return -1;
+ }
+
+ /* Check error: Stop non-existent job */
+ if (strstr(text, "has not been activated")) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID,\
+ _("No active operation on device: %s"), device);
+ return -1;
+ }
+
+ /* This is not an error condition, there are just no results to report. */
+ if (strstr(text, "No active jobs")) {
+ return 0;
+ }
+
+ /* Check for unsupported operation */
+ if (strstr(text, "Operation is not supported")) {
+ qemuReportError(VIR_ERR_OPERATION_INVALID,
+ _("Operation is not supported for device: %s"), device);
+ return -1;
+ }
+
+ /* No output indicates success for Pull, JobAbort, and JobSetSpeed */
+ if (STREQ(text, ""))
+ return 0;
+
+ /* Now try to parse BlockJobInfo */
+ do {
+ ret = qemuMonitorTextParseBlockJobOne(text, device, info, &next);
+ text = next;
+ } while (text && ret == -EAGAIN);
+
+ if (ret < 0)
+ return -1;
+ return ret;
+}
+
+int qemuMonitorTextBlockJob(qemuMonitorPtr mon,
+ const char *device,
+ unsigned long bandwidth,
+ virDomainBlockJobInfoPtr info,
+ int mode)
+{
+ char *cmd = NULL;
+ char *reply = NULL;
+ int ret;
+
+ if (mode == BLOCK_JOB_ABORT)
+ ret = virAsprintf(&cmd, "block_job_cancel %s", device);
+ else if (mode == BLOCK_JOB_INFO)
+ ret = virAsprintf(&cmd, "info block-jobs");
+ else if (mode == BLOCK_JOB_SPEED)
+ ret = virAsprintf(&cmd, "block_job_set_speed %s %llu", device,
+ bandwidth * 1024ULL * 1024ULL);
+ else if (mode == BLOCK_JOB_PULL)
+ ret = virAsprintf(&cmd, "block_stream %s", device);
+ else
+ return -1;
+
+ if (ret < 0) {
+ virReportOOMError();
+ return -1;
+ }
+
+ ret = 0;
+ if (qemuMonitorHMPCommand(mon, cmd, &reply) < 0) {
+ qemuReportError(VIR_ERR_INTERNAL_ERROR,
+ "%s", _("cannot run monitor command"));
+ ret = -1;
+ goto cleanup;
+ }
+
+ ret = qemuMonitorTextParseBlockJob(reply, device, info);
+
+cleanup:
+ VIR_FREE(cmd);
+ VIR_FREE(reply);
+ return ret;
+}
diff --git a/src/qemu/qemu_monitor_text.h b/src/qemu/qemu_monitor_text.h
index e53f693..9a1c7c0 100644
--- a/src/qemu/qemu_monitor_text.h
+++ b/src/qemu/qemu_monitor_text.h
@@ -213,4 +213,10 @@ int qemuMonitorTextInjectNMI(qemuMonitorPtr mon);
int qemuMonitorTextScreendump(qemuMonitorPtr mon, const char *file);
+int qemuMonitorTextBlockJob(qemuMonitorPtr mon,
+ const char *device,
+ unsigned long bandwidth,
+ virDomainBlockJobInfoPtr info,
+ int mode);
+
#endif /* QEMU_MONITOR_TEXT_H */
--
1.7.3