[libvirt] [PATCH] Initial implementation of new job control api

This is my initial definition of a new internal job control api. I am working on this as a part of the google summer of code. These patches contain the core job control api and deal only with managing individual jobs. I am currently working on writing code using this api to manage jobs in domains, in such a way that I will be able to replace the current job control code in qemu and libxl. Ultimately I will use this to implement job control in the storage driver which is my ultimate goal for the summer of code. --- src/Makefile.am | 1 + src/util/virjobcontrol.c | 574 +++++++++++++++++++++++++++++++++++++++++++++++ src/util/virjobcontrol.h | 342 ++++++++++++++++++++++++++++ 3 files changed, 917 insertions(+) create mode 100644 src/util/virjobcontrol.c create mode 100644 src/util/virjobcontrol.h diff --git a/src/Makefile.am b/src/Makefile.am index 2b9ac61..77de0e7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -118,6 +118,7 @@ UTIL_SOURCES = \ util/virinitctl.c util/virinitctl.h \ util/viriptables.c util/viriptables.h \ util/viriscsi.c util/viriscsi.h \ + util/virjobcontrol.h util/virjobcontrol.c \ util/virjson.c util/virjson.h \ util/virkeycode.c util/virkeycode.h \ util/virkeyfile.c util/virkeyfile.h \ diff --git a/src/util/virjobcontrol.c b/src/util/virjobcontrol.c new file mode 100644 index 0000000..04a5246 --- /dev/null +++ b/src/util/virjobcontrol.c @@ -0,0 +1,574 @@ +/* + * virjobcontrol.c Core implementation of job control + * + * Copyright (C) 2014 Tucker DiNapoli + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Tucker DiNapoli + */ + +#include <config.h> + +#include "virjobcontrol.h" +#include "viralloc.h" +#include "virtime.h" +#include "virlog.h" +VIR_LOG_INIT("virjobcontrol"); + +VIR_ENUM_IMPL(virJob, 4, + "none", + "query", + "modify", + "destroy", +); +/* + No files other then this and virjobcontrol.c should need to + have access to the core implmentation of jobs. The code in these + files is intended to serve as a base for job control independent of + drivers. +*/ + +#define LOCK_JOB(job) \ + virMutexLock(&job->lock) +#define UNLOCK_JOB(job) \ + virMutexUnlock(&job->lock) +#define LOCK_JOB_INFO(job) \ + virMutexLock(&job->info->lock) +#define UNLOCK_JOB_INFO(job) \ + virMutexUnlock(&job->info->lock) +#define GET_CURRENT_TIME(time) \ + if (virTimeMillisNow(&time) < 0) { \ + return -1; \ + } + + +#define CHECK_FLAG_ATOMIC(job, flag) (virAtomicIntGet(&job->flags) & VIR_JOB_FLAG_##flag) +#define CHECK_FLAG(job, flag) (job->flags & VIR_JOB_FLAG_##flag) +#define SET_FLAG_ATOMIC(job, flag) (virAtomicIntOr(&job->flags, VIR_JOB_FLAG_##flag)) +#define SET_FLAG(job, flag) (job->flags |= VIR_JOB_FLAG_##flag) +#define UNSET_FLAG_ATOMIC(job, flag) (virAtomicIntAnd(&job->flags, (~VIR_JOB_FLAG_##flag))) +#define UNSET_FLAG(job, flag) (job->flags &= (~VIR_JOB_FLAG_##flag)) +#define CLEAR_FLAGS_ATOMIC(job) (virAtomicIntSet(&job->flags, VIR_JOB_FLAG_NONE)) +#define CLEAR_FLAGS(job) (job->flags = VIR_JOB_FLAG_NONE) + +typedef struct _jobHashEntry { + virJobID id; + virJobObjPtr job; + struct _jobHashEntry *next; +} jobHashEntry; + +typedef struct _jobHash { + jobHashEntry **table; + size_t size; + size_t num_entries; + virRWLock lock; +} jobHash; + +/* Using this incures a cost on every call to a job control function + that uses the job control hash table, but it means that no one using + job control needs to call an initialization function to use it. + + The other option would be to have a function: + virJobControlInit(void) + { + return virOnce(job_once, jobControlInit); + } + and require anyone using job control to call it. + */ +static struct _jobHash *job_hash; /* Hash table that contians all current jobs */ +static int init_err = 0; +static virOnceControl job_once = VIR_ONCE_CONTROL_INITIALIZER; +static void +jobControlInit(void) +{ + if (VIR_ALLOC_QUIET(job_hash) < 0) { + init_err = 1; + } + if (virRWLockInit(&job_hash->lock) < 0) { + init_err = 1; + } + if (VIR_ALLOC_N_QUIET(job_hash->table, 16) <0) { + init_err = 1; + } +} + +/* global job id + + Because it gets incremented before it's first use 0 is + never a valid job id + + Only ever touched with atomic instructions. +*/ +static unsigned int global_id = 0; + + + +/* Simple hash table implementation for jobs. + It is keyed by job id's which are just integers so there isn't acutally + a hash function. Using the existing hash table in libvirt would be overkill. +*/ + +#ifndef VIR_JOB_REHASH_THRESHOLD +#define VIR_JOB_REHASH_THRESHOLD 0.8 +#endif + +#ifndef VIR_JOB_BUCKET_LEN +#define VIR_JOB_BUCKET_LEN 5 +#endif + +#ifndef VIR_JOB_GROWTH_FACTOR +#define VIR_JOB_GROWTH_FACTOR 2 +#endif + +#define READ_LOCK_TABLE(ht) virRWLockRead((virRWLockPtr)&ht->lock) +#define WRITE_LOCK_TABLE(ht) virRWLockWrite((virRWLockPtr)&ht->lock) +#define UNLOCK_TABLE(ht) virRWLockUnlock((virRWLockPtr)&ht->lock) + +static int jobHashRehash(jobHash *ht); +#define maybe_rehash(ht) \ + if (((double)ht->size*VIR_JOB_BUCKET_LEN)/(ht->num_entries) >= \ + VIR_JOB_REHASH_THRESHOLD){ \ + jobHashRehash(ht); \ + } + + +/* Doesn't lock ht, so should only be called with a write lock held on ht*/ +static int jobHashRehash(jobHash *ht); + +/* look for id in the hash table ht and return the entry associated with it + if id isn't it the table return null*/ +static jobHashEntry* +jobLookup(const jobHash *ht, virJobID id) +{ + if (virOnce(&job_once, jobControlInit) < 0) { + return NULL; + } + READ_LOCK_TABLE(ht); + int bucket = id % ht->size; + jobHashEntry *retval = NULL, *job_entry; + if (!ht->table[bucket]) { + goto end; + } + job_entry = ht->table[bucket]; + do { + if (job_entry->id == id) { + retval = job_entry; + goto end; + } + } while ((job_entry = job_entry->next)); + + end: + UNLOCK_TABLE(ht); + return retval; +} + +/* add job the hashtable of currently existing jobs, job should + have already been initialized via virJobObjInit. + returns 0 if job is already in the hash, + returns the id of the job if it was successfully added + returns -1 on error. + */ +static int +jobHashAdd(jobHash *ht, virJobObjPtr job) +{ + if (virOnce(&job_once, jobControlInit) < 0) { + return -1; + } + virJobID id = job->id; + if (jobLookup(ht, id)) { /* check if job is in the table*/ + return 0; + } + int bucket = id % ht->size; + jobHashEntry *new_entry; + if (VIR_ALLOC_QUIET(new_entry) < 0) { + return -1; + } + jobHashEntry *last_entry = ht->table[bucket]; + *new_entry = (jobHashEntry) {.id = id, .job = job}; + + WRITE_LOCK_TABLE(ht); + if (!last_entry) { + ht->table[bucket] = new_entry; + } else { + while (last_entry->next) { + last_entry = last_entry->next; + } + last_entry->next = new_entry; + } + ht->num_entries++; + maybe_rehash(ht); + UNLOCK_TABLE(ht); + return id; +} + +/* Remove the job with the given id from the list of currently existing jobs, + this doesn't free/cleanup the actual job object, it just removes + the hash entry, this is called by virJobObjFree, so there shouldn't + be any reason to call it directly. + + return values are the same as for jobAlist add, 0 if no job found, + job id on success, -1 on some other error; +*/ +static int +jobHashRemove(jobHash *ht, virJobID id) +{ + int bucket = id % ht->size; + /*we can't just call jobLookup because we need the entry before the one + we want to free*/ + WRITE_LOCK_TABLE(ht); + jobHashEntry *entry = ht->table[bucket], *old_entry; + if (!entry) { + goto error; + } + if (entry->id != id) { + while (entry && entry->next->id != id) { + entry = entry->next; + } + } + if (!entry) { + goto error; + } + old_entry = entry->next; + entry->next = old_entry->next; + VIR_FREE(old_entry); + UNLOCK_TABLE(ht); + return id; + error: + UNLOCK_TABLE(ht); + return 0; +} +static int +jobHashRehash(jobHash *ht) +{ + size_t new_size = ht->size*VIR_JOB_GROWTH_FACTOR; + jobHashEntry **new_table; + if (VIR_ALLOC_N_QUIET(new_table, new_size)) { + return -1; + } + jobHashEntry **old_table = ht->table; + int index; + for (index = 0; index<ht->size; index++) { + jobHashEntry *old_entry = old_table[index]; + while (old_entry) { + int bucket = old_entry->id % new_size; + jobHashEntry *new_entry = new_table[bucket]; + if (!new_entry) { + new_table[bucket] = old_entry; + } else { + while (new_entry->next) { + new_entry = new_entry->next; + } + new_entry->next = old_entry; + } + old_entry = old_entry->next; + } + } + ht->size = new_size; + ht->table = new_table; + VIR_FREE(old_table); + return 1; +} + +#define jobIDFromJobInternal(job) (job->id) + +static inline virJobObjPtr +jobFromIDInternal(virJobID id) +{ + jobHashEntry *entry = jobLookup(job_hash, id); + if (entry) { + return entry->job; + } else { + return NULL; + } +} + +virJobObjPtr +virJobFromID(virJobID id) +{ + return jobFromIDInternal(id); + +} +virJobID +virJobIDFromJob(virJobObjPtr job) +{ + return jobIDFromJobInternal(job); +} + +int +virJobObjInit(virJobObjPtr job) +{ + /* This code checks to see if job was already initialized, + I don't know if this is needed or not.*/ + if (job->id != 0) { + VIR_DEBUG("job %d has already been initialized", job->id); + return 0; + } + job->id = virAtomicIntInc(&global_id); + job->maxJobsWaiting = INT_MAX; + + if (virCondInit(&job->cond) < 0) { + return -1; + } + + if (virMutexInit(&job->lock) < 0) { + virCondDestroy(&job->cond); + return -1; + } + + jobHashAdd(job_hash, job); + + return job->id; +} + +void +virJobObjFree(virJobObjPtr job) +{ + virCondDestroy(&job->cond); + jobHashRemove(job_hash, job->id); + VIR_FREE(job); +} + +void +virJobObjCleanup(virJobObjPtr job) +{ + + virCondDestroy(&job->cond); + jobHashRemove(job_hash, job->id); + memset(job, '\0', sizeof(virJobObj)); + return; +} + +int +virJobObjBegin(virJobObjPtr job, + virJobType type) +{ + LOCK_JOB(job); + if (CHECK_FLAG(job, ACTIVE)) { + VIR_DEBUG("Job %d is already running", job->id); + UNLOCK_JOB(job); + return 0; + } + VIR_DEBUG("Starting job %d with type %s", + job->id, virJobTypeToString(type)); + unsigned long long now; + if (job->id <= 0) { + goto error; /* job wasn't initialiazed*/ + } + if (virTimeMillisNow(&now) < 0) { + goto error; + } + job->type = type; + SET_FLAG(job, ACTIVE); + job->start = now; + job->owner = virThreadSelfID(); + return job->id; + error: + UNLOCK_JOB(job); + return -1; +} + +int +virJobObjEnd(virJobObjPtr job) +{ + if (job->type == VIR_JOB_NONE) { + return -1; + } + virJobObjReset(job); + virCondSignal(&job->cond); + + return job->id; +} + +/* There shouldn't be any threads waiting on job when this is called*/ +int +virJobObjReset(virJobObjPtr job) +{ + LOCK_JOB(job); + job->type = VIR_JOB_NONE; + job->flags = VIR_JOB_FLAG_NONE; + job->owner = 0; + job->jobsWaiting = 0; + UNLOCK_JOB(job); + + return job->id; +} + +int +virJobObjAbort(virJobObjPtr job) +{ + int retval; + LOCK_JOB(job); + if (CHECK_FLAG(job, ACTIVE) || CHECK_FLAG(job, SUSPENDED)) { + SET_FLAG(job, ABORTED); + retval = job->id; + } else { + retval = -1; + } + UNLOCK_JOB(job); + return retval; +} + +int +virJobObjAbortAndSignal(virJobObjPtr job) +{ + int retval; + LOCK_JOB(job); + if (CHECK_FLAG(job, ACTIVE) || CHECK_FLAG(job, SUSPENDED)) { + SET_FLAG(job, ABORTED); + retval = job->id; + virCondBroadcast(&job->cond); + } else { + retval = -1; + } + UNLOCK_JOB(job); + return retval; +} + +/* lock should be held by the calling function*/ +int +virJobObjWait(virJobObjPtr job, + virMutexPtr lock, + unsigned long long limit) +{ + + bool job_lock = false; + int retval; + if (CHECK_FLAG_ATOMIC(job, ACTIVE)) { /* if the job isn't active we're fine*/ + if (virAtomicIntInc(&job->jobsWaiting) > job->maxJobsWaiting) { + errno = 0; + goto error; + } + if (!lock) { + LOCK_JOB(job); + lock = &job->lock; + job_lock = true; + } + while (CHECK_FLAG_ATOMIC(job, ACTIVE)) { + if (limit) { + retval = virCondWaitUntil(&job->cond, lock, limit); + } else { + retval = virCondWait(&job->cond, lock); + } + if (retval < 0) { + goto error; + } + } + } + virAtomicIntDec(&job->jobsWaiting); + if (job_lock) { + UNLOCK_JOB(job); + } + return job->id; + + error: + virAtomicIntDec(&job->jobsWaiting); + if (job_lock) { + UNLOCK_JOB(job); + } + if (!errno) { + return -3; + } else if (errno == ETIMEDOUT) { + return -2; + } else { + return -1; + } +} + +int +virJobObjSuspend(virJobObjPtr job) +{ + if (!CHECK_FLAG_ATOMIC(job, ACTIVE)) { + return -1; + } + LOCK_JOB(job); + SET_FLAG(job, SUSPENDED); + UNSET_FLAG(job, ACTIVE); + UNLOCK_JOB(job); + return job->id; +} +int +virJobObjResume(virJobObjPtr job) +{ + if (!CHECK_FLAG_ATOMIC(job, SUSPENDED)) { + return -1; + } + LOCK_JOB(job); + UNSET_FLAG(job, SUSPENDED); + SET_FLAG(job, ACTIVE); + UNLOCK_JOB(job); + return job->id; +} +int +virJobObjResumeIfNotAborted(virJobObjPtr job) +{ + int retval; + LOCK_JOB(job); + if (!CHECK_FLAG(job, SUSPENDED)) { + retval = -1; + } else if (CHECK_FLAG(job, ABORTED)) { + retval = 0; + } else { + UNSET_FLAG(job, SUSPENDED); + SET_FLAG(job, ACTIVE); + retval = job->id; + } + UNLOCK_JOB(job); + return retval; +} + +void +virJobObjSetMaxWaiters(virJobObjPtr job, int max) +{ + virAtomicIntSet(&job->maxJobsWaiting, max); +} + +bool +virJobObjCheckAbort(virJobObjPtr job) +{ + return CHECK_FLAG_ATOMIC(job, ABORTED); +} +bool +virJobObjActive(virJobObjPtr job) +{ + return CHECK_FLAG_ATOMIC(job, ACTIVE); +} + +/* since we need to be able to return a negitive answer on error + the time difference we can return is only half of the maximum + possible time. This shouldn't pose any real issue. +*/ +long long +virJobObjCheckTime(virJobObjPtr job) +{ + if (!CHECK_FLAG(job, ACTIVE)) { + return 0; + } + unsigned long long now; + if (virTimeMillisNow(&now) < 0) { + return -1; + } + return now - job->start; +} + +void +virJobObjSignal(virJobObjPtr job, bool all) +{ + if (all) { + virCondBroadcast(&job->cond); + } else { + virCondSignal(&job->cond); + } +} diff --git a/src/util/virjobcontrol.h b/src/util/virjobcontrol.h new file mode 100644 index 0000000..235cc06 --- /dev/null +++ b/src/util/virjobcontrol.h @@ -0,0 +1,342 @@ +/* + * virjobcontrol.h Core implementation of job control + * + * Copyright (C) 2014 Tucker DiNapoli + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Tucker DiNapoli + */ + + +#ifndef __JOB_CONTROL_H__ +#define __JOB_CONTROL_H__ + +#include <stdlib.h> + +#include "viratomic.h" +#include "virthread.h" +#include "virutil.h" + + +/* The general type of the job, specifically if it will modify + the object it is acting on.*/ +typedef enum { + VIR_JOB_NONE = 0, /*no job running*/ + VIR_JOB_QUERY = 0x1, /* job will not change object state (read-only)*/ + VIR_JOB_MODIFY = 0x2, /* job may change object state (read-write)*/ + VIR_JOB_DESTROY = 0x4, /* job will destroy the object it is acting on */ + VIR_JOB_LAST +} virJobType; +VIR_ENUM_DECL(virJob); + + +/* General metadata/flags for jobs + + more specific flags can be added for speciic drivers, + to do this the first value of the enum for the extra flags + should be set to (VIR_JOB_FLAG_LAST << 1) and each additional + flag should be set to the last flag shifted to the left by 1. + +*/ +typedef enum { + VIR_JOB_FLAG_NONE = 0x000, /* No job is active */ + VIR_JOB_FLAG_ACTIVE = 0x001, /* Job is active */ + /* These next flags are used to indicate how progress should be measured + and are currently unused + */ + VIR_JOB_FLAG_TIME_BOUND = 0x002, /* Job is bound by a specific ammount of time */ + VIR_JOB_FLAG_MEM_BOUND = 0x004, /* Job is bound by a specific ammount of memory */ + VIR_JOB_FLAG_FILE_BOUND = 0x008, /* Job is bound by a specific number of files */ + VIR_JOB_FLAG_DATA_BOUND = 0x010, /* Job is bound by a specific ammount of data */ + VIR_JOB_FLAG_UNBOUNDED = 0x020, /* Job has no specific bound */ + /* These flags indicate a job is no longer active but still exists + */ + VIR_JOB_FLAG_SUSPENDED = 0x040, /* Job is Suspended and can be resumed */ + VIR_JOB_FLAG_COMPLETED = 0x080, /* Job has finished, but isn't cleaned up */ + VIR_JOB_FLAG_FAILED = 0x100, /* Job hit error, but isn't cleaned up */ + VIR_JOB_FLAG_ABORTED = 0x200, /* Job was aborted, but isn't cleaned up */ + VIR_JOB_FLAG_LAST = 0x400 +} virJobFlag; + + +typedef int virJobID; /*signed so negitive values can be returned on error*/ +typedef struct _virJobObj virJobObj; +typedef virJobObj *virJobObjPtr; +typedef struct _virJobInfo virJobInfo; +typedef virJobInfo *virJobInfoPtr; + +struct _virJobObj { + /* this should only be locked when changing the job object, not while running a job + it's use is mostly optional, but it is needed for waiting on a job*/ + virMutex lock;/*should have a compile time option to enable/disable locking */ + virCond cond; /* Use to coordinate jobs (is this necessary?) */ + virJobType type; /* The type of the job */ + unsigned long long owner; /* Thread id which set current job */ + unsigned long long start; /* when the job started*/ + int jobsWaiting; /* number jobs waiting on cond */ + int maxJobsWaiting; /* max number of jobs that can wait on cond */ + /* information about the job, this is a private struct, access to information + about the job should be obtained by functions */ + virJobInfoPtr info; + virJobID id; /* the job id, constant, and unique */ + virJobFlag flags; /* The current state of the job */ + void *privateData; +}; + +/* Getting information on a currently running job is currently unimplmeneted and + any suggestions on what should go into the api for querying/setting job + progress information would be appreciated +*/ +typedef enum { + VIR_JOB_INFO_NONE = 0, + VIR_JOB_INFO_TIME, + VIR_JOB_INFO_MEM, + VIR_JOB_INFO_FILE, + VIR_JOB_INFO_DATA, + VIR_JOB_INFO_LAST +} virJobInfoType; + +struct _virJobInfo { + /* these fields are for compatability with existing job + information structs. + */ + virDomainBlockJobInfoPtr blockJobInfoPtr; + virDomainJobInfoPtr domainJobInfoPtr; + + unsigned long long progressTotal; + unsigned long long dataTotal; + unsigned long long memTotal; + unsigned long long filesTotal; + unsigned long long timeTotal; + + unsigned long long progressProcessed; + unsigned long long timeProcessed; + unsigned long long memProcessed; + unsigned long long filesProcessed; + unsigned long long dataProcessed; + + unsigned long long progressRemaining; + unsigned long long dataRemaining; + unsigned long long memRemaining; + unsigned long long filesRemaining; + unsigned long long timeRemaining; +}; + +/** + * virJobObjInit: + * @job: Pointer to the job object to initialize + * + * Initialize the job object given, should be called before any other + * job function. + * + * Like all job functions it returns the job id on success and -1 on error + */ +int virJobObjInit(virJobObjPtr job); +/** + * virJobObjFree: + * @job: Pointer to the job object to free + * + * Cleanup/free all resources/memory assoicated with job + */ +void virJobObjFree(virJobObjPtr job); +/** + * virJobObjCleanup: + * @job: Pointer to the job object to cleanup + * + * Cleanup all resources assoicated with job, and zero out the + * corrsponding memory, but do not free it. + */ + +void virJobObjCleanup(virJobObjPtr job); +/** + * virJobObjBegin: + * @job: The job to begin + * @type: The type of job that is being started + * + * Marks job as active, sets the calling thread as the owner of the job, + * sets the job's start time to the current time and sets it's type to type. + * + * End job should be called after this, once the job is done + * + * Returns the job id of job on success, 0 if job is already active + * and -1 on error. + */ +int virJobObjBegin(virJobObjPtr job, + virJobType type); +/** + * virJobObjEnd: + * + * @job: The job to end + * + * Ends job, calls virJobObjReset and signals all threads waiting on job. + * + * Ending a job does not invalidate the job object, and a new job can be + * started using the same job object, call virJobObjFree or virJobObjCleanup + * in order to destroy a job object. + * + * returns the job's id on success and -1 if the job was not active. + */ +int virJobObjEnd(virJobObjPtr job); +/** + * virJobObjReset: + * + * @job: The job to reset + * + * Clears all fields of job related to running a job. This does not + * clear the job id, any configurable parameters (currently just the + * maximum number of waiting threads), or the mutex/condition variable + * assoicated with the job. This is called internally by virJobObjEnd + * and there should be few reasons to call this explicitly. + */ +int virJobObjReset(virJobObjPtr job); +/** + * virJobObjAbort: + * @job: The job to abort + * + * Marks job as aborted, since jobs are asyncronous this doesn't actually + * stop the job. The abort status of a job can be checked by + * virJobObjCheckAbort. A suspended job can be aborted. + * + * returns the job id on success and -1 if + * job is not currently running/suspended. + */ +int virJobObjAbort(virJobObjPtr job); +/** + * virJobObjAbort: + * @job: The job to abort/signal waiters + * + * Behaves identically to virJobObjAbort except all threads waiting + * on job are signaled after the abort status is set. + */ +int virJobObjAbortAndSignal(virJobObjPtr job); +/** + * virJobObjSuspend: + * @job: The job to suspend + * + * Marks job as suspended, it is up to the caller of the function + * to actually save any state assoicated with the job + * + * This function returns the job's id on success and -1 if the job + * was not active. + */ +int virJobObjSuspend(virJobObjPtr job); +/** + * virJobObjResume: + * @job The job to resume + * + * Resume job, as with virJobObjSuspend it is up to the caller to + * insure that the work being done by job is actually restarted. + * + * Since a job can be aborted while it is suspended the caller should + * check to see if job has been aborted, a convenience function + * virJobObjResumeIfNotAborted is provided. + * + * returns the job id if job was resumed and -1 if the job was not suspended. + */ +int virJobObjResume(virJobObjPtr job); + +/** + * virJobObjResumeIfNotAborted: + * @job The job to resume + * + * Behaves the same as virJobObjResume except it returns 0 and does not + * resume the job if the job was aborted while suspended. + */ +int virJobObjResumeIfNotAborted(virJobObjPtr job); + +/* returns -3 if max waiters is exceeded, -2 on timeout, -1 on other error*/ +/** + * virJobObjWait: + * @job: The job to wait on + * @lock: The lock to use in the call to virCondWait + * @limit: If not 0 the maximum ammount of time to wait (in milliseconds) + * + * This function waits for job to be completed, or to otherwise signal on it's + * condition variable. + * + * If lock is NULL the internal job lock will be used, otherwise lock should + * be held by the calling thread. + * (NOTE: I'm not sure if it's a good idea or not to use the internal lock) + * + * If limit is > 0 virCondWaitUntil is called instead of virCondWait with limit + * being used as the time parameter. + * + * If job is not currently active return successfully. + * + * Like all job functions returns the job's id on success. + * + * On Failure returns a negitive number to indicate the cause of failure + * -3 indicates the maximum number of threads were alread waiting on job + * -2 indicates that virCondWaitUntil timed out + * -1 indicates some other error in virCondWait/virCondWaitUntil + */ +int virJobObjWait(virJobObjPtr job, + virMutexPtr lock, + unsigned long long limit); +/* Should I provide a function to wait for a suspended job to resume? */ + +/** + * virJobObjSignal: + * @job: The job to signal from + * @all: If true signal all waiting threads, otherwise just signal one + * + * Signal a thread/threads waiting on job. In most cases waiting threads + * are signaled when needed internally, but this is provided if for + * some reason waiting threads need to be manually signaled. + */ + +void virJobObjSignal(virJobObjPtr job, bool all); + +/* accessor functions*/ +/** + * virJobObjCheckAbort: + * @job: The job whoes status should be checked + * + * Returns true if job has been aborted, false otherwise + */ +bool virJobObjCheckAbort(virJobObjPtr job); +/** + * virJobObjCheckTime: + * @job: The job whoes time should be checked + * + * Returns the time in milliseconds that job has been running for. + * returns 0 if job is not active and -1 if there is an error in + * getting the current time. + */ +long long virJobObjCheckTime(virJobObjPtr job); +/** + * virJobObjActive: + * @job: The job whoes status should be checked + * + * Returns true if job is currently active, false otherwise + */ +bool virJobObjActive(virJobObjPtr job); +/** + * virJobObjSetMaxWaiters: + * @job: The job to modify + * @max: The maximum number of threads to allow to wait on job at once + * + * Sets the maximum number of threads that can simultaneously wait on job. + * By default there is essentially no limit (in reality the limit is the + * maximum value that can be held by an int) + */ +void virJobObjSetMaxWaiters(virJobObjPtr job, int max); + +/* These convert between a job object and a job id. +*/ +virJobObjPtr virJobFromID(virJobID id); +virJobID virJobIDFromJob(virJobObjPtr job); +#endif -- 2.0.0

On Wed, Jun 18, 2014 at 05:59:47PM -0400, Tucker DiNapoli wrote:
This is my initial definition of a new internal job control api. I am working on this as a part of the google summer of code. These patches contain the core job control api and deal only with managing individual jobs. I am currently working on writing code using this api to manage jobs in domains, in such a way that I will be able to replace the current job control code in qemu and libxl. Ultimately I will use this to implement job control in the storage driver which is my ultimate goal for the summer of code.
--- src/Makefile.am | 1 + src/util/virjobcontrol.c | 574 +++++++++++++++++++++++++++++++++++++++++++++++ src/util/virjobcontrol.h | 342 ++++++++++++++++++++++++++++ 3 files changed, 917 insertions(+) create mode 100644 src/util/virjobcontrol.c create mode 100644 src/util/virjobcontrol.h
diff --git a/src/Makefile.am b/src/Makefile.am index 2b9ac61..77de0e7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -118,6 +118,7 @@ UTIL_SOURCES = \ util/virinitctl.c util/virinitctl.h \ util/viriptables.c util/viriptables.h \ util/viriscsi.c util/viriscsi.h \ + util/virjobcontrol.h util/virjobcontrol.c \ util/virjson.c util/virjson.h \ util/virkeycode.c util/virkeycode.h \ util/virkeyfile.c util/virkeyfile.h \ diff --git a/src/util/virjobcontrol.c b/src/util/virjobcontrol.c new file mode 100644 index 0000000..04a5246 --- /dev/null +++ b/src/util/virjobcontrol.c @@ -0,0 +1,574 @@ +/* + * virjobcontrol.c Core implementation of job control + * + * Copyright (C) 2014 Tucker DiNapoli + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Tucker DiNapoli + */ + +#include <config.h> + +#include "virjobcontrol.h" +#include "viralloc.h" +#include "virtime.h" +#include "virlog.h" +VIR_LOG_INIT("virjobcontrol"); + +VIR_ENUM_IMPL(virJob, 4,
You should use VIR_JOB_LAST for easy extension and proper size checking. I've just now discovered that running make syntax-check would tell you the same.
+ "none", + "query", + "modify", + "destroy", +);
These are probably general enough, yet.
+/* + No files other then this and virjobcontrol.c should need to + have access to the core implmentation of jobs. The code in these + files is intended to serve as a base for job control independent of + drivers. +*/ + +#define LOCK_JOB(job) \ + virMutexLock(&job->lock) +#define UNLOCK_JOB(job) \ + virMutexUnlock(&job->lock) +#define LOCK_JOB_INFO(job) \ + virMutexLock(&job->info->lock) +#define UNLOCK_JOB_INFO(job) \ + virMutexUnlock(&job->info->lock)
We prefer having these in a separate functions. Not only can you trace them better when debugging (for such purposes I use CFLAGS="-ggdb -O0"), but it also shouldn't add too much of an overhead with proper compiler optimizations.
+#define GET_CURRENT_TIME(time) \ + if (virTimeMillisNow(&time) < 0) { \ + return -1; \ + } +
This creates code with two flaws. One is readability, because it can return from function (skipping possible clean-ups, e.g. in future code) even though it's not obvious from the name. Second one is that there is no need to have a semicolon after the macro (which confuses some editors). The usual workaround is: #define ASDF() \ do { \ asdf(); \ while (0)
+ +#define CHECK_FLAG_ATOMIC(job, flag) (virAtomicIntGet(&job->flags) & VIR_JOB_FLAG_##flag) +#define CHECK_FLAG(job, flag) (job->flags & VIR_JOB_FLAG_##flag) +#define SET_FLAG_ATOMIC(job, flag) (virAtomicIntOr(&job->flags, VIR_JOB_FLAG_##flag)) +#define SET_FLAG(job, flag) (job->flags |= VIR_JOB_FLAG_##flag) +#define UNSET_FLAG_ATOMIC(job, flag) (virAtomicIntAnd(&job->flags, (~VIR_JOB_FLAG_##flag))) +#define UNSET_FLAG(job, flag) (job->flags &= (~VIR_JOB_FLAG_##flag)) +#define CLEAR_FLAGS_ATOMIC(job) (virAtomicIntSet(&job->flags, VIR_JOB_FLAG_NONE)) +#define CLEAR_FLAGS(job) (job->flags = VIR_JOB_FLAG_NONE) +
While the resulting code looks quite nice with all these, I'm not convinced we want all of them. Is _ATOMIC even necessary? You're locking the structure anyway in some cases. Not using the atom ops might be better, as Daniel suggested: https://www.redhat.com/archives/libvir-list/2014-June/msg00194.html
+typedef struct _jobHashEntry { + virJobID id; + virJobObjPtr job; + struct _jobHashEntry *next; +} jobHashEntry; + +typedef struct _jobHash { + jobHashEntry **table; + size_t size; + size_t num_entries; + virRWLock lock; +} jobHash; +
If there's going to be a lot of code not related to the job hashtable, it might be worth separating that into separate file (clearly stating that it's simplified and thus not a hash table per se).
+/* Using this incures a cost on every call to a job control function + that uses the job control hash table, but it means that no one using + job control needs to call an initialization function to use it. + + The other option would be to have a function: + virJobControlInit(void) + { + return virOnce(job_once, jobControlInit); + } + and require anyone using job control to call it. + */
Why not running the initialization once per binary that uses it? I don't expect it will be needed anywhere else than the daemon, or am I wrong? Maybe we can mak it part of virInitialize() or similar. You're running the virOnce() anyway in lookup() and hashadd().
+static struct _jobHash *job_hash; /* Hash table that contians all current jobs */ +static int init_err = 0; +static virOnceControl job_once = VIR_ONCE_CONTROL_INITIALIZER; +static void +jobControlInit(void) +{ + if (VIR_ALLOC_QUIET(job_hash) < 0) { + init_err = 1; + } + if (virRWLockInit(&job_hash->lock) < 0) { + init_err = 1; + } + if (VIR_ALLOC_N_QUIET(job_hash->table, 16) <0) { + init_err = 1; + }
You're not checking the value of init_err anywhere.
+} + +/* global job id + + Because it gets incremented before it's first use 0 is + never a valid job id + + Only ever touched with atomic instructions. +*/ +static unsigned int global_id = 0; +
How's this id going to change when the daemon is restarted? It now just occurred to me that we haven't even discussed how the jobs will be handled on daemon quit/restart.
+ + +/* Simple hash table implementation for jobs. + It is keyed by job id's which are just integers so there isn't acutally + a hash function. Using the existing hash table in libvirt would be overkill. +*/ +
How about just keeping a sparse window in which there would be jobs ordered by their internal ID? You would avoid those linked lists and have a O(log(n)) lookup, which is not that bad with few jobs. And we have _APPEND _INSERT and other macros to handle array elements. If you really want O(1), I think using the hash table we already have is not such overkill as redefining another hash table in different part of the code. And the functions you need to put in virHashCreateFull() are trivial.
+#ifndef VIR_JOB_REHASH_THRESHOLD +#define VIR_JOB_REHASH_THRESHOLD 0.8 +#endif +
We require indenting nested defines, one space per level, in this case like this: #ifndef VIR_JOB_REHASH_THRESHOLD # define VIR_JOB_REHASH_THRESHOLD 0.8 #endif If you have cppi installed, make syntax-check would have told you that, that applies to both whole files.
+#ifndef VIR_JOB_BUCKET_LEN +#define VIR_JOB_BUCKET_LEN 5 +#endif + +#ifndef VIR_JOB_GROWTH_FACTOR +#define VIR_JOB_GROWTH_FACTOR 2 +#endif + +#define READ_LOCK_TABLE(ht) virRWLockRead((virRWLockPtr)&ht->lock) +#define WRITE_LOCK_TABLE(ht) virRWLockWrite((virRWLockPtr)&ht->lock) +#define UNLOCK_TABLE(ht) virRWLockUnlock((virRWLockPtr)&ht->lock) +
The same as for job locking applies here. BTW, why are you casting the pointers? Also having it in a function checks for pointer- and type-correctness. [...]
+/* Remove the job with the given id from the list of currently existing jobs, + this doesn't free/cleanup the actual job object, it just removes + the hash entry, this is called by virJobObjFree, so there shouldn't + be any reason to call it directly. + + return values are the same as for jobAlist add, 0 if no job found, + job id on success, -1 on some other error; +*/
There is no way this function returns -1, but that's a nitpicking (from my side) with something that might change. However, having one error value is fine unless you really need to fine-tune the behaviour in the caller. That brings me to another thing I wanted to mention. You're not reporting errors anywhere in these files. Do you expect all the callers sharing similar (or most probably the same) error reporting? If you do it here, it will be much nicer and smaller code in those callers.
+static int +jobHashRemove(jobHash *ht, virJobID id) +{ + int bucket = id % ht->size; + /*we can't just call jobLookup because we need the entry before the one + we want to free*/ + WRITE_LOCK_TABLE(ht); + jobHashEntry *entry = ht->table[bucket], *old_entry; + if (!entry) { + goto error; + } + if (entry->id != id) { + while (entry && entry->next->id != id) { + entry = entry->next; + } + } + if (!entry) { + goto error; + } + old_entry = entry->next; + entry->next = old_entry->next; + VIR_FREE(old_entry);
What if the job is locked and being worked with? It should be either locked or you should use reference tracking (counting) which we have in virObjectLockable, that you can make a parent of the job (see how it's used in the code). Also there should be strict rules for the order of locking these elements, so most of the deadlocks are avoided (see for example src/qemu/THREADS.txt where domain and driver locking is explained). This is one of the reasons why you should probably start with the libvirt hash table, concentrate on the job control and leave possible further optimizations for later. This way we'll be dealing with tuning the hash table implementation that is not really part of the project. [...]
diff --git a/src/util/virjobcontrol.h b/src/util/virjobcontrol.h new file mode 100644 index 0000000..235cc06 --- /dev/null +++ b/src/util/virjobcontrol.h @@ -0,0 +1,342 @@ +/* + * virjobcontrol.h Core implementation of job control + * + * Copyright (C) 2014 Tucker DiNapoli + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Tucker DiNapoli + */ + + +#ifndef __JOB_CONTROL_H__ +#define __JOB_CONTROL_H__ + +#include <stdlib.h> + +#include "viratomic.h" +#include "virthread.h" +#include "virutil.h" + + +/* The general type of the job, specifically if it will modify + the object it is acting on.*/ +typedef enum { + VIR_JOB_NONE = 0, /*no job running*/ + VIR_JOB_QUERY = 0x1, /* job will not change object state (read-only)*/ + VIR_JOB_MODIFY = 0x2, /* job may change object state (read-write)*/ + VIR_JOB_DESTROY = 0x4, /* job will destroy the object it is acting on */
why is this 0x4? Did you mean to use it as a flag? I don't think there is a job type that could be more than one of these. Not specifying the values will make them consecutive, and when there are no holes, VIR_ENUM_IMPL will work as expected (that's probably why you had "4" in there, right?).
+ VIR_JOB_LAST +} virJobType; +VIR_ENUM_DECL(virJob); + + +/* General metadata/flags for jobs + + more specific flags can be added for speciic drivers, + to do this the first value of the enum for the extra flags + should be set to (VIR_JOB_FLAG_LAST << 1) and each additional + flag should be set to the last flag shifted to the left by 1. + +*/ +typedef enum { + VIR_JOB_FLAG_NONE = 0x000, /* No job is active */ + VIR_JOB_FLAG_ACTIVE = 0x001, /* Job is active */ + /* These next flags are used to indicate how progress should be measured + and are currently unused + */ + VIR_JOB_FLAG_TIME_BOUND = 0x002, /* Job is bound by a specific ammount of time */ + VIR_JOB_FLAG_MEM_BOUND = 0x004, /* Job is bound by a specific ammount of memory */ + VIR_JOB_FLAG_FILE_BOUND = 0x008, /* Job is bound by a specific number of files */ + VIR_JOB_FLAG_DATA_BOUND = 0x010, /* Job is bound by a specific ammount of data */ + VIR_JOB_FLAG_UNBOUNDED = 0x020, /* Job has no specific bound */ + /* These flags indicate a job is no longer active but still exists + */ + VIR_JOB_FLAG_SUSPENDED = 0x040, /* Job is Suspended and can be resumed */ + VIR_JOB_FLAG_COMPLETED = 0x080, /* Job has finished, but isn't cleaned up */ + VIR_JOB_FLAG_FAILED = 0x100, /* Job hit error, but isn't cleaned up */ + VIR_JOB_FLAG_ABORTED = 0x200, /* Job was aborted, but isn't cleaned up */ + VIR_JOB_FLAG_LAST = 0x400 +} virJobFlag; +
Some of these flags are not really flags and might be separated into 'status' for example.
+ +typedef int virJobID; /*signed so negitive values can be returned on error*/ +typedef struct _virJobObj virJobObj; +typedef virJobObj *virJobObjPtr; +typedef struct _virJobInfo virJobInfo; +typedef virJobInfo *virJobInfoPtr; + +struct _virJobObj { + /* this should only be locked when changing the job object, not while running a job + it's use is mostly optional, but it is needed for waiting on a job*/ + virMutex lock;/*should have a compile time option to enable/disable locking */
I guess the comment here is here because most of the changes are atomic, am I right?
+ virCond cond; /* Use to coordinate jobs (is this necessary?) */
If it's not necessary now, leave it out and add it when necessary, that way we won't leave in case it won't be needed.
+ virJobType type; /* The type of the job */ + unsigned long long owner; /* Thread id which set current job */ + unsigned long long start; /* when the job started*/ + int jobsWaiting; /* number jobs waiting on cond */ + int maxJobsWaiting; /* max number of jobs that can wait on cond */ + /* information about the job, this is a private struct, access to information + about the job should be obtained by functions */
You can do that by defining the struct in the .c file only, that way any other file will have to use accessors and it will be checked by the compiler automagically.
+ virJobInfoPtr info; + virJobID id; /* the job id, constant, and unique */ + virJobFlag flags; /* The current state of the job */ + void *privateData; +}; + +/* Getting information on a currently running job is currently unimplmeneted and + any suggestions on what should go into the api for querying/setting job + progress information would be appreciated +*/
This fits better the commit message or underline note in the commit message.
+typedef enum { + VIR_JOB_INFO_NONE = 0, + VIR_JOB_INFO_TIME, + VIR_JOB_INFO_MEM, + VIR_JOB_INFO_FILE, + VIR_JOB_INFO_DATA, + VIR_JOB_INFO_LAST +} virJobInfoType; + +struct _virJobInfo { + /* these fields are for compatability with existing job + information structs. + */ + virDomainBlockJobInfoPtr blockJobInfoPtr; + virDomainJobInfoPtr domainJobInfoPtr; +
Shouldn't these be in the privateData then? Or will these be needed for every job?
+ unsigned long long progressTotal; + unsigned long long dataTotal; + unsigned long long memTotal; + unsigned long long filesTotal; + unsigned long long timeTotal; + + unsigned long long progressProcessed; + unsigned long long timeProcessed; + unsigned long long memProcessed; + unsigned long long filesProcessed; + unsigned long long dataProcessed; + + unsigned long long progressRemaining; + unsigned long long dataRemaining; + unsigned long long memRemaining; + unsigned long long filesRemaining; + unsigned long long timeRemaining;
I'm hesitant to have x{Total,Processed,Remaining} for each x and adding new ones even when only some driver will need them, however, this is internal and can be done lately. It's also pre-existing, so redesigning it might wait (a lot).
+}; + +/** + * virJobObjInit: + * @job: Pointer to the job object to initialize + * + * Initialize the job object given, should be called before any other + * job function. + * + * Like all job functions it returns the job id on success and -1 on error + */ +int virJobObjInit(virJobObjPtr job); +/** + * virJobObjFree: + * @job: Pointer to the job object to free + * + * Cleanup/free all resources/memory assoicated with job + */ +void virJobObjFree(virJobObjPtr job); +/** + * virJobObjCleanup: + * @job: Pointer to the job object to cleanup + * + * Cleanup all resources assoicated with job, and zero out the + * corrsponding memory, but do not free it.
s/corrsponding/corresponding/ However, the rest is documentation and I only skimmed it. Few pointers though, spaces between the functions will make it more readable, if you start the sentence about return values with "Return(s)", our documentation parser will understand it and use it as such, but that applies to public API only. Martin
participants (2)
-
Martin Kletzander
-
Tucker DiNapoli