this helper listens on a unix socket which is provided to QEMU
for multifd migration to/from files.
The file descriptors are passed as command line parameters.
Signed-off-by: Claudio Fontana <cfontana(a)suse.de>
---
src/libvirt_private.syms | 1 +
src/util/meson.build | 13 +++
src/util/multifd-helper.c | 205 ++++++++++++++++++++++++++++++++++++++
src/util/virthread.c | 5 +
src/util/virthread.h | 1 +
5 files changed, 225 insertions(+)
create mode 100644 src/util/multifd-helper.c
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index 8a3e5f7f7c..cd279b8713 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -3427,6 +3427,7 @@ virThreadCreateFull;
virThreadID;
virThreadIsSelf;
virThreadJoin;
+virThreadJoinRet;
virThreadMaxName;
virThreadSelf;
virThreadSelfID;
diff --git a/src/util/meson.build b/src/util/meson.build
index 58001a1699..8ea74ff9e8 100644
--- a/src/util/meson.build
+++ b/src/util/meson.build
@@ -179,6 +179,12 @@ io_helper_sources = [
'runio.h',
]
+multifd_helper_sources = [
+ 'multifd-helper.c',
+ 'runio.c',
+ 'runio.h',
+]
+
virt_util_lib = static_library(
'virt_util',
[
@@ -216,6 +222,13 @@ if conf.has('WITH_LIBVIRTD')
dtrace_gen_headers,
],
}
+ virt_helpers += {
+ 'name': 'libvirt_multifd_helper',
+ 'sources': [
+ files(multifd_helper_sources),
+ dtrace_gen_headers,
+ ],
+ }
endif
util_inc_dir = include_directories('.')
diff --git a/src/util/multifd-helper.c b/src/util/multifd-helper.c
new file mode 100644
index 0000000000..fab28bdb79
--- /dev/null
+++ b/src/util/multifd-helper.c
@@ -0,0 +1,205 @@
+/*
+ * multifd-helper.c: listens on Unix socket to perform I/O to multiple files
+ *
+ * Copyright (C) 2022 SUSE LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see
+ * <
http://www.gnu.org/licenses/>.
+ *
+ * This has been written to support QEMU multifd migration to file,
+ * allowing better use of cpu resources to speed up the save/restore.
+ */
+
+#include <config.h>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "virthread.h"
+#include "virfile.h"
+#include "virerror.h"
+#include "virstring.h"
+#include "virgettext.h"
+#include "runio.h"
+
+#define VIR_FROM_THIS VIR_FROM_STORAGE
+
+typedef struct _multiFdConnData multiFdConnData;
+struct _multiFdConnData {
+ int clientfd;
+ int filefd;
+ int oflags;
+ const char *path;
+ virThread tid;
+
+ off_t total;
+};
+
+typedef struct _multiFdThreadArgs multiFdThreadArgs;
+struct _multiFdThreadArgs {
+ int nchannels;
+ multiFdConnData *conn; /* contains main fd + nchannels */
+ const char *sun_path; /* unix socket name to use for the server */
+
+ off_t total;
+};
+
+static void clientThreadFunc(void *a)
+{
+ multiFdConnData *c = a;
+ c->total = runIO(c->path, c->filefd, c->oflags, c->clientfd,
c->clientfd);
+}
+
+static void serverThreadFunc(void *a)
+{
+ struct sockaddr_un serv_addr;
+ multiFdThreadArgs *args = a;
+ int i;
+ int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ const char buf[1] = {'R'};
+
+ args->total = -1;
+
+ if (sockfd < 0) {
+ virReportSystemError(errno, "%s", _("serverThread: socket()
failed"));
+ return;
+ }
+ /* initialize socket structure */
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sun_family = AF_UNIX;
+ strncpy(serv_addr.sun_path, args->sun_path, sizeof(serv_addr.sun_path) - 1);
+ unlink(args->sun_path);
+ if (bind(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
+ virReportSystemError(errno, "%s", _("serverThread: bind()
failed"));
+ goto cleanup;
+ }
+ if (listen(sockfd, args->nchannels + 1) < 0) {
+ virReportSystemError(errno, "%s", _("serverThread: listen()
failed"));
+ goto cleanup;
+ }
+
+ /* signal that the server is ready */
+ if (safewrite(1, &buf, 1) != 1) {
+ virReportSystemError(errno, "%s", _("serverThread: safewrite
failed"));
+ goto cleanup;
+ }
+
+ for (i = 0; i < args->nchannels + 1; i++) {
+ /* Wait for incoming connection. */
+ multiFdConnData *c = &args->conn[i];
+ if ((c->clientfd = accept(sockfd, NULL, NULL)) < 0) {
+ virReportSystemError(errno, "%s", _("serverThread: accept()
failed"));
+ goto cleanup;
+ }
+ if (virThreadCreate(&c->tid, true, &clientThreadFunc, c) < 0) {
+ virReportSystemError(errno, "%s", _("serverThread: client
thread creation failed"));
+ goto cleanup;
+ }
+ }
+
+ args->total = 0;
+ for (i = 0; i < args->nchannels + 1; i++) {
+ multiFdConnData *c = &args->conn[i];
+ if (virThreadJoinRet(&args->conn[i].tid) < 0) {
+ args->total = -1;
+ } else if (args->total >= 0) {
+ args->total += c->total;
+ }
+ if (VIR_CLOSE(c->clientfd) < 0) {
+ args->total = -1;
+ }
+ }
+
+ cleanup:
+ for (i = 0; i < args->nchannels + 1; i++) {
+ multiFdConnData *c = &args->conn[i];
+ VIR_FORCE_CLOSE(c->clientfd);
+ }
+ if (VIR_CLOSE(sockfd) < 0)
+ args->total = -1;
+}
+
+static const char *program_name;
+
+G_GNUC_NORETURN static void
+usage(int status)
+{
+ if (status) {
+ fprintf(stderr, _("%s: try --help for more details"), program_name);
+ } else {
+ fprintf(stderr, _("Usage: %s UNIX_SOCNAME N MAINFD FD0 FD1 ... FDn"),
program_name);
+ }
+ exit(status);
+}
+
+int
+main(int argc, char **argv)
+{
+ virThread tid;
+ multiFdThreadArgs args = { 0 };
+ int i;
+
+ program_name = argv[0];
+
+ if (virGettextInitialize() < 0 ||
+ virErrorInitialize() < 0) {
+ fprintf(stderr, _("%s: initialization failed"), program_name);
+ exit(EXIT_FAILURE);
+ }
+
+ if (argc > 1 && STREQ(argv[1], "--help"))
+ usage(EXIT_SUCCESS);
+ if (argc < 4)
+ usage(EXIT_FAILURE);
+
+ args.sun_path = argv[1];
+ if (virStrToLong_i(argv[2], NULL, 10, &args.nchannels) < 0)
+ fprintf(stderr, _("%s: malformed number of channels N %s"),
program_name, argv[2]);
+
+ if (argc < 4 + args.nchannels)
+ usage(EXIT_FAILURE);
+
+ args.conn = g_new0(multiFdConnData, args.nchannels + 1);
+
+ for (i = 3; i < 3 + args.nchannels + 1; i++) {
+ multiFdConnData *c = &args.conn[i - 3];
+
+ if (virStrToLong_i(argv[i], NULL, 10, &c->filefd) < 0) {
+ fprintf(stderr, _("%s: malformed FD %s"), program_name, argv[i]);
+ usage(EXIT_FAILURE);
+ }
+#ifndef F_GETFL
+#error "multifd-helper requires F_GETFL parameter of fcntl"
+#endif
+ c->oflags = fcntl(c->filefd, F_GETFL);
+ }
+
+ if (virThreadCreate(&tid, true, serverThreadFunc, &args) < 0) {
+ virReportSystemError(errno, _("%s: failed to create server thread"),
program_name);
+ exit(EXIT_FAILURE);
+ }
+
+ if (virThreadJoinRet(&tid) < 0)
+ exit(EXIT_FAILURE);
+
+ if (args.total < 0)
+ exit(EXIT_FAILURE);
+
+ exit(EXIT_SUCCESS);
+}
diff --git a/src/util/virthread.c b/src/util/virthread.c
index 5422bb74fd..0f6c6a68fa 100644
--- a/src/util/virthread.c
+++ b/src/util/virthread.c
@@ -348,6 +348,11 @@ void virThreadJoin(virThread *thread)
pthread_join(thread->thread, NULL);
}
+int virThreadJoinRet(virThread *thread)
+{
+ return pthread_join(thread->thread, NULL);
+}
+
void virThreadCancel(virThread *thread)
{
pthread_cancel(thread->thread);
diff --git a/src/util/virthread.h b/src/util/virthread.h
index 23abe0b6c9..5cecb9bd8a 100644
--- a/src/util/virthread.h
+++ b/src/util/virthread.h
@@ -89,6 +89,7 @@ int virThreadCreateFull(virThread *thread,
void virThreadSelf(virThread *thread);
bool virThreadIsSelf(virThread *thread);
void virThreadJoin(virThread *thread);
+int virThreadJoinRet(virThread *thread);
size_t virThreadMaxName(void);
--
2.34.1