[libvirt] [PATCH 0/4] Introduce jobs for storage driver

Disk operations can take ages to finish. Therefore users might want to abort such job, e.g. because host is going down for maintenance. This patch set is trying to allow this kind of behaviour. The inspiration was taken from qemu driver. How it works: An API that is known to run for a long time, e.g. volume allocation, wiping, have to find a pool which volume belongs to. If the pool was found it is locked. During this time the storage driver is locked. Now the API is preparing for taking the operation. Ideally, it would be sufficient to have only volume locked, but it requires far more changes to code. Just before the API is about to take the long term action, storageBeginJob[WithPool] is called. This wait on a condition specific for a volume. Similar to QEMU driver, wait is limited in time. Timeout is set to 30 seconds. Upon successful job acquire, a counter in the pool is incremented so the pool is prevented from destroy, undef; Then the pool is unlocked. After API finishes it's work, it calls storageEndJob[WithPool] which reset job structure in the volume, re-locks the pool, decrement the counter and broadcasts on the condition. Meanwhile, if the long term action is done internally, it should be done in a cycle so we can check if job abort wasn't signalized. If it is done by external program, we can virCommandAbort() it. Several improvements can be made, but I think this is good for start. NB, I've implemented job aborting for the both APIs using streams, however wasn't very successful with aborting a stream from the daemon site. Michal Privoznik (4): storage: Introduce virStorageVolAbortJob virsh: Expose virStorageVolAbortJob storage: Implement jobs for storage driver storage: Implement virStorageVolAbortJob include/libvirt/libvirt.h.in | 3 + src/conf/storage_conf.c | 12 ++ src/conf/storage_conf.h | 29 +++- src/driver.h | 5 + src/libvirt.c | 49 +++++ src/libvirt_private.syms | 4 + src/libvirt_public.syms | 1 + src/remote/remote_driver.c | 1 + src/remote/remote_protocol.x | 8 +- src/remote_protocol-structs | 5 + src/storage/storage_backend.c | 53 +++--- src/storage/storage_backend_fs.c | 8 +- src/storage/storage_driver.c | 408 ++++++++++++++++++++++++++++++-------- src/storage/storage_driver.h | 7 + tools/virsh.c | 39 ++++ 15 files changed, 516 insertions(+), 116 deletions(-) -- 1.7.8.5

This API can be used to terminate long running jobs on a volume like its building, resizing, wiping. Moreover, like virDomainAbortJob() calling this API will block until job has either completed or aborted. --- include/libvirt/libvirt.h.in | 3 ++ src/driver.h | 5 ++++ src/libvirt.c | 49 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + src/remote/remote_driver.c | 1 + src/remote/remote_protocol.x | 8 ++++++- src/remote_protocol-structs | 5 ++++ 7 files changed, 71 insertions(+), 1 deletions(-) diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 7d41642..77ec3f0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -2513,6 +2513,9 @@ int virStorageVolResize (virStorageVolPtr vol, unsigned long long capacity, unsigned int flags); +int virStorageVolAbortJob (virStorageVolPtr vol, + unsigned int flags); + /** * virKeycodeSet: diff --git a/src/driver.h b/src/driver.h index 03d249b..7845b06 100644 --- a/src/driver.h +++ b/src/driver.h @@ -1314,6 +1314,10 @@ typedef int unsigned int flags); typedef int + (*virDrvStorageVolAbortJob) (virStorageVolPtr vol, + unsigned int flags); + +typedef int (*virDrvStoragePoolIsActive)(virStoragePoolPtr pool); typedef int (*virDrvStoragePoolIsPersistent)(virStoragePoolPtr pool); @@ -1377,6 +1381,7 @@ struct _virStorageDriver { virDrvStorageVolResize volResize; virDrvStoragePoolIsActive poolIsActive; virDrvStoragePoolIsPersistent poolIsPersistent; + virDrvStorageVolAbortJob volAbortJob; }; # ifdef WITH_LIBVIRTD diff --git a/src/libvirt.c b/src/libvirt.c index e916aa0..8ce3234 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -13343,6 +13343,55 @@ error: } /** + * virStorageVolAbortJob: + * @vol: pointer to storage volume + * @flags: extra flags; not used yet, so callers should always pass 0 + * + * Requests that the current background job be aborted at the soonest + * opportunity. This will block until the job has either completed, + * or aborted. + * + * Returns: 0 in case of success + * -1 otherwise + */ +int +virStorageVolAbortJob(virStorageVolPtr vol, + unsigned int flags) +{ + virConnectPtr conn; + VIR_DEBUG("vol=%p flags=%x", vol, flags); + + virResetLastError(); + + if (!VIR_IS_STORAGE_VOL(vol)) { + virLibStorageVolError(VIR_ERR_INVALID_STORAGE_VOL, __FUNCTION__); + virDispatchError(NULL); + return -1; + } + + conn = vol->conn; + + if (conn->flags & VIR_CONNECT_RO) { + virLibConnError(VIR_ERR_OPERATION_DENIED, __FUNCTION__); + goto error; + } + + if (conn->storageDriver && conn->storageDriver->volAbortJob) { + int ret; + ret = conn->storageDriver->volAbortJob(vol, flags); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(VIR_ERR_NO_SUPPORT, __FUNCTION__); + +error: + virDispatchError(vol->conn); + return -1; +} + +/** * virNodeNumOfDevices: * @conn: pointer to the hypervisor connection * @cap: capability name diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index 46c13fb..cd3e2a6 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -532,6 +532,7 @@ LIBVIRT_0.9.10 { LIBVIRT_0.9.11 { global: virDomainPMWakeup; + virStorageVolAbortJob; } LIBVIRT_0.9.10; # .... define new API here using predicted next version number .... diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 031167a..3534ac0 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -5013,6 +5013,7 @@ static virStorageDriver storage_driver = { .volResize = remoteStorageVolResize, /* 0.9.10 */ .poolIsActive = remoteStoragePoolIsActive, /* 0.7.3 */ .poolIsPersistent = remoteStoragePoolIsPersistent, /* 0.7.3 */ + .volAbortJob = remoteStorageVolAbortJob, /* 0.9.11 */ }; static virSecretDriver secret_driver = { diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 4d845e7..014eade 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -1754,6 +1754,11 @@ struct remote_storage_vol_resize_args { unsigned int flags; }; +struct remote_storage_vol_abort_job_args { + remote_nonnull_storage_vol vol; + unsigned int flags; +}; + /* Node driver calls: */ struct remote_node_num_of_devices_args { @@ -2765,7 +2770,8 @@ enum remote_procedure { REMOTE_PROC_DOMAIN_SET_METADATA = 264, /* autogen autogen */ REMOTE_PROC_DOMAIN_GET_METADATA = 265, /* autogen autogen */ REMOTE_PROC_DOMAIN_BLOCK_REBASE = 266, /* autogen autogen */ - REMOTE_PROC_DOMAIN_PM_WAKEUP = 267 /* autogen autogen */ + REMOTE_PROC_DOMAIN_PM_WAKEUP = 267, /* autogen autogen */ + REMOTE_PROC_STORAGE_VOL_ABORT_JOB = 268 /* autogen autogen */ /* * Notice how the entries are grouped in sets of 10 ? diff --git a/src/remote_protocol-structs b/src/remote_protocol-structs index 8f882b7..c80d5f0 100644 --- a/src/remote_protocol-structs +++ b/src/remote_protocol-structs @@ -1317,6 +1317,10 @@ struct remote_storage_vol_resize_args { uint64_t capacity; u_int flags; }; +struct remote_storage_vol_abort_job_args { + remote_nonnull_storage_vol vol; + u_int flags; +}; struct remote_node_num_of_devices_args { remote_string cap; u_int flags; @@ -2178,4 +2182,5 @@ enum remote_procedure { REMOTE_PROC_DOMAIN_GET_METADATA = 265, REMOTE_PROC_DOMAIN_BLOCK_REBASE = 266, REMOTE_PROC_DOMAIN_PM_WAKEUP = 267, + REMOTE_PROC_STORAGE_VOL_ABORT_JOB = 268, }; -- 1.7.8.5

On Tue, Mar 13, 2012 at 03:35:29PM +0100, Michal Privoznik wrote:
This API can be used to terminate long running jobs on a volume like its building, resizing, wiping. Moreover, like virDomainAbortJob() calling this API will block until job has either completed or aborted. --- include/libvirt/libvirt.h.in | 3 ++ src/driver.h | 5 ++++ src/libvirt.c | 49 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + src/remote/remote_driver.c | 1 + src/remote/remote_protocol.x | 8 ++++++- src/remote_protocol-structs | 5 ++++ 7 files changed, 71 insertions(+), 1 deletions(-)
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 7d41642..77ec3f0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -2513,6 +2513,9 @@ int virStorageVolResize (virStorageVolPtr vol, unsigned long long capacity, unsigned int flags);
+int virStorageVolAbortJob (virStorageVolPtr vol, + unsigned int flags); +
No, virStorageVolGetJobInfo() API to go with it ? IMHO we should have both, so we mirror the virDomain job API design. Regards, Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|

On 13.03.2012 15:48, Daniel P. Berrange wrote:
On Tue, Mar 13, 2012 at 03:35:29PM +0100, Michal Privoznik wrote:
This API can be used to terminate long running jobs on a volume like its building, resizing, wiping. Moreover, like virDomainAbortJob() calling this API will block until job has either completed or aborted. --- include/libvirt/libvirt.h.in | 3 ++ src/driver.h | 5 ++++ src/libvirt.c | 49 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + src/remote/remote_driver.c | 1 + src/remote/remote_protocol.x | 8 ++++++- src/remote_protocol-structs | 5 ++++ 7 files changed, 71 insertions(+), 1 deletions(-)
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index 7d41642..77ec3f0 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -2513,6 +2513,9 @@ int virStorageVolResize (virStorageVolPtr vol, unsigned long long capacity, unsigned int flags);
+int virStorageVolAbortJob (virStorageVolPtr vol, + unsigned int flags); +
No, virStorageVolGetJobInfo() API to go with it ? IMHO we should have both, so we mirror the virDomain job API design.
Regards, Daniel
yeah, virStorageVolGetJobInfo() is one of the improvements I'm mentioning in cover letter. But I've decided to not implement it for now as another huge bunch of code would have to be rewritten make this patch set unbearable big. But if it is a show stopper I can rewrite and post v2.

via new virsh command 'vol-jobabort'. Currently, it accepts only volume specification as argument. --- tools/virsh.c | 39 +++++++++++++++++++++++++++++++++++++++ 1 files changed, 39 insertions(+), 0 deletions(-) diff --git a/tools/virsh.c b/tools/virsh.c index 630b77f..00668ff 100644 --- a/tools/virsh.c +++ b/tools/virsh.c @@ -12497,6 +12497,44 @@ cmdVolPath(vshControl *ctl, const vshCmd *cmd) return true; } +/* + * "vol-jobabort" command + */ +static const vshCmdInfo info_vol_jobabort[] = { + {"help", N_("abort active volume job")}, + {"desc", N_("Aborts the currently running volume job")}, + {NULL, NULL} +}; + +static const vshCmdOptDef opts_vol_jobabort[] = { + {"vol", VSH_OT_DATA, VSH_OFLAG_REQ, N_("volume name or key")}, + {"pool", VSH_OT_STRING, 0, N_("pool name or uuid")}, + {NULL, 0, 0, NULL} +}; + +static bool +cmdVolJobAbort(vshControl *ctl, const vshCmd *cmd) +{ + virStorageVolPtr vol; + bool ret = false; + + if (!vshConnectionUsability(ctl, ctl->conn)) + return false; + + if (!(vol = vshCommandOptVol(ctl, cmd, "vol", "pool", NULL))) { + return false; + } + + if (virStorageVolAbortJob(vol, 0) < 0) + goto cleanup; + + vshPrint(ctl, _("Job successfully aborted")); + ret = true; + +cleanup: + virStorageVolFree(vol); + return ret; +} /* * "secret-define" command @@ -17239,6 +17277,7 @@ static const vshCmdDef storageVolCmds[] = { {"vol-download", cmdVolDownload, opts_vol_download, info_vol_download, 0}, {"vol-dumpxml", cmdVolDumpXML, opts_vol_dumpxml, info_vol_dumpxml, 0}, {"vol-info", cmdVolInfo, opts_vol_info, info_vol_info, 0}, + {"vol-jobabort", cmdVolJobAbort, opts_vol_jobabort, info_vol_jobabort, 0}, {"vol-key", cmdVolKey, opts_vol_key, info_vol_key, 0}, {"vol-list", cmdVolList, opts_vol_list, info_vol_list, 0}, {"vol-name", cmdVolName, opts_vol_name, info_vol_name, 0}, -- 1.7.8.5

Simply, when we are about to take an action which might take ages, like allocating new volumes, wiping, etc. increment a counter of jobs in pool object and unlock it. We don't want to hold the pool locked during long term actions. --- src/conf/storage_conf.c | 12 ++ src/conf/storage_conf.h | 22 +++- src/libvirt_private.syms | 1 + src/storage/storage_driver.c | 284 +++++++++++++++++++++++++++++++----------- 4 files changed, 246 insertions(+), 73 deletions(-) diff --git a/src/conf/storage_conf.c b/src/conf/storage_conf.c index bdf6218..e378ceb 100644 --- a/src/conf/storage_conf.c +++ b/src/conf/storage_conf.c @@ -251,6 +251,8 @@ virStorageVolDefFree(virStorageVolDefPtr def) { if (!def) return; + virMutexDestroy(&def->lock); + ignore_value(virCondDestroy(&def->job.cond)); VIR_FREE(def->name); VIR_FREE(def->key); @@ -978,6 +980,16 @@ virStorageVolDefParseXML(virStoragePoolDefPtr pool, return NULL; } + if (virMutexInit(&ret->lock) < 0) { + virReportSystemError(errno, "%s", _("Cannot init mutex")); + goto cleanup; + } + + if (virCondInit(&ret->job.cond) < 0) { + virReportSystemError(errno, "%s", _("Cannot init cond")); + goto cleanup; + } + ret->name = virXPathString("string(./name)", ctxt); if (ret->name == NULL) { virStorageReportError(VIR_ERR_XML_ERROR, diff --git a/src/conf/storage_conf.h b/src/conf/storage_conf.h index 1ef9295..481c806 100644 --- a/src/conf/storage_conf.h +++ b/src/conf/storage_conf.h @@ -83,6 +83,25 @@ struct _virStorageVolTarget { }; +enum virStorageVolJob { + VIR_STORAGE_VOL_JOB_NONE = 0, /* no job */ + VIR_STORAGE_VOL_JOB_BUILD, + VIR_STORAGE_VOL_JOB_DELETE, + VIR_STORAGE_VOL_JOB_REFRESH, + VIR_STORAGE_VOL_JOB_WIPE, + VIR_STORAGE_VOL_JOB_RESIZE, + VIR_STORAGE_VOL_JOB_DOWNLOAD, + VIR_STORAGE_VOL_JOB_UPLOAD, + + VIR_STORAGE_VOL_JOB_LAST +}; + +struct virStorageVolJobObj { + virCond cond; + enum virStorageVolJob active; + unsigned long long start; +}; + typedef struct _virStorageVolDef virStorageVolDef; typedef virStorageVolDef *virStorageVolDefPtr; struct _virStorageVolDef { @@ -90,7 +109,8 @@ struct _virStorageVolDef { char *key; int type; /* virStorageVolType enum */ - unsigned int building; + virMutex lock; + struct virStorageVolJobObj job; unsigned long long allocation; /* bytes */ unsigned long long capacity; /* bytes */ diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 1f55f5d..fef9d5a 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -565,6 +565,7 @@ virFDStreamOpen; virFDStreamConnectUNIX; virFDStreamOpenFile; virFDStreamCreateFile; +virFDStreamSetInternalCloseCb; # hash.h diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index 66811ce..7f3dfcd 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -48,6 +48,7 @@ #include "virfile.h" #include "fdstream.h" #include "configmake.h" +#include "virtime.h" #define VIR_FROM_THIS VIR_FROM_STORAGE @@ -65,6 +66,111 @@ static void storageDriverUnlock(virStorageDriverStatePtr driver) } static void +storageResetJob(virStorageVolDefPtr vol) +{ + struct virStorageVolJobObj *job = &vol->job; + + job->active = VIR_STORAGE_VOL_JOB_NONE; + job->start = 0; +} + +/* wait max. 30 seconds for job ackquire */ +#define STORAGE_JOB_WAIT_TIME (1000ull * 30) + +static int +storageBeginJobInternal(virStorageDriverStatePtr driver ATTRIBUTE_UNUSED, + virStoragePoolObjPtr pool, + bool pool_locked, + virStorageVolDefPtr vol, + enum virStorageVolJob job) +{ + unsigned long long now; + unsigned long long then; + + if (virTimeMillisNow(&now) < 0) + return -1; + + then = now + STORAGE_JOB_WAIT_TIME; + + while (vol->job.active != VIR_STORAGE_VOL_JOB_NONE) { + if (virCondWaitUntil(&vol->job.cond, &vol->lock, then) < 0) + goto error; + } + + VIR_DEBUG("Starting job %d", job); + storageResetJob(vol); + vol->job.active = job; + vol->job.start = now; + + if (pool_locked) { + pool->asyncjobs++; + virStoragePoolObjUnlock(pool); + } + + return 0; + +error: + if (errno == ETIMEDOUT) + virStorageReportError(VIR_ERR_OPERATION_TIMEOUT, "%s", + _("cannot acquire state change lock")); + else + virReportSystemError(errno, "%s", + _("cannot acquire job mutex")); + if (pool_locked) + virStoragePoolObjLock(pool); + return -1; +} + +static int +storageBeginJob(virStorageDriverStatePtr driver, + virStorageVolDefPtr vol, + enum virStorageVolJob job) +{ + return storageBeginJobInternal(driver, NULL, false, vol, job); +} + +static int +storageBeginJobWithPool(virStorageDriverStatePtr driver, + virStoragePoolObjPtr pool, + virStorageVolDefPtr vol, + enum virStorageVolJob job) +{ + return storageBeginJobInternal(driver, pool, true, vol, job); +} + +static void +storageEndJobInternal(virStorageDriverStatePtr driver, + virStoragePoolObjPtr pool, + bool pool_locked, + virStorageVolDefPtr vol) +{ + VIR_DEBUG("Stopping job %d", vol->job.active); + storageResetJob(vol); + if (pool_locked) { + storageDriverLock(driver); + virStoragePoolObjLock(pool); + storageDriverUnlock(driver); + pool->asyncjobs--; + } + virCondBroadcast(&vol->job.cond); +} + +static void +storageEndJob(virStorageDriverStatePtr driver, + virStorageVolDefPtr vol) +{ + return storageEndJobInternal(driver, NULL, false, vol); +} + +static void +storageEndJobWithPool(virStorageDriverStatePtr driver, + virStoragePoolObjPtr pool, + virStorageVolDefPtr vol) +{ + return storageEndJobInternal(driver, pool, true, vol); +} + +static void storageDriverAutostart(virStorageDriverStatePtr driver) { unsigned int i; @@ -1365,18 +1471,16 @@ storageVolumeCreateXML(virStoragePoolPtr obj, memcpy(buildvoldef, voldef, sizeof(*voldef)); /* Drop the pool lock during volume allocation */ - pool->asyncjobs++; - voldef->building = 1; - virStoragePoolObjUnlock(pool); + if (storageBeginJobWithPool(driver, pool, voldef, + VIR_STORAGE_VOL_JOB_BUILD) < 0) { + VIR_FREE(buildvoldef); + goto cleanup; + } buildret = backend->buildVol(obj->conn, pool, buildvoldef); - storageDriverLock(driver); - virStoragePoolObjLock(pool); - storageDriverUnlock(driver); - voldef->building = 0; - pool->asyncjobs--; + storageEndJobWithPool(driver, pool, voldef); voldef = NULL; VIR_FREE(buildvoldef); @@ -1416,7 +1520,7 @@ storageVolumeCreateXMLFrom(virStoragePoolPtr obj, virStorageBackendPtr backend; virStorageVolDefPtr origvol = NULL, newvol = NULL; virStorageVolPtr ret = NULL, volobj = NULL; - int buildret; + int buildret = -1; virCheckFlags(0, NULL); @@ -1490,17 +1594,21 @@ storageVolumeCreateXMLFrom(virStoragePoolPtr obj, goto cleanup; } - if (origvol->building) { - virStorageReportError(VIR_ERR_OPERATION_INVALID, - _("volume '%s' is still being allocated."), - origvol->name); - goto cleanup; + if (backend->refreshVol) { + int refreshVolRet; + + if (storageBeginJobWithPool(driver, pool, origvol, + VIR_STORAGE_VOL_JOB_REFRESH) < 0) + goto cleanup; + + refreshVolRet = backend->refreshVol(obj->conn, pool, origvol); + + storageEndJobWithPool(driver, pool, origvol); + + if (refreshVolRet < 0) + goto cleanup; } - if (backend->refreshVol && - backend->refreshVol(obj->conn, pool, origvol) < 0) - goto cleanup; - if (VIR_REALLOC_N(pool->volumes.objs, pool->volumes.count+1) < 0) { virReportOOMError(); @@ -1517,34 +1625,23 @@ storageVolumeCreateXMLFrom(virStoragePoolPtr obj, newvol->key); /* Drop the pool lock during volume allocation */ - pool->asyncjobs++; - origvol->building = 1; - newvol->building = 1; - virStoragePoolObjUnlock(pool); + if (storageBeginJobWithPool(driver, pool, newvol, VIR_STORAGE_VOL_JOB_BUILD) < 0) + goto cleanup; - if (origpool) { - origpool->asyncjobs++; - virStoragePoolObjUnlock(origpool); - } + if ((origpool && storageBeginJobWithPool(driver, origpool, origvol, VIR_STORAGE_VOL_JOB_BUILD) < 0) || + (!origpool && storageBeginJob(driver, origvol, VIR_STORAGE_VOL_JOB_BUILD) < 0)) + goto endjob; buildret = backend->buildVolFrom(obj->conn, pool, newvol, origvol, flags); - storageDriverLock(driver); - virStoragePoolObjLock(pool); - if (origpool) - virStoragePoolObjLock(origpool); - storageDriverUnlock(driver); - - origvol->building = 0; - newvol->building = 0; newvol = NULL; - pool->asyncjobs--; - if (origpool) { - origpool->asyncjobs--; - virStoragePoolObjUnlock(origpool); - origpool = NULL; - } + if (origpool) + storageEndJobWithPool(driver, origpool, origvol); + else + storageEndJob(driver, origvol); +endjob: + storageEndJobWithPool(driver, pool, newvol); if (buildret < 0) { virStoragePoolObjUnlock(pool); @@ -1569,6 +1666,23 @@ cleanup: return ret; } +struct storageVolumeStreamData { + virStorageDriverStatePtr driver; + virStorageVolDefPtr vol; +}; + +static void +storageVolumeStreamEndJob(virStreamPtr stream ATTRIBUTE_UNUSED, + void *opaque) +{ + struct storageVolumeStreamData *data = opaque; + virStorageDriverStatePtr driver = data->driver; + virStorageVolDefPtr vol = data->vol; + + VIR_FREE(data); + + storageEndJob(driver, vol); +} static int storageVolumeDownload(virStorageVolPtr obj, @@ -1580,6 +1694,7 @@ storageVolumeDownload(virStorageVolPtr obj, virStorageDriverStatePtr driver = obj->conn->storagePrivateData; virStoragePoolObjPtr pool = NULL; virStorageVolDefPtr vol = NULL; + struct storageVolumeStreamData *stream_data = NULL; int ret = -1; virCheckFlags(0, -1); @@ -1609,21 +1724,34 @@ storageVolumeDownload(virStorageVolPtr obj, goto out; } - if (vol->building) { - virStorageReportError(VIR_ERR_OPERATION_INVALID, - _("volume '%s' is still being allocated."), - vol->name); + if (storageBeginJob(driver, vol, VIR_STORAGE_VOL_JOB_DOWNLOAD) < 0) goto out; - } if (virFDStreamOpenFile(stream, vol->target.path, offset, length, O_RDONLY) < 0) - goto out; + goto endjob; + + if (VIR_ALLOC(stream_data) < 0) { + virReportOOMError(); + goto endjob; + } + + stream_data->driver = driver; + stream_data->vol = vol; + + if (virFDStreamSetInternalCloseCb(stream, + storageVolumeStreamEndJob, + stream_data, NULL) < 0) + goto endjob; ret = 0; + goto out; +endjob: + storageEndJob(driver, vol); + VIR_FREE(stream_data); out: if (pool) virStoragePoolObjUnlock(pool); @@ -1642,6 +1770,7 @@ storageVolumeUpload(virStorageVolPtr obj, virStorageDriverStatePtr driver = obj->conn->storagePrivateData; virStoragePoolObjPtr pool = NULL; virStorageVolDefPtr vol = NULL; + struct storageVolumeStreamData *stream_data = NULL; int ret = -1; virCheckFlags(0, -1); @@ -1671,12 +1800,8 @@ storageVolumeUpload(virStorageVolPtr obj, goto out; } - if (vol->building) { - virStorageReportError(VIR_ERR_OPERATION_INVALID, - _("volume '%s' is still being allocated."), - vol->name); + if (storageBeginJob(driver, vol, VIR_STORAGE_VOL_JOB_DOWNLOAD) < 0) goto out; - } /* Not using O_CREAT because the file is required to * already exist at this point */ @@ -1684,10 +1809,27 @@ storageVolumeUpload(virStorageVolPtr obj, vol->target.path, offset, length, O_WRONLY) < 0) - goto out; + goto endjob; + + if (VIR_ALLOC(stream_data) < 0) { + virReportOOMError(); + goto endjob; + } + + stream_data->driver = driver; + stream_data->vol = vol; + + if (virFDStreamSetInternalCloseCb(stream, + storageVolumeStreamEndJob, + stream_data, NULL) < 0) + goto endjob; ret = 0; + goto out; +endjob: + storageEndJob(driver, vol); + VIR_FREE(stream_data); out: if (pool) virStoragePoolObjUnlock(pool); @@ -1737,13 +1879,6 @@ storageVolumeResize(virStorageVolPtr obj, goto out; } - if (vol->building) { - virStorageReportError(VIR_ERR_OPERATION_INVALID, - _("volume '%s' is still being allocated."), - vol->name); - goto out; - } - if (flags & VIR_STORAGE_VOL_RESIZE_DELTA) { abs_capacity = vol->capacity + capacity; flags &= ~VIR_STORAGE_VOL_RESIZE_DELTA; @@ -1771,9 +1906,14 @@ storageVolumeResize(virStorageVolPtr obj, goto out; } + if (storageBeginJobWithPool(driver, pool, vol, VIR_STORAGE_VOL_JOB_RESIZE) < 0) + goto out; + if (backend->resizeVol(obj->conn, pool, vol, abs_capacity, flags) < 0) goto out; + storageEndJobWithPool(driver, pool, vol); + vol->capacity = abs_capacity; ret = 0; @@ -2028,17 +2168,16 @@ storageVolumeWipePattern(virStorageVolPtr obj, goto out; } - if (vol->building) { - virStorageReportError(VIR_ERR_OPERATION_INVALID, - _("volume '%s' is still being allocated."), - vol->name); + if (storageBeginJobWithPool(driver, pool, vol, VIR_STORAGE_VOL_JOB_WIPE) < 0) goto out; - } if (storageVolumeWipeInternal(vol, algorithm) == -1) { + storageEndJobWithPool(driver, pool, vol); goto out; } + storageEndJobWithPool(driver, pool, vol); + ret = 0; out: @@ -2066,6 +2205,7 @@ storageVolumeDelete(virStorageVolPtr obj, virStorageVolDefPtr vol = NULL; unsigned int i; int ret = -1; + int deleteVolRet; storageDriverLock(driver); pool = virStoragePoolObjFindByName(&driver->pools, obj->pool); @@ -2095,13 +2235,6 @@ storageVolumeDelete(virStorageVolPtr obj, goto cleanup; } - if (vol->building) { - virStorageReportError(VIR_ERR_OPERATION_INVALID, - _("volume '%s' is still being allocated."), - vol->name); - goto cleanup; - } - if (!backend->deleteVol) { virStorageReportError(VIR_ERR_NO_SUPPORT, "%s", _("storage pool does not support vol deletion")); @@ -2109,7 +2242,14 @@ storageVolumeDelete(virStorageVolPtr obj, goto cleanup; } - if (backend->deleteVol(obj->conn, pool, vol, flags) < 0) + if (storageBeginJobWithPool(driver, pool, vol, + VIR_STORAGE_VOL_JOB_DELETE) < 0) + goto cleanup; + + deleteVolRet = backend->deleteVol(obj->conn, pool, vol, flags); + + storageEndJobWithPool(driver, pool, vol); + if (deleteVolRet < 0) goto cleanup; for (i = 0 ; i < pool->volumes.count ; i++) { -- 1.7.8.5

This implies breaking up some jobs into cycles during which we check for job abortion request. The virStorageVolAbortJob API will then just set request and wait until job is released. If a job was, however, interrupted it should fail with VIR_ERR_OPERATION_ABORTED error. --- src/conf/storage_conf.h | 7 ++ src/libvirt_private.syms | 3 + src/storage/storage_backend.c | 53 +++++++++------- src/storage/storage_backend_fs.c | 8 ++- src/storage/storage_driver.c | 128 ++++++++++++++++++++++++++++++++----- src/storage/storage_driver.h | 7 ++ 6 files changed, 162 insertions(+), 44 deletions(-) diff --git a/src/conf/storage_conf.h b/src/conf/storage_conf.h index 481c806..ee25e34 100644 --- a/src/conf/storage_conf.h +++ b/src/conf/storage_conf.h @@ -28,6 +28,7 @@ # include "util.h" # include "storage_encryption_conf.h" # include "threads.h" +# include "command.h" # include <libxml/tree.h> @@ -100,6 +101,12 @@ struct virStorageVolJobObj { virCond cond; enum virStorageVolJob active; unsigned long long start; + + bool abort_requested; /* abort was requested */ + virCommandPtr cmd; /* if we are running external program, + store pointer here too, so we can + virCommandAbort it if necessary */ + virStreamPtr stream; /* stream to abort */ }; typedef struct _virStorageVolDef virStorageVolDef; diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index fef9d5a..cec7a94 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -1017,6 +1017,9 @@ virStorageVolDefParseFile; virStorageVolDefParseNode; virStorageVolDefParseString; +# storage_driver.h +virStorageVolJobSetCommand; +virStorageVolJobSetStream; # storage_encryption_conf.h virStorageEncryptionFormat; diff --git a/src/storage/storage_backend.c b/src/storage/storage_backend.c index caac2f8..2e82c03 100644 --- a/src/storage/storage_backend.c +++ b/src/storage/storage_backend.c @@ -53,6 +53,7 @@ #include "internal.h" #include "secret_conf.h" #include "uuid.h" +#include "storage_driver.h" #include "storage_file.h" #include "storage_backend.h" #include "logging.h" @@ -106,8 +107,6 @@ static virStorageBackendPtr backends[] = { NULL }; -static int track_allocation_progress = 0; - enum { TOOL_QEMU_IMG, TOOL_KVM_IMG, @@ -202,6 +201,15 @@ virStorageBackendCopyToFD(virStorageVolDefPtr vol, goto cleanup; } + + if (vol->job.abort_requested || + inputvol->job.abort_requested) { + virStorageReportError(VIR_ERR_OPERATION_ABORTED, "%s", + _("Job abort requested")); + ret = -1; + goto cleanup; + } + } while ((amtleft -= interval) > 0); } @@ -325,34 +333,31 @@ createRawFile(int fd, virStorageVolDefPtr vol, } if (remain) { - if (track_allocation_progress) { + while (remain) { + /* Allocate in chunks of 512MiB: big-enough chunk + * size and takes approx. 9s on ext3. A progress + * update every 9s is a fair-enough trade-off + */ + unsigned long long bytes = 512 * 1024 * 1024; - while (remain) { - /* Allocate in chunks of 512MiB: big-enough chunk - * size and takes approx. 9s on ext3. A progress - * update every 9s is a fair-enough trade-off - */ - unsigned long long bytes = 512 * 1024 * 1024; - - if (bytes > remain) - bytes = remain; - if (safezero(fd, vol->allocation - remain, bytes) < 0) { - ret = -errno; - virReportSystemError(errno, _("cannot fill file '%s'"), - vol->target.path); - goto cleanup; - } - remain -= bytes; - } - } else { /* No progress bars to be shown */ - if (safezero(fd, 0, remain) < 0) { + if (bytes > remain) + bytes = remain; + if (safezero(fd, vol->allocation - remain, bytes) < 0) { ret = -errno; virReportSystemError(errno, _("cannot fill file '%s'"), vol->target.path); goto cleanup; } - } + remain -= bytes; + if (vol->job.abort_requested) { + virStorageReportError(VIR_ERR_OPERATION_ABORTED, + "%s", + _("Job abort requested")); + ret = -1; + goto cleanup; + } + } } if (fsync(fd) < 0) { @@ -782,6 +787,7 @@ virStorageBackendCreateQemuImg(virConnectPtr conn, goto cleanup; cmd = virCommandNew(create_tool); + virStorageVolJobSetCommand(vol, cmd); if (inputvol) { virCommandAddArgList(cmd, "convert", "-f", inputType, "-O", type, @@ -898,6 +904,7 @@ virStorageBackendCreateQcowCreate(virConnectPtr conn ATTRIBUTE_UNUSED, } cmd = virCommandNewArgList("qcow-create", size, vol->target.path, NULL); + virStorageVolJobSetCommand(vol, cmd); ret = virStorageBackendCreateExecCommand(pool, vol, cmd); virCommandFree(cmd); diff --git a/src/storage/storage_backend_fs.c b/src/storage/storage_backend_fs.c index 1af12e6..2d16be5 100644 --- a/src/storage/storage_backend_fs.c +++ b/src/storage/storage_backend_fs.c @@ -44,6 +44,7 @@ #include "virterror_internal.h" #include "storage_backend_fs.h" #include "storage_conf.h" +#include "storage_driver.h" #include "storage_file.h" #include "command.h" #include "memory.h" @@ -1188,7 +1189,7 @@ virStorageBackendFileSystemVolRefresh(virConnectPtr conn, } static int -virStorageBackendFilesystemResizeQemuImg(const char *path, +virStorageBackendFilesystemResizeQemuImg(virStorageVolDefPtr vol, unsigned long long capacity) { int ret = -1; @@ -1207,7 +1208,8 @@ virStorageBackendFilesystemResizeQemuImg(const char *path, } cmd = virCommandNew(img_tool); - virCommandAddArgList(cmd, "resize", path, NULL); + virStorageVolJobSetCommand(vol, cmd); + virCommandAddArgList(cmd, "resize", vol->target.path, NULL); virCommandAddArgFormat(cmd, "%llu", capacity); ret = virCommandRun(cmd, NULL); @@ -1233,7 +1235,7 @@ virStorageBackendFileSystemVolResize(virConnectPtr conn ATTRIBUTE_UNUSED, if (vol->target.format == VIR_STORAGE_FILE_RAW) return virStorageFileResize(vol->target.path, capacity); else - return virStorageBackendFilesystemResizeQemuImg(vol->target.path, + return virStorageBackendFilesystemResizeQemuImg(vol, capacity); } diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c index 7f3dfcd..7e11c74 100644 --- a/src/storage/storage_driver.c +++ b/src/storage/storage_driver.c @@ -72,6 +72,9 @@ storageResetJob(virStorageVolDefPtr vol) job->active = VIR_STORAGE_VOL_JOB_NONE; job->start = 0; + job->abort_requested = false; + job->cmd = NULL; + job->stream = NULL; } /* wait max. 30 seconds for job ackquire */ @@ -92,11 +95,17 @@ storageBeginJobInternal(virStorageDriverStatePtr driver ATTRIBUTE_UNUSED, then = now + STORAGE_JOB_WAIT_TIME; - while (vol->job.active != VIR_STORAGE_VOL_JOB_NONE) { + while (vol->job.active != VIR_STORAGE_VOL_JOB_NONE && + !vol->job.abort_requested) { if (virCondWaitUntil(&vol->job.cond, &vol->lock, then) < 0) goto error; } + if (vol->job.abort_requested) { + VIR_DEBUG("JobAbort requested. Not starting a job"); + return -1; + } + VIR_DEBUG("Starting job %d", job); storageResetJob(vol); vol->job.active = job; @@ -170,6 +179,24 @@ storageEndJobWithPool(virStorageDriverStatePtr driver, return storageEndJobInternal(driver, pool, true, vol); } +void virStorageVolJobSetCommand(virStorageVolDefPtr vol, + virCommandPtr cmd) +{ + if (!vol || !cmd) + return; + + vol->job.cmd = cmd; +} + +void virStorageVolJobSetStream(virStorageVolDefPtr vol, + virStreamPtr stream) +{ + if (!vol || !stream) + return; + + vol->job.stream = stream; +} + static void storageDriverAutostart(virStorageDriverStatePtr driver) { unsigned int i; @@ -1456,35 +1483,19 @@ storageVolumeCreateXML(virStoragePoolPtr obj, if (backend->buildVol) { int buildret; - virStorageVolDefPtr buildvoldef = NULL; - - if (VIR_ALLOC(buildvoldef) < 0) { - virReportOOMError(); - voldef = NULL; - goto cleanup; - } - - /* Make a shallow copy of the 'defined' volume definition, since the - * original allocation value will change as the user polls 'info', - * but we only need the initial requested values - */ - memcpy(buildvoldef, voldef, sizeof(*voldef)); /* Drop the pool lock during volume allocation */ if (storageBeginJobWithPool(driver, pool, voldef, VIR_STORAGE_VOL_JOB_BUILD) < 0) { - VIR_FREE(buildvoldef); goto cleanup; } - buildret = backend->buildVol(obj->conn, pool, buildvoldef); + buildret = backend->buildVol(obj->conn, pool, voldef); storageEndJobWithPool(driver, pool, voldef); voldef = NULL; - VIR_FREE(buildvoldef); - if (buildret < 0) { virStoragePoolObjUnlock(pool); storageVolumeDelete(volobj, 0); @@ -1727,6 +1738,8 @@ storageVolumeDownload(virStorageVolPtr obj, if (storageBeginJob(driver, vol, VIR_STORAGE_VOL_JOB_DOWNLOAD) < 0) goto out; + virStorageVolJobSetStream(vol, stream); + if (virFDStreamOpenFile(stream, vol->target.path, offset, length, @@ -1803,6 +1816,8 @@ storageVolumeUpload(virStorageVolPtr obj, if (storageBeginJob(driver, vol, VIR_STORAGE_VOL_JOB_DOWNLOAD) < 0) goto out; + virStorageVolJobSetStream(vol, stream); + /* Not using O_CREAT because the file is required to * already exist at this point */ if (virFDStreamOpenFile(stream, @@ -2007,6 +2022,13 @@ storageWipeExtent(virStorageVolDefPtr vol, *bytes_wiped += written; remaining -= written; + + if (vol->job.abort_requested) { + virStorageReportError(VIR_ERR_OPERATION_ABORTED, "%s", + _("Job abort requested")); + ret = -1; + goto out; + } } if (fdatasync(fd) < 0) { @@ -2088,6 +2110,7 @@ storageVolumeWipeInternal(virStorageVolDefPtr def, algorithm); } cmd = virCommandNew(SCRUB); + virStorageVolJobSetCommand(def, cmd); virCommandAddArgList(cmd, "-f", "-p", alg_char, def->target.path, NULL); @@ -2426,6 +2449,74 @@ cleanup: return ret; } +static int +storageVolAbortJob(virStorageVolPtr obj, + unsigned int flags) +{ + virStorageDriverStatePtr driver = obj->conn->storagePrivateData; + virStoragePoolObjPtr pool; + virStorageVolDefPtr vol; + int ret = -1; + + virCheckFlags(0, -1); + + storageDriverLock(driver); + pool = virStoragePoolObjFindByName(&driver->pools, obj->pool); + storageDriverUnlock(driver); + if (!pool) { + virStorageReportError(VIR_ERR_NO_STORAGE_POOL, + "%s", _("no storage pool with matching uuid")); + goto cleanup; + } + + if (!virStoragePoolObjIsActive(pool)) { + virStorageReportError(VIR_ERR_OPERATION_INVALID, + "%s", _("storage pool is not active")); + goto cleanup; + } + + vol = virStorageVolDefFindByName(pool, obj->name); + + if (!vol) { + virStorageReportError(VIR_ERR_NO_STORAGE_VOL, + _("no storage vol with matching name '%s'"), + obj->name); + goto cleanup; + } + + if (vol->job.active == VIR_STORAGE_VOL_JOB_NONE) { + virStorageReportError(VIR_ERR_OPERATION_INVALID, + _("there is no job running on '%s'"), + obj->name); + goto cleanup; + } + + vol->job.abort_requested = true; + virStoragePoolObjUnlock(pool); + + /* If we use external program kill it */ + virCommandAbort(vol->job.cmd); + if (vol->job.stream) + virStreamAbort(vol->job.stream); + + while (vol->job.active) { + if (virCondWait(&vol->job.cond, &vol->lock) < 0) { + virReportSystemError(errno, "%s", + _("unable to wait on condition")); + goto cleanup; + } + } + + virStoragePoolObjLock(pool); + + ret = 0; + +cleanup: + if (pool) + virStoragePoolObjUnlock(pool); + return ret; +} + static virStorageDriver storageDriver = { .name = "storage", .open = storageOpen, /* 0.4.0 */ @@ -2467,6 +2558,7 @@ static virStorageDriver storageDriver = { .volGetXMLDesc = storageVolumeGetXMLDesc, /* 0.4.0 */ .volGetPath = storageVolumeGetPath, /* 0.4.0 */ .volResize = storageVolumeResize, /* 0.9.10 */ + .volAbortJob = storageVolAbortJob, /* 0.9.11 */ .poolIsActive = storagePoolIsActive, /* 0.7.3 */ .poolIsPersistent = storagePoolIsPersistent, /* 0.7.3 */ diff --git a/src/storage/storage_driver.h b/src/storage/storage_driver.h index b3e2554..2220c90 100644 --- a/src/storage/storage_driver.h +++ b/src/storage/storage_driver.h @@ -25,7 +25,14 @@ # define __VIR_STORAGE_DRIVER_H__ # include "storage_conf.h" +# include "command.h" +# include "fdstream.h" int storageRegister(void); +void virStorageVolJobSetCommand(virStorageVolDefPtr vol, + virCommandPtr cmd); +void virStorageVolJobSetStream(virStorageVolDefPtr vol, + virStreamPtr stream); + #endif /* __VIR_STORAGE_DRIVER_H__ */ -- 1.7.8.5
participants (2)
-
Daniel P. Berrange
-
Michal Privoznik