Signed-off-by: Prerna Saxena <saxenap.ltc(a)gmail.com>
---
src/conf/domain_conf.h | 3 +
src/qemu/qemu_conf.h | 4 +
src/qemu/qemu_driver.c | 9 ++
src/qemu/qemu_event.c | 229 ++++++++++++++++++++++++++++++++++++++++++++++++
src/qemu/qemu_event.h | 1 -
src/qemu/qemu_process.c | 2 +
6 files changed, 247 insertions(+), 1 deletion(-)
diff --git a/src/conf/domain_conf.h b/src/conf/domain_conf.h
index a42efcf..7fe38e7 100644
--- a/src/conf/domain_conf.h
+++ b/src/conf/domain_conf.h
@@ -2496,6 +2496,9 @@ struct _virDomainObj {
unsigned long long original_memlock; /* Original RLIMIT_MEMLOCK, zero if no
* restore will be required later */
+
+ /* Pointer to per-VM Event Queue */
+ void *vmq;
};
typedef bool (*virDomainObjListACLFilter)(virConnectPtr conn,
diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h
index 13b6f81..e63dc98 100644
--- a/src/qemu/qemu_conf.h
+++ b/src/qemu/qemu_conf.h
@@ -33,6 +33,7 @@
# include "domain_conf.h"
# include "snapshot_conf.h"
# include "domain_event.h"
+# include "qemu_event.h"
# include "virthread.h"
# include "security/security_manager.h"
# include "virpci.h"
@@ -235,6 +236,9 @@ struct _virQEMUDriver {
/* Immutable pointer, self-locking APIs */
virDomainObjListPtr domains;
+ /* Immutable pointer, contains Qemu Driver Event List */
+ virQemuEventList *ev_list;
+
/* Immutable pointer */
char *qemuImgBinary;
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index 7c6f167..8a005d0 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -52,6 +52,7 @@
#include "qemu_command.h"
#include "qemu_parse_command.h"
#include "qemu_cgroup.h"
+#include "qemu_event.h"
#include "qemu_hostdev.h"
#include "qemu_hotplug.h"
#include "qemu_monitor.h"
@@ -650,6 +651,14 @@ qemuStateInitialize(bool privileged,
if (!(qemu_driver->domains = virDomainObjListNew()))
goto error;
+ /* Init domain Async QMP events */
+ qemu_driver->ev_list = virQemuEventListInit();
+ if (!qemu_driver->ev_list) {
+ virReportSystemError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Unable to initialize QMP event queues"));
+ goto error;
+ }
+
/* Init domain events */
qemu_driver->domainEventState = virObjectEventStateNew();
if (!qemu_driver->domainEventState)
diff --git a/src/qemu/qemu_event.c b/src/qemu/qemu_event.c
index e27ea0d..d52fad2 100644
--- a/src/qemu/qemu_event.c
+++ b/src/qemu/qemu_event.c
@@ -73,3 +73,232 @@ virQemuEventList* virQemuEventListInit(void)
return ev_list;
}
+
+int virQemuVmEventListInit(virDomainObjPtr vm)
+{
+ virQemuVmEventQueue* vmq;
+ if (!vm)
+ return -1;
+
+ if (VIR_ALLOC(vmq) < 0)
+ return -1;
+
+ vmq->last = NULL;
+ vmq->head = NULL;
+
+ if (!virMutexInit(&vmq->lock)) {
+ vm->vmq = vmq;
+ return 0;
+ }
+ return -1;
+}
+/**
+ * virEnqueueVMEvent()
+ * Adds a new event to:
+ * - Global event queue
+ * - the event queue for this VM
+ *
+ * Return : 0 (success)
+ * -1 (failure)
+ */
+int virEnqueueVMEvent(virQemuEventList *qlist, qemuEventPtr ev)
+{
+ struct _qemuGlobalEventListElement *globalEntry;
+ virQemuVmEventQueue *vmq;
+ struct _qemuVmEventQueueElement *vmq_entry;
+
+ if (!qlist || !ev || !ev->vm || !ev->vm->vmq) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ "No queue list instantiated."
+ "Dropping event %d for Vm %s",
+ ev->ev_type, ev->vm->def->name);
+ goto error;
+ }
+
+ if (VIR_ALLOC(globalEntry) < 0) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ "Allocation error."
+ "Dropping event %d for Vm %s",
+ ev->ev_type, ev->vm->def->name);
+ goto error;
+ }
+
+ if (VIR_ALLOC(vmq_entry) < 0) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ "Allocation error."
+ "Dropping event %d for Vm %s",
+ ev->ev_type, ev->vm->def->name);
+ free(globalEntry);
+ goto error;
+ }
+
+ vmq_entry->ev = ev;
+ vmq_entry->next = NULL;
+
+ virObjectRef(ev->vm);
+ globalEntry->vm = ev->vm;
+ globalEntry->next = NULL;
+ globalEntry->prev = NULL;
+ /* Note that this order needs to be maintained
+ * for dequeue too else ABBA deadlocks will happen */
+
+ /* Insert into per-Vm queue */
+ vmq = ev->vm->vmq;
+
+ virMutexLock(&(vmq->lock));
+ if (vmq->last) {
+ vmq->last->next = vmq_entry;
+ vmq_entry->ev->ev_id = vmq->last->ev->ev_id + 1;
+ } else {
+ vmq->head = vmq_entry;
+ vmq_entry->ev->ev_id = 1;
+ }
+ vmq->last = vmq_entry;
+ globalEntry->ev_id = vmq_entry->ev->ev_id;
+ /* Insert the event into the global queue */
+ virMutexLock(&(qlist->lock));
+ if (qlist->last) {
+ qlist->last->next = globalEntry;
+ globalEntry->prev = qlist->last;
+ } else {
+ qlist->head = globalEntry;
+ }
+
+ qlist->last = globalEntry;
+ virMutexUnlock(&(qlist->lock));
+ virMutexUnlock(&(vmq->lock));
+
+ return 0;
+
+error:
+ return -1;
+}
+
+/**
+ * virDequeueVMEvent: Dequeues the first event of this VM from :
+ * - the global event table;
+ * - the per-VM event table;
+ *
+ * Needs to be called with VM lock held. Else the event is deleted forever and
+ * cannot be picked up by any other worker thread.
+ */
+qemuEventPtr virDequeueVMEvent(virQemuEventList *qlist, virDomainObjPtr vm)
+{
+ qemuEventPtr ret_ev;
+ struct _qemuVmEventQueue *cur_vmq;
+ struct _qemuVmEventQueueElement *vmq_entry;
+ struct _qemuGlobalEventListElement *iter;
+ const char *ref_uuid;
+
+ if (!qlist || !vm || !vm->vmq) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ "No queue list /VM/ event for this vm %s",
+ vm?vm->def->name:NULL);
+ goto error;
+ }
+
+ cur_vmq = vm->vmq;
+
+ /* Acquire a ref to first event from per-Vm event queue
+ */
+ virMutexLock(&(cur_vmq->lock));
+ vmq_entry = cur_vmq->head;
+
+ if (cur_vmq->head == NULL) {
+ virMutexUnlock(&(cur_vmq->lock));
+ goto error;
+ }
+ ref_uuid = (const char *)vmq_entry->ev->vm->def->uuid;
+
+ /* Purge the event from global queue, and then from local queue.
+ * So that ev_ids are always consistent.
+ */
+ virMutexLock(&(qlist->lock));
+ iter = qlist->head;
+ while (iter) {
+ if (iter->vm != NULL &&
+ STREQ((const char *)iter->vm->def->uuid, ref_uuid) &&
+ iter->ev_id == vmq_entry->ev->ev_id) {
+ // Found the element, delete it.
+ if (iter->prev != NULL)
+ iter->prev->next = iter->next;
+ else
+ /* This was the first element */
+ qlist->head = iter->next;
+ if (iter->next != NULL)
+ iter->next->prev = iter->prev;
+ else
+ /* This was the last element */
+ qlist->last = iter->prev;
+ break;
+ } else {
+ iter = iter->next;
+ }
+ }
+
+ // Now remove this from per-Vm queue:
+ cur_vmq->head = vmq_entry->next;
+ virMutexUnlock(&(qlist->lock));
+
+ virMutexUnlock(&(cur_vmq->lock));
+
+ ret_ev = vmq_entry->ev;
+ free(vmq_entry);
+ if (iter)
+ free(iter);
+
+ return ret_ev;
+error:
+ return NULL;
+}
+
+void
+virEventWorkerScanQueue(void *dummy ATTRIBUTE_UNUSED, void *opaque)
+{
+ virQEMUDriverPtr driver = opaque;
+ struct _qemuGlobalEventListElement *globalEntry = driver->ev_list->head;
+ virDomainObjPtr vm = NULL;
+
+ if (!globalEntry)
+ return;
+
+ VIR_WARN("Running event driver");
+
+ while (globalEntry) {
+ vm = globalEntry->vm;
+ if (vm != NULL) {
+ if (!virObjectTrylock(vm)) {
+ break;
+ }
+ }
+ // Todo:Clear events for irrelevant VMs
+ globalEntry = globalEntry->next;
+ }
+
+ // Scanned the entire list, but no worthy event found. Exit now.
+ if (!globalEntry)
+ return;
+
+ virDomainConsumeVMEvents(vm, opaque);
+
+ virObjectUnlock(vm);
+
+ return;
+}
+
+/* Called under the VM lock */
+void virDomainConsumeVMEvents(virDomainObjPtr vm, void *opaque)
+{
+ virQEMUDriverPtr driver = opaque;
+ qemuEventPtr evt = virDequeueVMEvent(driver->ev_list, vm);
+
+ while (evt) {
+ VIR_WARN("Processing event %d vm %s", evt->ev_type,
vm->def->name);
+ if (evt->handler)
+ (evt->handler)(evt, opaque);
+ free(evt);
+ virObjectUnref(vm);
+ evt = virDequeueVMEvent(driver->ev_list, vm);
+ }
+ return;
+}
diff --git a/src/qemu/qemu_event.h b/src/qemu/qemu_event.h
index 9781795..4173834 100644
--- a/src/qemu/qemu_event.h
+++ b/src/qemu/qemu_event.h
@@ -219,6 +219,5 @@ int virQemuVmEventListInit(virDomainObjPtr vm);
int virEnqueueVMEvent(virQemuEventList *qlist, qemuEventPtr ev);
qemuEventPtr virDequeueVMEvent(virQemuEventList *qlist, virDomainObjPtr vm);
void virEventWorkerScanQueue(void *dummy, void *opaque);
-void virEventRunHandler(qemuEventPtr ev, void *opaque);
void virDomainConsumeVMEvents(virDomainObjPtr vm, void *opaque);
#endif
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index 9f26dfc..8e6498e 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -6941,6 +6941,8 @@ qemuProcessReconnect(void *opaque)
goto error;
jobStarted = true;
+ if (virQemuVmEventListInit(obj) < 0)
+ goto error;
/* XXX If we ever gonna change pid file pattern, come up with
* some intelligence here to deal with old paths. */
if (!(priv->pidfile = virPidFileBuildPath(cfg->stateDir,
obj->def->name)))
--
2.9.5