Current libvirt qemu monitor can not handle many send command
requests and it does not need to as misc job conditions protect
him from this case. As there is an intention to replace nested
jobs concept by mere serialization at monitor level let's add
serialization to monitor as a preparation step.
Let's add job tracking code so that in case of request jams we can
get nice info on blockers just as domain job acquiring code does.
This patch introduces distinct mutex condition for serializing
queries. This way we can still wake up only senders on qemu
response and wake up waiters when sender finishes. Typically
we wake up only one waiter, in case of error conditions all
waiters will wakeup and fail.
---
src/qemu/qemu_monitor.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 66 insertions(+)
diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c
index 3f86887..f402300 100644
--- a/src/qemu/qemu_monitor.c
+++ b/src/qemu/qemu_monitor.c
@@ -43,6 +43,7 @@
#include "virprobe.h"
#include "virstring.h"
#include "virtime.h"
+#include "virthreadjob.h"
#ifdef WITH_DTRACE_PROBES
# include "libvirt_qemu_probes.h"
@@ -99,6 +100,11 @@ struct _qemuMonitor {
qemuMonitorReportDomainLogError logFunc;
void *logOpaque;
virFreeCallback logDestroy;
+
+ unsigned long long owner;
+ const char *ownerAPI;
+ unsigned long long started;
+ virCond queueCond;
};
/**
@@ -320,6 +326,7 @@ qemuMonitorDispose(void *obj)
virResetError(&mon->lastError);
virCondDestroy(&mon->notify);
+ virCondDestroy(&mon->queueCond);
VIR_FREE(mon->buffer);
virJSONValueFree(mon->options);
VIR_FREE(mon->balloonpath);
@@ -813,6 +820,11 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
_("cannot initialize monitor condition"));
goto cleanup;
}
+ if (virCondInit(&mon->queueCond) < 0) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("cannot initialize monitor queue condition"));
+ goto cleanup;
+ }
mon->fd = fd;
mon->hasSendFD = hasSendFD;
mon->vm = virObjectRef(vm);
@@ -986,12 +998,52 @@ qemuMonitorNextCommandID(qemuMonitorPtr mon)
return id;
}
+#define QEMU_COMMAND_WAIT_TIME (1000ull * 30)
int
qemuMonitorSend(qemuMonitorPtr mon,
qemuMonitorMessagePtr msg)
{
int ret = -1;
+ unsigned long long now;
+ unsigned long long then;
+
+ if (virTimeMillisNow(&now) < 0)
+ return -1;
+ then = now + QEMU_COMMAND_WAIT_TIME;
+
+ while (mon->msg && mon->lastError.code == VIR_ERR_OK) {
+ VIR_DEBUG("Waiting for monitor lock: mon=%p, msg=%s, fd=%d",
+ mon, msg->txBuffer, msg->txFD);
+
+ if (virCondWaitUntil(&mon->queueCond, &mon->parent.lock, then) <
0) {
+ if (errno == ETIMEDOUT) {
+ if (mon->ownerAPI) {
+ virReportError(VIR_ERR_OPERATION_TIMEOUT,
+ _("cannot acquire monitor lock (held by
%s)"),
+ mon->ownerAPI);
+ } else {
+ virReportError(VIR_ERR_OPERATION_TIMEOUT, "%s",
+ _("cannot acquire monitor lock"));
+ }
+ } else {
+ virReportSystemError(errno, "%s",
+ _("cannot acquire monitor lock"));
+ }
+
+ now = 0;
+ ignore_value(virTimeMillisNow(&now));
+ VIR_WARN("Cannot send message: mon=%p, msg=%s, fd=%d"
+ " blocker: msg=%s, fd=%d"
+ " (thread: %llu, API: %s," " duration: %llu)",
+ mon, msg->txBuffer, msg->txFD,
+ mon->msg->txBuffer, mon->msg->txFD,
+ mon->owner, NULLSTR(mon->ownerAPI),
+ now && mon->started ? (now - mon->started) / 1000 :
0);
+
+ return -1;
+ }
+ }
/* Check whether qemu quit unexpectedly */
if (mon->lastError.code != VIR_ERR_OK) {
@@ -1001,6 +1053,11 @@ qemuMonitorSend(qemuMonitorPtr mon,
return -1;
}
+ now = 0;
+ ignore_value(virTimeMillisNow(&now));
+ mon->ownerAPI = virThreadJobGet();
+ mon->owner = virThreadSelfID();
+ mon->started = now;
mon->msg = msg;
qemuMonitorUpdateWatch(mon);
@@ -1027,6 +1084,15 @@ qemuMonitorSend(qemuMonitorPtr mon,
cleanup:
mon->msg = NULL;
+ mon->ownerAPI = NULL;
+ mon->owner = 0;
+ mon->started = 0;
+
+ if (mon->lastError.code == VIR_ERR_OK)
+ virCondSignal(&mon->queueCond);
+ else
+ virCondBroadcast(&mon->queueCond);
+
qemuMonitorUpdateWatch(mon);
return ret;
--
1.8.3.1