diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
commit | a175314c3e5827eb193872241446f2f8f5c9d33c (patch) | |
tree | cd3d60ca99ae00829c52a6ca79150a5b6e62528b /wsrep-lib/wsrep-API/v26/examples | |
parent | Initial commit. (diff) | |
download | mariadb-10.5-a175314c3e5827eb193872241446f2f8f5c9d33c.tar.xz mariadb-10.5-a175314c3e5827eb193872241446f2f8f5c9d33c.zip |
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'wsrep-lib/wsrep-API/v26/examples')
26 files changed, 4381 insertions, 0 deletions
diff --git a/wsrep-lib/wsrep-API/v26/examples/CMakeLists.txt b/wsrep-lib/wsrep-API/v26/examples/CMakeLists.txt new file mode 100644 index 00000000..e6e33b78 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) 2019, Codership Oy. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +ADD_EXECUTABLE(listener listener.c) +TARGET_LINK_LIBRARIES(listener wsrep dl pthread) + +ADD_SUBDIRECTORY(node) diff --git a/wsrep-lib/wsrep-API/v26/examples/README.md b/wsrep-lib/wsrep-API/v26/examples/README.md new file mode 100644 index 00000000..b1b20744 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/README.md @@ -0,0 +1,14 @@ +## wsrep API usage examples + +### 1. Listener +Is a simple program that connects and listens to replication events in +an existing cluster. + +Usage example (starting listener on the same host as the rest of the cluster): +``` +$ ./listener /path_to/libgalera_smm.so gcomm://localhost:4567?gmcast.listen_addr=tcp://127.0.0.1:9999 cluster_name +``` + +### 2. Node +Is a more complex program which implements most of wsrep node functionality +and can form clusters in itself. diff --git a/wsrep-lib/wsrep-API/v26/examples/listener.c b/wsrep-lib/wsrep-API/v26/examples/listener.c new file mode 100644 index 00000000..9fc881fe --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/listener.c @@ -0,0 +1,268 @@ +/* Copyright (C) 2012 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Example of wsrep event listener. Outputs description of received + * events to stdout. To get a general picture you should start with + * main() function. */ + +#include <wsrep_api.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <signal.h> +#include <pthread.h> + +/*! This is global application context, it will be used by wsrep callbacks */ +struct application_context +{}; + +static struct application_context global_ctx; + +/*! This is receiving thread context, it will be used by wsrep callbacks */ +struct receiver_context +{ + char msg[4096]; +}; + +/* wsrep provider handle (global for simplicty) */ +static wsrep_t* wsrep = NULL; + +/*! This is a logger callback which library will be using to log events. */ +static void +logger_cb (wsrep_log_level_t level __attribute__((unused)), const char* msg) +{ + fprintf (stderr, "WSREP: %s\n", msg); +} + +/*! This will be called on cluster view change (nodes joining, leaving, etc.). + * Each view change is the point where application may be pronounced out of + * sync with the current cluster view and need state transfer. + * It is guaranteed that no other callbacks are called concurrently with it. */ +static wsrep_cb_status_t +view_cb (void* app_ctx __attribute__((unused)), + void* recv_ctx __attribute__((unused)), + const wsrep_view_info_t* view, + const char* state __attribute__((unused)), + size_t state_len __attribute__((unused))) +{ + printf ("New cluster membership view: %d nodes, my index is %d, " + "global seqno: %lld\n", + view->memb_num, view->my_idx, (long long)view->state_id.seqno); + + return WSREP_CB_SUCCESS; +} + +/*! This will be called on cluster view change (nodes joining, leaving, etc.). + * Each view change is the point where application may be pronounced out of + * sync with the current cluster view and need state transfer. + * It is guaranteed that no other callbacks are called concurrently with it. */ +static wsrep_cb_status_t +sst_request_cb (void* app_ctx __attribute__((unused)), + void** sst_req, + size_t* sst_req_len) +{ + /* For simplicity we're skipping state transfer by using magic string + * as a state transfer request. + * This node will not be considered JOINED (having full state) + * by other cluster members. */ + *sst_req = strdup(WSREP_STATE_TRANSFER_NONE); + + if (*sst_req) + *sst_req_len = strlen(*sst_req) + 1; + else + *sst_req_len = 0; + + return WSREP_CB_SUCCESS; +} + +/*! This is called to "apply" writeset. + * If writesets don't conflict on keys, it may be called concurrently to + * utilize several CPU cores. */ +static wsrep_cb_status_t +apply_cb (void* recv_ctx, + const wsrep_ws_handle_t* ws_handle __attribute__((unused)), + uint32_t flags __attribute__((unused)), + const wsrep_buf_t* ws __attribute__((unused)), + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop __attribute__((unused))) +{ + struct receiver_context* ctx = (struct receiver_context*)recv_ctx; + + snprintf (ctx->msg, sizeof(ctx->msg), + "Got writeset %lld, size %zu", (long long)meta->gtid.seqno, + ws->len); + + bool const commit = flags & (WSREP_FLAG_TRX_END | WSREP_FLAG_ROLLBACK); + + wsrep->commit_order_enter(wsrep, ws_handle, meta); + if (commit) puts(ctx->msg); + wsrep->commit_order_leave(wsrep, ws_handle, meta, NULL); + + return WSREP_CB_SUCCESS; +} + +/* The following callbacks are stubs and not used in this example. */ +static wsrep_cb_status_t +unordered_cb(void* recv_ctx __attribute__((unused)), + const wsrep_buf_t* data __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +static wsrep_cb_status_t +sst_donate_cb (void* app_ctx __attribute__((unused)), + void* recv_ctx __attribute__((unused)), + const wsrep_buf_t* msg __attribute__((unused)), + const wsrep_gtid_t* state_id __attribute__((unused)), + const wsrep_buf_t* state __attribute__((unused)), + wsrep_bool_t bypass __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +static wsrep_cb_status_t synced_cb (void* app_ctx __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +/* This is the listening thread. It blocks in wsrep::recv() call until + * disconnect from cluster. It will apply and commit writesets through the + * callbacks defined avbove. */ +static void* +recv_thread (void* arg) +{ + struct receiver_context* ctx = (struct receiver_context*)arg; + + wsrep_status_t rc = wsrep->recv(wsrep, ctx); + + fprintf (stderr, "Receiver exited with code %d", rc); + + return NULL; +} + +/* This is a signal handler to demonstrate graceful cluster leave. */ +static void +graceful_leave (int signum) +{ + printf ("Got signal %d, exiting...\n", signum); + wsrep->disconnect(wsrep); +} + +int main (int const argc, char* argv[]) +{ + if (argc < 4 || argc > 5) + { + fprintf (stderr, "Usage: %s </path/to/wsrep/provider> <wsrep URI> " + "<cluster name> [own address]\n", argv[0]); + exit (EXIT_FAILURE); + } + + const char* const wsrep_provider = argv[1]; + const char* const wsrep_uri = argv[2]; + const char* const cluster_name = argv[3]; + const char* const own_address = argc == 5 ? argv[4] : "localhost"; + + /* Now let's load and initialize provider */ + wsrep_status_t rc = wsrep_load (wsrep_provider, &wsrep, logger_cb); + if (WSREP_OK != rc) + { + fprintf (stderr, "Failed to load wsrep provider '%s'\n",wsrep_provider); + exit (EXIT_FAILURE); + } + + wsrep_gtid_t state_id = { WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED }; + + /* wsrep provider initialization arguments */ + struct wsrep_init_args wsrep_args = + { + .app_ctx = &global_ctx, + + .node_name = "example listener", + .node_address = own_address, + .node_incoming = "", + .data_dir = ".", // working directory + .options = "", + .proto_ver = 127, // maximum supported application event protocol + + .state_id = &state_id, + .state = NULL, + + .logger_cb = logger_cb, + .view_cb = view_cb, + .sst_request_cb = sst_request_cb, + .encrypt_cb = NULL, + .apply_cb = apply_cb, + .unordered_cb = unordered_cb, + .sst_donate_cb = sst_donate_cb, + .synced_cb = synced_cb + }; + + rc = wsrep->init(wsrep, &wsrep_args); + if (WSREP_OK != rc) + { + fprintf (stderr, "wsrep::init() failed: %d\n", rc); + exit (EXIT_FAILURE); + } + + /* Connect to cluster */ + rc = wsrep->connect(wsrep, cluster_name, wsrep_uri, "", 0); + if (0 != rc) + { + if (rc < 0) + fprintf (stderr, "wsrep::connect(%s, %s) failed: %d (%s)\n", + cluster_name, wsrep_uri, rc, strerror(-(int)rc)); + else + fprintf (stderr, "wsrep::connect() failed: %d\n", rc); + + exit (EXIT_FAILURE); + } + + /* Now let's start several listening threads*/ + int const num_threads = 4; + struct receiver_context thread_ctx[num_threads]; + pthread_t threads[num_threads]; + + int i; + for (i = 0; i < num_threads; i++) + { + int err = pthread_create ( + &threads[i], NULL, recv_thread, &thread_ctx[i]); + + if (err) + { + fprintf (stderr, "Failed to start thread %d: %d (%s)", + i, err, strerror(err)); + exit (EXIT_FAILURE); + } + } + + signal (SIGTERM, graceful_leave); + signal (SIGINT, graceful_leave); + + /* Listening threads are now running and receiving writesets. Wait for them + * to join. Threads will join after signal handler closes wsrep connection*/ + for (i = 0; i < num_threads; i++) + { + pthread_join (threads[i], NULL); + } + + /* Unload provider after nobody uses it any more. */ + wsrep_unload (wsrep); + + return 0; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt b/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt new file mode 100644 index 00000000..d018afde --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) 2019, Codership Oy. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +FILE(GLOB SRC + "*.h" + "*.c" + ) + +ADD_EXECUTABLE(node ${SRC}) + +TARGET_LINK_LIBRARIES(node wsrep dl pthread) + +CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/node.sh + ${CMAKE_CURRENT_BINARY_DIR}/node.sh COPYONLY) diff --git a/wsrep-lib/wsrep-API/v26/examples/node/README.md b/wsrep-lib/wsrep-API/v26/examples/node/README.md new file mode 100644 index 00000000..4a07c149 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/README.md @@ -0,0 +1,81 @@ +# wsrep API node application + +## Overview + +This is a simple application to demonstrate the usage of wsrep API. It +deliberately does nothing useful in order to present as concentrated and +concise API usage as possible. + +The program is deliberately written in C to demonstrate the naked API usage. +For C++ example see a much more advanced integration library at +https://github.com/codership/wsrep-lib + +## High level architecture + +Process-wise the program consists of an endless main loop that periodically +samples and prints performance stats and a configurable number of "master" and +"slave" threads, with master threads loop executing "transactions" and +replicating resulting "write sets" and slave threads receiving and processing +the write sets from other nodes. + +Object-wise the program is composed of two main objects: `store` and `wsrep`. +'store' object contains application "state" and generates and commits changes +to the state. `wsrep` object contains cluster context and provides interface +to it. Changes generated by `store` are replicated and certified through +`wsrep` and then committed to `store`. + +## Unit descriptions (in alphabetical order) + +#### ctx.h +A small header to declare the application context structure. + +#### log.* +Implements logging functionality for the application AND +**a logging callback** for the wsrep provider. + +#### main.c +Defines `main()` routine that initializes storage and wsrep provider, starts +the worker threads and loops in a statistics collection loop. Even though it is +not designed to return it still shows the deinitialization order. + +#### options.* +Implements reading configuration options from the command line, does not have +anything related to wsrep API, but shows which additional parameters must be +configured for the program to make use of wsrep clustering. + +#### socket.* +Network sockets boilerplate code for setting TCP connections between processes +(for SST). Has nothing wsrep-related and can be ignored. + +#### sst.* +Defines **SST callbacks** for the wsrep provider and shows how to asynchronously +implement state snapshot transfer (yes, you don't want to spend eternity in +callbacks). + +#### stats.* +Implements performance stats collecting function for the main loop. While it is +an absolutely optional provider functionality, still it shows how to use that. + +#### store.* +Defines the `store` object that pretends to store and modify some data in a +"transactional" manner. It provides the caller that intends to do a change with +a *change data* and a *key* for replication and certification. + +#### trx.* +Defines routines to process local and replicated transactions. + +#### worker.* +Implements worker thread pool functinality. Worker threads run routines defined +in 'trx.*'. Also implements **apply callback** for the wsrep provider. + +#### wsrep.* +Maintains wsrep cluster context: provider instance and cluster membership view. +While there is little use for the latter in this primitive application, still +it shows **connected and view callbacks** usage. But mostly, for this +application its purpose is to initialize the provider, connect to the cluster +and offer access to initialized provider for other parts of the program. + +## Example usage +``` +./node -f /tmp/galera/0 -v /tmp/galera/0/galera/lib/libgalera_smm.so -o 'pc.weight=2;evs.send_window=2;evs.user_send_window=1;gcache.recover=no' -s 8 -m 16 +``` diff --git a/wsrep-lib/wsrep-API/v26/examples/node/ctx.h b/wsrep-lib/wsrep-API/v26/examples/node/ctx.h new file mode 100644 index 00000000..01653554 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/ctx.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines application context for wsrep provider + */ + +#ifndef NODE_CTX_H +#define NODE_CTX_H + +#include "store.h" +#include "wsrep.h" + +struct node_ctx +{ + node_wsrep_t* wsrep; + node_store_t* store; + const struct node_options* opts; +}; + +#endif /* NODE_CTX_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/log.c b/wsrep-lib/wsrep-API/v26/examples/node/log.c new file mode 100644 index 00000000..71f4705c --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/log.c @@ -0,0 +1,100 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "log.h" + +#include <stdio.h> // fprintf(), fflush() +#include <sys/time.h> // gettimeofday() +#include <time.h> // localtime_r() +#include <stdarg.h> // va_start(), va_end() + +wsrep_log_level_t node_log_max_level = WSREP_LOG_INFO; + +static const char* log_level_str[WSREP_LOG_DEBUG + 2] = +{ + "FATAL: ", + "ERROR: ", + " WARN: ", + " INFO: ", + "DEBUG: ", + "XXXXX: " +}; + +static inline void +log_timestamp_and_log(const char* const prefix, // source of msg + int const severity, + const char* const msg) +{ + struct tm date; + struct timeval time; + + gettimeofday(&time, NULL); + localtime_r (&time.tv_sec, &date); + + FILE* log_file = stderr; + fprintf(log_file, + "%04d-%02d-%02d %02d:%02d:%02d.%03d " /* timestamp fmt */ + "[%s] %s%s\n", /* [prefix] severity msg */ + date.tm_year + 1900, date.tm_mon + 1, date.tm_mday, + date.tm_hour, date.tm_min, date.tm_sec, + (int)time.tv_usec / 1000, + prefix, log_level_str[severity], msg + ); + + fflush (log_file); +} + +void +node_log_cb(wsrep_log_level_t const severity, const char* const msg) +{ + /* REPLICATION: let provider log messages be prefixed with 'wsrep'*/ + log_timestamp_and_log("wsrep", severity, msg); +} + +void +node_log(wsrep_log_level_t const severity, + const char* const file, + const char* const function, + int const line, + ...) +{ + va_list ap; + + char string[2048]; + int max_string = sizeof(string); + char* str = string; + + /* provide file:func():line info only if debug logging is on */ + if (NODE_DO_LOG_DEBUG) { + int const len = snprintf(str, (size_t)max_string, "%s:%s():%d: ", + file, function, line); + str += len; + max_string -= len; + } + + va_start(ap, line); + { + const char* format = va_arg (ap, const char*); + + if (max_string > 0 && NULL != format) { + vsnprintf (str, (size_t)max_string, format, ap); + } + } + va_end(ap); + + /* actual logging */ + log_timestamp_and_log(" node", severity, string); +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/log.h b/wsrep-lib/wsrep-API/v26/examples/node/log.h new file mode 100644 index 00000000..09404f26 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/log.h @@ -0,0 +1,69 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines logging macros for the application and + * a logger callback for the wsrep provider. + */ + +#ifndef NODE_LOG_H +#define NODE_LOG_H + +#include "../../wsrep_api.h" + +/** + * REPLICATION: a logger callback for wsrep provider + */ +extern void +node_log_cb(wsrep_log_level_t severity, const char* message); + +/** + * Applicaton log function intended to be used through the macros defined below. + * For simplicity it uses log levels defined by wsrep API, but it does not have + * to. */ +extern void +node_log (wsrep_log_level_t level, + const char* file, + const char* function, + const int line, + ...); + +/** + * This variable made global to avoid calling node_log() when debug logging + * is disabled. */ +extern wsrep_log_level_t node_log_max_level; +#define NODE_DO_LOG_DEBUG (WSREP_LOG_DEBUG <= node_log_max_level) + +/** + * Base logging macro that records current file, function and line number */ +#define NODE_LOG(level, ...)\ + node_log(level, __FILE__, __func__, __LINE__, __VA_ARGS__, NULL) + +/** + * @name Logging macros. + * Must be implemented as macros to report the location of the code where + * they are called. + */ +/*@{*/ +#define NODE_FATAL(...) NODE_LOG(WSREP_LOG_FATAL, __VA_ARGS__, NULL) +#define NODE_ERROR(...) NODE_LOG(WSREP_LOG_ERROR, __VA_ARGS__, NULL) +#define NODE_WARN(...) NODE_LOG(WSREP_LOG_WARN, __VA_ARGS__, NULL) +#define NODE_INFO(...) NODE_LOG(WSREP_LOG_INFO, __VA_ARGS__, NULL) +#define NODE_DEBUG(...) if (NODE_DO_LOG_DEBUG) \ + { NODE_LOG(WSREP_LOG_DEBUG, __VA_ARGS__, NULL); } +/*@}*/ + +#endif /* NODE_LOG_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/main.c b/wsrep-lib/wsrep-API/v26/examples/node/main.c new file mode 100644 index 00000000..f3124042 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/main.c @@ -0,0 +1,146 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "ctx.h" +#include "log.h" +#include "options.h" +#include "stats.h" +#include "worker.h" +#include "wsrep.h" + +#include <errno.h> +#include <signal.h> // sigaction() +#include <string.h> // strerror() + +static void +signal_handler(int const signum) +{ + NODE_INFO("Got signal %d. Terminating.", signum); +} + +static void +install_signal_handler(void) +{ + sigset_t sa_mask; + sigemptyset(&sa_mask); + + struct sigaction const act = + { + .sa_handler = signal_handler, + .sa_mask = sa_mask, + .sa_flags = (int)SA_RESETHAND + }; + + if (sigaction(SIGINT /* Ctrl-C */, &act, NULL)) + { + NODE_INFO("sigaction() failed: %d (%s)", errno, strerror(errno)); + abort(); + } +} + +int main(int argc, char* argv[]) +{ + install_signal_handler(); + + struct node_options opts; + int err = node_options_read(argc, argv, &opts); + if (err) + { + NODE_FATAL("Failed to read command line opritons: %d (%s)", + err, strerror(err)); + return err; + } + + struct node_ctx node; + node.opts = &opts; + + /* REPLICATION: before connecting to cluster we need to initialize our + * storage to know our current position (GTID) */ + node.store = node_store_open(&opts); + if (!node.store) + { + NODE_FATAL("Failed to open node store"); + return 1; + } + + wsrep_gtid_t current_gtid; + node_store_gtid(node.store, ¤t_gtid); + + /* REPLICATION: complete initialization of application context + * (including provider itself) */ + node.wsrep = node_wsrep_init(&opts, ¤t_gtid, &node); + if (!node.wsrep) + { + NODE_FATAL("Failed to initialize wsrep provider"); + return 1; + } + + /* REPLICATION: now we can connect to the cluster and start receiving + * replication events */ + if (node_wsrep_connect(node.wsrep, opts.address, opts.bootstrap) != + WSREP_OK) + { + NODE_FATAL("Failed to connect to primary component"); + return 1; + } + + /* REPLICATION: and start processing replicaiton events */ + struct node_worker_pool* slave_pool = + node_worker_start(&node, NODE_WORKER_SLAVE, (size_t)opts.slaves); + if (!slave_pool) + { + NODE_FATAL("Failed to create slave worker pool"); + return 1; + } + + /* REPLICATION: now that replicaton events are being processed we can + * wait to sync with the cluster */ + if (!node_wsrep_wait_synced(node.wsrep)) + { + NODE_ERROR("Failed to wait fir SYNCED event"); + return 1; + } + + NODE_INFO("Synced with cluster"); + + /* REPLICATION: now we can start replicate own events */ + struct node_worker_pool* master_pool = + node_worker_start(&node, NODE_WORKER_MASTER, (size_t)opts.masters); + if (opts.masters > 0 && !master_pool) + { + NODE_FATAL("Failed to create master worker pool"); + return 1; + } + + node_stats_loop(&node, (int)opts.period); + + /* REPLICATON: to shut down we go in the opposite order: + * first - disconnect from the cluster to signal master threads + * to exit loop, + * second - join master and slave threads, + * third - close provider once not in use */ + node_wsrep_disconnect(node.wsrep); + + node_worker_stop(master_pool); + node_worker_stop(slave_pool); + + node_wsrep_close(node.wsrep); + + /* and finally, when the storage can no longer be disturbed, close it */ + node_store_close(node.store); + + return 0; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/node.sh b/wsrep-lib/wsrep-API/v26/examples/node/node.sh new file mode 100755 index 00000000..40e0a498 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/node.sh @@ -0,0 +1,40 @@ +#!/bin/sh -eu + +NODE_ID=$1 + +NODE_NAME=${NODE_NAME:-$NODE_ID} + +NODE_DIR=${NODE_DIR:-/tmp/node/$NODE_NAME} +rm -rf $NODE_DIR/* +mkdir -p $NODE_DIR + +NODE_OPT=${NODE_OPT:-} + +NODE_HOST=${NODE_HOST:-localhost} +NODE_PORT=${NODE_PORT:-$((10000 + $NODE_ID))} + +NODE_CLIENTS=${NODE_CLIENTS:-1} +NODE_APPLIERS=${NODE_APPLIERS:-1} + +NODE_ADDR=${NODE_ADDR:-} + +NODE_BIN=${NODE_BIN:-$(dirname $0)/node} + +# convert possible relative path to absolute path +NODE_PROVIDER=$(realpath $NODE_PROVIDER) + +set -x + +$NODE_BIN \ +-v "$NODE_PROVIDER" \ +-n "$NODE_NAME" \ +-f "$NODE_DIR" \ +-o "$NODE_OPT" \ +-t "$NODE_HOST" \ +-p $NODE_PORT \ +-s $NODE_APPLIERS \ +-m $NODE_CLIENTS \ +-d 10 \ +-a "$NODE_ADDR" + +set +x diff --git a/wsrep-lib/wsrep-API/v26/examples/node/options.c b/wsrep-lib/wsrep-API/v26/examples/node/options.c new file mode 100644 index 00000000..0bd08ffb --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/options.c @@ -0,0 +1,291 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "options.h" + +#include <ctype.h> // isspace() +#include <errno.h> +#include <getopt.h> +#include <stdio.h> +#include <stdlib.h> // strtol() +#include <string.h> // strcmp() + +/* + * getopt_long() declarations begin + */ + +#define OPTS_NA no_argument +#define OPTS_RA required_argument +#define OPTS_OA optional_argument + +typedef enum opt +{ + OPTS_NOOPT = 0, + OPTS_ADDRESS = 'a', + OPTS_BOOTSTRAP = 'b', + OPTS_DELAY = 'd', + OPTS_DATA_DIR = 'f', + OPTS_HELP = 'h', + OPTS_PERIOD = 'i', + OPTS_MASTERS = 'm', + OPTS_NAME = 'n', + OPTS_OPTIONS = 'o', + OPTS_BASE_PORT = 'p', + OPTS_RECORDS = 'r', + OPTS_SLAVES = 's', + OPTS_BASE_HOST = 't', + OPTS_PROVIDER = 'v', + OPTS_WS_SIZE = 'w', + OPTS_OPS = 'x' +} + opt_t; + +static struct option s_opts[] = +{ + { "address", OPTS_RA, NULL, OPTS_ADDRESS }, + { "bootstrap", OPTS_NA, NULL, OPTS_BOOTSTRAP }, + { "delay", OPTS_RA, NULL, OPTS_DELAY }, + { "storage", OPTS_RA, NULL, OPTS_DATA_DIR }, + { "help", OPTS_NA, NULL, OPTS_HELP }, + { "period", OPTS_RA, NULL, OPTS_PERIOD }, + { "masters", OPTS_RA, NULL, OPTS_MASTERS }, + { "name", OPTS_RA, NULL, OPTS_NAME }, + { "options", OPTS_RA, NULL, OPTS_OPTIONS, }, + { "base-port", OPTS_RA, NULL, OPTS_BASE_PORT }, + { "records", OPTS_RA, NULL, OPTS_RECORDS }, + { "slaves", OPTS_RA, NULL, OPTS_SLAVES }, + { "base-host", OPTS_RA, NULL, OPTS_BASE_HOST }, + { "provider", OPTS_RA, NULL, OPTS_PROVIDER }, + { "size", OPTS_RA, NULL, OPTS_WS_SIZE }, + { "ops", OPTS_RA, NULL, OPTS_OPS }, + { NULL, 0, NULL, 0 } +}; + +static const char* opts_string = "a:d:f:hi:m:n:o:p:r:s:t:v:w:x:"; + +/* + * getopt_long() declarations end + */ + +static const struct node_options opts_defaults = +{ + .provider = "none", + .address = "", + .options = "", + .name = "unnamed", + .data_dir = ".", + .base_host = "localhost", + .masters = 0, + .slaves = 1, + .ws_size = 1024, + .records = 1024*1024, + .delay = 0, + .base_port = 4567, + .period = 10, + .operations= 1, + .bootstrap = true +}; + +static void +opts_print_help(FILE* out, const char* prog_name) +{ + fprintf( + out, + "Usage: %s [OPTION...]\n" + "\n" + " -h, --help this thing.\n" + " -v, --provider=PATH a path to wsrep provider library file.\n" + " -a, --address=STRING list of node addresses in the group.\n" + " If not set the node assumes that it is the first\n" + " node in the group (default)\n" + " -o, --options=STRING a string of wsrep provider options.\n" + " -n, --name=STRING human-readable node name.\n" + " -f, --data-dir=PATH a directory to save working data in.\n" + " Should be private to the process.\n" + " -t, --base-host=ADDRESS address of this node at which other members can\n" + " connect to it\n" + " -p, --base-port=NUM base port which the node shall listen for\n" + " connections from other members. This port will be\n" + " used for replication, port+1 for IST and port+2\n" + " for SST. Default: 4567\n" + " -m, --masters=NUM number of concurrent master workers.\n" + " -s, --slaves=NUM number of concurrent slave workers.\n" + " (can't be less than 1)\n" + " -w, --size=NUM desirable size of the resulting writesets\n" + " (approximate lower boundary). Default: 1K\n" + " -r, --records=NUM number of records in the store. Default: 1M\n" + " -x, --ops=NUM number of operations per transaction. Default: 1\n" + " -d, --delay=NUM delay in milliseconds between \"commits\"\n" + " (per master thread).\n" + " -b, --bootstrap bootstrap the cluster with this node.\n" + " Default: 'Yes' if --address is not given, 'No'\n" + " otherwise.\n" + " -i, --period period in seconds between performance stats output\n" + "\n" + , prog_name); +} + +static void +opts_print_config(FILE* out, const struct node_options* opts) +{ + fprintf( + out, + "Continuing with the following configuration:\n" + "provider: %s\n" + "address: %s\n" + "options: %s\n" + "name: %s\n" + "data dir: %s\n" + "base addr: %s:%ld\n" + "masters: %ld\n" + "slaves: %ld\n" + "writeset size: %ld bytes\n" + "records: %ld\n" + "operations: %ld\n" + "commit delay: %ld ms\n" + "stats period: %ld s\n" + "bootstrap: %s\n" + , + opts->provider, opts->address, opts->options, opts->name, opts->data_dir, + opts->base_host, opts->base_port, + opts->masters, opts->slaves, opts->ws_size, opts->records, + opts->operations, + opts->delay, opts->period, opts->bootstrap ? "Yes" : "No" + ); +} + +static int +opts_check_conversion(int cond, const char* ptr, int idx) +{ + if (!cond || errno || (*ptr != '\0' && !isspace(*ptr))) + { + fprintf(stderr, "Bad value for %s option.\n", s_opts[idx].name); + return EINVAL; + } + return 0; +} + +int +node_options_read(int argc, char* argv[], struct node_options* opts) +{ + *opts = opts_defaults; + + int opt = 0; + int opt_idx = 0; + char* endptr; + int ret = 0; + + bool address_given = false; + bool bootstrap_given = false; + + while ((opt = getopt_long(argc, argv, opts_string, s_opts, &opt_idx)) != -1) + { + switch (opt) + { + case OPTS_ADDRESS: + address_given = strcmp(opts->address, optarg); + opts->address = optarg; + break; + case OPTS_BOOTSTRAP: + bootstrap_given = true; + opts->bootstrap = true; + break; + case OPTS_DELAY: + opts->delay = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->delay >= 0, endptr, opt_idx))) + goto err; + break; + case OPTS_DATA_DIR: + opts->data_dir = optarg; + break; + case OPTS_HELP: + ret = 1; + goto help; + case OPTS_PERIOD: + opts->period = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->period > 0, endptr, opt_idx))) + goto err; + break; + case OPTS_MASTERS: + opts->masters = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->masters >= 0, endptr, + opt_idx))) + goto err; + break; + case OPTS_NAME: + opts->name = optarg; + break; + case OPTS_OPTIONS: + opts->options = optarg; + break; + case OPTS_BASE_PORT: + opts->base_port = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion( + opts->base_port > 0 && opts->base_port < 65536, + endptr, opt_idx))) + goto err; + break; + case OPTS_RECORDS: + opts->records = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->records >= 0, endptr, + opt_idx))) + goto err; + break; + case OPTS_SLAVES: + opts->slaves = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->slaves > 0, endptr, opt_idx))) + goto err; + break; + case OPTS_BASE_HOST: + opts->base_host = optarg; + break; + case OPTS_PROVIDER: + opts->provider = optarg; + break; + case OPTS_WS_SIZE: + opts->ws_size = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->ws_size > 0, endptr, + opt_idx))) + goto err; + break; + case OPTS_OPS: + opts->operations = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->operations >= 1, endptr, + opt_idx))) + goto err; + break; + default: + ret = EINVAL; + } + } + +help: + if (ret) { + opts_print_help(stderr, argv[0]); + } + else + { + if (!bootstrap_given) + { + opts->bootstrap = !address_given; + } + opts_print_config(stdout, opts); + opts->delay *= 1000; /* convert to microseconds for usleep() */ + } + +err: + return ret; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/options.h b/wsrep-lib/wsrep-API/v26/examples/node/options.h new file mode 100644 index 00000000..62172281 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/options.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines options interface + */ + +#ifndef NODE_OPTIONS_H +#define NODE_OPTIONS_H + +#include <stdbool.h> + +struct node_options +{ + const char* provider; // path to wsrep provider + const char* address; // wsrep cluster address string + const char* options; // wsrep option string + const char* name; // node name (for logging purposes) + const char* data_dir; // name of the storage file + const char* base_host;// host own address + long masters; // number of master threads + long slaves; // number of slave threads + long ws_size; // desired writeset size + long records; // total number of records + long delay; // delay between commits + long base_port;// base port to use + long period; // statistics output interval + long operations;// number of "statements" in a "transaction" + bool bootstrap;// bootstrap the cluster with this node +}; + +extern int +node_options_read(int argc, char* argv[], struct node_options* opts); + +#endif /* NODE_OPTIONS_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/socket.c b/wsrep-lib/wsrep-API/v26/examples/node/socket.c new file mode 100644 index 00000000..377abcaf --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/socket.c @@ -0,0 +1,304 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "socket.h" + +#include "log.h" + +#include <assert.h> +#include <ctype.h> // isspace() +#include <errno.h> +#include <limits.h> // USHRT_MAX +#include <netdb.h> // struct addrinfo +#include <stdio.h> // snprintf() +#include <string.h> // strerror() +#include <sys/socket.h> // bind(), connect(), accept(), send(), recv() + +struct node_socket +{ + int fd; +}; + +/** + * Initializes addrinfo from the separate host address and port arguments + * + * Requires calling freeaddrinfo() later + * + * @param[in] host - if NULL, will be initialized for listening + * @param[in] port + * + * @return struct addrinfo* or NULL in case of error + */ +static struct addrinfo* +socket_get_addrinfo2(const char* const host, + uint16_t const port) +{ + struct addrinfo const hints = + { + .ai_flags = AI_PASSIVE | /** will be ignored if host is not NULL */ + AI_NUMERICSERV, /** service is a numeric port */ + .ai_family = AF_UNSPEC, /** either IPv4 or IPv6 */ + .ai_socktype = SOCK_STREAM, /** STREAM or DGRAM */ + .ai_protocol = 0, + .ai_addrlen = 0, + .ai_addr = NULL, + .ai_canonname = NULL, + .ai_next = NULL + }; + + char service[6]; + snprintf(service, sizeof(service), "%hu", port); + + struct addrinfo* info; + int err = getaddrinfo(host, service, &hints, &info); + if (err) + { + NODE_ERROR("Failed to resolve '%s': %d (%s)", + host, err, gai_strerror(err)); + return NULL; + } + + return info; +} + +/** + * Initializes addrinfo from single address and port string + * The port is expected to be in numerical form and appended to the host address + * via colon. + * + * Requires calling freeaddrinfo() later + * + * @param[in] addr full address specification, including port + * + * @return struct addrinfo* or NULL in case of error + */ +static struct addrinfo* +socket_get_addrinfo1(const char* const addr) +{ + int const addr_len = (int)strlen(addr); + char* const addr_buf = strdup(addr); + if (!addr_buf) + { + NODE_ERROR("strdup(%s) failed: %d (%s)", addr, errno, strerror(errno)); + return NULL; + } + + struct addrinfo* res = NULL; + long port; + char* endptr; + + int i; + for (i = addr_len - 1; i >= 0; i--) + { + if (addr_buf[i] == ':') break; + } + + if (addr_buf[i] != ':') + { + NODE_ERROR("Malformed address:port string: '%s'", addr); + goto end; + } + + addr_buf[i] = '\0'; + port = strtol(addr_buf + i + 1, &endptr, 10); + + if (port <= 0 || port > USHRT_MAX || errno || + (*endptr != '\0' && !isspace(*endptr))) + { + NODE_ERROR("Malformed/invalid port: '%s'. Errno: %d (%s)", + addr_buf + i + 1, errno, strerror(errno)); + goto end; + } + + res = socket_get_addrinfo2(strlen(addr_buf) > 0 ? addr_buf : NULL, + (uint16_t)port); +end: + free(addr_buf); + return res; +} + +static struct node_socket* +socket_create(int const fd) +{ + assert(fd > 0); + + struct node_socket* res = calloc(1, sizeof(struct node_socket)); + if (res) + { + res->fd = fd; + } + else + { + NODE_ERROR("Failed to allocate struct node_socket: %d (%s)", + errno, strerror(errno)); + close(fd); + } + + return res; +} + +/** + * Definition of function type with the signature of bind() and connect() + */ +typedef int (*socket_act_fun_t) (int sfd, + const struct sockaddr* addr, + socklen_t addrlen); + +static int +socket_bind_and_listen(int const sfd, + const struct sockaddr* const addr, + socklen_t const addrlen) +{ + int ret = bind(sfd, addr, addrlen); + + if (!ret) + ret = listen(sfd, SOMAXCONN); + + return ret; +} + +/** + * A "template" method to do the "right thing" with the addrinfo and create a + * socket from it. The "right thing" would normally be bind and listen for + * a server socket OR connect for a client socket. + * + * @param[in] info addrinfo list, swallowed and deallocated + * @param[in] action_fun the "right thing" to do on socket and struct sockaddr + * @param[in] action_str action description to be printed in the error message + * @param[in] orig_host host address to be pronted in the error message + * @param[in] orig_port port to be printed in the error message, if orig_host + * string contains the port, this parameter should be 0 + * + * The last three parameters are for diagnostic puposes only. orig_host and + * orig_port are supposed to be what were used to obtain addrinfo. + * + * @return new struct node_socket. + */ +static struct node_socket* +socket_from_addrinfo(struct addrinfo* const info, + socket_act_fun_t const action_fun, + const char* const action_str, + const char* const orig_host, + uint16_t const orig_port) +{ + int sfd; + int err = 0; + + /* Iterate over addrinfo list and try to apply action_fun on the resulting + * socket. Once successful, break loop. */ + struct addrinfo* addr; + for (addr = info; addr != NULL; addr = addr->ai_next) + { + sfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (sfd == -1) + { + err = errno; + continue; + } + + if (action_fun(sfd, addr->ai_addr, addr->ai_addrlen) == 0) break; + + err = errno; + close(sfd); + } + + freeaddrinfo(info); /* no longer needed */ + + if (!addr) + { + NODE_ERROR("Failed to %s to '%s%s%.0hu': %d (%s)", + action_str, + orig_host ? orig_host : "", orig_port > 0 ? ":" : "", + orig_port > 0 ? orig_port : 0, /* won't be printed if 0 */ + err, strerror(err)); + return NULL; + } + + assert(sfd > 0); + return socket_create(sfd); +} + +struct node_socket* +node_socket_listen(const char* const host, uint16_t const port) +{ + struct addrinfo* const info = socket_get_addrinfo2(host, port); + if (!info) return NULL; + + return socket_from_addrinfo(info, socket_bind_and_listen, + "bind a listening socket", host, port); +} + +struct node_socket* +node_socket_connect(const char* const addr_str) +{ + struct addrinfo* const info = socket_get_addrinfo1(addr_str); + if (!info) return NULL; + + return socket_from_addrinfo(info, connect, "connect", addr_str, 0); +} + +struct node_socket* +node_socket_accept(struct node_socket* socket) +{ + int sfd = accept(socket->fd, NULL, NULL); + + if (sfd < 0) + { + NODE_ERROR("Failed to accept connection: %d (%s)", + errno, strerror(errno)); + return NULL; + } + + return socket_create(sfd); +} + +int +node_socket_send_bytes(node_socket_t* socket, const void* buf, size_t len) +{ + ssize_t const ret = send(socket->fd, buf, len, MSG_NOSIGNAL); + + if (ret != (ssize_t)len) + { + NODE_ERROR("Failed to send %zu bytes: %d (%s)", errno, strerror(errno)); + return -1; + } + + return 0; +} + +int +node_socket_recv_bytes(node_socket_t* socket, void* buf, size_t len) +{ + ssize_t const ret = recv(socket->fd, buf, len, MSG_WAITALL); + + if (ret != (ssize_t)len) + { + NODE_ERROR("Failed to recv %zu bytes: %d (%s)", errno, strerror(errno)); + return -1; + } + + return 0; +} + +void +node_socket_close(node_socket_t* socket) +{ + if (!socket) return; + + if (socket->fd > 0) close(socket->fd); + + free(socket); +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/socket.h b/wsrep-lib/wsrep-API/v26/examples/node/socket.h new file mode 100644 index 00000000..3a77eff3 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/socket.h @@ -0,0 +1,72 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit implements auxiliary networking functions (for SST purposes) + * It has nothing wsrep related and is not of general purpose. + */ + +#ifndef NODE_SOCKET_H +#define NODE_SOCKET_H + +#include <stddef.h> // size_t +#include <stdint.h> // uint16_t + +typedef struct node_socket node_socket_t; + +/** + * Open listening socket at a given address + * + * @return listening socket + */ +extern node_socket_t* +node_socket_listen(const char* host, uint16_t port); + +/** + * Connect to a given address. + * + * @return connected socket + */ +extern node_socket_t* +node_socket_connect(const char* addr); + +/** + * Wait for connection on a listening socket + * @return connected socket + */ +extern node_socket_t* +node_socket_accept(node_socket_t* s); + +/** + * Send a given number of bytes + * @return 0 or a negative error code + */ +extern int +node_socket_send_bytes(node_socket_t* s, const void* buf, size_t len); + +/** + * Receive a given number of bytes + * @return 0 or a negative error code + */ +extern int +node_socket_recv_bytes(node_socket_t* s, void* buf, size_t len); + +/** + * Release all recources associated with the socket */ +extern void +node_socket_close(node_socket_t* s); + +#endif /* NODE_SOCKET_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/sst.c b/wsrep-lib/wsrep-API/v26/examples/node/sst.c new file mode 100644 index 00000000..e93534ef --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/sst.c @@ -0,0 +1,372 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "sst.h" + +#include "ctx.h" +#include "log.h" +#include "socket.h" + +#include <arpa/inet.h> // htonl() +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <stdio.h> // snprintf() +#include <stdlib.h> // abort() +#include <string.h> // strdup() +#include <unistd.h> // usleep() + +/** + * Helper: creates detached thread */ +static int +sst_create_thread(void* (*thread_routine) (void*), + void* const thread_arg) +{ + pthread_t thr; + pthread_attr_t attr; + int ret = pthread_attr_init(&attr); + ret = ret ? ret : pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); + ret = ret ? ret : pthread_create(&thr, &attr, thread_routine, thread_arg); + return ret; +} + +/** + * Helper: creates detached thread and waits for it to call + * sst_sync_with_parent() */ +static void +sst_create_and_sync(const char* const role, + pthread_mutex_t* const mtx, + pthread_cond_t* const cond, + void* (*thread_routine) (void*), + void* const thread_arg) +{ + int ret = pthread_mutex_lock(mtx); + if (ret) + { + NODE_FATAL("Failed to lock %s mutex: %d (%s)", role, ret, strerror(ret)); + abort(); + } + + ret = sst_create_thread(thread_routine, thread_arg); + if (ret) + { + NODE_FATAL("Failed to create detached %s thread: %d (%s)", + role, ret, strerror(ret)); + abort(); + } + + ret = pthread_cond_wait(cond, mtx); + if (ret) + { + NODE_FATAL("Failed to synchronize with %s thread: %d (%s)", + role, ret, strerror(ret)); + abort(); + } + + pthread_mutex_unlock(mtx); +} + +/** + * Helper: syncs with parent thread and allows it to continue and return + * asynchronously */ +static void +sst_sync_with_parent(const char* role, + pthread_mutex_t* mtx, + pthread_cond_t* cond) +{ + int ret = pthread_mutex_lock(mtx); + if (ret) + { + NODE_FATAL("Failed to lock %s mutex: %d (%s)", role, ret, strerror(ret)); + abort(); + } + + NODE_INFO("Initialized %s thread", role); + + pthread_cond_signal(cond); + pthread_mutex_unlock(mtx); +} + +static pthread_mutex_t sst_joiner_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sst_joiner_cond = PTHREAD_COND_INITIALIZER; + +struct sst_joiner_ctx +{ + struct node_ctx* node; + node_socket_t* socket; +}; + +/** + * waits for SST completion and signals the provider to continue */ +static void* +sst_joiner_thread(void* ctx) +{ + assert(ctx); + + struct node_ctx* const node = ((struct sst_joiner_ctx*)ctx)->node; + node_socket_t* const listen = ((struct sst_joiner_ctx*)ctx)->socket; + ctx = NULL; /* may be unusable after next statement */ + + /* this allows parent callback to return */ + sst_sync_with_parent("JOINER", &sst_joiner_mtx, &sst_joiner_cond); + + wsrep_gtid_t state_gtid = WSREP_GTID_UNDEFINED; + int err = -1; + + /* REPLICATION: wait for donor to connect and send the state snapshot */ + node_socket_t* const connected = node_socket_accept(listen); + if (!connected) goto end; + + uint32_t state_len; + err = node_socket_recv_bytes(connected, &state_len, sizeof(state_len)); + if (err) goto end; + + state_len = ntohl(state_len); + if (state_len > 0) + { + /* REPLICATION: get the state of state_len size */ + void* state = malloc(state_len); + if (state) + { + err = node_socket_recv_bytes(connected, state, state_len); + if (err) + { + free(state); + goto end; + } + + /* REPLICATION: install the newly received state. */ + err = node_store_init_state(node->store, state, state_len); + free(state); + if (err) goto end; + } + else + { + NODE_ERROR("Failed to allocate %zu bytes for state snapshot.", + state_len); + err = -ENOMEM; + goto end; + } + } + else + { + /* REPLICATION: it was a bypass, the node will receive missing data via + * IST. It starts with the state it currently has. */ + } + + /* REPLICATION: find gtid of the received state to report to provider */ + node_store_gtid(node->store, &state_gtid); + +end: + assert(err <= 0); + node_socket_close(connected); + node_socket_close(listen); + + /* REPLICATION: tell provider that SST is received */ + wsrep_status_t sst_ret; + wsrep_t* const wsrep = node_wsrep_provider(node->wsrep); + sst_ret = wsrep->sst_received(wsrep, &state_gtid, NULL, err); + + if (WSREP_OK != sst_ret) + { + NODE_FATAL("Failed to report completion of SST: %d", sst_ret); + abort(); + } + + return NULL; +} + +enum wsrep_cb_status +node_sst_request_cb (void* const app_ctx, + void** const sst_req, + size_t* const sst_req_len) +{ + static int const SST_PORT_OFFSET = 2; + + assert(app_ctx); + struct node_ctx* const node = app_ctx; + const struct node_options* const opts = node->opts; + + char* sst_str = NULL; + + /* REPLICATION: 1. prepare the node to receive SST */ + uint16_t const sst_port = (uint16_t)(opts->base_port + SST_PORT_OFFSET); + size_t const sst_len = strlen(opts->base_host) + + 1 /* ':' */ + 5 /* max port len */ + 1 /* \0 */; + sst_str = malloc(sst_len); + if (!sst_str) + { + NODE_ERROR("Failed to allocate %zu bytes for SST request", sst_len); + goto end; + } + + /* write in request the address at which we listen */ + int ret = snprintf(sst_str, sst_len, "%s:%hu", opts->base_host, sst_port); + if (ret < 0 || (size_t)ret >= sst_len) + { + free(sst_str); + sst_str = NULL; + NODE_ERROR("Failed to write a SST request"); + goto end; + } + + node_socket_t* const socket = node_socket_listen(NULL, sst_port); + if (!socket) + { + free(sst_str); + sst_str = NULL; + NODE_ERROR("Failed to listen at %s", sst_str); + goto end; + } + + /* REPLICATION 2. start the "joiner" thread that will wait for SST and + * report its success to provider, and syncronize with it. */ + struct sst_joiner_ctx ctx = + { + .node = node, + .socket = socket + }; + sst_create_and_sync("JOINER", &sst_joiner_mtx, &sst_joiner_cond, + sst_joiner_thread, &ctx); + + NODE_INFO("Waiting for SST at %s", sst_str); + +end: + if (sst_str) + { + *sst_req = sst_str; + *sst_req_len = strlen(sst_str) + 1; + } + else + { + *sst_req = NULL; + *sst_req_len = 0; + return WSREP_CB_FAILURE; + } + + /* REPLICATION 3. return SST request to provider */ + return WSREP_CB_SUCCESS; +} + +static pthread_mutex_t sst_donor_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sst_donor_cond = PTHREAD_COND_INITIALIZER; + +struct sst_donor_ctx +{ + wsrep_gtid_t state; + struct node_ctx* node; + node_socket_t* socket; + wsrep_bool_t bypass; +}; + +/** + * donates SST and signals provider that it is done. */ +static void* +sst_donor_thread(void* const args) +{ + struct sst_donor_ctx const ctx = *(struct sst_donor_ctx*)args; + + int err = 0; + const void* state; + size_t state_len; + + if (ctx.bypass) + { + /* REPLICATION: if bypass is true, there is no need to send snapshot, + * just signal the joiner that snapshot is not needed and + * it can proceed to apply IST. We'll do it by sending 0 + * for the size of snapshot */ + state = NULL; + state_len = 0; + } + else + { + /* REPLICATION: if bypass is false, we need to send a full state snapshot + * Get hold of the state, which is currently just GTID + * NOTICE that while parent is waiting, the store is in a + * quiescent state, provider blocking any modifications. */ + err = node_store_acquire_state(ctx.node->store, &state, &state_len); + if (state_len > UINT32_MAX) err = -ERANGE; + } + + /* REPLICATION: after getting hold of the state we can allow parent callback + * to return and the node to resume its normal operation */ + sst_sync_with_parent("DONOR", &sst_donor_mtx, &sst_donor_cond); + + if (err >= 0) + { + uint32_t tmp = htonl((uint32_t)state_len); + err = node_socket_send_bytes(ctx.socket, &tmp, sizeof(tmp)); + } + + if (state_len != 0) + { + if (err >= 0) + { + assert(state); + err = node_socket_send_bytes(ctx.socket, state, state_len); + } + + node_store_release_state(ctx.node->store); + } + + node_socket_close(ctx.socket); + + /* REPLICATION: signal provider the success of the operation */ + wsrep_t* const wsrep = node_wsrep_provider(ctx.node->wsrep); + wsrep->sst_sent(wsrep, &ctx.state, err); + + return NULL; +} + +enum wsrep_cb_status +node_sst_donate_cb (void* const app_ctx, + void* const recv_ctx, + const wsrep_buf_t* const str_msg, + const wsrep_gtid_t* const state_id, + const wsrep_buf_t* const state, + wsrep_bool_t const bypass) +{ + (void)recv_ctx; + (void)state; + + struct sst_donor_ctx ctx = + { + .node = app_ctx, + .state = *state_id, + .bypass = bypass + }; + + /* we are expecting a human-readable 0-terminated string */ + void* p = memchr(str_msg->ptr, '\0', str_msg->len); + if (!p) + { + NODE_ERROR("Received a badly formed State Transfer Request."); + /* REPLICATION: in case of a failure we return the status to provider, so + * that the joining node can be notified of it by cluster */ + return WSREP_CB_FAILURE; + } + + const char* addr = str_msg->ptr; + ctx.socket = node_socket_connect(addr); + + if (!ctx.socket) return WSREP_CB_FAILURE; + + sst_create_and_sync("DONOR", &sst_donor_mtx, &sst_donor_cond, + sst_donor_thread, &ctx); + + return WSREP_CB_SUCCESS; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/sst.h b/wsrep-lib/wsrep-API/v26/examples/node/sst.h new file mode 100644 index 00000000..7006a1b6 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/sst.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines SST interface + */ + +#ifndef NODE_SST_H +#define NODE_SST_H + +#include "../../wsrep_api.h" + +extern enum wsrep_cb_status +node_sst_request_cb (void* app_ctx, + void** sst_req, + size_t* sst_req_len); + +extern enum wsrep_cb_status +node_sst_donate_cb (void* app_ctx, + void* recv_ctx, + const wsrep_buf_t* str_msg, + const wsrep_gtid_t* state_id, + const wsrep_buf_t* state, + wsrep_bool_t bypass); + +#endif /* NODE_SST_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/stats.c b/wsrep-lib/wsrep-API/v26/examples/node/stats.c new file mode 100644 index 00000000..4b02240f --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/stats.c @@ -0,0 +1,215 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "stats.h" + +#include "log.h" + +#include <assert.h> +#include <errno.h> +#include <stdio.h> // snprintf() +#include <stdlib.h> // abort() +#include <string.h> // strcmp() +#include <unistd.h> // usleep() + +enum +{ + STATS_REPL_BYTE, + STATS_REPL_WS, + STATS_RECV_BYTE, + STATS_RECV_WS, + STATS_TOTAL_BYTE, + STATS_TOTAL_WS, + STATS_CERT_FAILS, + STATS_STORE_FAILS, + STATS_FC_PAUSED, + STATS_MAX +}; + +static const char* const stats_legend[STATS_MAX] = +{ + " repl(B/s)", + " repl(W/s)", + " recv(B/s)", + " recv(W/s)", + "total(B/s)", + "total(W/s)", + " cert.fail", + " stor.fail", + " paused(%)" +}; + +/* stats IDs in provider output - provider dependent, here we use Galera's */ +static const char* const galera_ids[STATS_MAX] = +{ + "replicated_bytes", /**< STATS_REPL_BYTE */ + "replicated", /**< STATS_REPL_WS */ + "received_bytes", /**< STATS_RECV_BYTE */ + "received", /**< STATS_RECV_WS */ + "", /**< STATS_TOTAL_BYTE */ + "", /**< STATS_TOTAL_WS */ + "local_cert_failures", /**< STATS_CERT_FAILS */ + "", /**< STATS_STORE_FAILS */ + "flow_control_paused_ns" /**< STATS_FC_PAUSED */ +}; + +/* maps local stats IDs to provider stat IDs */ +static int stats_galera_map[STATS_MAX]; + +/** + * Helper to map provider stats to own stats set */ +static void +stats_establish_mapping(wsrep_t* const wsrep) +{ + int const magic_map = -1; + size_t i; + for (i = 0; i < sizeof(stats_galera_map)/sizeof(stats_galera_map[0]); i++) + { + stats_galera_map[i] = magic_map; /* initialize map array */ + } + + struct wsrep_stats_var* const stats = wsrep->stats_get(wsrep); + + /* to compensate for STATS_TOTAL_* and STATS_STORE_FAILS having no + * counterparts */ + int mapped = 3; + + i = 0; + while (stats[i].name) /* stats array is terminated by Null name */ + { + int j; + for (j = 0; j < STATS_MAX; j++) + { + if (magic_map == stats_galera_map[j] /* j-th member still unset */ + && + !strcmp(stats[i].name, galera_ids[j])) + { + stats_galera_map[j] = (int)i; + mapped++; + if (STATS_MAX == mapped) /* all mapped */ goto out; + } + } + + i++; + } + +out: + wsrep->stats_free(wsrep, stats); +} + +static void +stats_get(node_store_t* const store, wsrep_t* const wsrep, long long stats[]) +{ + stats[STATS_STORE_FAILS] = node_store_read_view_failures(store); + + struct wsrep_stats_var* const ret = wsrep->stats_get(wsrep); + if (!ret) + { + NODE_FATAL("wsrep::stats_get() call failed."); + abort(); + } + + int i; + for (i = 0; i < STATS_MAX; i++) + { + int j = stats_galera_map[i]; + if (j >= 0) + { + assert(WSREP_VAR_INT64 == ret[j].type); + stats[i] = ret[j].value._int64; + } + } + + wsrep->stats_free(wsrep, ret); + + // totals are just sums + stats[STATS_TOTAL_BYTE] = stats[STATS_REPL_BYTE] + stats[STATS_RECV_BYTE]; + stats[STATS_TOTAL_WS ] = stats[STATS_REPL_WS ] + stats[STATS_RECV_WS ]; +} + +static void +stats_print(long long bef[], long long aft[], double period) +{ + double rate[STATS_MAX]; + int i; + for (i = 0; i < STATS_MAX; i++) + { + rate[i] = (double)(aft[i] - bef[i])/period; + } + rate[STATS_FC_PAUSED] /= 1.0e+07; // nanoseconds to % of seconds + + char str[256]; + int written = 0; + + /* first line write legend */ + for (i = 0; i < STATS_MAX; i++) + { + size_t const space_left = sizeof(str) - (size_t)written; + written += snprintf(&str[written], space_left, "%s", stats_legend[i]); + } + + str[written] = '\n'; + written++; + + /* second line write values */ + for (i = 0; i < STATS_MAX; i++) + { + size_t const space_left = sizeof(str) - (size_t)written; + long long const value = (long long)rate[i]; + written += snprintf(&str[written], space_left, " %9lld", value); + } + + str[written] = '\0'; + + /* use logging macro for timestamp */ + NODE_INFO("\n%s", str); +} + +void +node_stats_loop(const struct node_ctx* const node, int const period) +{ + double const period_sec = period; + useconds_t const period_usec = (useconds_t)period * 1000000; + + wsrep_t* const wsrep = node_wsrep_provider(node->wsrep); + stats_establish_mapping(wsrep); + + long long stats1[STATS_MAX]; + long long stats2[STATS_MAX]; + + stats_get(node->store, wsrep, stats1); + + while (1) + { + if (usleep(period_usec)) break; + stats_get(node->store, wsrep, stats2); + stats_print(stats1, stats2, period_sec); + + if (usleep(period_usec)) break; + stats_get(node->store, wsrep, stats1); + stats_print(stats2, stats1, period_sec); + } + + if (EINTR != errno) + { + NODE_ERROR("Unexpected usleep(%lld) error: %d (%s)", + (long long)period_usec, errno, strerror(errno)); + } + else + { + /* interrupted by signal */ + } +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/stats.h b/wsrep-lib/wsrep-API/v26/examples/node/stats.h new file mode 100644 index 00000000..f7ab7ef4 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/stats.h @@ -0,0 +1,35 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines performance statistics loop + */ + +#ifndef NODE_STATS_H +#define NODE_STATS_H + +#include "ctx.h" + +/** + * Prints out statistics with a given period. + * + * @param[in] node node context + * @param[in] period in seconds + */ +extern void +node_stats_loop(const struct node_ctx* node, int period); + +#endif /* NODE_STATS_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/store.c b/wsrep-lib/wsrep-API/v26/examples/node/store.c new file mode 100644 index 00000000..1dc2d6c1 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/store.c @@ -0,0 +1,1044 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "store.h" + +#include "log.h" + +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <stdbool.h> +#include <stddef.h> // ptrdiff_t +#include <stdint.h> // uintptr_t +#include <stdlib.h> // abort() +#include <string.h> // memset() + +#define DECLARE_SERIALIZE_INT(INTTYPE) \ + static inline size_t \ + store_serialize_##INTTYPE(void* const to, INTTYPE##_t const from) \ + { \ + memcpy(to, &from, sizeof(from)); /* for simplicity ignore endianness */ \ + return sizeof(from); \ + } + +DECLARE_SERIALIZE_INT(uint32); +DECLARE_SERIALIZE_INT(int64); + +#define DECLARE_DESERIALIZE_INT(INTTYPE) \ + static inline size_t \ + store_deserialize_##INTTYPE(INTTYPE##_t* const to, const void* const from) \ + { \ + memcpy(to, from, sizeof(*to)); /* for simplicity ignore endianness */ \ + return sizeof(*to); \ + } + +DECLARE_DESERIALIZE_INT(uint32); +DECLARE_DESERIALIZE_INT(int64); + +typedef struct record +{ + wsrep_seqno_t version; + uint32_t value; + /* this order ensures that there is no padding between the members */ +} +record_t; + +#define STORE_RECORD_SIZE \ + (sizeof(((record_t*)(NULL))->version) + sizeof(((record_t*)(NULL))->value)) + +static inline size_t +store_record_set(void* const base, + size_t const index, + const record_t* const record) +{ + char* const position = (char*)base + index*STORE_RECORD_SIZE; + memcpy(position, record, STORE_RECORD_SIZE); + return STORE_RECORD_SIZE; +} + +static inline size_t +store_record_get(const void* const base, + size_t const index, + record_t* const record) +{ + const char* const position = (const char*)base + index*STORE_RECORD_SIZE; + memcpy(record, position, STORE_RECORD_SIZE); + return STORE_RECORD_SIZE; +} + +static inline bool +store_record_equal(const record_t* const lhs, const record_t* const rhs) +{ + return (lhs->version == rhs->version) && (lhs->value == rhs->value); +} + +/* transaction context */ +struct store_trx_op +{ + /* Normally what we'd need for transaction context is the record index and + * new record value. Here we also save read view snapshot (rec_from & rec_to) + * to + * 1. test provider certification correctness if provider supports read view + * 2. if not, detect conflicts at a store level. */ + record_t rec_from; + record_t rec_to; + uint32_t idx_from; + uint32_t idx_to; + uint32_t new_value; + uint32_t size; /* nominal "size" of operation to manipulate on-the-wire + * writeset size. */ +}; + +#define STORE_OP_SIZE (STORE_RECORD_SIZE + STORE_RECORD_SIZE + \ + sizeof(((struct store_trx_op*)NULL)->idx_from) + \ + sizeof(((struct store_trx_op*)NULL)->idx_to) + \ + sizeof(((struct store_trx_op*)NULL)->new_value) + \ + sizeof(((struct store_trx_op*)NULL)->size)) + +struct store_trx_ctx +{ + wsrep_gtid_t rv_gtid; + size_t ops_num; + struct store_trx_op* ops; +}; + +static inline bool +store_trx_add_op(struct store_trx_ctx* const trx) +{ + struct store_trx_op* const new_ops = + realloc(trx->ops, sizeof(struct store_trx_op)*(trx->ops_num + 1)); + + if (new_ops) + { + trx->ops = new_ops; +#ifndef NDEBUG + memset(&trx->ops[trx->ops_num], 0, sizeof(*trx->ops)); +#endif + trx->ops_num++; + } + + return (NULL == new_ops); +} + +struct store_trx_entry +{ + bool used; + struct store_trx_ctx ctx; +}; + +typedef wsrep_uuid_t member_t; + +struct node_store +{ + wsrep_gtid_t gtid; + pthread_mutex_t gtid_mtx; + wsrep_trx_id_t trx_id; + pthread_mutex_t trx_id_mtx; + char* snapshot; + member_t* members; + void* records; + size_t op_size; + long read_view_fails; + uint32_t members_num; + uint32_t records_num; + uint32_t entries_mask; + bool read_view_support; // read view support by cluster + /* trx pool piggybacked */ +}; + +node_store_t* +node_store_open(const struct node_options* const opts) +{ + /* make the size of trx pool the next highest power of 2 over the total + * number of workers */ + uint32_t trx_pool_mask = (uint32_t)(opts->masters + opts->slaves); + if (trx_pool_mask > 0) + { + trx_pool_mask -= 1; + trx_pool_mask |= trx_pool_mask >> 1; + trx_pool_mask |= trx_pool_mask >> 2; + trx_pool_mask |= trx_pool_mask >> 4; + trx_pool_mask |= trx_pool_mask >> 8; + trx_pool_mask |= trx_pool_mask >> 16; + } + assert(((trx_pool_mask + 1) & trx_pool_mask) == 0); // 2^n - 1 + + size_t const desired_op_size = (size_t)(opts->ws_size/opts->operations); + size_t const op_size = (desired_op_size > STORE_OP_SIZE ? + desired_op_size : STORE_OP_SIZE); + + /* since the number of workers will never change, we can allocate trx pool + * together with the main store struc */ + size_t const store_alloc_size = sizeof(struct node_store) + + /* op_size - additional buffer for op serialization per trx */ + (sizeof(struct store_trx_entry) + op_size)*(trx_pool_mask + 1); + + struct node_store* const ret = malloc(store_alloc_size); + + if (ret) + { + memset(ret, 0, store_alloc_size); + ret->records = malloc((size_t)opts->records * STORE_RECORD_SIZE); + + if (ret->records) + { + ret->gtid = WSREP_GTID_UNDEFINED; + pthread_mutex_init(&ret->gtid_mtx, NULL); + pthread_mutex_init(&ret->trx_id_mtx, NULL); + ret->op_size = op_size; + ret->records_num = (uint32_t)opts->records; + ret->entries_mask = trx_pool_mask; + + uint32_t i; + for (i = 0; i < ret->records_num; i++) + { + /* keep state in serialized form for easy snapshotting */ + struct record const record = { WSREP_SEQNO_UNDEFINED, i }; + store_record_set(ret->records, i, &record); + } + + return ret; + } + else + { + free(ret); + } + } + + return NULL; +} + +void +node_store_close(struct node_store* const store) +{ + assert(store); + assert(store->records); + pthread_mutex_destroy(&store->gtid_mtx); + pthread_mutex_destroy(&store->trx_id_mtx); + free(store->records); + free(store->members); + free(store); +} + +#define STORE_MUTEX_LOCK(mtx) \ + { \ + int err = pthread_mutex_lock(mtx); \ + if (err) \ + { \ + NODE_FATAL("Failed to lock " #mtx ": %d (%s)", \ + err, strerror(err)); \ + abort(); \ + } \ + } + +static inline struct store_trx_entry* +store_get_trx_entry(struct node_store* const store, wsrep_trx_id_t const trx_id) +{ + return (struct store_trx_entry*) + ((char*)(store + 1) + (trx_id & store->entries_mask)* + (sizeof(struct store_trx_entry) + store->op_size)); +} + +static inline struct store_trx_ctx* +store_get_trx_ctx(struct node_store* const store, wsrep_trx_id_t const trx_id) +{ + return &(store_get_trx_entry(store, trx_id)->ctx); +} + +static inline wsrep_trx_id_t +store_new_trx_id(struct node_store* const store) +{ + wsrep_trx_id_t ret; + struct store_trx_entry* trx; + + STORE_MUTEX_LOCK(&store->trx_id_mtx); + + do + { + store->trx_id++; + trx = store_get_trx_entry(store, store->trx_id); + } + while (trx->used); + trx->used = true; + ret = store->trx_id; + + pthread_mutex_unlock(&store->trx_id_mtx); + + memset(&trx->ctx, 0, sizeof(trx->ctx)); + + return ret; +} + +static inline void +store_free_trx_id(struct node_store* const store, wsrep_trx_id_t const trx_id) +{ + struct store_trx_entry* const trx = store_get_trx_entry(store, trx_id); + assert(trx->used); + free(trx->ctx.ops); + + STORE_MUTEX_LOCK(&store->trx_id_mtx); + + trx->used = false; + + pthread_mutex_unlock(&store->trx_id_mtx); +} + +/** + * deserializes membership from snapshot */ +static int +store_new_members(const char* ptr, const char* const endptr, + uint32_t* const num, member_t** const memb) +{ + ptr += store_deserialize_uint32(num, ptr); + + if (*num < 2) + { + NODE_ERROR("Bogus number of members %u", *num); + return -1; + } + + int ret = (int)sizeof(*num); + + size_t const msize = sizeof(member_t) * *num; + if ((endptr - ptr) < (ptrdiff_t)msize) + { + NODE_ERROR("State snapshot does not contain all membership: " + "%zd < %zu", endptr - ptr, msize); + return -1; + } + + *memb = calloc(*num, sizeof(member_t)); + if (!*memb) + { + NODE_ERROR("Could not allocate new membership"); + return -ENOMEM; + } + + memcpy(*memb, ptr, msize); + + return ret + (int)msize; +} + +/** + * deserializes records from snapshot */ +static int +store_new_records(const char* ptr, const char* const endptr, + uint32_t* const num, void** const rec) +{ + ptr += store_deserialize_uint32(num, ptr); + + int ret = (int)sizeof(*num); + if (!*num) + { + *rec = NULL; + return ret; + } + + size_t const rsize = STORE_RECORD_SIZE * *num; + if ((endptr - ptr) < (ptrdiff_t)rsize) + { + NODE_ERROR("State snapshot does not contain all records: " + "%zu < %zu", endptr - ptr, rsize); + return -1; + } + + *rec = malloc(rsize); + if (!*rec) + { + NODE_ERROR("Could not allocate new records"); + return -ENOMEM; + } + + memcpy(*rec, ptr, rsize); + + return ret + (int)rsize; +} + +int +node_store_init_state(struct node_store* const store, + const void* const state, + size_t const state_len) +{ + /* First, deserialize and prepare new state */ + if (state_len <= sizeof(member_t)*2 /* at least two members */ + + WSREP_UUID_STR_LEN + 1 /* : */ + 1 /* seqno */ + 1 /* \0 */) + { + NODE_ERROR("State snapshot too short: %zu", state_len); + return -1; + } + + wsrep_gtid_t state_gtid; + int ret; + ret = wsrep_gtid_scan(state, state_len, &state_gtid); + if (ret < 0) + { + char state_str[WSREP_GTID_STR_LEN + 1] = { 0, }; + memcpy(state_str, state, sizeof(state_str) - 1); + NODE_ERROR("Could not find valid GTID in the received data: %s", + state_str); + return -1; + } + + ret++; /* \0 */ + if ((state_len - (size_t)ret) < sizeof(uint32_t)) + { + NODE_ERROR("State snapshot does not contain the number of members"); + return -1; + } + + const char* ptr = ((char*)state); + const char* const endptr = ptr + state_len; + ptr += ret; + + uint32_t m_num; + member_t* new_members; + ret = store_new_members(ptr, endptr, &m_num, &new_members); + if (ret < 0) + { + return ret; + } + ptr += ret; + + bool const read_view_support = ptr[0]; + ptr += 1; + + uint32_t r_num; + void* new_records; + ret = store_new_records(ptr, endptr, &r_num, &new_records); + if (ret < 0) + { + free(new_members); + return ret; + } + ptr += ret; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + /* just a sanity check */ + if (0 == wsrep_uuid_compare(&state_gtid.uuid, &store->gtid.uuid) && + state_gtid.seqno < store->gtid.seqno) + { + NODE_ERROR("Received snapshot that is in the past: my seqno %lld," + " received seqno: %lld", + (long long)store->gtid.seqno, (long long)state_gtid.seqno); + free(new_members); + free(new_records); + ret = -1; + } + else + { + free(store->members); + store->members_num = m_num; + store->members = new_members; + free(store->records); + store->records_num = r_num; + store->records = new_records; + store->gtid = state_gtid; + store->read_view_support = read_view_support; + ret = 0; + } + + pthread_mutex_unlock(&store->gtid_mtx); + + return ret; +} + +int +node_store_acquire_state(node_store_t* const store, + const void** const state, + size_t* const state_len) +{ + int ret = 0; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + if (!store->snapshot) + { + size_t const memb_len = store->members_num * sizeof(member_t); + size_t const rec_len = store->records_num * STORE_RECORD_SIZE; + size_t const buf_len = WSREP_GTID_STR_LEN + 1 + + sizeof(uint32_t) + memb_len + + 1 /* read view support */ + + sizeof(uint32_t) + rec_len; + + store->snapshot = malloc(buf_len); + + if (store->snapshot) + { + char* ptr = store->snapshot; + + /* state GTID */ + ret = wsrep_gtid_print(&store->gtid, ptr, buf_len); + if (ret > 0) + { + NODE_INFO(""); + assert((size_t)ret < buf_len); + + ptr[ret] = '\0'; + ret++; + ptr += ret; + assert((size_t)ret < buf_len); + + /* membership */ + ptr += store_serialize_uint32(ptr, store->members_num); + ret += (int)sizeof(uint32_t); + assert((size_t)ret + memb_len < buf_len); + memcpy(ptr, store->members, memb_len); + ptr += memb_len; + ret += (int)memb_len; + assert((size_t)ret + sizeof(uint32_t) <= buf_len); + + /* read view support */ + ptr[0] = store->read_view_support; + ptr += 1; + ret += 1; + + /* records */ + ptr += store_serialize_uint32(ptr, store->records_num); + ret += (int)sizeof(uint32_t); + assert((size_t)ret + rec_len < buf_len); + memcpy(ptr, store->records, rec_len); + ret += (int)rec_len; + assert((size_t)ret <= buf_len); + } + else + { + NODE_ERROR("Failed to record GTID: %d (%s)", ret,strerror(-ret)); + free(store->snapshot); + store->snapshot = 0; + } + } + else + { + NODE_ERROR("Failed to allocate snapshot buffer of size %zu",buf_len); + ret = -ENOMEM; + } + } + else + { + assert(0); /* provider should prevent such situation */ + ret = -EAGAIN; + } + + pthread_mutex_unlock(&store->gtid_mtx); + + if (ret > 0) + { + NODE_INFO("\n\nPrepared snapshot of %u records\n\n", store->records_num); + *state = store->snapshot; + *state_len = (size_t)ret; + ret = 0; + } + + return ret; +} + +void +node_store_release_state(node_store_t* const store) +{ + STORE_MUTEX_LOCK(&store->gtid_mtx); + + assert(store->snapshot); + free(store->snapshot); + store->snapshot = 0; + + pthread_mutex_unlock(&store->gtid_mtx); +} + +int +node_store_update_membership(struct node_store* const store, + const wsrep_view_info_t* const v) +{ + assert(store); + assert(WSREP_VIEW_PRIMARY == v->status); + assert(v->memb_num > 0); + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + bool const continuation = v->state_id.seqno == store->gtid.seqno + 1 && + 0 == wsrep_uuid_compare(&v->state_id.uuid, &store->gtid.uuid); + + bool const initialization = WSREP_SEQNO_UNDEFINED == store->gtid.seqno && + 0 == wsrep_uuid_compare(&WSREP_UUID_UNDEFINED, &store->gtid.uuid); + + if (!(continuation || initialization)) + { + char store_str[WSREP_GTID_STR_LEN + 1] = { 0, }; + wsrep_gtid_print(&store->gtid, store_str, sizeof(store_str)); + char view_str[WSREP_GTID_STR_LEN + 1] = { 0, }; + wsrep_gtid_print(&v->state_id, view_str, sizeof(view_str)); + + NODE_FATAL("Attempt to initialize store GTID from incompatible view:\n" + "\tstore: %s\n" + "\tview: %s", + store_str, view_str); + abort(); + } + + wsrep_uuid_t* const new_members = calloc(sizeof(wsrep_uuid_t), + (size_t)v->memb_num); + if (!new_members) + { + NODE_FATAL("Could not allocate new members array"); + abort(); + } + + int i; + for (i = 0; i < v->memb_num; i++) + { + new_members[i] = v->members[i].id; + } + + /* REPLICATION: at this point we should compare old and new memberships and + * rollback all streaming transactions from the partitioned + * members, if any. But we don't support it in this program yet. + */ + + free(store->members); + + store->members = new_members; + store->members_num = (uint32_t)v->memb_num; + store->gtid = v->state_id; + store->read_view_support = (v->capabilities & WSREP_CAP_SNAPSHOT); + + pthread_mutex_unlock(&store->gtid_mtx); + + return 0; +} + +void +node_store_gtid(struct node_store* const store, + wsrep_gtid_t* const gtid) +{ + assert(store); + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + *gtid = store->gtid; + + pthread_mutex_unlock(&store->gtid_mtx); +} + + +static inline void +store_serialize_op(void* const buf, const struct store_trx_op* const op) +{ + char* ptr = buf; + ptr += store_record_set(ptr, 0, &op->rec_from); + ptr += store_record_set(ptr, 0, &op->rec_to); + ptr += store_serialize_uint32(ptr, op->idx_from); + ptr += store_serialize_uint32(ptr, op->idx_to); + ptr += store_serialize_uint32(ptr, op->new_value); + store_serialize_uint32(ptr, op->size); +} + +static inline void +store_deserialize_op(struct store_trx_op* const op, const void* const buf) +{ + const char* ptr = buf; + ptr += store_record_get(ptr, 0, &op->rec_from); + ptr += store_record_get(ptr, 0, &op->rec_to); + ptr += store_deserialize_uint32(&op->idx_from, ptr); + ptr += store_deserialize_uint32(&op->idx_to, ptr); + ptr += store_deserialize_uint32(&op->new_value, ptr); + store_deserialize_uint32(&op->size, ptr); +} + +static inline void +store_serialize_gtid(void* const buf, const wsrep_gtid_t* const gtid) +{ + char* ptr = buf; + memcpy(ptr, >id->uuid, sizeof(gtid->uuid)); + ptr += sizeof(gtid->uuid); + store_serialize_int64(ptr, gtid->seqno); +} + +static inline void +store_deserialize_gtid(wsrep_gtid_t* const gtid, const void* const buf) +{ + const char* ptr = buf; + memcpy(>id->uuid, ptr, sizeof(gtid->uuid)); + ptr += sizeof(gtid->uuid); + store_deserialize_int64(>id->seqno, ptr); +} + +#define STORE_GTID_SIZE (sizeof(((wsrep_gtid_t*)(NULL))->uuid) + sizeof(int64_t)) + +int +node_store_execute(node_store_t* const store, + wsrep_t* const wsrep, + wsrep_ws_handle_t* const ws_handle) +{ + assert(store); + + if (0 == ws_handle->trx_id) + { + assert(sizeof(ws_handle->trx_id) >= sizeof(uintptr_t)); + ws_handle->trx_id = store_new_trx_id(store); + } + + struct store_trx_ctx* trx = store_get_trx_ctx(store, ws_handle->trx_id); + if (store_trx_add_op(trx)) return -ENOMEM; + struct store_trx_op* const op = &trx->ops[trx->ops_num - 1]; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + if (1 == trx->ops_num) + { + /* First operation, save ID of the read view of the transaction */ + trx->rv_gtid = store->gtid; + } + + /* Transaction op: copy value from one random record to another... */ + op->idx_from = (uint32_t)rand() % store->records_num; + op->idx_to = (uint32_t)rand() % store->records_num; + store_record_get(store->records, op->idx_from, &op->rec_from); + store_record_get(store->records, op->idx_to, &op->rec_to); + + pthread_mutex_unlock(&store->gtid_mtx); + + wsrep_status_t ret = WSREP_TRX_FAIL; + + if (op->rec_from.version > trx->rv_gtid.seqno || + op->rec_to.version > trx->rv_gtid.seqno) + { + /* transaction read view changed, trx needs to be restarted */ +#if 0 + NODE_INFO("Transaction read view changed: %lld -> %lld, returning %d", + (long long)trx->rv_gtid.seqno, + (long long)(op->rec_from.version > op->rec_to.version ? + op->rec_from.version : op->rec_to.version), + ret); +#endif + goto error; + } + + /* Transaction op: ... and modify it somehow, e.g. increment by 1 */ + op->new_value = op->rec_from.value + 1; + + if (1 == trx->ops_num) // first trx operation + { + /* REPLICATION: Since this application does not implement record locks, + * it needs to establish read view for each transaction for + * a proper conflict detection and transaction isolation. + * Otherwose we'll need to implement record versioning */ + if (store->read_view_support) + { + ret = wsrep->assign_read_view(wsrep, ws_handle, &trx->rv_gtid); + if (ret) + { + NODE_ERROR("wsrep::assign_read_view(%lld) failed: %d", + trx->rv_gtid.seqno, ret); + goto error; + } + } + + /* Record read view in the writeset for debugging purposes */ + assert(store->op_size > STORE_GTID_SIZE); + store_serialize_gtid(trx + 1, &trx->rv_gtid); + wsrep_buf_t ws = { .ptr = trx + 1, .len = STORE_GTID_SIZE }; + ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, + true); + if (ret) + { + NODE_ERROR("wsrep::append_data(rv_gtid) failed: %d", ret); + goto error; + } + } + + /* REPLICATION: append keys touched by the operation + * + * NOTE: depending on data access granularity some applications may require + * multipart keys, e.g. <schema>:<table>:<row> in a SQL database. + * Single part keys match hashtables and key-value stores. + * Below we have two different single-part keys which reference two + * different records. */ + uint32_t key_val; + wsrep_buf_t key_part = { .ptr = &key_val, .len = sizeof(key_val) }; + wsrep_key_t ws_key = { .key_parts = &key_part, .key_parts_num = 1 }; + + /* REPLICATION: Key 1 - the key of the source, unchanged record */ + store_serialize_uint32(&key_val, op->idx_from); + ret = wsrep->append_key(wsrep, ws_handle, + &ws_key, + 1, /* single key */ + WSREP_KEY_REFERENCE, + true /* provider shall make a copy of the key */); + if (ret) + { + NODE_ERROR("wsrep::append_key(REFERENCE) failed: %d", ret); + goto error; + } + + /* REPLICATION: Key 2 - the key of the record we want to update */ + store_serialize_uint32(&key_val, op->idx_to); + ret = wsrep->append_key(wsrep, ws_handle, + &ws_key, + 1, /* single key */ + WSREP_KEY_UPDATE, + true /* provider shall make a copy of the key */); + if (ret) + { + NODE_ERROR("wsrep::append_key(UPDATE) failed: %d", ret); + goto error; + } + + /* REPLICATION: append transaction operation to the "writeset" + * (WS buffer was allocated together with trx context above) */ + assert(store->op_size >= STORE_OP_SIZE); + assert(store->op_size == (uint32_t)store->op_size); + op->size = (uint32_t)store->op_size; + store_serialize_op(trx + 1, op); + wsrep_buf_t ws = { .ptr = trx + 1, .len = store->op_size }; + ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, true); + + if (!ret) return 0; + + NODE_ERROR("wsrep::append_data(op) failed: %d", ret); + +error: + store_free_trx_id(store, ws_handle->trx_id); + + return ret; +} + +int +node_store_apply(node_store_t* const store, + wsrep_trx_id_t* const trx_id, + const wsrep_buf_t* const ws) +{ + assert(store); + (void)store; + + *trx_id = store_new_trx_id(store); + struct store_trx_ctx* const trx = store_get_trx_ctx(store, *trx_id); + + /* prepare trx context for commit */ + const char* ptr = ws->ptr; + size_t left = ws->len; + + /* at least one operation should be there */ + assert(left >= STORE_GTID_SIZE + STORE_OP_SIZE); + + if (left >= STORE_GTID_SIZE) + { + store_deserialize_gtid(&trx->rv_gtid, ptr); + left -= STORE_GTID_SIZE; + ptr += STORE_GTID_SIZE; + } + + while (left >= STORE_OP_SIZE) + { + if (store_trx_add_op(trx)) + { + store_free_trx_id(store,*trx_id); /* "rollback": release resources */ + return -ENOMEM; + } + struct store_trx_op* const op = &trx->ops[trx->ops_num - 1]; + + store_deserialize_op(op, ptr); + assert(op->idx_to <= store->records_num); + + left -= op->size; + ptr += op->size; + } + + if (left != 0) + { + NODE_FATAL("Failed to process last (%d/%zu) bytes of the writeset.", + (int)left, ws->len); + abort(); + } + + return 0; +} + +static uint32_t const store_fnv32_seed = 2166136261; + +static inline uint32_t +store_fnv32a(const void* buf, size_t const len, uint32_t seed) +{ + static uint32_t const fnv32_prime = 16777619; + const uint8_t* bp = (const uint8_t*)buf; + const uint8_t* const be = bp + len; + + while (bp < be) + { + seed ^= *bp++; + seed *= fnv32_prime; + } + + return seed; +} + + +static void +store_checksum_state(node_store_t* store) +{ + uint32_t res = store_fnv32_seed; + uint32_t i; + + for (i = 0; i < store->members_num; i++) + { + res = store_fnv32a(&store->members[i], sizeof(*store->members), res); + } + + res = store_fnv32a(store->records, store->records_num * STORE_RECORD_SIZE, + res); + + res = store_fnv32a(&store->gtid.uuid, sizeof(store->gtid.uuid), res); + + wsrep_seqno_t s; + store_serialize_int64(&s, store->gtid.seqno); + res = store_fnv32a(&s, sizeof(s), res); + + NODE_INFO("\n\n\tSeqno: %lld; state hash: %#010x\n", + (long long)store->gtid.seqno, res); +} + +static inline void +store_update_gtid(node_store_t* const store, const wsrep_gtid_t* ws_gtid) +{ + assert(0 == wsrep_uuid_compare(&store->gtid.uuid, &ws_gtid->uuid)); + + store->gtid.seqno++; + + if (store->gtid.seqno != ws_gtid->seqno) + { + NODE_FATAL("Out of order commit: expected %lld, got %lld", + store->gtid.seqno, ws_gtid->seqno); + abort(); + } + + static wsrep_seqno_t const period = 0x000fffff; /* ~1M */ + if (0 == (store->gtid.seqno & period)) + { + store_checksum_state(store); + } +} + +void +node_store_commit(node_store_t* const store, + wsrep_trx_id_t const trx_id, + const wsrep_gtid_t* const ws_gtid) +{ + assert(store); + assert(trx_id); + + struct store_trx_ctx* const trx = store_get_trx_ctx(store, trx_id); + + bool const check_read_view_snapshot = +#ifdef NDEBUG + !store->read_view_support; +#else + 1; +#endif /* NDEBUG */ + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + store_update_gtid(store, ws_gtid); + + /* First loop is to check if we can commit all operations if provider + * does not support read view or for debugging puposes */ + size_t i; + if (check_read_view_snapshot) + { + for (i = 0; i < trx->ops_num; i++) + { + struct store_trx_op* const op = &trx->ops[i]; + + record_t from, to; + store_record_get(store->records, op->idx_from, &from); + store_record_get(store->records, op->idx_to, &to); + + if (!store_record_equal(&op->rec_from, &from) || + !store_record_equal(&op->rec_to, &to)) + { + /* read view changed since transaction was executed, + * can't commit */ + assert(op->rec_from.version <= from.version); + assert(op->rec_to.version <= to.version); + if (op->rec_from.version == from.version) + assert(op->rec_from.value == from.value); + if (op->rec_to.version == to.version) + assert(op->rec_to.value == to.value); + if (store->read_view_support) abort(); + + store->read_view_fails++; + + NODE_INFO("Read view changed at commit time, rollback trx"); + + goto error; + } + } + } + + /* Second loop is to actually modify the dataset */ + for (i = 0; i < trx->ops_num; i++) + { + struct store_trx_op* const op = &trx->ops[i]; + + record_t const new_record = + { .version = ws_gtid->seqno, .value = op->new_value }; + + store_record_set(store->records, op->idx_to, &new_record); + } + +error: + pthread_mutex_unlock(&store->gtid_mtx); + + store_free_trx_id(store, trx_id); +} + +void +node_store_rollback(node_store_t* const store, + wsrep_trx_id_t const trx_id) +{ + assert(store); + assert(trx_id); + + store_free_trx_id(store, trx_id); +} + +void +node_store_update_gtid(node_store_t* const store, + const wsrep_gtid_t* const ws_gtid) +{ + assert(store); + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + store_update_gtid(store, ws_gtid); + + pthread_mutex_unlock(&store->gtid_mtx); +} + +long +node_store_read_view_failures(node_store_t* const store) +{ + assert(store); + + long ret; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + ret = store->read_view_fails;; + + pthread_mutex_unlock(&store->gtid_mtx); + + return ret; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/store.h b/wsrep-lib/wsrep-API/v26/examples/node/store.h new file mode 100644 index 00000000..51da74d7 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/store.h @@ -0,0 +1,125 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines simple "transactional storage engine" interface + */ + +#ifndef NODE_STORE_H +#define NODE_STORE_H + +#include "options.h" + +#include "../../wsrep_api.h" + +typedef struct node_store node_store_t; + +/** + * open a store and optionally assocoate a file with it */ +extern node_store_t* +node_store_open(const struct node_options* opts); + +/** + * close store and deallocate associated resources */ +extern void +node_store_close(node_store_t* store); + +/** + * initialize store with a state */ +extern int +node_store_init_state(node_store_t* store, const void* state, size_t state_len); + +/** + * Return a pointer to state snapshot that is guaranteed to be unchanged + * until node_store_release_state() is called. + * + * @param[out] state pointer to state snapshot + * @param[out] state_len soze of state snapshot + */ +extern int +node_store_acquire_state(node_store_t* store, + const void** state, size_t* state_len); + +/** + * release state */ +extern void +node_store_release_state(node_store_t* store); + +/** + * inform store about new membership */ +extern int +node_store_update_membership(node_store_t* store, const wsrep_view_info_t* v); + +/** + * get the current GTID (last committed) */ +extern void +node_store_gtid(node_store_t* store, wsrep_gtid_t* gtid); + +/** + * execute and prepare local transaction in store and return its key and write + * set. + * + * This operation allocates resources that must be freed with either + * node_store_commit() or node_store_rollback() + * + * @param[in] wsrep provider handle + * @param[out] ws_handle reference to the resulting write set in the provider + */ +extern int +node_store_execute(node_store_t* store, + wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle); + +/** + * apply and prepare foreign write set received from replication + * + * This operation allocates resources that must be freed with either + * node_store_commit() or node_store_rollback() + * + * @param[out] trx_id locally unique transaction ID + * @param[in] ws foreign transaction write set + */ +extern int +node_store_apply(node_store_t* store, + wsrep_trx_id_t* trx_id, + const wsrep_buf_t* ws); + +/** + * commit prepared transaction identified by trx_id */ +extern void +node_store_commit(node_store_t* store, + wsrep_trx_id_t trx_id, + const wsrep_gtid_t* ws_gtid); + +/** + * rollback prepared transaction identified by trx_id */ +extern void +node_store_rollback(node_store_t* store, + wsrep_trx_id_t trx_id); + +/** + * update storage GTID for transactions that had to be skipped/rolled back */ +extern void +node_store_update_gtid(node_store_t* store, + const wsrep_gtid_t* ws_gtid); + +/** + * @return the number of store read view snapshot check failures at commit time. + * (should be zero if provider implements assign_read_view() call) */ +extern long +node_store_read_view_failures(node_store_t* store); + +#endif /* NODE_STORE_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/trx.c b/wsrep-lib/wsrep-API/v26/examples/node/trx.c new file mode 100644 index 00000000..afdcada4 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/trx.c @@ -0,0 +1,155 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "trx.h" +#include "log.h" + +#include <assert.h> +#include <errno.h> // ENOMEM, etc. +#include <stdbool.h> + +wsrep_status_t +node_trx_execute(node_store_t* const store, + wsrep_t* const wsrep, + wsrep_conn_id_t const conn_id, + int ops_num) +{ + wsrep_status_t cert = WSREP_OK; // for cleanup + + static unsigned int const ws_flags = + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END; // atomic trx + wsrep_trx_meta_t ws_meta; + wsrep_status_t ret = WSREP_OK; + + /* prepare simple transaction and obtain a writeset handle for it */ + wsrep_ws_handle_t ws_handle = { 0, NULL }; + while (ops_num--) + { + if (0 != (ret = node_store_execute(store, wsrep, &ws_handle))) + { +#if 0 + NODE_INFO("master [%d]: node_store_execute() returned %d", + conn_id, ret); +#endif + ret = WSREP_TRX_FAIL; + goto cleanup; + } + } + + /* REPLICATION: (replicate and) certify the writeset (pointed to by + * ws_handle) with the cluster */ + cert = wsrep->certify(wsrep, conn_id, &ws_handle, ws_flags, &ws_meta); + + if (WSREP_BF_ABORT == cert) + { + /* REPLICATION: transaction was signaled to abort due to multi-master + * conflict. It must rollback immediately: it blocks + * transaction that was ordered earlier and will never + * be able to enter commit order. */ + node_store_rollback(store, ws_handle.trx_id); + } + + /* REPLICATION: writeset was totally ordered, need to enter commit order */ + if (ws_meta.gtid.seqno > 0) + { + ret = wsrep->commit_order_enter(wsrep, &ws_handle, &ws_meta); + if (ret) + { + NODE_ERROR("master [%d]: wsrep::commit_order_enter(%lld) failed: " + "%d", (long long)(ws_meta.gtid.seqno), ret); + goto cleanup; + } + + /* REPLICATION: inside commit monitor + * Note: we commit transaction only if certification succeded */ + if (WSREP_OK == cert) + node_store_commit(store, ws_handle.trx_id, &ws_meta.gtid); + else + node_store_update_gtid(store, &ws_meta.gtid); + + ret = wsrep->commit_order_leave(wsrep, &ws_handle, &ws_meta, NULL); + if (ret) + { + NODE_ERROR("master [%d]: wsrep::commit_order_leave(%lld) failed: " + "%d", (long long)(ws_meta.gtid.seqno), ret); + goto cleanup; + } + } + else + { + assert(cert); + } + +cleanup: + /* REPLICATION: if wsrep->certify() returned anything else but WSREP_OK + * transaction must roll back. BF aborted trx already did it. */ + if (cert && WSREP_BF_ABORT != cert) + node_store_rollback(store, ws_handle.trx_id); + + /* NOTE: this application follows the approach that resources must be freed + * at the same level where they were allocated, so it is assumed that + * ws_key and ws were deallocated in either commit or rollback calls.*/ + + /* REPLICATION: release provider resources associated with the trx */ + wsrep->release(wsrep, &ws_handle); + + return ret ? ret : cert; +} + +wsrep_status_t +node_trx_apply(node_store_t* const store, + wsrep_t* const wsrep, + const wsrep_ws_handle_t* const ws_handle, + const wsrep_trx_meta_t* const ws_meta, + const wsrep_buf_t* const ws) +{ + /* no business being here if event was not ordered */ + assert(ws_meta->gtid.seqno > 0); + + wsrep_trx_id_t trx_id; + wsrep_buf_t err_buf = { NULL, 0 }; + int app_err; + if (ws) + { + app_err = node_store_apply(store, &trx_id, ws); + if (app_err) + { + /* REPLICATION: if applying failed, prepare an error buffer with + * sufficient error specification */ + err_buf.ptr = &app_err; // suppose error code is enough + err_buf.len = sizeof(app_err); + } + } + else /* ws failed certification and should be skipped */ + { + /* just some non-0 code to choose node_store_update_gtid() below */ + app_err = 1; + } + + wsrep_status_t ret; + ret = wsrep->commit_order_enter(wsrep, ws_handle, ws_meta); + if (ret) { + node_store_rollback(store, trx_id); + return ret; + } + + if (!app_err) node_store_commit(store, trx_id, &ws_meta->gtid); + else node_store_update_gtid(store, &ws_meta->gtid); + + ret = wsrep->commit_order_leave(wsrep, ws_handle, ws_meta, &err_buf); + + return ret; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/trx.h b/wsrep-lib/wsrep-API/v26/examples/node/trx.h new file mode 100644 index 00000000..e1d763a1 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/trx.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines "transaction" interface + */ + +#ifndef NODE_TRX_H +#define NODE_TRX_H + +#include "store.h" + +#include "../../wsrep_api.h" + +/** + * executes and replicates local transaction + */ +extern wsrep_status_t +node_trx_execute(node_store_t* store, + wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + int ops_num); + +/** + * applies and commits slave write set + * + * @param ws replicated event writeset. NULL if it failed certification (and so + * must be skipped, but it was ordered, so store GTID must be updated) + */ +extern wsrep_status_t +node_trx_apply(node_store_t* store, + wsrep_t* wsrep, + const wsrep_ws_handle_t* ws_handle, + const wsrep_trx_meta_t* ws_meta, + const wsrep_buf_t* ws); + +#endif /* NODE_TRX_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.c b/wsrep-lib/wsrep-API/v26/examples/node/worker.c new file mode 100644 index 00000000..e9901ad8 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.c @@ -0,0 +1,197 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "worker.h" + +#include "log.h" +#include "options.h" +#include "trx.h" +#include "wsrep.h" + +#include <assert.h> +#include <pthread.h> +#include <stdbool.h> +#include <string.h> // strerror() + +struct node_worker +{ + struct node_ctx* node; + pthread_t thread_id; + size_t id; + bool exit; +}; + +enum wsrep_cb_status +node_worker_apply_cb(void* const recv_ctx, + const wsrep_ws_handle_t* const ws_handle, + uint32_t const ws_flags, + const wsrep_buf_t* const ws, + const wsrep_trx_meta_t* const ws_meta, + wsrep_bool_t* const exit_loop) +{ + assert(recv_ctx); + + struct node_worker* const worker = recv_ctx; + + wsrep_status_t const ret = node_trx_apply( + worker->node->store, + node_wsrep_provider(worker->node->wsrep), + ws_handle, + ws_meta, + ws_flags & WSREP_FLAG_ROLLBACK ? NULL : ws); + + *exit_loop = worker->exit; + + return WSREP_OK == ret ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE; +} + + +static void* +worker_slave(void* recv_ctx) +{ + struct node_worker* const worker = recv_ctx; + wsrep_t* const wsrep = node_wsrep_provider(worker->node->wsrep); + + wsrep_status_t const ret = wsrep->recv(wsrep, worker); + + if (WSREP_OK != ret) + { + NODE_ERROR("slave worker [%zu] exited with error %d.", worker->id, ret); + } + + return NULL; +} + +static void* +worker_master(void* send_ctx) +{ + struct node_worker* const worker = send_ctx; + struct node_ctx* const node = worker->node; + wsrep_t* const wsrep = node_wsrep_provider(node->wsrep); + + assert(node->opts->ws_size > 0); + + wsrep_status_t ret; + + do + { + /* REPLICATION: we should not perform any local writes until the node + * is synced with the cluster. */ + if (!node_wsrep_wait_synced(node->wsrep)) + { + NODE_ERROR("master worker [%zu] failed waiting for SYNCED state.", + worker->id); + break; + } + + /* REPLICATION: the node is now synced */ + + do + { + ret = node_trx_execute(node->store, + wsrep, + worker->id, + (int)node->opts->operations); + } + while(WSREP_OK == ret // success + || (WSREP_TRX_FAIL == ret // certification failed, trx rolled back + && (usleep(10000),true)) // retry after short sleep + ); + } + while (WSREP_CONN_FAIL == ret); // provider in bad state (e.g. non-Primary) + + return NULL; +} + +struct node_worker_pool +{ + size_t size; // size of the pool (nu,ber of nodes) + struct node_worker worker[1]; // worker context array; +}; + +struct node_worker_pool* +node_worker_start(struct node_ctx* const ctx, + node_worker_type_t const type, + size_t const size) +{ + assert(ctx); + + if (0 == size) return NULL; + + const char* const type_str = type == NODE_WORKER_SLAVE ? "slave" : "master"; + + size_t const alloc_size = + sizeof(struct node_worker_pool) + + sizeof(struct node_worker) * (size - 1); + + struct node_worker_pool* const ret = malloc(alloc_size); + + if (ret) + { + void* (* const routine) (void*) = + type == NODE_WORKER_SLAVE ? worker_slave : worker_master; + + size_t i; + for (i = 0; i < size; i++) + { + struct node_worker* const worker = &ret->worker[i]; + worker->node = ctx; + worker->id = i; + worker->exit = false; + + int const err = pthread_create(&worker->thread_id, + NULL, + routine, + worker); + if (err) + { + NODE_ERROR("Failed to start %s worker[%zu]: %d (%s)", + type_str, i, err, strerror(err)); + if (0 == i) + { + free(ret); + return NULL; + } + else + { + break; // some threads have started, + // need to return to close them first + } + } + } + + ret->size = i; + } + else + { + NODE_ERROR("Failed to allocate %zu bytes for the %s worker pool", + alloc_size, type_str); + } + + return ret; +} + +void +node_worker_stop(struct node_worker_pool* pool) +{ + size_t i; + for (i = 0; pool && i < pool->size; i++) + { + pthread_join(pool->worker[i].thread_id, NULL); + } + + free(pool); +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.h b/wsrep-lib/wsrep-API/v26/examples/node/worker.h new file mode 100644 index 00000000..7ae06423 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.h @@ -0,0 +1,66 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines worker thread interface + */ + +#ifndef NODE_WORKER_H +#define NODE_WORKER_H + +#include "ctx.h" + +#include "../../wsrep_api.h" + +/** + * REPLICATION: a callback to apply and commit slave replication events */ +extern enum wsrep_cb_status +node_worker_apply_cb(void* recv_ctx, + const wsrep_ws_handle_t* ws_handle, + uint32_t ws_flags, + const wsrep_buf_t* ws, + const wsrep_trx_meta_t* ws_meta, + wsrep_bool_t* exit_loop); + +typedef enum node_worker_type +{ + NODE_WORKER_SLAVE, + NODE_WORKER_MASTER +} + node_worker_type_t; + +struct node_worker_pool; + +/** + * Starts the required number of workier threads of a given type + * + * @param[in] ctx application context + * @param[in] type of a worker + * @param[in] number of workers + * + * @return worker pool handle + */ +extern struct node_worker_pool* +node_worker_start(struct node_ctx* ctx, + node_worker_type_t type, + size_t number); + +/** + * Stops workers in a pool and deallocates respective resources */ +extern void +node_worker_stop(struct node_worker_pool* pool); + +#endif /* NODE_WORKER_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c new file mode 100644 index 00000000..6cea6d90 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c @@ -0,0 +1,479 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "wsrep.h" + +#include "log.h" +#include "sst.h" +#include "store.h" +#include "worker.h" + +#include <assert.h> +#include <stdio.h> // snprintf() +#include <stdlib.h> // abort() +#include <string.h> // strcasecmp() + +struct node_wsrep +{ + wsrep_t* instance; // wsrep provider instance + + struct wsrep_view + { + pthread_mutex_t mtx; + wsrep_gtid_t state_id; + wsrep_view_status_t status; + wsrep_cap_t capabilities; + int proto_ver; + int memb_num; + int my_idx; + wsrep_member_info_t* members; + } + view; + + struct + { + pthread_mutex_t mtx; + pthread_cond_t cond; + int value; + } + synced; + + bool bootstrap; // shall this node bootstrap a primary view? +}; + +static struct node_wsrep s_wsrep = +{ + .instance = NULL, + .view = + { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .state_id = {{{ 0, }}, WSREP_SEQNO_UNDEFINED }, + .status = WSREP_VIEW_DISCONNECTED, + .capabilities = 0, + .proto_ver = -1, + .memb_num = 0, + .my_idx = -1, + .members = NULL + }, + .synced = + { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + .value = 0 + }, + .bootstrap = false +}; + +static const char* wsrep_view_status_str[WSREP_VIEW_MAX] = +{ + "PRIMARY", + "NON-PRIMARY", + "DISCONNECTED" +}; + +#define WSREP_CAPABILITIES_MAX ((int)sizeof(wsrep_cap_t) * 8) // bitmask +static const char* wsrep_capabilities_str[WSREP_CAPABILITIES_MAX] = +{ + "MULTI-MASTER", + "CERTIFICATION", + "PA", + "REPLAY", + "TOI", + "PAUSE", + "CAUSAL-READS", + "CAUSAL-TRX", + "INCREMENTAL", + "SESSION-LOCKS", + "DISTRIBUTED-LOCKS", + "CONSISTENCY-CHECK", + "UNORDERED", + "ANNOTATION", + "PREORDERED", + "STREAMING", + "SNAPSHOT", + "NBO", + NULL, +}; + +/** + * REPLICATION: callback is called by provider when the node connects to group. + * This happens out-of-order, before the node receives a state + * transfer and syncs with the cluster. Unless application requires + * it it can be empty. We however want to know the GTID of the + * group out of order for SST tricks, so we record it out of order. + */ +static enum wsrep_cb_status +wsrep_connected_cb(void* const x, + const wsrep_view_info_t* const v) +{ + char gtid_str[WSREP_GTID_STR_LEN + 1]; + wsrep_gtid_print(&v->state_id, gtid_str, sizeof(gtid_str)); + + NODE_INFO("connect_cb(): Connected at %s to %s group of %d member(s)", + gtid_str, wsrep_view_status_str[v->status], v->memb_num); + + struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep; + + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + + wsrep->view.state_id = v->state_id; + + pthread_mutex_unlock(&wsrep->view.mtx); + + return WSREP_CB_SUCCESS; +} + +/** + * logs view data */ +static void +wsrep_log_view(const struct wsrep_view* v) +{ + char gtid[WSREP_GTID_STR_LEN + 1]; + wsrep_gtid_print(&v->state_id, gtid, sizeof(gtid)); + gtid[WSREP_GTID_STR_LEN] = '\0'; + + char caps[256]; + int written = 0; + size_t space_left = sizeof(caps); + int i; + for (i = 0; i < WSREP_CAPABILITIES_MAX && space_left > 0; i++) + { + wsrep_cap_t const f = 1u << i; + + if (!(f & v->capabilities)) continue; + + if (wsrep_capabilities_str[i]) + { + written += snprintf(&caps[written], space_left, "%s|", + wsrep_capabilities_str[i]); + } + else + { + written += snprintf(&caps[written], space_left, "%d|", i); + } + + space_left = sizeof(caps) - (size_t)written; + } + caps[written ? written - 1 : 0] = '\0'; // overwrite last '|' + + char members_list[1024]; + written = 0; + space_left = sizeof(members_list); + for (i = 0; i < v->memb_num && space_left > 0; i++) + { + wsrep_member_info_t* m = &v->members[i]; + char uuid[WSREP_UUID_STR_LEN + 1]; + wsrep_uuid_print(&m->id, uuid, sizeof(uuid)); + uuid[WSREP_UUID_STR_LEN] = '\0'; + + written += snprintf(&members_list[written], space_left, + "%s%d: %s '%s' incoming:'%s'\n", + v->my_idx == i ? " * " : " ", i, + uuid, m->name, m->incoming); + + space_left = sizeof(members_list) - (size_t)written; + } + members_list[written ? written - 1 : 0] = '\0'; // overwrite the last '\n' + + NODE_INFO( + "New view received:\n" + "state: %s (%s)\n" + "capabilities: %s\n" + "protocol version: %d\n" + "members(%d)%s%s", + gtid, wsrep_view_status_str[v->status], + caps, + v->proto_ver, + v->memb_num, v->memb_num ? ":\n" : "", members_list); +} + +/** + * REPLICATION: callback is called when the node needs to process cluster + * view change. The callback is called in "total order isolation", + * so all the preceding replication events will be processed + * strictly before the call and all subsequent - striclty after. + */ +static enum wsrep_cb_status +wsrep_view_cb(void* const x, + void* const r, + const wsrep_view_info_t* const v, + const char* const state, + size_t const state_len) +{ + (void)r; + (void)state; + (void)state_len; + + struct node_ctx* const node = x; + + if (WSREP_VIEW_PRIMARY == v->status) + { + /* REPLICATION: membership change is a totally ordered event and as such + * should be a part of the state, like changes to the + * database. */ + int err = node_store_update_membership(node->store, v); + if (err) + { + NODE_FATAL("Failed to update membership in store: %d (%s)", + err, strerror(-err)); + abort(); + } + } + + enum wsrep_cb_status ret = WSREP_CB_SUCCESS; + struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep; + + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + + /* below we'll just copy the data for future reference (if need be): */ + + size_t const memb_size = (size_t)v->memb_num * sizeof(wsrep_member_info_t); + void* const tmp = realloc(wsrep->view.members, memb_size); + if (memb_size > 0 && !tmp) + { + NODE_ERROR("Could not allocate memory for a new view: %zu bytes", + memb_size); + ret = WSREP_CB_FAILURE; + goto cleanup; + } + else + { + wsrep->view.members = tmp; + if (memb_size) memcpy(wsrep->view.members, &v->members[0], memb_size); + } + + wsrep->view.state_id = v->state_id; + wsrep->view.status = v->status; + wsrep->view.capabilities = v->capabilities; + wsrep->view.proto_ver = v->proto_ver; + wsrep->view.memb_num = v->memb_num; + wsrep->view.my_idx = v->my_idx; + + /* and now log the info */ + + wsrep_log_view(&wsrep->view); + +cleanup: + pthread_mutex_unlock(&wsrep->view.mtx); + + return ret; +} + +/** + * REPLICATION: callback is called by provider when the node becomes SYNCED */ +static enum wsrep_cb_status +wsrep_synced_cb(void* const x) +{ + struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep; + + if (pthread_mutex_lock(&wsrep->synced.mtx)) + { + NODE_FATAL("Failed to lock SYNCED mutex"); + abort(); + } + + if (wsrep->synced.value == 0) + { + NODE_INFO("become SYNCED"); + wsrep->synced.value = 1; + pthread_cond_broadcast(&wsrep->synced.cond); + } + + pthread_mutex_unlock(&wsrep->synced.mtx); + + return WSREP_CB_SUCCESS; +} + +struct node_wsrep* +node_wsrep_init(const struct node_options* const opts, + const wsrep_gtid_t* const current_gtid, + void* const app_ctx) +{ + if (s_wsrep.instance != NULL) return NULL; // already initialized + + wsrep_status_t err; + err = wsrep_load(opts->provider, &s_wsrep.instance, node_log_cb); + if (WSREP_OK != err) + { + if (strcasecmp(opts->provider, WSREP_NONE)) + { + NODE_ERROR("wsrep_load(%s) failed: %s (%d).", + opts->provider, strerror(err), err); + } + else + { + NODE_ERROR("Initializing dummy provider failed: %s (%d).", + strerror(err), err); + } + return NULL; + } + + char base_addr[256]; + snprintf(base_addr, sizeof(base_addr) - 1, "%s:%ld", + opts->base_host, opts->base_port); + + struct wsrep_init_args args = + { + .app_ctx = app_ctx, + + .node_name = opts->name, + .node_address = base_addr, + .node_incoming = "", // we don't accept client connections + .data_dir = opts->data_dir, + .options = opts->options, + .proto_ver = 0, // this is the first version of the application + // so the first version of the writeset protocol + .state_id = current_gtid, + .state = NULL, // unused + + .logger_cb = node_log_cb, + .connected_cb = wsrep_connected_cb, + .view_cb = wsrep_view_cb, + .synced_cb = wsrep_synced_cb, + .encrypt_cb = NULL, // not implemented ATM + + .apply_cb = node_worker_apply_cb, + .unordered_cb = NULL, // not needed now + + .sst_request_cb = node_sst_request_cb, + .sst_donate_cb = node_sst_donate_cb + }; + + wsrep_t* wsrep = s_wsrep.instance; + + err = wsrep->init(wsrep, &args); + + if (WSREP_OK != err) + { + NODE_ERROR("wsrep::init() failed: %d, must shutdown", err); + node_wsrep_close(&s_wsrep); + return NULL; + } + + return &s_wsrep; +} + +wsrep_status_t +node_wsrep_connect(struct node_wsrep* const wsrep, + const char* const address, + bool const bootstrap) +{ + wsrep->bootstrap = bootstrap; + wsrep_status_t err = wsrep->instance->connect(wsrep->instance, + "wsrep_cluster", + address, + NULL, + wsrep->bootstrap); + + if (WSREP_OK != err) + { + NODE_ERROR("wsrep::connect(%s) failed: %d, must shutdown", + address, err); + node_wsrep_close(wsrep); + } + + return err; +} + +void +node_wsrep_disconnect(struct node_wsrep* const wsrep) +{ + if (pthread_mutex_lock(&wsrep->synced.mtx)) + { + NODE_FATAL("Failed to lock SYNCED mutex"); + abort(); + } + wsrep->synced.value = -1; /* this will signal master threads to exit */ + pthread_cond_broadcast(&wsrep->synced.cond); + pthread_mutex_unlock(&wsrep->synced.mtx); + + wsrep_status_t const err = wsrep->instance->disconnect(wsrep->instance); + + if (err) + { + /* REPLICATION: unless connection is not closed, slave threads will + * never return. */ + NODE_FATAL("Failed to close wsrep connection: %d", err); + abort(); + } +} + +void +node_wsrep_close(struct node_wsrep* const wsrep) +{ + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + assert(0 == wsrep->view.memb_num); // the node must be disconneted + assert(NULL == wsrep->view.members); + free(wsrep->view.members); + wsrep->view.members = NULL; + pthread_mutex_unlock(&wsrep->view.mtx); + + wsrep->instance->free(wsrep->instance); + wsrep->instance = NULL; +} + +bool +node_wsrep_wait_synced(struct node_wsrep* const wsrep) +{ + if (pthread_mutex_lock(&wsrep->synced.mtx)) + { + NODE_FATAL("Failed to lock SYNCED mutex"); + abort(); + } + + while (wsrep->synced.value == 0) + { + pthread_cond_wait(&wsrep->synced.cond, &wsrep->synced.mtx); + } + + bool const ret = wsrep->synced.value > 0; + + pthread_mutex_unlock(&wsrep->synced.mtx); + + return ret; +} + +void +node_wsrep_connected_gtid(struct node_wsrep* wsrep, wsrep_gtid_t* gtid) +{ + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + + *gtid = wsrep->view.state_id; + + pthread_mutex_unlock(&wsrep->view.mtx); +} + +wsrep_t* +node_wsrep_provider(struct node_wsrep* wsrep) +{ + return wsrep->instance; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h new file mode 100644 index 00000000..75c7eac3 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines various helpers to manage wsrep provider + */ + +#ifndef NODE_WSREP_H +#define NODE_WSREP_H + +#include "options.h" + +#include "../../wsrep_api.h" + +#include <pthread.h> +#include <stdbool.h> + +typedef struct node_wsrep node_wsrep_t; + +/** + * loads and initializes wsrep provider for further usage + * + * @param[in] opts program options + * @param[in] current_gtid GTID corresponding to the current node state + * @param[in] app_ctx application context to be passed to callbacks + * + * @return initialized object pointer + */ +extern node_wsrep_t* +node_wsrep_init(const struct node_options* opts, + const wsrep_gtid_t* current_gtid, + void* app_ctx); + +/** + * connects to primary component + * + * @param[in] wsrep wsrep context + * @param[in] address address to connect at (provider specific) + * @param[in] bootsstrap bootstrap primary component if there's none + * + * @return wsrep status code + */ +extern wsrep_status_t +node_wsrep_connect(node_wsrep_t* wsrep, + const char* address, + bool bootstrap); + +/** + * disconnects from primary component + */ +extern void +node_wsrep_disconnect(node_wsrep_t* wsrep); + +/** + * deinitializes and unloads wsrep provider + */ +extern void +node_wsrep_close(node_wsrep_t* wsrep); + +/** + * waits for the node to become SYNCED + * + * @return true if node is synced, false in any other event. + */ +extern bool +node_wsrep_wait_synced(node_wsrep_t* wsrep); + +/** + * @param[in] wsrep context + * @param[out] gtid of the current view */ +extern void +node_wsrep_connected_gtid(node_wsrep_t* wsrep, wsrep_gtid_t* gtid); + +/** + * @return wsrep provider instance */ +extern wsrep_t* +node_wsrep_provider(node_wsrep_t* wsrep); + +#endif /* NODE_WSREP_H */ |