/* Copyright (C) 2012 Codership Oy This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /*! @file Example of wsrep event listener. Outputs description of received * events to stdout. To get a general picture you should start with * main() function. */ #include #include #include #include #include #include #include /*! This is global application context, it will be used by wsrep callbacks */ struct application_context {}; static struct application_context global_ctx; /*! This is receiving thread context, it will be used by wsrep callbacks */ struct receiver_context { char msg[4096]; }; /* wsrep provider handle (global for simplicty) */ static wsrep_t* wsrep = NULL; /*! This is a logger callback which library will be using to log events. */ static void logger_cb (wsrep_log_level_t level __attribute__((unused)), const char* msg) { fprintf (stderr, "WSREP: %s\n", msg); } /*! This will be called on cluster view change (nodes joining, leaving, etc.). * Each view change is the point where application may be pronounced out of * sync with the current cluster view and need state transfer. * It is guaranteed that no other callbacks are called concurrently with it. */ static wsrep_cb_status_t view_cb (void* app_ctx __attribute__((unused)), void* recv_ctx __attribute__((unused)), const wsrep_view_info_t* view, const char* state __attribute__((unused)), size_t state_len __attribute__((unused))) { printf ("New cluster membership view: %d nodes, my index is %d, " "global seqno: %lld\n", view->memb_num, view->my_idx, (long long)view->state_id.seqno); return WSREP_CB_SUCCESS; } /*! This will be called on cluster view change (nodes joining, leaving, etc.). * Each view change is the point where application may be pronounced out of * sync with the current cluster view and need state transfer. * It is guaranteed that no other callbacks are called concurrently with it. */ static wsrep_cb_status_t sst_request_cb (void* app_ctx __attribute__((unused)), void** sst_req, size_t* sst_req_len) { /* For simplicity we're skipping state transfer by using magic string * as a state transfer request. * This node will not be considered JOINED (having full state) * by other cluster members. */ *sst_req = strdup(WSREP_STATE_TRANSFER_NONE); if (*sst_req) *sst_req_len = strlen(*sst_req) + 1; else *sst_req_len = 0; return WSREP_CB_SUCCESS; } /*! This is called to "apply" writeset. * If writesets don't conflict on keys, it may be called concurrently to * utilize several CPU cores. */ static wsrep_cb_status_t apply_cb (void* recv_ctx, const wsrep_ws_handle_t* ws_handle __attribute__((unused)), uint32_t flags __attribute__((unused)), const wsrep_buf_t* ws __attribute__((unused)), const wsrep_trx_meta_t* meta, wsrep_bool_t* exit_loop __attribute__((unused))) { struct receiver_context* ctx = (struct receiver_context*)recv_ctx; snprintf (ctx->msg, sizeof(ctx->msg), "Got writeset %lld, size %zu", (long long)meta->gtid.seqno, ws->len); bool const commit = flags & (WSREP_FLAG_TRX_END | WSREP_FLAG_ROLLBACK); wsrep->commit_order_enter(wsrep, ws_handle, meta); if (commit) puts(ctx->msg); wsrep->commit_order_leave(wsrep, ws_handle, meta, NULL); return WSREP_CB_SUCCESS; } /* The following callbacks are stubs and not used in this example. */ static wsrep_cb_status_t unordered_cb(void* recv_ctx __attribute__((unused)), const wsrep_buf_t* data __attribute__((unused))) { return WSREP_CB_SUCCESS; } static wsrep_cb_status_t sst_donate_cb (void* app_ctx __attribute__((unused)), void* recv_ctx __attribute__((unused)), const wsrep_buf_t* msg __attribute__((unused)), const wsrep_gtid_t* state_id __attribute__((unused)), const wsrep_buf_t* state __attribute__((unused)), wsrep_bool_t bypass __attribute__((unused))) { return WSREP_CB_SUCCESS; } static wsrep_cb_status_t synced_cb (void* app_ctx __attribute__((unused))) { return WSREP_CB_SUCCESS; } /* This is the listening thread. It blocks in wsrep::recv() call until * disconnect from cluster. It will apply and commit writesets through the * callbacks defined avbove. */ static void* recv_thread (void* arg) { struct receiver_context* ctx = (struct receiver_context*)arg; wsrep_status_t rc = wsrep->recv(wsrep, ctx); fprintf (stderr, "Receiver exited with code %d", rc); return NULL; } /* This is a signal handler to demonstrate graceful cluster leave. */ static void graceful_leave (int signum) { printf ("Got signal %d, exiting...\n", signum); wsrep->disconnect(wsrep); } int main (int const argc, char* argv[]) { if (argc < 4 || argc > 5) { fprintf (stderr, "Usage: %s " " [own address]\n", argv[0]); exit (EXIT_FAILURE); } const char* const wsrep_provider = argv[1]; const char* const wsrep_uri = argv[2]; const char* const cluster_name = argv[3]; const char* const own_address = argc == 5 ? argv[4] : "localhost"; /* Now let's load and initialize provider */ wsrep_status_t rc = wsrep_load (wsrep_provider, &wsrep, logger_cb); if (WSREP_OK != rc) { fprintf (stderr, "Failed to load wsrep provider '%s'\n",wsrep_provider); exit (EXIT_FAILURE); } wsrep_gtid_t state_id = { WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED }; /* wsrep provider initialization arguments */ struct wsrep_init_args wsrep_args = { .app_ctx = &global_ctx, .node_name = "example listener", .node_address = own_address, .node_incoming = "", .data_dir = ".", // working directory .options = "", .proto_ver = 127, // maximum supported application event protocol .state_id = &state_id, .state = NULL, .logger_cb = logger_cb, .view_cb = view_cb, .sst_request_cb = sst_request_cb, .encrypt_cb = NULL, .apply_cb = apply_cb, .unordered_cb = unordered_cb, .sst_donate_cb = sst_donate_cb, .synced_cb = synced_cb }; rc = wsrep->init(wsrep, &wsrep_args); if (WSREP_OK != rc) { fprintf (stderr, "wsrep::init() failed: %d\n", rc); exit (EXIT_FAILURE); } /* Connect to cluster */ rc = wsrep->connect(wsrep, cluster_name, wsrep_uri, "", 0); if (0 != rc) { if (rc < 0) fprintf (stderr, "wsrep::connect(%s, %s) failed: %d (%s)\n", cluster_name, wsrep_uri, rc, strerror(-(int)rc)); else fprintf (stderr, "wsrep::connect() failed: %d\n", rc); exit (EXIT_FAILURE); } /* Now let's start several listening threads*/ int const num_threads = 4; struct receiver_context thread_ctx[num_threads]; pthread_t threads[num_threads]; int i; for (i = 0; i < num_threads; i++) { int err = pthread_create ( &threads[i], NULL, recv_thread, &thread_ctx[i]); if (err) { fprintf (stderr, "Failed to start thread %d: %d (%s)", i, err, strerror(err)); exit (EXIT_FAILURE); } } signal (SIGTERM, graceful_leave); signal (SIGINT, graceful_leave); /* Listening threads are now running and receiving writesets. Wait for them * to join. Threads will join after signal handler closes wsrep connection*/ for (i = 0; i < num_threads; i++) { pthread_join (threads[i], NULL); } /* Unload provider after nobody uses it any more. */ wsrep_unload (wsrep); return 0; }