summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/wsrep-API/v26/examples/node
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt26
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/README.md81
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/ctx.h34
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/log.c100
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/log.h69
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/main.c146
-rwxr-xr-xwsrep-lib/wsrep-API/v26/examples/node/node.sh40
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/options.c291
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/options.h48
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/socket.c304
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/socket.h72
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/sst.c372
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/sst.h39
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/stats.c215
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/stats.h35
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/store.c1044
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/store.h125
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/trx.c155
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/trx.h50
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/worker.c197
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/worker.h66
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/wsrep.c479
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/wsrep.h92
23 files changed, 4080 insertions, 0 deletions
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt b/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt
new file mode 100644
index 00000000..d018afde
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt
@@ -0,0 +1,26 @@
+# Copyright (c) 2019, Codership Oy. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+FILE(GLOB SRC
+ "*.h"
+ "*.c"
+ )
+
+ADD_EXECUTABLE(node ${SRC})
+
+TARGET_LINK_LIBRARIES(node wsrep dl pthread)
+
+CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/node.sh
+ ${CMAKE_CURRENT_BINARY_DIR}/node.sh COPYONLY)
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/README.md b/wsrep-lib/wsrep-API/v26/examples/node/README.md
new file mode 100644
index 00000000..4a07c149
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/README.md
@@ -0,0 +1,81 @@
+# wsrep API node application
+
+## Overview
+
+This is a simple application to demonstrate the usage of wsrep API. It
+deliberately does nothing useful in order to present as concentrated and
+concise API usage as possible.
+
+The program is deliberately written in C to demonstrate the naked API usage.
+For C++ example see a much more advanced integration library at
+https://github.com/codership/wsrep-lib
+
+## High level architecture
+
+Process-wise the program consists of an endless main loop that periodically
+samples and prints performance stats and a configurable number of "master" and
+"slave" threads, with master threads loop executing "transactions" and
+replicating resulting "write sets" and slave threads receiving and processing
+the write sets from other nodes.
+
+Object-wise the program is composed of two main objects: `store` and `wsrep`.
+'store' object contains application "state" and generates and commits changes
+to the state. `wsrep` object contains cluster context and provides interface
+to it. Changes generated by `store` are replicated and certified through
+`wsrep` and then committed to `store`.
+
+## Unit descriptions (in alphabetical order)
+
+#### ctx.h
+A small header to declare the application context structure.
+
+#### log.*
+Implements logging functionality for the application AND
+**a logging callback** for the wsrep provider.
+
+#### main.c
+Defines `main()` routine that initializes storage and wsrep provider, starts
+the worker threads and loops in a statistics collection loop. Even though it is
+not designed to return it still shows the deinitialization order.
+
+#### options.*
+Implements reading configuration options from the command line, does not have
+anything related to wsrep API, but shows which additional parameters must be
+configured for the program to make use of wsrep clustering.
+
+#### socket.*
+Network sockets boilerplate code for setting TCP connections between processes
+(for SST). Has nothing wsrep-related and can be ignored.
+
+#### sst.*
+Defines **SST callbacks** for the wsrep provider and shows how to asynchronously
+implement state snapshot transfer (yes, you don't want to spend eternity in
+callbacks).
+
+#### stats.*
+Implements performance stats collecting function for the main loop. While it is
+an absolutely optional provider functionality, still it shows how to use that.
+
+#### store.*
+Defines the `store` object that pretends to store and modify some data in a
+"transactional" manner. It provides the caller that intends to do a change with
+a *change data* and a *key* for replication and certification.
+
+#### trx.*
+Defines routines to process local and replicated transactions.
+
+#### worker.*
+Implements worker thread pool functinality. Worker threads run routines defined
+in 'trx.*'. Also implements **apply callback** for the wsrep provider.
+
+#### wsrep.*
+Maintains wsrep cluster context: provider instance and cluster membership view.
+While there is little use for the latter in this primitive application, still
+it shows **connected and view callbacks** usage. But mostly, for this
+application its purpose is to initialize the provider, connect to the cluster
+and offer access to initialized provider for other parts of the program.
+
+## Example usage
+```
+./node -f /tmp/galera/0 -v /tmp/galera/0/galera/lib/libgalera_smm.so -o 'pc.weight=2;evs.send_window=2;evs.user_send_window=1;gcache.recover=no' -s 8 -m 16
+```
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/ctx.h b/wsrep-lib/wsrep-API/v26/examples/node/ctx.h
new file mode 100644
index 00000000..01653554
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/ctx.h
@@ -0,0 +1,34 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines application context for wsrep provider
+ */
+
+#ifndef NODE_CTX_H
+#define NODE_CTX_H
+
+#include "store.h"
+#include "wsrep.h"
+
+struct node_ctx
+{
+ node_wsrep_t* wsrep;
+ node_store_t* store;
+ const struct node_options* opts;
+};
+
+#endif /* NODE_CTX_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/log.c b/wsrep-lib/wsrep-API/v26/examples/node/log.c
new file mode 100644
index 00000000..71f4705c
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/log.c
@@ -0,0 +1,100 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "log.h"
+
+#include <stdio.h> // fprintf(), fflush()
+#include <sys/time.h> // gettimeofday()
+#include <time.h> // localtime_r()
+#include <stdarg.h> // va_start(), va_end()
+
+wsrep_log_level_t node_log_max_level = WSREP_LOG_INFO;
+
+static const char* log_level_str[WSREP_LOG_DEBUG + 2] =
+{
+ "FATAL: ",
+ "ERROR: ",
+ " WARN: ",
+ " INFO: ",
+ "DEBUG: ",
+ "XXXXX: "
+};
+
+static inline void
+log_timestamp_and_log(const char* const prefix, // source of msg
+ int const severity,
+ const char* const msg)
+{
+ struct tm date;
+ struct timeval time;
+
+ gettimeofday(&time, NULL);
+ localtime_r (&time.tv_sec, &date);
+
+ FILE* log_file = stderr;
+ fprintf(log_file,
+ "%04d-%02d-%02d %02d:%02d:%02d.%03d " /* timestamp fmt */
+ "[%s] %s%s\n", /* [prefix] severity msg */
+ date.tm_year + 1900, date.tm_mon + 1, date.tm_mday,
+ date.tm_hour, date.tm_min, date.tm_sec,
+ (int)time.tv_usec / 1000,
+ prefix, log_level_str[severity], msg
+ );
+
+ fflush (log_file);
+}
+
+void
+node_log_cb(wsrep_log_level_t const severity, const char* const msg)
+{
+ /* REPLICATION: let provider log messages be prefixed with 'wsrep'*/
+ log_timestamp_and_log("wsrep", severity, msg);
+}
+
+void
+node_log(wsrep_log_level_t const severity,
+ const char* const file,
+ const char* const function,
+ int const line,
+ ...)
+{
+ va_list ap;
+
+ char string[2048];
+ int max_string = sizeof(string);
+ char* str = string;
+
+ /* provide file:func():line info only if debug logging is on */
+ if (NODE_DO_LOG_DEBUG) {
+ int const len = snprintf(str, (size_t)max_string, "%s:%s():%d: ",
+ file, function, line);
+ str += len;
+ max_string -= len;
+ }
+
+ va_start(ap, line);
+ {
+ const char* format = va_arg (ap, const char*);
+
+ if (max_string > 0 && NULL != format) {
+ vsnprintf (str, (size_t)max_string, format, ap);
+ }
+ }
+ va_end(ap);
+
+ /* actual logging */
+ log_timestamp_and_log(" node", severity, string);
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/log.h b/wsrep-lib/wsrep-API/v26/examples/node/log.h
new file mode 100644
index 00000000..09404f26
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/log.h
@@ -0,0 +1,69 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines logging macros for the application and
+ * a logger callback for the wsrep provider.
+ */
+
+#ifndef NODE_LOG_H
+#define NODE_LOG_H
+
+#include "../../wsrep_api.h"
+
+/**
+ * REPLICATION: a logger callback for wsrep provider
+ */
+extern void
+node_log_cb(wsrep_log_level_t severity, const char* message);
+
+/**
+ * Applicaton log function intended to be used through the macros defined below.
+ * For simplicity it uses log levels defined by wsrep API, but it does not have
+ * to. */
+extern void
+node_log (wsrep_log_level_t level,
+ const char* file,
+ const char* function,
+ const int line,
+ ...);
+
+/**
+ * This variable made global to avoid calling node_log() when debug logging
+ * is disabled. */
+extern wsrep_log_level_t node_log_max_level;
+#define NODE_DO_LOG_DEBUG (WSREP_LOG_DEBUG <= node_log_max_level)
+
+/**
+ * Base logging macro that records current file, function and line number */
+#define NODE_LOG(level, ...)\
+ node_log(level, __FILE__, __func__, __LINE__, __VA_ARGS__, NULL)
+
+/**
+ * @name Logging macros.
+ * Must be implemented as macros to report the location of the code where
+ * they are called.
+ */
+/*@{*/
+#define NODE_FATAL(...) NODE_LOG(WSREP_LOG_FATAL, __VA_ARGS__, NULL)
+#define NODE_ERROR(...) NODE_LOG(WSREP_LOG_ERROR, __VA_ARGS__, NULL)
+#define NODE_WARN(...) NODE_LOG(WSREP_LOG_WARN, __VA_ARGS__, NULL)
+#define NODE_INFO(...) NODE_LOG(WSREP_LOG_INFO, __VA_ARGS__, NULL)
+#define NODE_DEBUG(...) if (NODE_DO_LOG_DEBUG) \
+ { NODE_LOG(WSREP_LOG_DEBUG, __VA_ARGS__, NULL); }
+/*@}*/
+
+#endif /* NODE_LOG_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/main.c b/wsrep-lib/wsrep-API/v26/examples/node/main.c
new file mode 100644
index 00000000..f3124042
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/main.c
@@ -0,0 +1,146 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "ctx.h"
+#include "log.h"
+#include "options.h"
+#include "stats.h"
+#include "worker.h"
+#include "wsrep.h"
+
+#include <errno.h>
+#include <signal.h> // sigaction()
+#include <string.h> // strerror()
+
+static void
+signal_handler(int const signum)
+{
+ NODE_INFO("Got signal %d. Terminating.", signum);
+}
+
+static void
+install_signal_handler(void)
+{
+ sigset_t sa_mask;
+ sigemptyset(&sa_mask);
+
+ struct sigaction const act =
+ {
+ .sa_handler = signal_handler,
+ .sa_mask = sa_mask,
+ .sa_flags = (int)SA_RESETHAND
+ };
+
+ if (sigaction(SIGINT /* Ctrl-C */, &act, NULL))
+ {
+ NODE_INFO("sigaction() failed: %d (%s)", errno, strerror(errno));
+ abort();
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ install_signal_handler();
+
+ struct node_options opts;
+ int err = node_options_read(argc, argv, &opts);
+ if (err)
+ {
+ NODE_FATAL("Failed to read command line opritons: %d (%s)",
+ err, strerror(err));
+ return err;
+ }
+
+ struct node_ctx node;
+ node.opts = &opts;
+
+ /* REPLICATION: before connecting to cluster we need to initialize our
+ * storage to know our current position (GTID) */
+ node.store = node_store_open(&opts);
+ if (!node.store)
+ {
+ NODE_FATAL("Failed to open node store");
+ return 1;
+ }
+
+ wsrep_gtid_t current_gtid;
+ node_store_gtid(node.store, &current_gtid);
+
+ /* REPLICATION: complete initialization of application context
+ * (including provider itself) */
+ node.wsrep = node_wsrep_init(&opts, &current_gtid, &node);
+ if (!node.wsrep)
+ {
+ NODE_FATAL("Failed to initialize wsrep provider");
+ return 1;
+ }
+
+ /* REPLICATION: now we can connect to the cluster and start receiving
+ * replication events */
+ if (node_wsrep_connect(node.wsrep, opts.address, opts.bootstrap) !=
+ WSREP_OK)
+ {
+ NODE_FATAL("Failed to connect to primary component");
+ return 1;
+ }
+
+ /* REPLICATION: and start processing replicaiton events */
+ struct node_worker_pool* slave_pool =
+ node_worker_start(&node, NODE_WORKER_SLAVE, (size_t)opts.slaves);
+ if (!slave_pool)
+ {
+ NODE_FATAL("Failed to create slave worker pool");
+ return 1;
+ }
+
+ /* REPLICATION: now that replicaton events are being processed we can
+ * wait to sync with the cluster */
+ if (!node_wsrep_wait_synced(node.wsrep))
+ {
+ NODE_ERROR("Failed to wait fir SYNCED event");
+ return 1;
+ }
+
+ NODE_INFO("Synced with cluster");
+
+ /* REPLICATION: now we can start replicate own events */
+ struct node_worker_pool* master_pool =
+ node_worker_start(&node, NODE_WORKER_MASTER, (size_t)opts.masters);
+ if (opts.masters > 0 && !master_pool)
+ {
+ NODE_FATAL("Failed to create master worker pool");
+ return 1;
+ }
+
+ node_stats_loop(&node, (int)opts.period);
+
+ /* REPLICATON: to shut down we go in the opposite order:
+ * first - disconnect from the cluster to signal master threads
+ * to exit loop,
+ * second - join master and slave threads,
+ * third - close provider once not in use */
+ node_wsrep_disconnect(node.wsrep);
+
+ node_worker_stop(master_pool);
+ node_worker_stop(slave_pool);
+
+ node_wsrep_close(node.wsrep);
+
+ /* and finally, when the storage can no longer be disturbed, close it */
+ node_store_close(node.store);
+
+ return 0;
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/node.sh b/wsrep-lib/wsrep-API/v26/examples/node/node.sh
new file mode 100755
index 00000000..40e0a498
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/node.sh
@@ -0,0 +1,40 @@
+#!/bin/sh -eu
+
+NODE_ID=$1
+
+NODE_NAME=${NODE_NAME:-$NODE_ID}
+
+NODE_DIR=${NODE_DIR:-/tmp/node/$NODE_NAME}
+rm -rf $NODE_DIR/*
+mkdir -p $NODE_DIR
+
+NODE_OPT=${NODE_OPT:-}
+
+NODE_HOST=${NODE_HOST:-localhost}
+NODE_PORT=${NODE_PORT:-$((10000 + $NODE_ID))}
+
+NODE_CLIENTS=${NODE_CLIENTS:-1}
+NODE_APPLIERS=${NODE_APPLIERS:-1}
+
+NODE_ADDR=${NODE_ADDR:-}
+
+NODE_BIN=${NODE_BIN:-$(dirname $0)/node}
+
+# convert possible relative path to absolute path
+NODE_PROVIDER=$(realpath $NODE_PROVIDER)
+
+set -x
+
+$NODE_BIN \
+-v "$NODE_PROVIDER" \
+-n "$NODE_NAME" \
+-f "$NODE_DIR" \
+-o "$NODE_OPT" \
+-t "$NODE_HOST" \
+-p $NODE_PORT \
+-s $NODE_APPLIERS \
+-m $NODE_CLIENTS \
+-d 10 \
+-a "$NODE_ADDR"
+
+set +x
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/options.c b/wsrep-lib/wsrep-API/v26/examples/node/options.c
new file mode 100644
index 00000000..0bd08ffb
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/options.c
@@ -0,0 +1,291 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "options.h"
+
+#include <ctype.h> // isspace()
+#include <errno.h>
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h> // strtol()
+#include <string.h> // strcmp()
+
+/*
+ * getopt_long() declarations begin
+ */
+
+#define OPTS_NA no_argument
+#define OPTS_RA required_argument
+#define OPTS_OA optional_argument
+
+typedef enum opt
+{
+ OPTS_NOOPT = 0,
+ OPTS_ADDRESS = 'a',
+ OPTS_BOOTSTRAP = 'b',
+ OPTS_DELAY = 'd',
+ OPTS_DATA_DIR = 'f',
+ OPTS_HELP = 'h',
+ OPTS_PERIOD = 'i',
+ OPTS_MASTERS = 'm',
+ OPTS_NAME = 'n',
+ OPTS_OPTIONS = 'o',
+ OPTS_BASE_PORT = 'p',
+ OPTS_RECORDS = 'r',
+ OPTS_SLAVES = 's',
+ OPTS_BASE_HOST = 't',
+ OPTS_PROVIDER = 'v',
+ OPTS_WS_SIZE = 'w',
+ OPTS_OPS = 'x'
+}
+ opt_t;
+
+static struct option s_opts[] =
+{
+ { "address", OPTS_RA, NULL, OPTS_ADDRESS },
+ { "bootstrap", OPTS_NA, NULL, OPTS_BOOTSTRAP },
+ { "delay", OPTS_RA, NULL, OPTS_DELAY },
+ { "storage", OPTS_RA, NULL, OPTS_DATA_DIR },
+ { "help", OPTS_NA, NULL, OPTS_HELP },
+ { "period", OPTS_RA, NULL, OPTS_PERIOD },
+ { "masters", OPTS_RA, NULL, OPTS_MASTERS },
+ { "name", OPTS_RA, NULL, OPTS_NAME },
+ { "options", OPTS_RA, NULL, OPTS_OPTIONS, },
+ { "base-port", OPTS_RA, NULL, OPTS_BASE_PORT },
+ { "records", OPTS_RA, NULL, OPTS_RECORDS },
+ { "slaves", OPTS_RA, NULL, OPTS_SLAVES },
+ { "base-host", OPTS_RA, NULL, OPTS_BASE_HOST },
+ { "provider", OPTS_RA, NULL, OPTS_PROVIDER },
+ { "size", OPTS_RA, NULL, OPTS_WS_SIZE },
+ { "ops", OPTS_RA, NULL, OPTS_OPS },
+ { NULL, 0, NULL, 0 }
+};
+
+static const char* opts_string = "a:d:f:hi:m:n:o:p:r:s:t:v:w:x:";
+
+/*
+ * getopt_long() declarations end
+ */
+
+static const struct node_options opts_defaults =
+{
+ .provider = "none",
+ .address = "",
+ .options = "",
+ .name = "unnamed",
+ .data_dir = ".",
+ .base_host = "localhost",
+ .masters = 0,
+ .slaves = 1,
+ .ws_size = 1024,
+ .records = 1024*1024,
+ .delay = 0,
+ .base_port = 4567,
+ .period = 10,
+ .operations= 1,
+ .bootstrap = true
+};
+
+static void
+opts_print_help(FILE* out, const char* prog_name)
+{
+ fprintf(
+ out,
+ "Usage: %s [OPTION...]\n"
+ "\n"
+ " -h, --help this thing.\n"
+ " -v, --provider=PATH a path to wsrep provider library file.\n"
+ " -a, --address=STRING list of node addresses in the group.\n"
+ " If not set the node assumes that it is the first\n"
+ " node in the group (default)\n"
+ " -o, --options=STRING a string of wsrep provider options.\n"
+ " -n, --name=STRING human-readable node name.\n"
+ " -f, --data-dir=PATH a directory to save working data in.\n"
+ " Should be private to the process.\n"
+ " -t, --base-host=ADDRESS address of this node at which other members can\n"
+ " connect to it\n"
+ " -p, --base-port=NUM base port which the node shall listen for\n"
+ " connections from other members. This port will be\n"
+ " used for replication, port+1 for IST and port+2\n"
+ " for SST. Default: 4567\n"
+ " -m, --masters=NUM number of concurrent master workers.\n"
+ " -s, --slaves=NUM number of concurrent slave workers.\n"
+ " (can't be less than 1)\n"
+ " -w, --size=NUM desirable size of the resulting writesets\n"
+ " (approximate lower boundary). Default: 1K\n"
+ " -r, --records=NUM number of records in the store. Default: 1M\n"
+ " -x, --ops=NUM number of operations per transaction. Default: 1\n"
+ " -d, --delay=NUM delay in milliseconds between \"commits\"\n"
+ " (per master thread).\n"
+ " -b, --bootstrap bootstrap the cluster with this node.\n"
+ " Default: 'Yes' if --address is not given, 'No'\n"
+ " otherwise.\n"
+ " -i, --period period in seconds between performance stats output\n"
+ "\n"
+ , prog_name);
+}
+
+static void
+opts_print_config(FILE* out, const struct node_options* opts)
+{
+ fprintf(
+ out,
+ "Continuing with the following configuration:\n"
+ "provider: %s\n"
+ "address: %s\n"
+ "options: %s\n"
+ "name: %s\n"
+ "data dir: %s\n"
+ "base addr: %s:%ld\n"
+ "masters: %ld\n"
+ "slaves: %ld\n"
+ "writeset size: %ld bytes\n"
+ "records: %ld\n"
+ "operations: %ld\n"
+ "commit delay: %ld ms\n"
+ "stats period: %ld s\n"
+ "bootstrap: %s\n"
+ ,
+ opts->provider, opts->address, opts->options, opts->name, opts->data_dir,
+ opts->base_host, opts->base_port,
+ opts->masters, opts->slaves, opts->ws_size, opts->records,
+ opts->operations,
+ opts->delay, opts->period, opts->bootstrap ? "Yes" : "No"
+ );
+}
+
+static int
+opts_check_conversion(int cond, const char* ptr, int idx)
+{
+ if (!cond || errno || (*ptr != '\0' && !isspace(*ptr)))
+ {
+ fprintf(stderr, "Bad value for %s option.\n", s_opts[idx].name);
+ return EINVAL;
+ }
+ return 0;
+}
+
+int
+node_options_read(int argc, char* argv[], struct node_options* opts)
+{
+ *opts = opts_defaults;
+
+ int opt = 0;
+ int opt_idx = 0;
+ char* endptr;
+ int ret = 0;
+
+ bool address_given = false;
+ bool bootstrap_given = false;
+
+ while ((opt = getopt_long(argc, argv, opts_string, s_opts, &opt_idx)) != -1)
+ {
+ switch (opt)
+ {
+ case OPTS_ADDRESS:
+ address_given = strcmp(opts->address, optarg);
+ opts->address = optarg;
+ break;
+ case OPTS_BOOTSTRAP:
+ bootstrap_given = true;
+ opts->bootstrap = true;
+ break;
+ case OPTS_DELAY:
+ opts->delay = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->delay >= 0, endptr, opt_idx)))
+ goto err;
+ break;
+ case OPTS_DATA_DIR:
+ opts->data_dir = optarg;
+ break;
+ case OPTS_HELP:
+ ret = 1;
+ goto help;
+ case OPTS_PERIOD:
+ opts->period = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->period > 0, endptr, opt_idx)))
+ goto err;
+ break;
+ case OPTS_MASTERS:
+ opts->masters = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->masters >= 0, endptr,
+ opt_idx)))
+ goto err;
+ break;
+ case OPTS_NAME:
+ opts->name = optarg;
+ break;
+ case OPTS_OPTIONS:
+ opts->options = optarg;
+ break;
+ case OPTS_BASE_PORT:
+ opts->base_port = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(
+ opts->base_port > 0 && opts->base_port < 65536,
+ endptr, opt_idx)))
+ goto err;
+ break;
+ case OPTS_RECORDS:
+ opts->records = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->records >= 0, endptr,
+ opt_idx)))
+ goto err;
+ break;
+ case OPTS_SLAVES:
+ opts->slaves = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->slaves > 0, endptr, opt_idx)))
+ goto err;
+ break;
+ case OPTS_BASE_HOST:
+ opts->base_host = optarg;
+ break;
+ case OPTS_PROVIDER:
+ opts->provider = optarg;
+ break;
+ case OPTS_WS_SIZE:
+ opts->ws_size = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->ws_size > 0, endptr,
+ opt_idx)))
+ goto err;
+ break;
+ case OPTS_OPS:
+ opts->operations = strtol(optarg, &endptr, 10);
+ if ((ret = opts_check_conversion(opts->operations >= 1, endptr,
+ opt_idx)))
+ goto err;
+ break;
+ default:
+ ret = EINVAL;
+ }
+ }
+
+help:
+ if (ret) {
+ opts_print_help(stderr, argv[0]);
+ }
+ else
+ {
+ if (!bootstrap_given)
+ {
+ opts->bootstrap = !address_given;
+ }
+ opts_print_config(stdout, opts);
+ opts->delay *= 1000; /* convert to microseconds for usleep() */
+ }
+
+err:
+ return ret;
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/options.h b/wsrep-lib/wsrep-API/v26/examples/node/options.h
new file mode 100644
index 00000000..62172281
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/options.h
@@ -0,0 +1,48 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines options interface
+ */
+
+#ifndef NODE_OPTIONS_H
+#define NODE_OPTIONS_H
+
+#include <stdbool.h>
+
+struct node_options
+{
+ const char* provider; // path to wsrep provider
+ const char* address; // wsrep cluster address string
+ const char* options; // wsrep option string
+ const char* name; // node name (for logging purposes)
+ const char* data_dir; // name of the storage file
+ const char* base_host;// host own address
+ long masters; // number of master threads
+ long slaves; // number of slave threads
+ long ws_size; // desired writeset size
+ long records; // total number of records
+ long delay; // delay between commits
+ long base_port;// base port to use
+ long period; // statistics output interval
+ long operations;// number of "statements" in a "transaction"
+ bool bootstrap;// bootstrap the cluster with this node
+};
+
+extern int
+node_options_read(int argc, char* argv[], struct node_options* opts);
+
+#endif /* NODE_OPTIONS_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/socket.c b/wsrep-lib/wsrep-API/v26/examples/node/socket.c
new file mode 100644
index 00000000..377abcaf
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/socket.c
@@ -0,0 +1,304 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "socket.h"
+
+#include "log.h"
+
+#include <assert.h>
+#include <ctype.h> // isspace()
+#include <errno.h>
+#include <limits.h> // USHRT_MAX
+#include <netdb.h> // struct addrinfo
+#include <stdio.h> // snprintf()
+#include <string.h> // strerror()
+#include <sys/socket.h> // bind(), connect(), accept(), send(), recv()
+
+struct node_socket
+{
+ int fd;
+};
+
+/**
+ * Initializes addrinfo from the separate host address and port arguments
+ *
+ * Requires calling freeaddrinfo() later
+ *
+ * @param[in] host - if NULL, will be initialized for listening
+ * @param[in] port
+ *
+ * @return struct addrinfo* or NULL in case of error
+ */
+static struct addrinfo*
+socket_get_addrinfo2(const char* const host,
+ uint16_t const port)
+{
+ struct addrinfo const hints =
+ {
+ .ai_flags = AI_PASSIVE | /** will be ignored if host is not NULL */
+ AI_NUMERICSERV, /** service is a numeric port */
+ .ai_family = AF_UNSPEC, /** either IPv4 or IPv6 */
+ .ai_socktype = SOCK_STREAM, /** STREAM or DGRAM */
+ .ai_protocol = 0,
+ .ai_addrlen = 0,
+ .ai_addr = NULL,
+ .ai_canonname = NULL,
+ .ai_next = NULL
+ };
+
+ char service[6];
+ snprintf(service, sizeof(service), "%hu", port);
+
+ struct addrinfo* info;
+ int err = getaddrinfo(host, service, &hints, &info);
+ if (err)
+ {
+ NODE_ERROR("Failed to resolve '%s': %d (%s)",
+ host, err, gai_strerror(err));
+ return NULL;
+ }
+
+ return info;
+}
+
+/**
+ * Initializes addrinfo from single address and port string
+ * The port is expected to be in numerical form and appended to the host address
+ * via colon.
+ *
+ * Requires calling freeaddrinfo() later
+ *
+ * @param[in] addr full address specification, including port
+ *
+ * @return struct addrinfo* or NULL in case of error
+ */
+static struct addrinfo*
+socket_get_addrinfo1(const char* const addr)
+{
+ int const addr_len = (int)strlen(addr);
+ char* const addr_buf = strdup(addr);
+ if (!addr_buf)
+ {
+ NODE_ERROR("strdup(%s) failed: %d (%s)", addr, errno, strerror(errno));
+ return NULL;
+ }
+
+ struct addrinfo* res = NULL;
+ long port;
+ char* endptr;
+
+ int i;
+ for (i = addr_len - 1; i >= 0; i--)
+ {
+ if (addr_buf[i] == ':') break;
+ }
+
+ if (addr_buf[i] != ':')
+ {
+ NODE_ERROR("Malformed address:port string: '%s'", addr);
+ goto end;
+ }
+
+ addr_buf[i] = '\0';
+ port = strtol(addr_buf + i + 1, &endptr, 10);
+
+ if (port <= 0 || port > USHRT_MAX || errno ||
+ (*endptr != '\0' && !isspace(*endptr)))
+ {
+ NODE_ERROR("Malformed/invalid port: '%s'. Errno: %d (%s)",
+ addr_buf + i + 1, errno, strerror(errno));
+ goto end;
+ }
+
+ res = socket_get_addrinfo2(strlen(addr_buf) > 0 ? addr_buf : NULL,
+ (uint16_t)port);
+end:
+ free(addr_buf);
+ return res;
+}
+
+static struct node_socket*
+socket_create(int const fd)
+{
+ assert(fd > 0);
+
+ struct node_socket* res = calloc(1, sizeof(struct node_socket));
+ if (res)
+ {
+ res->fd = fd;
+ }
+ else
+ {
+ NODE_ERROR("Failed to allocate struct node_socket: %d (%s)",
+ errno, strerror(errno));
+ close(fd);
+ }
+
+ return res;
+}
+
+/**
+ * Definition of function type with the signature of bind() and connect()
+ */
+typedef int (*socket_act_fun_t) (int sfd,
+ const struct sockaddr* addr,
+ socklen_t addrlen);
+
+static int
+socket_bind_and_listen(int const sfd,
+ const struct sockaddr* const addr,
+ socklen_t const addrlen)
+{
+ int ret = bind(sfd, addr, addrlen);
+
+ if (!ret)
+ ret = listen(sfd, SOMAXCONN);
+
+ return ret;
+}
+
+/**
+ * A "template" method to do the "right thing" with the addrinfo and create a
+ * socket from it. The "right thing" would normally be bind and listen for
+ * a server socket OR connect for a client socket.
+ *
+ * @param[in] info addrinfo list, swallowed and deallocated
+ * @param[in] action_fun the "right thing" to do on socket and struct sockaddr
+ * @param[in] action_str action description to be printed in the error message
+ * @param[in] orig_host host address to be pronted in the error message
+ * @param[in] orig_port port to be printed in the error message, if orig_host
+ * string contains the port, this parameter should be 0
+ *
+ * The last three parameters are for diagnostic puposes only. orig_host and
+ * orig_port are supposed to be what were used to obtain addrinfo.
+ *
+ * @return new struct node_socket.
+ */
+static struct node_socket*
+socket_from_addrinfo(struct addrinfo* const info,
+ socket_act_fun_t const action_fun,
+ const char* const action_str,
+ const char* const orig_host,
+ uint16_t const orig_port)
+{
+ int sfd;
+ int err = 0;
+
+ /* Iterate over addrinfo list and try to apply action_fun on the resulting
+ * socket. Once successful, break loop. */
+ struct addrinfo* addr;
+ for (addr = info; addr != NULL; addr = addr->ai_next)
+ {
+ sfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+ if (sfd == -1)
+ {
+ err = errno;
+ continue;
+ }
+
+ if (action_fun(sfd, addr->ai_addr, addr->ai_addrlen) == 0) break;
+
+ err = errno;
+ close(sfd);
+ }
+
+ freeaddrinfo(info); /* no longer needed */
+
+ if (!addr)
+ {
+ NODE_ERROR("Failed to %s to '%s%s%.0hu': %d (%s)",
+ action_str,
+ orig_host ? orig_host : "", orig_port > 0 ? ":" : "",
+ orig_port > 0 ? orig_port : 0, /* won't be printed if 0 */
+ err, strerror(err));
+ return NULL;
+ }
+
+ assert(sfd > 0);
+ return socket_create(sfd);
+}
+
+struct node_socket*
+node_socket_listen(const char* const host, uint16_t const port)
+{
+ struct addrinfo* const info = socket_get_addrinfo2(host, port);
+ if (!info) return NULL;
+
+ return socket_from_addrinfo(info, socket_bind_and_listen,
+ "bind a listening socket", host, port);
+}
+
+struct node_socket*
+node_socket_connect(const char* const addr_str)
+{
+ struct addrinfo* const info = socket_get_addrinfo1(addr_str);
+ if (!info) return NULL;
+
+ return socket_from_addrinfo(info, connect, "connect", addr_str, 0);
+}
+
+struct node_socket*
+node_socket_accept(struct node_socket* socket)
+{
+ int sfd = accept(socket->fd, NULL, NULL);
+
+ if (sfd < 0)
+ {
+ NODE_ERROR("Failed to accept connection: %d (%s)",
+ errno, strerror(errno));
+ return NULL;
+ }
+
+ return socket_create(sfd);
+}
+
+int
+node_socket_send_bytes(node_socket_t* socket, const void* buf, size_t len)
+{
+ ssize_t const ret = send(socket->fd, buf, len, MSG_NOSIGNAL);
+
+ if (ret != (ssize_t)len)
+ {
+ NODE_ERROR("Failed to send %zu bytes: %d (%s)", errno, strerror(errno));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+node_socket_recv_bytes(node_socket_t* socket, void* buf, size_t len)
+{
+ ssize_t const ret = recv(socket->fd, buf, len, MSG_WAITALL);
+
+ if (ret != (ssize_t)len)
+ {
+ NODE_ERROR("Failed to recv %zu bytes: %d (%s)", errno, strerror(errno));
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+node_socket_close(node_socket_t* socket)
+{
+ if (!socket) return;
+
+ if (socket->fd > 0) close(socket->fd);
+
+ free(socket);
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/socket.h b/wsrep-lib/wsrep-API/v26/examples/node/socket.h
new file mode 100644
index 00000000..3a77eff3
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/socket.h
@@ -0,0 +1,72 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit implements auxiliary networking functions (for SST purposes)
+ * It has nothing wsrep related and is not of general purpose.
+ */
+
+#ifndef NODE_SOCKET_H
+#define NODE_SOCKET_H
+
+#include <stddef.h> // size_t
+#include <stdint.h> // uint16_t
+
+typedef struct node_socket node_socket_t;
+
+/**
+ * Open listening socket at a given address
+ *
+ * @return listening socket
+ */
+extern node_socket_t*
+node_socket_listen(const char* host, uint16_t port);
+
+/**
+ * Connect to a given address.
+ *
+ * @return connected socket
+ */
+extern node_socket_t*
+node_socket_connect(const char* addr);
+
+/**
+ * Wait for connection on a listening socket
+ * @return connected socket
+ */
+extern node_socket_t*
+node_socket_accept(node_socket_t* s);
+
+/**
+ * Send a given number of bytes
+ * @return 0 or a negative error code
+ */
+extern int
+node_socket_send_bytes(node_socket_t* s, const void* buf, size_t len);
+
+/**
+ * Receive a given number of bytes
+ * @return 0 or a negative error code
+ */
+extern int
+node_socket_recv_bytes(node_socket_t* s, void* buf, size_t len);
+
+/**
+ * Release all recources associated with the socket */
+extern void
+node_socket_close(node_socket_t* s);
+
+#endif /* NODE_SOCKET_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/sst.c b/wsrep-lib/wsrep-API/v26/examples/node/sst.c
new file mode 100644
index 00000000..e93534ef
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/sst.c
@@ -0,0 +1,372 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "sst.h"
+
+#include "ctx.h"
+#include "log.h"
+#include "socket.h"
+
+#include <arpa/inet.h> // htonl()
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h> // snprintf()
+#include <stdlib.h> // abort()
+#include <string.h> // strdup()
+#include <unistd.h> // usleep()
+
+/**
+ * Helper: creates detached thread */
+static int
+sst_create_thread(void* (*thread_routine) (void*),
+ void* const thread_arg)
+{
+ pthread_t thr;
+ pthread_attr_t attr;
+ int ret = pthread_attr_init(&attr);
+ ret = ret ? ret : pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
+ ret = ret ? ret : pthread_create(&thr, &attr, thread_routine, thread_arg);
+ return ret;
+}
+
+/**
+ * Helper: creates detached thread and waits for it to call
+ * sst_sync_with_parent() */
+static void
+sst_create_and_sync(const char* const role,
+ pthread_mutex_t* const mtx,
+ pthread_cond_t* const cond,
+ void* (*thread_routine) (void*),
+ void* const thread_arg)
+{
+ int ret = pthread_mutex_lock(mtx);
+ if (ret)
+ {
+ NODE_FATAL("Failed to lock %s mutex: %d (%s)", role, ret, strerror(ret));
+ abort();
+ }
+
+ ret = sst_create_thread(thread_routine, thread_arg);
+ if (ret)
+ {
+ NODE_FATAL("Failed to create detached %s thread: %d (%s)",
+ role, ret, strerror(ret));
+ abort();
+ }
+
+ ret = pthread_cond_wait(cond, mtx);
+ if (ret)
+ {
+ NODE_FATAL("Failed to synchronize with %s thread: %d (%s)",
+ role, ret, strerror(ret));
+ abort();
+ }
+
+ pthread_mutex_unlock(mtx);
+}
+
+/**
+ * Helper: syncs with parent thread and allows it to continue and return
+ * asynchronously */
+static void
+sst_sync_with_parent(const char* role,
+ pthread_mutex_t* mtx,
+ pthread_cond_t* cond)
+{
+ int ret = pthread_mutex_lock(mtx);
+ if (ret)
+ {
+ NODE_FATAL("Failed to lock %s mutex: %d (%s)", role, ret, strerror(ret));
+ abort();
+ }
+
+ NODE_INFO("Initialized %s thread", role);
+
+ pthread_cond_signal(cond);
+ pthread_mutex_unlock(mtx);
+}
+
+static pthread_mutex_t sst_joiner_mtx = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t sst_joiner_cond = PTHREAD_COND_INITIALIZER;
+
+struct sst_joiner_ctx
+{
+ struct node_ctx* node;
+ node_socket_t* socket;
+};
+
+/**
+ * waits for SST completion and signals the provider to continue */
+static void*
+sst_joiner_thread(void* ctx)
+{
+ assert(ctx);
+
+ struct node_ctx* const node = ((struct sst_joiner_ctx*)ctx)->node;
+ node_socket_t* const listen = ((struct sst_joiner_ctx*)ctx)->socket;
+ ctx = NULL; /* may be unusable after next statement */
+
+ /* this allows parent callback to return */
+ sst_sync_with_parent("JOINER", &sst_joiner_mtx, &sst_joiner_cond);
+
+ wsrep_gtid_t state_gtid = WSREP_GTID_UNDEFINED;
+ int err = -1;
+
+ /* REPLICATION: wait for donor to connect and send the state snapshot */
+ node_socket_t* const connected = node_socket_accept(listen);
+ if (!connected) goto end;
+
+ uint32_t state_len;
+ err = node_socket_recv_bytes(connected, &state_len, sizeof(state_len));
+ if (err) goto end;
+
+ state_len = ntohl(state_len);
+ if (state_len > 0)
+ {
+ /* REPLICATION: get the state of state_len size */
+ void* state = malloc(state_len);
+ if (state)
+ {
+ err = node_socket_recv_bytes(connected, state, state_len);
+ if (err)
+ {
+ free(state);
+ goto end;
+ }
+
+ /* REPLICATION: install the newly received state. */
+ err = node_store_init_state(node->store, state, state_len);
+ free(state);
+ if (err) goto end;
+ }
+ else
+ {
+ NODE_ERROR("Failed to allocate %zu bytes for state snapshot.",
+ state_len);
+ err = -ENOMEM;
+ goto end;
+ }
+ }
+ else
+ {
+ /* REPLICATION: it was a bypass, the node will receive missing data via
+ * IST. It starts with the state it currently has. */
+ }
+
+ /* REPLICATION: find gtid of the received state to report to provider */
+ node_store_gtid(node->store, &state_gtid);
+
+end:
+ assert(err <= 0);
+ node_socket_close(connected);
+ node_socket_close(listen);
+
+ /* REPLICATION: tell provider that SST is received */
+ wsrep_status_t sst_ret;
+ wsrep_t* const wsrep = node_wsrep_provider(node->wsrep);
+ sst_ret = wsrep->sst_received(wsrep, &state_gtid, NULL, err);
+
+ if (WSREP_OK != sst_ret)
+ {
+ NODE_FATAL("Failed to report completion of SST: %d", sst_ret);
+ abort();
+ }
+
+ return NULL;
+}
+
+enum wsrep_cb_status
+node_sst_request_cb (void* const app_ctx,
+ void** const sst_req,
+ size_t* const sst_req_len)
+{
+ static int const SST_PORT_OFFSET = 2;
+
+ assert(app_ctx);
+ struct node_ctx* const node = app_ctx;
+ const struct node_options* const opts = node->opts;
+
+ char* sst_str = NULL;
+
+ /* REPLICATION: 1. prepare the node to receive SST */
+ uint16_t const sst_port = (uint16_t)(opts->base_port + SST_PORT_OFFSET);
+ size_t const sst_len = strlen(opts->base_host)
+ + 1 /* ':' */ + 5 /* max port len */ + 1 /* \0 */;
+ sst_str = malloc(sst_len);
+ if (!sst_str)
+ {
+ NODE_ERROR("Failed to allocate %zu bytes for SST request", sst_len);
+ goto end;
+ }
+
+ /* write in request the address at which we listen */
+ int ret = snprintf(sst_str, sst_len, "%s:%hu", opts->base_host, sst_port);
+ if (ret < 0 || (size_t)ret >= sst_len)
+ {
+ free(sst_str);
+ sst_str = NULL;
+ NODE_ERROR("Failed to write a SST request");
+ goto end;
+ }
+
+ node_socket_t* const socket = node_socket_listen(NULL, sst_port);
+ if (!socket)
+ {
+ free(sst_str);
+ sst_str = NULL;
+ NODE_ERROR("Failed to listen at %s", sst_str);
+ goto end;
+ }
+
+ /* REPLICATION 2. start the "joiner" thread that will wait for SST and
+ * report its success to provider, and syncronize with it. */
+ struct sst_joiner_ctx ctx =
+ {
+ .node = node,
+ .socket = socket
+ };
+ sst_create_and_sync("JOINER", &sst_joiner_mtx, &sst_joiner_cond,
+ sst_joiner_thread, &ctx);
+
+ NODE_INFO("Waiting for SST at %s", sst_str);
+
+end:
+ if (sst_str)
+ {
+ *sst_req = sst_str;
+ *sst_req_len = strlen(sst_str) + 1;
+ }
+ else
+ {
+ *sst_req = NULL;
+ *sst_req_len = 0;
+ return WSREP_CB_FAILURE;
+ }
+
+ /* REPLICATION 3. return SST request to provider */
+ return WSREP_CB_SUCCESS;
+}
+
+static pthread_mutex_t sst_donor_mtx = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t sst_donor_cond = PTHREAD_COND_INITIALIZER;
+
+struct sst_donor_ctx
+{
+ wsrep_gtid_t state;
+ struct node_ctx* node;
+ node_socket_t* socket;
+ wsrep_bool_t bypass;
+};
+
+/**
+ * donates SST and signals provider that it is done. */
+static void*
+sst_donor_thread(void* const args)
+{
+ struct sst_donor_ctx const ctx = *(struct sst_donor_ctx*)args;
+
+ int err = 0;
+ const void* state;
+ size_t state_len;
+
+ if (ctx.bypass)
+ {
+ /* REPLICATION: if bypass is true, there is no need to send snapshot,
+ * just signal the joiner that snapshot is not needed and
+ * it can proceed to apply IST. We'll do it by sending 0
+ * for the size of snapshot */
+ state = NULL;
+ state_len = 0;
+ }
+ else
+ {
+ /* REPLICATION: if bypass is false, we need to send a full state snapshot
+ * Get hold of the state, which is currently just GTID
+ * NOTICE that while parent is waiting, the store is in a
+ * quiescent state, provider blocking any modifications. */
+ err = node_store_acquire_state(ctx.node->store, &state, &state_len);
+ if (state_len > UINT32_MAX) err = -ERANGE;
+ }
+
+ /* REPLICATION: after getting hold of the state we can allow parent callback
+ * to return and the node to resume its normal operation */
+ sst_sync_with_parent("DONOR", &sst_donor_mtx, &sst_donor_cond);
+
+ if (err >= 0)
+ {
+ uint32_t tmp = htonl((uint32_t)state_len);
+ err = node_socket_send_bytes(ctx.socket, &tmp, sizeof(tmp));
+ }
+
+ if (state_len != 0)
+ {
+ if (err >= 0)
+ {
+ assert(state);
+ err = node_socket_send_bytes(ctx.socket, state, state_len);
+ }
+
+ node_store_release_state(ctx.node->store);
+ }
+
+ node_socket_close(ctx.socket);
+
+ /* REPLICATION: signal provider the success of the operation */
+ wsrep_t* const wsrep = node_wsrep_provider(ctx.node->wsrep);
+ wsrep->sst_sent(wsrep, &ctx.state, err);
+
+ return NULL;
+}
+
+enum wsrep_cb_status
+node_sst_donate_cb (void* const app_ctx,
+ void* const recv_ctx,
+ const wsrep_buf_t* const str_msg,
+ const wsrep_gtid_t* const state_id,
+ const wsrep_buf_t* const state,
+ wsrep_bool_t const bypass)
+{
+ (void)recv_ctx;
+ (void)state;
+
+ struct sst_donor_ctx ctx =
+ {
+ .node = app_ctx,
+ .state = *state_id,
+ .bypass = bypass
+ };
+
+ /* we are expecting a human-readable 0-terminated string */
+ void* p = memchr(str_msg->ptr, '\0', str_msg->len);
+ if (!p)
+ {
+ NODE_ERROR("Received a badly formed State Transfer Request.");
+ /* REPLICATION: in case of a failure we return the status to provider, so
+ * that the joining node can be notified of it by cluster */
+ return WSREP_CB_FAILURE;
+ }
+
+ const char* addr = str_msg->ptr;
+ ctx.socket = node_socket_connect(addr);
+
+ if (!ctx.socket) return WSREP_CB_FAILURE;
+
+ sst_create_and_sync("DONOR", &sst_donor_mtx, &sst_donor_cond,
+ sst_donor_thread, &ctx);
+
+ return WSREP_CB_SUCCESS;
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/sst.h b/wsrep-lib/wsrep-API/v26/examples/node/sst.h
new file mode 100644
index 00000000..7006a1b6
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/sst.h
@@ -0,0 +1,39 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines SST interface
+ */
+
+#ifndef NODE_SST_H
+#define NODE_SST_H
+
+#include "../../wsrep_api.h"
+
+extern enum wsrep_cb_status
+node_sst_request_cb (void* app_ctx,
+ void** sst_req,
+ size_t* sst_req_len);
+
+extern enum wsrep_cb_status
+node_sst_donate_cb (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_buf_t* str_msg,
+ const wsrep_gtid_t* state_id,
+ const wsrep_buf_t* state,
+ wsrep_bool_t bypass);
+
+#endif /* NODE_SST_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/stats.c b/wsrep-lib/wsrep-API/v26/examples/node/stats.c
new file mode 100644
index 00000000..4b02240f
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/stats.c
@@ -0,0 +1,215 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "stats.h"
+
+#include "log.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h> // snprintf()
+#include <stdlib.h> // abort()
+#include <string.h> // strcmp()
+#include <unistd.h> // usleep()
+
+enum
+{
+ STATS_REPL_BYTE,
+ STATS_REPL_WS,
+ STATS_RECV_BYTE,
+ STATS_RECV_WS,
+ STATS_TOTAL_BYTE,
+ STATS_TOTAL_WS,
+ STATS_CERT_FAILS,
+ STATS_STORE_FAILS,
+ STATS_FC_PAUSED,
+ STATS_MAX
+};
+
+static const char* const stats_legend[STATS_MAX] =
+{
+ " repl(B/s)",
+ " repl(W/s)",
+ " recv(B/s)",
+ " recv(W/s)",
+ "total(B/s)",
+ "total(W/s)",
+ " cert.fail",
+ " stor.fail",
+ " paused(%)"
+};
+
+/* stats IDs in provider output - provider dependent, here we use Galera's */
+static const char* const galera_ids[STATS_MAX] =
+{
+ "replicated_bytes", /**< STATS_REPL_BYTE */
+ "replicated", /**< STATS_REPL_WS */
+ "received_bytes", /**< STATS_RECV_BYTE */
+ "received", /**< STATS_RECV_WS */
+ "", /**< STATS_TOTAL_BYTE */
+ "", /**< STATS_TOTAL_WS */
+ "local_cert_failures", /**< STATS_CERT_FAILS */
+ "", /**< STATS_STORE_FAILS */
+ "flow_control_paused_ns" /**< STATS_FC_PAUSED */
+};
+
+/* maps local stats IDs to provider stat IDs */
+static int stats_galera_map[STATS_MAX];
+
+/**
+ * Helper to map provider stats to own stats set */
+static void
+stats_establish_mapping(wsrep_t* const wsrep)
+{
+ int const magic_map = -1;
+ size_t i;
+ for (i = 0; i < sizeof(stats_galera_map)/sizeof(stats_galera_map[0]); i++)
+ {
+ stats_galera_map[i] = magic_map; /* initialize map array */
+ }
+
+ struct wsrep_stats_var* const stats = wsrep->stats_get(wsrep);
+
+ /* to compensate for STATS_TOTAL_* and STATS_STORE_FAILS having no
+ * counterparts */
+ int mapped = 3;
+
+ i = 0;
+ while (stats[i].name) /* stats array is terminated by Null name */
+ {
+ int j;
+ for (j = 0; j < STATS_MAX; j++)
+ {
+ if (magic_map == stats_galera_map[j] /* j-th member still unset */
+ &&
+ !strcmp(stats[i].name, galera_ids[j]))
+ {
+ stats_galera_map[j] = (int)i;
+ mapped++;
+ if (STATS_MAX == mapped) /* all mapped */ goto out;
+ }
+ }
+
+ i++;
+ }
+
+out:
+ wsrep->stats_free(wsrep, stats);
+}
+
+static void
+stats_get(node_store_t* const store, wsrep_t* const wsrep, long long stats[])
+{
+ stats[STATS_STORE_FAILS] = node_store_read_view_failures(store);
+
+ struct wsrep_stats_var* const ret = wsrep->stats_get(wsrep);
+ if (!ret)
+ {
+ NODE_FATAL("wsrep::stats_get() call failed.");
+ abort();
+ }
+
+ int i;
+ for (i = 0; i < STATS_MAX; i++)
+ {
+ int j = stats_galera_map[i];
+ if (j >= 0)
+ {
+ assert(WSREP_VAR_INT64 == ret[j].type);
+ stats[i] = ret[j].value._int64;
+ }
+ }
+
+ wsrep->stats_free(wsrep, ret);
+
+ // totals are just sums
+ stats[STATS_TOTAL_BYTE] = stats[STATS_REPL_BYTE] + stats[STATS_RECV_BYTE];
+ stats[STATS_TOTAL_WS ] = stats[STATS_REPL_WS ] + stats[STATS_RECV_WS ];
+}
+
+static void
+stats_print(long long bef[], long long aft[], double period)
+{
+ double rate[STATS_MAX];
+ int i;
+ for (i = 0; i < STATS_MAX; i++)
+ {
+ rate[i] = (double)(aft[i] - bef[i])/period;
+ }
+ rate[STATS_FC_PAUSED] /= 1.0e+07; // nanoseconds to % of seconds
+
+ char str[256];
+ int written = 0;
+
+ /* first line write legend */
+ for (i = 0; i < STATS_MAX; i++)
+ {
+ size_t const space_left = sizeof(str) - (size_t)written;
+ written += snprintf(&str[written], space_left, "%s", stats_legend[i]);
+ }
+
+ str[written] = '\n';
+ written++;
+
+ /* second line write values */
+ for (i = 0; i < STATS_MAX; i++)
+ {
+ size_t const space_left = sizeof(str) - (size_t)written;
+ long long const value = (long long)rate[i];
+ written += snprintf(&str[written], space_left, " %9lld", value);
+ }
+
+ str[written] = '\0';
+
+ /* use logging macro for timestamp */
+ NODE_INFO("\n%s", str);
+}
+
+void
+node_stats_loop(const struct node_ctx* const node, int const period)
+{
+ double const period_sec = period;
+ useconds_t const period_usec = (useconds_t)period * 1000000;
+
+ wsrep_t* const wsrep = node_wsrep_provider(node->wsrep);
+ stats_establish_mapping(wsrep);
+
+ long long stats1[STATS_MAX];
+ long long stats2[STATS_MAX];
+
+ stats_get(node->store, wsrep, stats1);
+
+ while (1)
+ {
+ if (usleep(period_usec)) break;
+ stats_get(node->store, wsrep, stats2);
+ stats_print(stats1, stats2, period_sec);
+
+ if (usleep(period_usec)) break;
+ stats_get(node->store, wsrep, stats1);
+ stats_print(stats2, stats1, period_sec);
+ }
+
+ if (EINTR != errno)
+ {
+ NODE_ERROR("Unexpected usleep(%lld) error: %d (%s)",
+ (long long)period_usec, errno, strerror(errno));
+ }
+ else
+ {
+ /* interrupted by signal */
+ }
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/stats.h b/wsrep-lib/wsrep-API/v26/examples/node/stats.h
new file mode 100644
index 00000000..f7ab7ef4
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/stats.h
@@ -0,0 +1,35 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines performance statistics loop
+ */
+
+#ifndef NODE_STATS_H
+#define NODE_STATS_H
+
+#include "ctx.h"
+
+/**
+ * Prints out statistics with a given period.
+ *
+ * @param[in] node node context
+ * @param[in] period in seconds
+ */
+extern void
+node_stats_loop(const struct node_ctx* node, int period);
+
+#endif /* NODE_STATS_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/store.c b/wsrep-lib/wsrep-API/v26/examples/node/store.c
new file mode 100644
index 00000000..1dc2d6c1
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/store.c
@@ -0,0 +1,1044 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "store.h"
+
+#include "log.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h> // ptrdiff_t
+#include <stdint.h> // uintptr_t
+#include <stdlib.h> // abort()
+#include <string.h> // memset()
+
+#define DECLARE_SERIALIZE_INT(INTTYPE) \
+ static inline size_t \
+ store_serialize_##INTTYPE(void* const to, INTTYPE##_t const from) \
+ { \
+ memcpy(to, &from, sizeof(from)); /* for simplicity ignore endianness */ \
+ return sizeof(from); \
+ }
+
+DECLARE_SERIALIZE_INT(uint32);
+DECLARE_SERIALIZE_INT(int64);
+
+#define DECLARE_DESERIALIZE_INT(INTTYPE) \
+ static inline size_t \
+ store_deserialize_##INTTYPE(INTTYPE##_t* const to, const void* const from) \
+ { \
+ memcpy(to, from, sizeof(*to)); /* for simplicity ignore endianness */ \
+ return sizeof(*to); \
+ }
+
+DECLARE_DESERIALIZE_INT(uint32);
+DECLARE_DESERIALIZE_INT(int64);
+
+typedef struct record
+{
+ wsrep_seqno_t version;
+ uint32_t value;
+ /* this order ensures that there is no padding between the members */
+}
+record_t;
+
+#define STORE_RECORD_SIZE \
+ (sizeof(((record_t*)(NULL))->version) + sizeof(((record_t*)(NULL))->value))
+
+static inline size_t
+store_record_set(void* const base,
+ size_t const index,
+ const record_t* const record)
+{
+ char* const position = (char*)base + index*STORE_RECORD_SIZE;
+ memcpy(position, record, STORE_RECORD_SIZE);
+ return STORE_RECORD_SIZE;
+}
+
+static inline size_t
+store_record_get(const void* const base,
+ size_t const index,
+ record_t* const record)
+{
+ const char* const position = (const char*)base + index*STORE_RECORD_SIZE;
+ memcpy(record, position, STORE_RECORD_SIZE);
+ return STORE_RECORD_SIZE;
+}
+
+static inline bool
+store_record_equal(const record_t* const lhs, const record_t* const rhs)
+{
+ return (lhs->version == rhs->version) && (lhs->value == rhs->value);
+}
+
+/* transaction context */
+struct store_trx_op
+{
+ /* Normally what we'd need for transaction context is the record index and
+ * new record value. Here we also save read view snapshot (rec_from & rec_to)
+ * to
+ * 1. test provider certification correctness if provider supports read view
+ * 2. if not, detect conflicts at a store level. */
+ record_t rec_from;
+ record_t rec_to;
+ uint32_t idx_from;
+ uint32_t idx_to;
+ uint32_t new_value;
+ uint32_t size; /* nominal "size" of operation to manipulate on-the-wire
+ * writeset size. */
+};
+
+#define STORE_OP_SIZE (STORE_RECORD_SIZE + STORE_RECORD_SIZE + \
+ sizeof(((struct store_trx_op*)NULL)->idx_from) + \
+ sizeof(((struct store_trx_op*)NULL)->idx_to) + \
+ sizeof(((struct store_trx_op*)NULL)->new_value) + \
+ sizeof(((struct store_trx_op*)NULL)->size))
+
+struct store_trx_ctx
+{
+ wsrep_gtid_t rv_gtid;
+ size_t ops_num;
+ struct store_trx_op* ops;
+};
+
+static inline bool
+store_trx_add_op(struct store_trx_ctx* const trx)
+{
+ struct store_trx_op* const new_ops =
+ realloc(trx->ops, sizeof(struct store_trx_op)*(trx->ops_num + 1));
+
+ if (new_ops)
+ {
+ trx->ops = new_ops;
+#ifndef NDEBUG
+ memset(&trx->ops[trx->ops_num], 0, sizeof(*trx->ops));
+#endif
+ trx->ops_num++;
+ }
+
+ return (NULL == new_ops);
+}
+
+struct store_trx_entry
+{
+ bool used;
+ struct store_trx_ctx ctx;
+};
+
+typedef wsrep_uuid_t member_t;
+
+struct node_store
+{
+ wsrep_gtid_t gtid;
+ pthread_mutex_t gtid_mtx;
+ wsrep_trx_id_t trx_id;
+ pthread_mutex_t trx_id_mtx;
+ char* snapshot;
+ member_t* members;
+ void* records;
+ size_t op_size;
+ long read_view_fails;
+ uint32_t members_num;
+ uint32_t records_num;
+ uint32_t entries_mask;
+ bool read_view_support; // read view support by cluster
+ /* trx pool piggybacked */
+};
+
+node_store_t*
+node_store_open(const struct node_options* const opts)
+{
+ /* make the size of trx pool the next highest power of 2 over the total
+ * number of workers */
+ uint32_t trx_pool_mask = (uint32_t)(opts->masters + opts->slaves);
+ if (trx_pool_mask > 0)
+ {
+ trx_pool_mask -= 1;
+ trx_pool_mask |= trx_pool_mask >> 1;
+ trx_pool_mask |= trx_pool_mask >> 2;
+ trx_pool_mask |= trx_pool_mask >> 4;
+ trx_pool_mask |= trx_pool_mask >> 8;
+ trx_pool_mask |= trx_pool_mask >> 16;
+ }
+ assert(((trx_pool_mask + 1) & trx_pool_mask) == 0); // 2^n - 1
+
+ size_t const desired_op_size = (size_t)(opts->ws_size/opts->operations);
+ size_t const op_size = (desired_op_size > STORE_OP_SIZE ?
+ desired_op_size : STORE_OP_SIZE);
+
+ /* since the number of workers will never change, we can allocate trx pool
+ * together with the main store struc */
+ size_t const store_alloc_size = sizeof(struct node_store) +
+ /* op_size - additional buffer for op serialization per trx */
+ (sizeof(struct store_trx_entry) + op_size)*(trx_pool_mask + 1);
+
+ struct node_store* const ret = malloc(store_alloc_size);
+
+ if (ret)
+ {
+ memset(ret, 0, store_alloc_size);
+ ret->records = malloc((size_t)opts->records * STORE_RECORD_SIZE);
+
+ if (ret->records)
+ {
+ ret->gtid = WSREP_GTID_UNDEFINED;
+ pthread_mutex_init(&ret->gtid_mtx, NULL);
+ pthread_mutex_init(&ret->trx_id_mtx, NULL);
+ ret->op_size = op_size;
+ ret->records_num = (uint32_t)opts->records;
+ ret->entries_mask = trx_pool_mask;
+
+ uint32_t i;
+ for (i = 0; i < ret->records_num; i++)
+ {
+ /* keep state in serialized form for easy snapshotting */
+ struct record const record = { WSREP_SEQNO_UNDEFINED, i };
+ store_record_set(ret->records, i, &record);
+ }
+
+ return ret;
+ }
+ else
+ {
+ free(ret);
+ }
+ }
+
+ return NULL;
+}
+
+void
+node_store_close(struct node_store* const store)
+{
+ assert(store);
+ assert(store->records);
+ pthread_mutex_destroy(&store->gtid_mtx);
+ pthread_mutex_destroy(&store->trx_id_mtx);
+ free(store->records);
+ free(store->members);
+ free(store);
+}
+
+#define STORE_MUTEX_LOCK(mtx) \
+ { \
+ int err = pthread_mutex_lock(mtx); \
+ if (err) \
+ { \
+ NODE_FATAL("Failed to lock " #mtx ": %d (%s)", \
+ err, strerror(err)); \
+ abort(); \
+ } \
+ }
+
+static inline struct store_trx_entry*
+store_get_trx_entry(struct node_store* const store, wsrep_trx_id_t const trx_id)
+{
+ return (struct store_trx_entry*)
+ ((char*)(store + 1) + (trx_id & store->entries_mask)*
+ (sizeof(struct store_trx_entry) + store->op_size));
+}
+
+static inline struct store_trx_ctx*
+store_get_trx_ctx(struct node_store* const store, wsrep_trx_id_t const trx_id)
+{
+ return &(store_get_trx_entry(store, trx_id)->ctx);
+}
+
+static inline wsrep_trx_id_t
+store_new_trx_id(struct node_store* const store)
+{
+ wsrep_trx_id_t ret;
+ struct store_trx_entry* trx;
+
+ STORE_MUTEX_LOCK(&store->trx_id_mtx);
+
+ do
+ {
+ store->trx_id++;
+ trx = store_get_trx_entry(store, store->trx_id);
+ }
+ while (trx->used);
+ trx->used = true;
+ ret = store->trx_id;
+
+ pthread_mutex_unlock(&store->trx_id_mtx);
+
+ memset(&trx->ctx, 0, sizeof(trx->ctx));
+
+ return ret;
+}
+
+static inline void
+store_free_trx_id(struct node_store* const store, wsrep_trx_id_t const trx_id)
+{
+ struct store_trx_entry* const trx = store_get_trx_entry(store, trx_id);
+ assert(trx->used);
+ free(trx->ctx.ops);
+
+ STORE_MUTEX_LOCK(&store->trx_id_mtx);
+
+ trx->used = false;
+
+ pthread_mutex_unlock(&store->trx_id_mtx);
+}
+
+/**
+ * deserializes membership from snapshot */
+static int
+store_new_members(const char* ptr, const char* const endptr,
+ uint32_t* const num, member_t** const memb)
+{
+ ptr += store_deserialize_uint32(num, ptr);
+
+ if (*num < 2)
+ {
+ NODE_ERROR("Bogus number of members %u", *num);
+ return -1;
+ }
+
+ int ret = (int)sizeof(*num);
+
+ size_t const msize = sizeof(member_t) * *num;
+ if ((endptr - ptr) < (ptrdiff_t)msize)
+ {
+ NODE_ERROR("State snapshot does not contain all membership: "
+ "%zd < %zu", endptr - ptr, msize);
+ return -1;
+ }
+
+ *memb = calloc(*num, sizeof(member_t));
+ if (!*memb)
+ {
+ NODE_ERROR("Could not allocate new membership");
+ return -ENOMEM;
+ }
+
+ memcpy(*memb, ptr, msize);
+
+ return ret + (int)msize;
+}
+
+/**
+ * deserializes records from snapshot */
+static int
+store_new_records(const char* ptr, const char* const endptr,
+ uint32_t* const num, void** const rec)
+{
+ ptr += store_deserialize_uint32(num, ptr);
+
+ int ret = (int)sizeof(*num);
+ if (!*num)
+ {
+ *rec = NULL;
+ return ret;
+ }
+
+ size_t const rsize = STORE_RECORD_SIZE * *num;
+ if ((endptr - ptr) < (ptrdiff_t)rsize)
+ {
+ NODE_ERROR("State snapshot does not contain all records: "
+ "%zu < %zu", endptr - ptr, rsize);
+ return -1;
+ }
+
+ *rec = malloc(rsize);
+ if (!*rec)
+ {
+ NODE_ERROR("Could not allocate new records");
+ return -ENOMEM;
+ }
+
+ memcpy(*rec, ptr, rsize);
+
+ return ret + (int)rsize;
+}
+
+int
+node_store_init_state(struct node_store* const store,
+ const void* const state,
+ size_t const state_len)
+{
+ /* First, deserialize and prepare new state */
+ if (state_len <= sizeof(member_t)*2 /* at least two members */ +
+ WSREP_UUID_STR_LEN + 1 /* : */ + 1 /* seqno */ + 1 /* \0 */)
+ {
+ NODE_ERROR("State snapshot too short: %zu", state_len);
+ return -1;
+ }
+
+ wsrep_gtid_t state_gtid;
+ int ret;
+ ret = wsrep_gtid_scan(state, state_len, &state_gtid);
+ if (ret < 0)
+ {
+ char state_str[WSREP_GTID_STR_LEN + 1] = { 0, };
+ memcpy(state_str, state, sizeof(state_str) - 1);
+ NODE_ERROR("Could not find valid GTID in the received data: %s",
+ state_str);
+ return -1;
+ }
+
+ ret++; /* \0 */
+ if ((state_len - (size_t)ret) < sizeof(uint32_t))
+ {
+ NODE_ERROR("State snapshot does not contain the number of members");
+ return -1;
+ }
+
+ const char* ptr = ((char*)state);
+ const char* const endptr = ptr + state_len;
+ ptr += ret;
+
+ uint32_t m_num;
+ member_t* new_members;
+ ret = store_new_members(ptr, endptr, &m_num, &new_members);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ ptr += ret;
+
+ bool const read_view_support = ptr[0];
+ ptr += 1;
+
+ uint32_t r_num;
+ void* new_records;
+ ret = store_new_records(ptr, endptr, &r_num, &new_records);
+ if (ret < 0)
+ {
+ free(new_members);
+ return ret;
+ }
+ ptr += ret;
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ /* just a sanity check */
+ if (0 == wsrep_uuid_compare(&state_gtid.uuid, &store->gtid.uuid) &&
+ state_gtid.seqno < store->gtid.seqno)
+ {
+ NODE_ERROR("Received snapshot that is in the past: my seqno %lld,"
+ " received seqno: %lld",
+ (long long)store->gtid.seqno, (long long)state_gtid.seqno);
+ free(new_members);
+ free(new_records);
+ ret = -1;
+ }
+ else
+ {
+ free(store->members);
+ store->members_num = m_num;
+ store->members = new_members;
+ free(store->records);
+ store->records_num = r_num;
+ store->records = new_records;
+ store->gtid = state_gtid;
+ store->read_view_support = read_view_support;
+ ret = 0;
+ }
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+
+ return ret;
+}
+
+int
+node_store_acquire_state(node_store_t* const store,
+ const void** const state,
+ size_t* const state_len)
+{
+ int ret = 0;
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ if (!store->snapshot)
+ {
+ size_t const memb_len = store->members_num * sizeof(member_t);
+ size_t const rec_len = store->records_num * STORE_RECORD_SIZE;
+ size_t const buf_len = WSREP_GTID_STR_LEN + 1
+ + sizeof(uint32_t) + memb_len
+ + 1 /* read view support */
+ + sizeof(uint32_t) + rec_len;
+
+ store->snapshot = malloc(buf_len);
+
+ if (store->snapshot)
+ {
+ char* ptr = store->snapshot;
+
+ /* state GTID */
+ ret = wsrep_gtid_print(&store->gtid, ptr, buf_len);
+ if (ret > 0)
+ {
+ NODE_INFO("");
+ assert((size_t)ret < buf_len);
+
+ ptr[ret] = '\0';
+ ret++;
+ ptr += ret;
+ assert((size_t)ret < buf_len);
+
+ /* membership */
+ ptr += store_serialize_uint32(ptr, store->members_num);
+ ret += (int)sizeof(uint32_t);
+ assert((size_t)ret + memb_len < buf_len);
+ memcpy(ptr, store->members, memb_len);
+ ptr += memb_len;
+ ret += (int)memb_len;
+ assert((size_t)ret + sizeof(uint32_t) <= buf_len);
+
+ /* read view support */
+ ptr[0] = store->read_view_support;
+ ptr += 1;
+ ret += 1;
+
+ /* records */
+ ptr += store_serialize_uint32(ptr, store->records_num);
+ ret += (int)sizeof(uint32_t);
+ assert((size_t)ret + rec_len < buf_len);
+ memcpy(ptr, store->records, rec_len);
+ ret += (int)rec_len;
+ assert((size_t)ret <= buf_len);
+ }
+ else
+ {
+ NODE_ERROR("Failed to record GTID: %d (%s)", ret,strerror(-ret));
+ free(store->snapshot);
+ store->snapshot = 0;
+ }
+ }
+ else
+ {
+ NODE_ERROR("Failed to allocate snapshot buffer of size %zu",buf_len);
+ ret = -ENOMEM;
+ }
+ }
+ else
+ {
+ assert(0); /* provider should prevent such situation */
+ ret = -EAGAIN;
+ }
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+
+ if (ret > 0)
+ {
+ NODE_INFO("\n\nPrepared snapshot of %u records\n\n", store->records_num);
+ *state = store->snapshot;
+ *state_len = (size_t)ret;
+ ret = 0;
+ }
+
+ return ret;
+}
+
+void
+node_store_release_state(node_store_t* const store)
+{
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ assert(store->snapshot);
+ free(store->snapshot);
+ store->snapshot = 0;
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+}
+
+int
+node_store_update_membership(struct node_store* const store,
+ const wsrep_view_info_t* const v)
+{
+ assert(store);
+ assert(WSREP_VIEW_PRIMARY == v->status);
+ assert(v->memb_num > 0);
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ bool const continuation = v->state_id.seqno == store->gtid.seqno + 1 &&
+ 0 == wsrep_uuid_compare(&v->state_id.uuid, &store->gtid.uuid);
+
+ bool const initialization = WSREP_SEQNO_UNDEFINED == store->gtid.seqno &&
+ 0 == wsrep_uuid_compare(&WSREP_UUID_UNDEFINED, &store->gtid.uuid);
+
+ if (!(continuation || initialization))
+ {
+ char store_str[WSREP_GTID_STR_LEN + 1] = { 0, };
+ wsrep_gtid_print(&store->gtid, store_str, sizeof(store_str));
+ char view_str[WSREP_GTID_STR_LEN + 1] = { 0, };
+ wsrep_gtid_print(&v->state_id, view_str, sizeof(view_str));
+
+ NODE_FATAL("Attempt to initialize store GTID from incompatible view:\n"
+ "\tstore: %s\n"
+ "\tview: %s",
+ store_str, view_str);
+ abort();
+ }
+
+ wsrep_uuid_t* const new_members = calloc(sizeof(wsrep_uuid_t),
+ (size_t)v->memb_num);
+ if (!new_members)
+ {
+ NODE_FATAL("Could not allocate new members array");
+ abort();
+ }
+
+ int i;
+ for (i = 0; i < v->memb_num; i++)
+ {
+ new_members[i] = v->members[i].id;
+ }
+
+ /* REPLICATION: at this point we should compare old and new memberships and
+ * rollback all streaming transactions from the partitioned
+ * members, if any. But we don't support it in this program yet.
+ */
+
+ free(store->members);
+
+ store->members = new_members;
+ store->members_num = (uint32_t)v->memb_num;
+ store->gtid = v->state_id;
+ store->read_view_support = (v->capabilities & WSREP_CAP_SNAPSHOT);
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+
+ return 0;
+}
+
+void
+node_store_gtid(struct node_store* const store,
+ wsrep_gtid_t* const gtid)
+{
+ assert(store);
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ *gtid = store->gtid;
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+}
+
+
+static inline void
+store_serialize_op(void* const buf, const struct store_trx_op* const op)
+{
+ char* ptr = buf;
+ ptr += store_record_set(ptr, 0, &op->rec_from);
+ ptr += store_record_set(ptr, 0, &op->rec_to);
+ ptr += store_serialize_uint32(ptr, op->idx_from);
+ ptr += store_serialize_uint32(ptr, op->idx_to);
+ ptr += store_serialize_uint32(ptr, op->new_value);
+ store_serialize_uint32(ptr, op->size);
+}
+
+static inline void
+store_deserialize_op(struct store_trx_op* const op, const void* const buf)
+{
+ const char* ptr = buf;
+ ptr += store_record_get(ptr, 0, &op->rec_from);
+ ptr += store_record_get(ptr, 0, &op->rec_to);
+ ptr += store_deserialize_uint32(&op->idx_from, ptr);
+ ptr += store_deserialize_uint32(&op->idx_to, ptr);
+ ptr += store_deserialize_uint32(&op->new_value, ptr);
+ store_deserialize_uint32(&op->size, ptr);
+}
+
+static inline void
+store_serialize_gtid(void* const buf, const wsrep_gtid_t* const gtid)
+{
+ char* ptr = buf;
+ memcpy(ptr, &gtid->uuid, sizeof(gtid->uuid));
+ ptr += sizeof(gtid->uuid);
+ store_serialize_int64(ptr, gtid->seqno);
+}
+
+static inline void
+store_deserialize_gtid(wsrep_gtid_t* const gtid, const void* const buf)
+{
+ const char* ptr = buf;
+ memcpy(&gtid->uuid, ptr, sizeof(gtid->uuid));
+ ptr += sizeof(gtid->uuid);
+ store_deserialize_int64(&gtid->seqno, ptr);
+}
+
+#define STORE_GTID_SIZE (sizeof(((wsrep_gtid_t*)(NULL))->uuid) + sizeof(int64_t))
+
+int
+node_store_execute(node_store_t* const store,
+ wsrep_t* const wsrep,
+ wsrep_ws_handle_t* const ws_handle)
+{
+ assert(store);
+
+ if (0 == ws_handle->trx_id)
+ {
+ assert(sizeof(ws_handle->trx_id) >= sizeof(uintptr_t));
+ ws_handle->trx_id = store_new_trx_id(store);
+ }
+
+ struct store_trx_ctx* trx = store_get_trx_ctx(store, ws_handle->trx_id);
+ if (store_trx_add_op(trx)) return -ENOMEM;
+ struct store_trx_op* const op = &trx->ops[trx->ops_num - 1];
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ if (1 == trx->ops_num)
+ {
+ /* First operation, save ID of the read view of the transaction */
+ trx->rv_gtid = store->gtid;
+ }
+
+ /* Transaction op: copy value from one random record to another... */
+ op->idx_from = (uint32_t)rand() % store->records_num;
+ op->idx_to = (uint32_t)rand() % store->records_num;
+ store_record_get(store->records, op->idx_from, &op->rec_from);
+ store_record_get(store->records, op->idx_to, &op->rec_to);
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+
+ wsrep_status_t ret = WSREP_TRX_FAIL;
+
+ if (op->rec_from.version > trx->rv_gtid.seqno ||
+ op->rec_to.version > trx->rv_gtid.seqno)
+ {
+ /* transaction read view changed, trx needs to be restarted */
+#if 0
+ NODE_INFO("Transaction read view changed: %lld -> %lld, returning %d",
+ (long long)trx->rv_gtid.seqno,
+ (long long)(op->rec_from.version > op->rec_to.version ?
+ op->rec_from.version : op->rec_to.version),
+ ret);
+#endif
+ goto error;
+ }
+
+ /* Transaction op: ... and modify it somehow, e.g. increment by 1 */
+ op->new_value = op->rec_from.value + 1;
+
+ if (1 == trx->ops_num) // first trx operation
+ {
+ /* REPLICATION: Since this application does not implement record locks,
+ * it needs to establish read view for each transaction for
+ * a proper conflict detection and transaction isolation.
+ * Otherwose we'll need to implement record versioning */
+ if (store->read_view_support)
+ {
+ ret = wsrep->assign_read_view(wsrep, ws_handle, &trx->rv_gtid);
+ if (ret)
+ {
+ NODE_ERROR("wsrep::assign_read_view(%lld) failed: %d",
+ trx->rv_gtid.seqno, ret);
+ goto error;
+ }
+ }
+
+ /* Record read view in the writeset for debugging purposes */
+ assert(store->op_size > STORE_GTID_SIZE);
+ store_serialize_gtid(trx + 1, &trx->rv_gtid);
+ wsrep_buf_t ws = { .ptr = trx + 1, .len = STORE_GTID_SIZE };
+ ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED,
+ true);
+ if (ret)
+ {
+ NODE_ERROR("wsrep::append_data(rv_gtid) failed: %d", ret);
+ goto error;
+ }
+ }
+
+ /* REPLICATION: append keys touched by the operation
+ *
+ * NOTE: depending on data access granularity some applications may require
+ * multipart keys, e.g. <schema>:<table>:<row> in a SQL database.
+ * Single part keys match hashtables and key-value stores.
+ * Below we have two different single-part keys which reference two
+ * different records. */
+ uint32_t key_val;
+ wsrep_buf_t key_part = { .ptr = &key_val, .len = sizeof(key_val) };
+ wsrep_key_t ws_key = { .key_parts = &key_part, .key_parts_num = 1 };
+
+ /* REPLICATION: Key 1 - the key of the source, unchanged record */
+ store_serialize_uint32(&key_val, op->idx_from);
+ ret = wsrep->append_key(wsrep, ws_handle,
+ &ws_key,
+ 1, /* single key */
+ WSREP_KEY_REFERENCE,
+ true /* provider shall make a copy of the key */);
+ if (ret)
+ {
+ NODE_ERROR("wsrep::append_key(REFERENCE) failed: %d", ret);
+ goto error;
+ }
+
+ /* REPLICATION: Key 2 - the key of the record we want to update */
+ store_serialize_uint32(&key_val, op->idx_to);
+ ret = wsrep->append_key(wsrep, ws_handle,
+ &ws_key,
+ 1, /* single key */
+ WSREP_KEY_UPDATE,
+ true /* provider shall make a copy of the key */);
+ if (ret)
+ {
+ NODE_ERROR("wsrep::append_key(UPDATE) failed: %d", ret);
+ goto error;
+ }
+
+ /* REPLICATION: append transaction operation to the "writeset"
+ * (WS buffer was allocated together with trx context above) */
+ assert(store->op_size >= STORE_OP_SIZE);
+ assert(store->op_size == (uint32_t)store->op_size);
+ op->size = (uint32_t)store->op_size;
+ store_serialize_op(trx + 1, op);
+ wsrep_buf_t ws = { .ptr = trx + 1, .len = store->op_size };
+ ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, true);
+
+ if (!ret) return 0;
+
+ NODE_ERROR("wsrep::append_data(op) failed: %d", ret);
+
+error:
+ store_free_trx_id(store, ws_handle->trx_id);
+
+ return ret;
+}
+
+int
+node_store_apply(node_store_t* const store,
+ wsrep_trx_id_t* const trx_id,
+ const wsrep_buf_t* const ws)
+{
+ assert(store);
+ (void)store;
+
+ *trx_id = store_new_trx_id(store);
+ struct store_trx_ctx* const trx = store_get_trx_ctx(store, *trx_id);
+
+ /* prepare trx context for commit */
+ const char* ptr = ws->ptr;
+ size_t left = ws->len;
+
+ /* at least one operation should be there */
+ assert(left >= STORE_GTID_SIZE + STORE_OP_SIZE);
+
+ if (left >= STORE_GTID_SIZE)
+ {
+ store_deserialize_gtid(&trx->rv_gtid, ptr);
+ left -= STORE_GTID_SIZE;
+ ptr += STORE_GTID_SIZE;
+ }
+
+ while (left >= STORE_OP_SIZE)
+ {
+ if (store_trx_add_op(trx))
+ {
+ store_free_trx_id(store,*trx_id); /* "rollback": release resources */
+ return -ENOMEM;
+ }
+ struct store_trx_op* const op = &trx->ops[trx->ops_num - 1];
+
+ store_deserialize_op(op, ptr);
+ assert(op->idx_to <= store->records_num);
+
+ left -= op->size;
+ ptr += op->size;
+ }
+
+ if (left != 0)
+ {
+ NODE_FATAL("Failed to process last (%d/%zu) bytes of the writeset.",
+ (int)left, ws->len);
+ abort();
+ }
+
+ return 0;
+}
+
+static uint32_t const store_fnv32_seed = 2166136261;
+
+static inline uint32_t
+store_fnv32a(const void* buf, size_t const len, uint32_t seed)
+{
+ static uint32_t const fnv32_prime = 16777619;
+ const uint8_t* bp = (const uint8_t*)buf;
+ const uint8_t* const be = bp + len;
+
+ while (bp < be)
+ {
+ seed ^= *bp++;
+ seed *= fnv32_prime;
+ }
+
+ return seed;
+}
+
+
+static void
+store_checksum_state(node_store_t* store)
+{
+ uint32_t res = store_fnv32_seed;
+ uint32_t i;
+
+ for (i = 0; i < store->members_num; i++)
+ {
+ res = store_fnv32a(&store->members[i], sizeof(*store->members), res);
+ }
+
+ res = store_fnv32a(store->records, store->records_num * STORE_RECORD_SIZE,
+ res);
+
+ res = store_fnv32a(&store->gtid.uuid, sizeof(store->gtid.uuid), res);
+
+ wsrep_seqno_t s;
+ store_serialize_int64(&s, store->gtid.seqno);
+ res = store_fnv32a(&s, sizeof(s), res);
+
+ NODE_INFO("\n\n\tSeqno: %lld; state hash: %#010x\n",
+ (long long)store->gtid.seqno, res);
+}
+
+static inline void
+store_update_gtid(node_store_t* const store, const wsrep_gtid_t* ws_gtid)
+{
+ assert(0 == wsrep_uuid_compare(&store->gtid.uuid, &ws_gtid->uuid));
+
+ store->gtid.seqno++;
+
+ if (store->gtid.seqno != ws_gtid->seqno)
+ {
+ NODE_FATAL("Out of order commit: expected %lld, got %lld",
+ store->gtid.seqno, ws_gtid->seqno);
+ abort();
+ }
+
+ static wsrep_seqno_t const period = 0x000fffff; /* ~1M */
+ if (0 == (store->gtid.seqno & period))
+ {
+ store_checksum_state(store);
+ }
+}
+
+void
+node_store_commit(node_store_t* const store,
+ wsrep_trx_id_t const trx_id,
+ const wsrep_gtid_t* const ws_gtid)
+{
+ assert(store);
+ assert(trx_id);
+
+ struct store_trx_ctx* const trx = store_get_trx_ctx(store, trx_id);
+
+ bool const check_read_view_snapshot =
+#ifdef NDEBUG
+ !store->read_view_support;
+#else
+ 1;
+#endif /* NDEBUG */
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ store_update_gtid(store, ws_gtid);
+
+ /* First loop is to check if we can commit all operations if provider
+ * does not support read view or for debugging puposes */
+ size_t i;
+ if (check_read_view_snapshot)
+ {
+ for (i = 0; i < trx->ops_num; i++)
+ {
+ struct store_trx_op* const op = &trx->ops[i];
+
+ record_t from, to;
+ store_record_get(store->records, op->idx_from, &from);
+ store_record_get(store->records, op->idx_to, &to);
+
+ if (!store_record_equal(&op->rec_from, &from) ||
+ !store_record_equal(&op->rec_to, &to))
+ {
+ /* read view changed since transaction was executed,
+ * can't commit */
+ assert(op->rec_from.version <= from.version);
+ assert(op->rec_to.version <= to.version);
+ if (op->rec_from.version == from.version)
+ assert(op->rec_from.value == from.value);
+ if (op->rec_to.version == to.version)
+ assert(op->rec_to.value == to.value);
+ if (store->read_view_support) abort();
+
+ store->read_view_fails++;
+
+ NODE_INFO("Read view changed at commit time, rollback trx");
+
+ goto error;
+ }
+ }
+ }
+
+ /* Second loop is to actually modify the dataset */
+ for (i = 0; i < trx->ops_num; i++)
+ {
+ struct store_trx_op* const op = &trx->ops[i];
+
+ record_t const new_record =
+ { .version = ws_gtid->seqno, .value = op->new_value };
+
+ store_record_set(store->records, op->idx_to, &new_record);
+ }
+
+error:
+ pthread_mutex_unlock(&store->gtid_mtx);
+
+ store_free_trx_id(store, trx_id);
+}
+
+void
+node_store_rollback(node_store_t* const store,
+ wsrep_trx_id_t const trx_id)
+{
+ assert(store);
+ assert(trx_id);
+
+ store_free_trx_id(store, trx_id);
+}
+
+void
+node_store_update_gtid(node_store_t* const store,
+ const wsrep_gtid_t* const ws_gtid)
+{
+ assert(store);
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ store_update_gtid(store, ws_gtid);
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+}
+
+long
+node_store_read_view_failures(node_store_t* const store)
+{
+ assert(store);
+
+ long ret;
+
+ STORE_MUTEX_LOCK(&store->gtid_mtx);
+
+ ret = store->read_view_fails;;
+
+ pthread_mutex_unlock(&store->gtid_mtx);
+
+ return ret;
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/store.h b/wsrep-lib/wsrep-API/v26/examples/node/store.h
new file mode 100644
index 00000000..51da74d7
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/store.h
@@ -0,0 +1,125 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines simple "transactional storage engine" interface
+ */
+
+#ifndef NODE_STORE_H
+#define NODE_STORE_H
+
+#include "options.h"
+
+#include "../../wsrep_api.h"
+
+typedef struct node_store node_store_t;
+
+/**
+ * open a store and optionally assocoate a file with it */
+extern node_store_t*
+node_store_open(const struct node_options* opts);
+
+/**
+ * close store and deallocate associated resources */
+extern void
+node_store_close(node_store_t* store);
+
+/**
+ * initialize store with a state */
+extern int
+node_store_init_state(node_store_t* store, const void* state, size_t state_len);
+
+/**
+ * Return a pointer to state snapshot that is guaranteed to be unchanged
+ * until node_store_release_state() is called.
+ *
+ * @param[out] state pointer to state snapshot
+ * @param[out] state_len soze of state snapshot
+ */
+extern int
+node_store_acquire_state(node_store_t* store,
+ const void** state, size_t* state_len);
+
+/**
+ * release state */
+extern void
+node_store_release_state(node_store_t* store);
+
+/**
+ * inform store about new membership */
+extern int
+node_store_update_membership(node_store_t* store, const wsrep_view_info_t* v);
+
+/**
+ * get the current GTID (last committed) */
+extern void
+node_store_gtid(node_store_t* store, wsrep_gtid_t* gtid);
+
+/**
+ * execute and prepare local transaction in store and return its key and write
+ * set.
+ *
+ * This operation allocates resources that must be freed with either
+ * node_store_commit() or node_store_rollback()
+ *
+ * @param[in] wsrep provider handle
+ * @param[out] ws_handle reference to the resulting write set in the provider
+ */
+extern int
+node_store_execute(node_store_t* store,
+ wsrep_t* wsrep,
+ wsrep_ws_handle_t* ws_handle);
+
+/**
+ * apply and prepare foreign write set received from replication
+ *
+ * This operation allocates resources that must be freed with either
+ * node_store_commit() or node_store_rollback()
+ *
+ * @param[out] trx_id locally unique transaction ID
+ * @param[in] ws foreign transaction write set
+ */
+extern int
+node_store_apply(node_store_t* store,
+ wsrep_trx_id_t* trx_id,
+ const wsrep_buf_t* ws);
+
+/**
+ * commit prepared transaction identified by trx_id */
+extern void
+node_store_commit(node_store_t* store,
+ wsrep_trx_id_t trx_id,
+ const wsrep_gtid_t* ws_gtid);
+
+/**
+ * rollback prepared transaction identified by trx_id */
+extern void
+node_store_rollback(node_store_t* store,
+ wsrep_trx_id_t trx_id);
+
+/**
+ * update storage GTID for transactions that had to be skipped/rolled back */
+extern void
+node_store_update_gtid(node_store_t* store,
+ const wsrep_gtid_t* ws_gtid);
+
+/**
+ * @return the number of store read view snapshot check failures at commit time.
+ * (should be zero if provider implements assign_read_view() call) */
+extern long
+node_store_read_view_failures(node_store_t* store);
+
+#endif /* NODE_STORE_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/trx.c b/wsrep-lib/wsrep-API/v26/examples/node/trx.c
new file mode 100644
index 00000000..afdcada4
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/trx.c
@@ -0,0 +1,155 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "trx.h"
+#include "log.h"
+
+#include <assert.h>
+#include <errno.h> // ENOMEM, etc.
+#include <stdbool.h>
+
+wsrep_status_t
+node_trx_execute(node_store_t* const store,
+ wsrep_t* const wsrep,
+ wsrep_conn_id_t const conn_id,
+ int ops_num)
+{
+ wsrep_status_t cert = WSREP_OK; // for cleanup
+
+ static unsigned int const ws_flags =
+ WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END; // atomic trx
+ wsrep_trx_meta_t ws_meta;
+ wsrep_status_t ret = WSREP_OK;
+
+ /* prepare simple transaction and obtain a writeset handle for it */
+ wsrep_ws_handle_t ws_handle = { 0, NULL };
+ while (ops_num--)
+ {
+ if (0 != (ret = node_store_execute(store, wsrep, &ws_handle)))
+ {
+#if 0
+ NODE_INFO("master [%d]: node_store_execute() returned %d",
+ conn_id, ret);
+#endif
+ ret = WSREP_TRX_FAIL;
+ goto cleanup;
+ }
+ }
+
+ /* REPLICATION: (replicate and) certify the writeset (pointed to by
+ * ws_handle) with the cluster */
+ cert = wsrep->certify(wsrep, conn_id, &ws_handle, ws_flags, &ws_meta);
+
+ if (WSREP_BF_ABORT == cert)
+ {
+ /* REPLICATION: transaction was signaled to abort due to multi-master
+ * conflict. It must rollback immediately: it blocks
+ * transaction that was ordered earlier and will never
+ * be able to enter commit order. */
+ node_store_rollback(store, ws_handle.trx_id);
+ }
+
+ /* REPLICATION: writeset was totally ordered, need to enter commit order */
+ if (ws_meta.gtid.seqno > 0)
+ {
+ ret = wsrep->commit_order_enter(wsrep, &ws_handle, &ws_meta);
+ if (ret)
+ {
+ NODE_ERROR("master [%d]: wsrep::commit_order_enter(%lld) failed: "
+ "%d", (long long)(ws_meta.gtid.seqno), ret);
+ goto cleanup;
+ }
+
+ /* REPLICATION: inside commit monitor
+ * Note: we commit transaction only if certification succeded */
+ if (WSREP_OK == cert)
+ node_store_commit(store, ws_handle.trx_id, &ws_meta.gtid);
+ else
+ node_store_update_gtid(store, &ws_meta.gtid);
+
+ ret = wsrep->commit_order_leave(wsrep, &ws_handle, &ws_meta, NULL);
+ if (ret)
+ {
+ NODE_ERROR("master [%d]: wsrep::commit_order_leave(%lld) failed: "
+ "%d", (long long)(ws_meta.gtid.seqno), ret);
+ goto cleanup;
+ }
+ }
+ else
+ {
+ assert(cert);
+ }
+
+cleanup:
+ /* REPLICATION: if wsrep->certify() returned anything else but WSREP_OK
+ * transaction must roll back. BF aborted trx already did it. */
+ if (cert && WSREP_BF_ABORT != cert)
+ node_store_rollback(store, ws_handle.trx_id);
+
+ /* NOTE: this application follows the approach that resources must be freed
+ * at the same level where they were allocated, so it is assumed that
+ * ws_key and ws were deallocated in either commit or rollback calls.*/
+
+ /* REPLICATION: release provider resources associated with the trx */
+ wsrep->release(wsrep, &ws_handle);
+
+ return ret ? ret : cert;
+}
+
+wsrep_status_t
+node_trx_apply(node_store_t* const store,
+ wsrep_t* const wsrep,
+ const wsrep_ws_handle_t* const ws_handle,
+ const wsrep_trx_meta_t* const ws_meta,
+ const wsrep_buf_t* const ws)
+{
+ /* no business being here if event was not ordered */
+ assert(ws_meta->gtid.seqno > 0);
+
+ wsrep_trx_id_t trx_id;
+ wsrep_buf_t err_buf = { NULL, 0 };
+ int app_err;
+ if (ws)
+ {
+ app_err = node_store_apply(store, &trx_id, ws);
+ if (app_err)
+ {
+ /* REPLICATION: if applying failed, prepare an error buffer with
+ * sufficient error specification */
+ err_buf.ptr = &app_err; // suppose error code is enough
+ err_buf.len = sizeof(app_err);
+ }
+ }
+ else /* ws failed certification and should be skipped */
+ {
+ /* just some non-0 code to choose node_store_update_gtid() below */
+ app_err = 1;
+ }
+
+ wsrep_status_t ret;
+ ret = wsrep->commit_order_enter(wsrep, ws_handle, ws_meta);
+ if (ret) {
+ node_store_rollback(store, trx_id);
+ return ret;
+ }
+
+ if (!app_err) node_store_commit(store, trx_id, &ws_meta->gtid);
+ else node_store_update_gtid(store, &ws_meta->gtid);
+
+ ret = wsrep->commit_order_leave(wsrep, ws_handle, ws_meta, &err_buf);
+
+ return ret;
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/trx.h b/wsrep-lib/wsrep-API/v26/examples/node/trx.h
new file mode 100644
index 00000000..e1d763a1
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/trx.h
@@ -0,0 +1,50 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines "transaction" interface
+ */
+
+#ifndef NODE_TRX_H
+#define NODE_TRX_H
+
+#include "store.h"
+
+#include "../../wsrep_api.h"
+
+/**
+ * executes and replicates local transaction
+ */
+extern wsrep_status_t
+node_trx_execute(node_store_t* store,
+ wsrep_t* wsrep,
+ wsrep_conn_id_t conn_id,
+ int ops_num);
+
+/**
+ * applies and commits slave write set
+ *
+ * @param ws replicated event writeset. NULL if it failed certification (and so
+ * must be skipped, but it was ordered, so store GTID must be updated)
+ */
+extern wsrep_status_t
+node_trx_apply(node_store_t* store,
+ wsrep_t* wsrep,
+ const wsrep_ws_handle_t* ws_handle,
+ const wsrep_trx_meta_t* ws_meta,
+ const wsrep_buf_t* ws);
+
+#endif /* NODE_TRX_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.c b/wsrep-lib/wsrep-API/v26/examples/node/worker.c
new file mode 100644
index 00000000..e9901ad8
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.c
@@ -0,0 +1,197 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "worker.h"
+
+#include "log.h"
+#include "options.h"
+#include "trx.h"
+#include "wsrep.h"
+
+#include <assert.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <string.h> // strerror()
+
+struct node_worker
+{
+ struct node_ctx* node;
+ pthread_t thread_id;
+ size_t id;
+ bool exit;
+};
+
+enum wsrep_cb_status
+node_worker_apply_cb(void* const recv_ctx,
+ const wsrep_ws_handle_t* const ws_handle,
+ uint32_t const ws_flags,
+ const wsrep_buf_t* const ws,
+ const wsrep_trx_meta_t* const ws_meta,
+ wsrep_bool_t* const exit_loop)
+{
+ assert(recv_ctx);
+
+ struct node_worker* const worker = recv_ctx;
+
+ wsrep_status_t const ret = node_trx_apply(
+ worker->node->store,
+ node_wsrep_provider(worker->node->wsrep),
+ ws_handle,
+ ws_meta,
+ ws_flags & WSREP_FLAG_ROLLBACK ? NULL : ws);
+
+ *exit_loop = worker->exit;
+
+ return WSREP_OK == ret ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE;
+}
+
+
+static void*
+worker_slave(void* recv_ctx)
+{
+ struct node_worker* const worker = recv_ctx;
+ wsrep_t* const wsrep = node_wsrep_provider(worker->node->wsrep);
+
+ wsrep_status_t const ret = wsrep->recv(wsrep, worker);
+
+ if (WSREP_OK != ret)
+ {
+ NODE_ERROR("slave worker [%zu] exited with error %d.", worker->id, ret);
+ }
+
+ return NULL;
+}
+
+static void*
+worker_master(void* send_ctx)
+{
+ struct node_worker* const worker = send_ctx;
+ struct node_ctx* const node = worker->node;
+ wsrep_t* const wsrep = node_wsrep_provider(node->wsrep);
+
+ assert(node->opts->ws_size > 0);
+
+ wsrep_status_t ret;
+
+ do
+ {
+ /* REPLICATION: we should not perform any local writes until the node
+ * is synced with the cluster. */
+ if (!node_wsrep_wait_synced(node->wsrep))
+ {
+ NODE_ERROR("master worker [%zu] failed waiting for SYNCED state.",
+ worker->id);
+ break;
+ }
+
+ /* REPLICATION: the node is now synced */
+
+ do
+ {
+ ret = node_trx_execute(node->store,
+ wsrep,
+ worker->id,
+ (int)node->opts->operations);
+ }
+ while(WSREP_OK == ret // success
+ || (WSREP_TRX_FAIL == ret // certification failed, trx rolled back
+ && (usleep(10000),true)) // retry after short sleep
+ );
+ }
+ while (WSREP_CONN_FAIL == ret); // provider in bad state (e.g. non-Primary)
+
+ return NULL;
+}
+
+struct node_worker_pool
+{
+ size_t size; // size of the pool (nu,ber of nodes)
+ struct node_worker worker[1]; // worker context array;
+};
+
+struct node_worker_pool*
+node_worker_start(struct node_ctx* const ctx,
+ node_worker_type_t const type,
+ size_t const size)
+{
+ assert(ctx);
+
+ if (0 == size) return NULL;
+
+ const char* const type_str = type == NODE_WORKER_SLAVE ? "slave" : "master";
+
+ size_t const alloc_size =
+ sizeof(struct node_worker_pool) +
+ sizeof(struct node_worker) * (size - 1);
+
+ struct node_worker_pool* const ret = malloc(alloc_size);
+
+ if (ret)
+ {
+ void* (* const routine) (void*) =
+ type == NODE_WORKER_SLAVE ? worker_slave : worker_master;
+
+ size_t i;
+ for (i = 0; i < size; i++)
+ {
+ struct node_worker* const worker = &ret->worker[i];
+ worker->node = ctx;
+ worker->id = i;
+ worker->exit = false;
+
+ int const err = pthread_create(&worker->thread_id,
+ NULL,
+ routine,
+ worker);
+ if (err)
+ {
+ NODE_ERROR("Failed to start %s worker[%zu]: %d (%s)",
+ type_str, i, err, strerror(err));
+ if (0 == i)
+ {
+ free(ret);
+ return NULL;
+ }
+ else
+ {
+ break; // some threads have started,
+ // need to return to close them first
+ }
+ }
+ }
+
+ ret->size = i;
+ }
+ else
+ {
+ NODE_ERROR("Failed to allocate %zu bytes for the %s worker pool",
+ alloc_size, type_str);
+ }
+
+ return ret;
+}
+
+void
+node_worker_stop(struct node_worker_pool* pool)
+{
+ size_t i;
+ for (i = 0; pool && i < pool->size; i++)
+ {
+ pthread_join(pool->worker[i].thread_id, NULL);
+ }
+
+ free(pool);
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.h b/wsrep-lib/wsrep-API/v26/examples/node/worker.h
new file mode 100644
index 00000000..7ae06423
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.h
@@ -0,0 +1,66 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines worker thread interface
+ */
+
+#ifndef NODE_WORKER_H
+#define NODE_WORKER_H
+
+#include "ctx.h"
+
+#include "../../wsrep_api.h"
+
+/**
+ * REPLICATION: a callback to apply and commit slave replication events */
+extern enum wsrep_cb_status
+node_worker_apply_cb(void* recv_ctx,
+ const wsrep_ws_handle_t* ws_handle,
+ uint32_t ws_flags,
+ const wsrep_buf_t* ws,
+ const wsrep_trx_meta_t* ws_meta,
+ wsrep_bool_t* exit_loop);
+
+typedef enum node_worker_type
+{
+ NODE_WORKER_SLAVE,
+ NODE_WORKER_MASTER
+}
+ node_worker_type_t;
+
+struct node_worker_pool;
+
+/**
+ * Starts the required number of workier threads of a given type
+ *
+ * @param[in] ctx application context
+ * @param[in] type of a worker
+ * @param[in] number of workers
+ *
+ * @return worker pool handle
+ */
+extern struct node_worker_pool*
+node_worker_start(struct node_ctx* ctx,
+ node_worker_type_t type,
+ size_t number);
+
+/**
+ * Stops workers in a pool and deallocates respective resources */
+extern void
+node_worker_stop(struct node_worker_pool* pool);
+
+#endif /* NODE_WORKER_H */
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c
new file mode 100644
index 00000000..6cea6d90
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c
@@ -0,0 +1,479 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "wsrep.h"
+
+#include "log.h"
+#include "sst.h"
+#include "store.h"
+#include "worker.h"
+
+#include <assert.h>
+#include <stdio.h> // snprintf()
+#include <stdlib.h> // abort()
+#include <string.h> // strcasecmp()
+
+struct node_wsrep
+{
+ wsrep_t* instance; // wsrep provider instance
+
+ struct wsrep_view
+ {
+ pthread_mutex_t mtx;
+ wsrep_gtid_t state_id;
+ wsrep_view_status_t status;
+ wsrep_cap_t capabilities;
+ int proto_ver;
+ int memb_num;
+ int my_idx;
+ wsrep_member_info_t* members;
+ }
+ view;
+
+ struct
+ {
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+ int value;
+ }
+ synced;
+
+ bool bootstrap; // shall this node bootstrap a primary view?
+};
+
+static struct node_wsrep s_wsrep =
+{
+ .instance = NULL,
+ .view =
+ {
+ .mtx = PTHREAD_MUTEX_INITIALIZER,
+ .state_id = {{{ 0, }}, WSREP_SEQNO_UNDEFINED },
+ .status = WSREP_VIEW_DISCONNECTED,
+ .capabilities = 0,
+ .proto_ver = -1,
+ .memb_num = 0,
+ .my_idx = -1,
+ .members = NULL
+ },
+ .synced =
+ {
+ .mtx = PTHREAD_MUTEX_INITIALIZER,
+ .cond = PTHREAD_COND_INITIALIZER,
+ .value = 0
+ },
+ .bootstrap = false
+};
+
+static const char* wsrep_view_status_str[WSREP_VIEW_MAX] =
+{
+ "PRIMARY",
+ "NON-PRIMARY",
+ "DISCONNECTED"
+};
+
+#define WSREP_CAPABILITIES_MAX ((int)sizeof(wsrep_cap_t) * 8) // bitmask
+static const char* wsrep_capabilities_str[WSREP_CAPABILITIES_MAX] =
+{
+ "MULTI-MASTER",
+ "CERTIFICATION",
+ "PA",
+ "REPLAY",
+ "TOI",
+ "PAUSE",
+ "CAUSAL-READS",
+ "CAUSAL-TRX",
+ "INCREMENTAL",
+ "SESSION-LOCKS",
+ "DISTRIBUTED-LOCKS",
+ "CONSISTENCY-CHECK",
+ "UNORDERED",
+ "ANNOTATION",
+ "PREORDERED",
+ "STREAMING",
+ "SNAPSHOT",
+ "NBO",
+ NULL,
+};
+
+/**
+ * REPLICATION: callback is called by provider when the node connects to group.
+ * This happens out-of-order, before the node receives a state
+ * transfer and syncs with the cluster. Unless application requires
+ * it it can be empty. We however want to know the GTID of the
+ * group out of order for SST tricks, so we record it out of order.
+ */
+static enum wsrep_cb_status
+wsrep_connected_cb(void* const x,
+ const wsrep_view_info_t* const v)
+{
+ char gtid_str[WSREP_GTID_STR_LEN + 1];
+ wsrep_gtid_print(&v->state_id, gtid_str, sizeof(gtid_str));
+
+ NODE_INFO("connect_cb(): Connected at %s to %s group of %d member(s)",
+ gtid_str, wsrep_view_status_str[v->status], v->memb_num);
+
+ struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep;
+
+ if (pthread_mutex_lock(&wsrep->view.mtx))
+ {
+ NODE_FATAL("Failed to lock VIEW mutex");
+ abort();
+ }
+
+ wsrep->view.state_id = v->state_id;
+
+ pthread_mutex_unlock(&wsrep->view.mtx);
+
+ return WSREP_CB_SUCCESS;
+}
+
+/**
+ * logs view data */
+static void
+wsrep_log_view(const struct wsrep_view* v)
+{
+ char gtid[WSREP_GTID_STR_LEN + 1];
+ wsrep_gtid_print(&v->state_id, gtid, sizeof(gtid));
+ gtid[WSREP_GTID_STR_LEN] = '\0';
+
+ char caps[256];
+ int written = 0;
+ size_t space_left = sizeof(caps);
+ int i;
+ for (i = 0; i < WSREP_CAPABILITIES_MAX && space_left > 0; i++)
+ {
+ wsrep_cap_t const f = 1u << i;
+
+ if (!(f & v->capabilities)) continue;
+
+ if (wsrep_capabilities_str[i])
+ {
+ written += snprintf(&caps[written], space_left, "%s|",
+ wsrep_capabilities_str[i]);
+ }
+ else
+ {
+ written += snprintf(&caps[written], space_left, "%d|", i);
+ }
+
+ space_left = sizeof(caps) - (size_t)written;
+ }
+ caps[written ? written - 1 : 0] = '\0'; // overwrite last '|'
+
+ char members_list[1024];
+ written = 0;
+ space_left = sizeof(members_list);
+ for (i = 0; i < v->memb_num && space_left > 0; i++)
+ {
+ wsrep_member_info_t* m = &v->members[i];
+ char uuid[WSREP_UUID_STR_LEN + 1];
+ wsrep_uuid_print(&m->id, uuid, sizeof(uuid));
+ uuid[WSREP_UUID_STR_LEN] = '\0';
+
+ written += snprintf(&members_list[written], space_left,
+ "%s%d: %s '%s' incoming:'%s'\n",
+ v->my_idx == i ? " * " : " ", i,
+ uuid, m->name, m->incoming);
+
+ space_left = sizeof(members_list) - (size_t)written;
+ }
+ members_list[written ? written - 1 : 0] = '\0'; // overwrite the last '\n'
+
+ NODE_INFO(
+ "New view received:\n"
+ "state: %s (%s)\n"
+ "capabilities: %s\n"
+ "protocol version: %d\n"
+ "members(%d)%s%s",
+ gtid, wsrep_view_status_str[v->status],
+ caps,
+ v->proto_ver,
+ v->memb_num, v->memb_num ? ":\n" : "", members_list);
+}
+
+/**
+ * REPLICATION: callback is called when the node needs to process cluster
+ * view change. The callback is called in "total order isolation",
+ * so all the preceding replication events will be processed
+ * strictly before the call and all subsequent - striclty after.
+ */
+static enum wsrep_cb_status
+wsrep_view_cb(void* const x,
+ void* const r,
+ const wsrep_view_info_t* const v,
+ const char* const state,
+ size_t const state_len)
+{
+ (void)r;
+ (void)state;
+ (void)state_len;
+
+ struct node_ctx* const node = x;
+
+ if (WSREP_VIEW_PRIMARY == v->status)
+ {
+ /* REPLICATION: membership change is a totally ordered event and as such
+ * should be a part of the state, like changes to the
+ * database. */
+ int err = node_store_update_membership(node->store, v);
+ if (err)
+ {
+ NODE_FATAL("Failed to update membership in store: %d (%s)",
+ err, strerror(-err));
+ abort();
+ }
+ }
+
+ enum wsrep_cb_status ret = WSREP_CB_SUCCESS;
+ struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep;
+
+ if (pthread_mutex_lock(&wsrep->view.mtx))
+ {
+ NODE_FATAL("Failed to lock VIEW mutex");
+ abort();
+ }
+
+ /* below we'll just copy the data for future reference (if need be): */
+
+ size_t const memb_size = (size_t)v->memb_num * sizeof(wsrep_member_info_t);
+ void* const tmp = realloc(wsrep->view.members, memb_size);
+ if (memb_size > 0 && !tmp)
+ {
+ NODE_ERROR("Could not allocate memory for a new view: %zu bytes",
+ memb_size);
+ ret = WSREP_CB_FAILURE;
+ goto cleanup;
+ }
+ else
+ {
+ wsrep->view.members = tmp;
+ if (memb_size) memcpy(wsrep->view.members, &v->members[0], memb_size);
+ }
+
+ wsrep->view.state_id = v->state_id;
+ wsrep->view.status = v->status;
+ wsrep->view.capabilities = v->capabilities;
+ wsrep->view.proto_ver = v->proto_ver;
+ wsrep->view.memb_num = v->memb_num;
+ wsrep->view.my_idx = v->my_idx;
+
+ /* and now log the info */
+
+ wsrep_log_view(&wsrep->view);
+
+cleanup:
+ pthread_mutex_unlock(&wsrep->view.mtx);
+
+ return ret;
+}
+
+/**
+ * REPLICATION: callback is called by provider when the node becomes SYNCED */
+static enum wsrep_cb_status
+wsrep_synced_cb(void* const x)
+{
+ struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep;
+
+ if (pthread_mutex_lock(&wsrep->synced.mtx))
+ {
+ NODE_FATAL("Failed to lock SYNCED mutex");
+ abort();
+ }
+
+ if (wsrep->synced.value == 0)
+ {
+ NODE_INFO("become SYNCED");
+ wsrep->synced.value = 1;
+ pthread_cond_broadcast(&wsrep->synced.cond);
+ }
+
+ pthread_mutex_unlock(&wsrep->synced.mtx);
+
+ return WSREP_CB_SUCCESS;
+}
+
+struct node_wsrep*
+node_wsrep_init(const struct node_options* const opts,
+ const wsrep_gtid_t* const current_gtid,
+ void* const app_ctx)
+{
+ if (s_wsrep.instance != NULL) return NULL; // already initialized
+
+ wsrep_status_t err;
+ err = wsrep_load(opts->provider, &s_wsrep.instance, node_log_cb);
+ if (WSREP_OK != err)
+ {
+ if (strcasecmp(opts->provider, WSREP_NONE))
+ {
+ NODE_ERROR("wsrep_load(%s) failed: %s (%d).",
+ opts->provider, strerror(err), err);
+ }
+ else
+ {
+ NODE_ERROR("Initializing dummy provider failed: %s (%d).",
+ strerror(err), err);
+ }
+ return NULL;
+ }
+
+ char base_addr[256];
+ snprintf(base_addr, sizeof(base_addr) - 1, "%s:%ld",
+ opts->base_host, opts->base_port);
+
+ struct wsrep_init_args args =
+ {
+ .app_ctx = app_ctx,
+
+ .node_name = opts->name,
+ .node_address = base_addr,
+ .node_incoming = "", // we don't accept client connections
+ .data_dir = opts->data_dir,
+ .options = opts->options,
+ .proto_ver = 0, // this is the first version of the application
+ // so the first version of the writeset protocol
+ .state_id = current_gtid,
+ .state = NULL, // unused
+
+ .logger_cb = node_log_cb,
+ .connected_cb = wsrep_connected_cb,
+ .view_cb = wsrep_view_cb,
+ .synced_cb = wsrep_synced_cb,
+ .encrypt_cb = NULL, // not implemented ATM
+
+ .apply_cb = node_worker_apply_cb,
+ .unordered_cb = NULL, // not needed now
+
+ .sst_request_cb = node_sst_request_cb,
+ .sst_donate_cb = node_sst_donate_cb
+ };
+
+ wsrep_t* wsrep = s_wsrep.instance;
+
+ err = wsrep->init(wsrep, &args);
+
+ if (WSREP_OK != err)
+ {
+ NODE_ERROR("wsrep::init() failed: %d, must shutdown", err);
+ node_wsrep_close(&s_wsrep);
+ return NULL;
+ }
+
+ return &s_wsrep;
+}
+
+wsrep_status_t
+node_wsrep_connect(struct node_wsrep* const wsrep,
+ const char* const address,
+ bool const bootstrap)
+{
+ wsrep->bootstrap = bootstrap;
+ wsrep_status_t err = wsrep->instance->connect(wsrep->instance,
+ "wsrep_cluster",
+ address,
+ NULL,
+ wsrep->bootstrap);
+
+ if (WSREP_OK != err)
+ {
+ NODE_ERROR("wsrep::connect(%s) failed: %d, must shutdown",
+ address, err);
+ node_wsrep_close(wsrep);
+ }
+
+ return err;
+}
+
+void
+node_wsrep_disconnect(struct node_wsrep* const wsrep)
+{
+ if (pthread_mutex_lock(&wsrep->synced.mtx))
+ {
+ NODE_FATAL("Failed to lock SYNCED mutex");
+ abort();
+ }
+ wsrep->synced.value = -1; /* this will signal master threads to exit */
+ pthread_cond_broadcast(&wsrep->synced.cond);
+ pthread_mutex_unlock(&wsrep->synced.mtx);
+
+ wsrep_status_t const err = wsrep->instance->disconnect(wsrep->instance);
+
+ if (err)
+ {
+ /* REPLICATION: unless connection is not closed, slave threads will
+ * never return. */
+ NODE_FATAL("Failed to close wsrep connection: %d", err);
+ abort();
+ }
+}
+
+void
+node_wsrep_close(struct node_wsrep* const wsrep)
+{
+ if (pthread_mutex_lock(&wsrep->view.mtx))
+ {
+ NODE_FATAL("Failed to lock VIEW mutex");
+ abort();
+ }
+ assert(0 == wsrep->view.memb_num); // the node must be disconneted
+ assert(NULL == wsrep->view.members);
+ free(wsrep->view.members);
+ wsrep->view.members = NULL;
+ pthread_mutex_unlock(&wsrep->view.mtx);
+
+ wsrep->instance->free(wsrep->instance);
+ wsrep->instance = NULL;
+}
+
+bool
+node_wsrep_wait_synced(struct node_wsrep* const wsrep)
+{
+ if (pthread_mutex_lock(&wsrep->synced.mtx))
+ {
+ NODE_FATAL("Failed to lock SYNCED mutex");
+ abort();
+ }
+
+ while (wsrep->synced.value == 0)
+ {
+ pthread_cond_wait(&wsrep->synced.cond, &wsrep->synced.mtx);
+ }
+
+ bool const ret = wsrep->synced.value > 0;
+
+ pthread_mutex_unlock(&wsrep->synced.mtx);
+
+ return ret;
+}
+
+void
+node_wsrep_connected_gtid(struct node_wsrep* wsrep, wsrep_gtid_t* gtid)
+{
+ if (pthread_mutex_lock(&wsrep->view.mtx))
+ {
+ NODE_FATAL("Failed to lock VIEW mutex");
+ abort();
+ }
+
+ *gtid = wsrep->view.state_id;
+
+ pthread_mutex_unlock(&wsrep->view.mtx);
+}
+
+wsrep_t*
+node_wsrep_provider(struct node_wsrep* wsrep)
+{
+ return wsrep->instance;
+}
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h
new file mode 100644
index 00000000..75c7eac3
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h
@@ -0,0 +1,92 @@
+/* Copyright (c) 2019, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file This unit defines various helpers to manage wsrep provider
+ */
+
+#ifndef NODE_WSREP_H
+#define NODE_WSREP_H
+
+#include "options.h"
+
+#include "../../wsrep_api.h"
+
+#include <pthread.h>
+#include <stdbool.h>
+
+typedef struct node_wsrep node_wsrep_t;
+
+/**
+ * loads and initializes wsrep provider for further usage
+ *
+ * @param[in] opts program options
+ * @param[in] current_gtid GTID corresponding to the current node state
+ * @param[in] app_ctx application context to be passed to callbacks
+ *
+ * @return initialized object pointer
+ */
+extern node_wsrep_t*
+node_wsrep_init(const struct node_options* opts,
+ const wsrep_gtid_t* current_gtid,
+ void* app_ctx);
+
+/**
+ * connects to primary component
+ *
+ * @param[in] wsrep wsrep context
+ * @param[in] address address to connect at (provider specific)
+ * @param[in] bootsstrap bootstrap primary component if there's none
+ *
+ * @return wsrep status code
+ */
+extern wsrep_status_t
+node_wsrep_connect(node_wsrep_t* wsrep,
+ const char* address,
+ bool bootstrap);
+
+/**
+ * disconnects from primary component
+ */
+extern void
+node_wsrep_disconnect(node_wsrep_t* wsrep);
+
+/**
+ * deinitializes and unloads wsrep provider
+ */
+extern void
+node_wsrep_close(node_wsrep_t* wsrep);
+
+/**
+ * waits for the node to become SYNCED
+ *
+ * @return true if node is synced, false in any other event.
+ */
+extern bool
+node_wsrep_wait_synced(node_wsrep_t* wsrep);
+
+/**
+ * @param[in] wsrep context
+ * @param[out] gtid of the current view */
+extern void
+node_wsrep_connected_gtid(node_wsrep_t* wsrep, wsrep_gtid_t* gtid);
+
+/**
+ * @return wsrep provider instance */
+extern wsrep_t*
+node_wsrep_provider(node_wsrep_t* wsrep);
+
+#endif /* NODE_WSREP_H */