diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:04:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:04:16 +0000 |
commit | a68fb2d8219f6bccc573009600e9f23e89226a5e (patch) | |
tree | d742d35d14ae816e99293d2b01face30e9f3a46b /wsrep-lib/wsrep-API/v26/examples/listener.c | |
parent | Initial commit. (diff) | |
download | mariadb-10.6-upstream.tar.xz mariadb-10.6-upstream.zip |
Adding upstream version 1:10.6.11.upstream/1%10.6.11upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'wsrep-lib/wsrep-API/v26/examples/listener.c')
-rw-r--r-- | wsrep-lib/wsrep-API/v26/examples/listener.c | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/wsrep-lib/wsrep-API/v26/examples/listener.c b/wsrep-lib/wsrep-API/v26/examples/listener.c new file mode 100644 index 00000000..9fc881fe --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/listener.c @@ -0,0 +1,268 @@ +/* Copyright (C) 2012 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Example of wsrep event listener. Outputs description of received + * events to stdout. To get a general picture you should start with + * main() function. */ + +#include <wsrep_api.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <signal.h> +#include <pthread.h> + +/*! This is global application context, it will be used by wsrep callbacks */ +struct application_context +{}; + +static struct application_context global_ctx; + +/*! This is receiving thread context, it will be used by wsrep callbacks */ +struct receiver_context +{ + char msg[4096]; +}; + +/* wsrep provider handle (global for simplicty) */ +static wsrep_t* wsrep = NULL; + +/*! This is a logger callback which library will be using to log events. */ +static void +logger_cb (wsrep_log_level_t level __attribute__((unused)), const char* msg) +{ + fprintf (stderr, "WSREP: %s\n", msg); +} + +/*! This will be called on cluster view change (nodes joining, leaving, etc.). + * Each view change is the point where application may be pronounced out of + * sync with the current cluster view and need state transfer. + * It is guaranteed that no other callbacks are called concurrently with it. */ +static wsrep_cb_status_t +view_cb (void* app_ctx __attribute__((unused)), + void* recv_ctx __attribute__((unused)), + const wsrep_view_info_t* view, + const char* state __attribute__((unused)), + size_t state_len __attribute__((unused))) +{ + printf ("New cluster membership view: %d nodes, my index is %d, " + "global seqno: %lld\n", + view->memb_num, view->my_idx, (long long)view->state_id.seqno); + + return WSREP_CB_SUCCESS; +} + +/*! This will be called on cluster view change (nodes joining, leaving, etc.). + * Each view change is the point where application may be pronounced out of + * sync with the current cluster view and need state transfer. + * It is guaranteed that no other callbacks are called concurrently with it. */ +static wsrep_cb_status_t +sst_request_cb (void* app_ctx __attribute__((unused)), + void** sst_req, + size_t* sst_req_len) +{ + /* For simplicity we're skipping state transfer by using magic string + * as a state transfer request. + * This node will not be considered JOINED (having full state) + * by other cluster members. */ + *sst_req = strdup(WSREP_STATE_TRANSFER_NONE); + + if (*sst_req) + *sst_req_len = strlen(*sst_req) + 1; + else + *sst_req_len = 0; + + return WSREP_CB_SUCCESS; +} + +/*! This is called to "apply" writeset. + * If writesets don't conflict on keys, it may be called concurrently to + * utilize several CPU cores. */ +static wsrep_cb_status_t +apply_cb (void* recv_ctx, + const wsrep_ws_handle_t* ws_handle __attribute__((unused)), + uint32_t flags __attribute__((unused)), + const wsrep_buf_t* ws __attribute__((unused)), + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop __attribute__((unused))) +{ + struct receiver_context* ctx = (struct receiver_context*)recv_ctx; + + snprintf (ctx->msg, sizeof(ctx->msg), + "Got writeset %lld, size %zu", (long long)meta->gtid.seqno, + ws->len); + + bool const commit = flags & (WSREP_FLAG_TRX_END | WSREP_FLAG_ROLLBACK); + + wsrep->commit_order_enter(wsrep, ws_handle, meta); + if (commit) puts(ctx->msg); + wsrep->commit_order_leave(wsrep, ws_handle, meta, NULL); + + return WSREP_CB_SUCCESS; +} + +/* The following callbacks are stubs and not used in this example. */ +static wsrep_cb_status_t +unordered_cb(void* recv_ctx __attribute__((unused)), + const wsrep_buf_t* data __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +static wsrep_cb_status_t +sst_donate_cb (void* app_ctx __attribute__((unused)), + void* recv_ctx __attribute__((unused)), + const wsrep_buf_t* msg __attribute__((unused)), + const wsrep_gtid_t* state_id __attribute__((unused)), + const wsrep_buf_t* state __attribute__((unused)), + wsrep_bool_t bypass __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +static wsrep_cb_status_t synced_cb (void* app_ctx __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +/* This is the listening thread. It blocks in wsrep::recv() call until + * disconnect from cluster. It will apply and commit writesets through the + * callbacks defined avbove. */ +static void* +recv_thread (void* arg) +{ + struct receiver_context* ctx = (struct receiver_context*)arg; + + wsrep_status_t rc = wsrep->recv(wsrep, ctx); + + fprintf (stderr, "Receiver exited with code %d", rc); + + return NULL; +} + +/* This is a signal handler to demonstrate graceful cluster leave. */ +static void +graceful_leave (int signum) +{ + printf ("Got signal %d, exiting...\n", signum); + wsrep->disconnect(wsrep); +} + +int main (int const argc, char* argv[]) +{ + if (argc < 4 || argc > 5) + { + fprintf (stderr, "Usage: %s </path/to/wsrep/provider> <wsrep URI> " + "<cluster name> [own address]\n", argv[0]); + exit (EXIT_FAILURE); + } + + const char* const wsrep_provider = argv[1]; + const char* const wsrep_uri = argv[2]; + const char* const cluster_name = argv[3]; + const char* const own_address = argc == 5 ? argv[4] : "localhost"; + + /* Now let's load and initialize provider */ + wsrep_status_t rc = wsrep_load (wsrep_provider, &wsrep, logger_cb); + if (WSREP_OK != rc) + { + fprintf (stderr, "Failed to load wsrep provider '%s'\n",wsrep_provider); + exit (EXIT_FAILURE); + } + + wsrep_gtid_t state_id = { WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED }; + + /* wsrep provider initialization arguments */ + struct wsrep_init_args wsrep_args = + { + .app_ctx = &global_ctx, + + .node_name = "example listener", + .node_address = own_address, + .node_incoming = "", + .data_dir = ".", // working directory + .options = "", + .proto_ver = 127, // maximum supported application event protocol + + .state_id = &state_id, + .state = NULL, + + .logger_cb = logger_cb, + .view_cb = view_cb, + .sst_request_cb = sst_request_cb, + .encrypt_cb = NULL, + .apply_cb = apply_cb, + .unordered_cb = unordered_cb, + .sst_donate_cb = sst_donate_cb, + .synced_cb = synced_cb + }; + + rc = wsrep->init(wsrep, &wsrep_args); + if (WSREP_OK != rc) + { + fprintf (stderr, "wsrep::init() failed: %d\n", rc); + exit (EXIT_FAILURE); + } + + /* Connect to cluster */ + rc = wsrep->connect(wsrep, cluster_name, wsrep_uri, "", 0); + if (0 != rc) + { + if (rc < 0) + fprintf (stderr, "wsrep::connect(%s, %s) failed: %d (%s)\n", + cluster_name, wsrep_uri, rc, strerror(-(int)rc)); + else + fprintf (stderr, "wsrep::connect() failed: %d\n", rc); + + exit (EXIT_FAILURE); + } + + /* Now let's start several listening threads*/ + int const num_threads = 4; + struct receiver_context thread_ctx[num_threads]; + pthread_t threads[num_threads]; + + int i; + for (i = 0; i < num_threads; i++) + { + int err = pthread_create ( + &threads[i], NULL, recv_thread, &thread_ctx[i]); + + if (err) + { + fprintf (stderr, "Failed to start thread %d: %d (%s)", + i, err, strerror(err)); + exit (EXIT_FAILURE); + } + } + + signal (SIGTERM, graceful_leave); + signal (SIGINT, graceful_leave); + + /* Listening threads are now running and receiving writesets. Wait for them + * to join. Threads will join after signal handler closes wsrep connection*/ + for (i = 0; i < num_threads; i++) + { + pthread_join (threads[i], NULL); + } + + /* Unload provider after nobody uses it any more. */ + wsrep_unload (wsrep); + + return 0; +} |