summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/wsrep-API/v26/examples/listener.c
blob: 9fc881fe94dd9c8d57022ee0c45c1de1d881841a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/* Copyright (C) 2012 Codership Oy <info@codersihp.com>

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

/*! @file Example of wsrep event listener. Outputs description of received
 *        events to stdout. To get a general picture you should start with
 *        main() function. */

#include <wsrep_api.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>

/*! This is global application context, it will be used by wsrep callbacks */
struct application_context
{};

static struct application_context global_ctx;

/*! This is receiving thread context, it will be used by wsrep callbacks */
struct receiver_context
{
    char msg[4096];
};

/* wsrep provider handle (global for simplicty) */
static wsrep_t* wsrep = NULL;

/*! This is a logger callback which library will be using to log events. */
static void
logger_cb (wsrep_log_level_t level __attribute__((unused)), const char* msg)
{
    fprintf (stderr, "WSREP: %s\n", msg);
}

/*! This will be called on cluster view change (nodes joining, leaving, etc.).
 *  Each view change is the point where application may be pronounced out of
 *  sync with the current cluster view and need state transfer.
 *  It is guaranteed that no other callbacks are called concurrently with it. */
static wsrep_cb_status_t
view_cb (void*                    app_ctx   __attribute__((unused)),
         void*                    recv_ctx  __attribute__((unused)),
         const wsrep_view_info_t* view,
         const char*              state     __attribute__((unused)),
         size_t                   state_len __attribute__((unused)))
{
    printf ("New cluster membership view: %d nodes, my index is %d, "
            "global seqno: %lld\n",
            view->memb_num, view->my_idx, (long long)view->state_id.seqno);

    return WSREP_CB_SUCCESS;
}

/*! This will be called on cluster view change (nodes joining, leaving, etc.).
 *  Each view change is the point where application may be pronounced out of
 *  sync with the current cluster view and need state transfer.
 *  It is guaranteed that no other callbacks are called concurrently with it. */
static wsrep_cb_status_t
sst_request_cb (void*             app_ctx __attribute__((unused)),
                void**            sst_req,
                size_t*           sst_req_len)
{
    /* For simplicity we're skipping state transfer by using magic string
     * as a state transfer request.
     * This node will not be considered JOINED (having full state)
     * by other cluster members. */
    *sst_req = strdup(WSREP_STATE_TRANSFER_NONE);

    if (*sst_req)
        *sst_req_len = strlen(*sst_req) + 1;
    else
        *sst_req_len = 0;

    return WSREP_CB_SUCCESS;
}

/*! This is called to "apply" writeset.
 *  If writesets don't conflict on keys, it may be called concurrently to
 *  utilize several CPU cores. */
static wsrep_cb_status_t
apply_cb (void*                    recv_ctx,
          const wsrep_ws_handle_t* ws_handle __attribute__((unused)),
          uint32_t                 flags     __attribute__((unused)),
          const wsrep_buf_t*       ws        __attribute__((unused)),
          const wsrep_trx_meta_t*  meta,
          wsrep_bool_t*            exit_loop __attribute__((unused)))
{
    struct receiver_context* ctx = (struct receiver_context*)recv_ctx;

    snprintf (ctx->msg, sizeof(ctx->msg),
              "Got writeset %lld, size %zu", (long long)meta->gtid.seqno,
              ws->len);

    bool const commit = flags & (WSREP_FLAG_TRX_END | WSREP_FLAG_ROLLBACK);

    wsrep->commit_order_enter(wsrep, ws_handle, meta);
    if (commit) puts(ctx->msg);
    wsrep->commit_order_leave(wsrep, ws_handle, meta, NULL);

    return WSREP_CB_SUCCESS;
}

/* The following callbacks are stubs and not used in this example. */
static wsrep_cb_status_t
unordered_cb(void*                recv_ctx __attribute__((unused)),
             const wsrep_buf_t*   data     __attribute__((unused)))
{
    return WSREP_CB_SUCCESS;
}

static wsrep_cb_status_t
sst_donate_cb (void*               app_ctx   __attribute__((unused)),
               void*               recv_ctx  __attribute__((unused)),
               const wsrep_buf_t*  msg       __attribute__((unused)),
               const wsrep_gtid_t* state_id  __attribute__((unused)),
               const wsrep_buf_t*  state     __attribute__((unused)),
               wsrep_bool_t        bypass    __attribute__((unused)))
{
    return WSREP_CB_SUCCESS;
}

static wsrep_cb_status_t synced_cb (void* app_ctx __attribute__((unused)))
{
    return WSREP_CB_SUCCESS;
}

/* This is the listening thread. It blocks in wsrep::recv() call until
 * disconnect from cluster. It will apply and commit writesets through the
 * callbacks defined avbove. */
static void*
recv_thread (void* arg)
{
    struct receiver_context* ctx = (struct receiver_context*)arg;

    wsrep_status_t rc = wsrep->recv(wsrep, ctx);

    fprintf (stderr, "Receiver exited with code %d", rc);

    return NULL;
}

/* This is a signal handler to demonstrate graceful cluster leave. */
static void
graceful_leave (int signum)
{
    printf ("Got signal %d, exiting...\n", signum);
    wsrep->disconnect(wsrep);
}

int main (int const argc, char* argv[])
{
    if (argc < 4 || argc > 5)
    {
        fprintf (stderr, "Usage: %s </path/to/wsrep/provider> <wsrep URI> "
                 "<cluster name> [own address]\n", argv[0]);
        exit (EXIT_FAILURE);
    }

    const char* const wsrep_provider = argv[1];
    const char* const wsrep_uri      = argv[2];
    const char* const cluster_name   = argv[3];
    const char* const own_address    = argc == 5 ? argv[4] : "localhost";

    /* Now let's load and initialize provider */
    wsrep_status_t rc = wsrep_load (wsrep_provider, &wsrep, logger_cb);
    if (WSREP_OK != rc)
    {
        fprintf (stderr, "Failed to load wsrep provider '%s'\n",wsrep_provider);
        exit (EXIT_FAILURE);
    }

    wsrep_gtid_t state_id = { WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED };

    /* wsrep provider initialization arguments */
    struct wsrep_init_args wsrep_args =
    {
        .app_ctx       = &global_ctx,

        .node_name     = "example listener",
        .node_address  = own_address,
        .node_incoming = "",
        .data_dir      = ".", // working directory
        .options       = "",
        .proto_ver     = 127, // maximum supported application event protocol

        .state_id      = &state_id,
        .state         = NULL,

        .logger_cb      = logger_cb,
        .view_cb        = view_cb,
        .sst_request_cb = sst_request_cb,
        .encrypt_cb     = NULL,
        .apply_cb       = apply_cb,
        .unordered_cb   = unordered_cb,
        .sst_donate_cb  = sst_donate_cb,
        .synced_cb      = synced_cb
    };

    rc = wsrep->init(wsrep, &wsrep_args);
    if (WSREP_OK != rc)
    {
        fprintf (stderr, "wsrep::init() failed: %d\n", rc);
        exit (EXIT_FAILURE);
    }

    /* Connect to cluster */
    rc = wsrep->connect(wsrep, cluster_name, wsrep_uri, "", 0);
    if (0 != rc)
    {
        if (rc < 0)
            fprintf (stderr, "wsrep::connect(%s, %s) failed: %d (%s)\n",
                     cluster_name, wsrep_uri, rc, strerror(-(int)rc));
        else
            fprintf (stderr, "wsrep::connect() failed: %d\n", rc);

        exit (EXIT_FAILURE);
    }

    /* Now let's start several listening threads*/
    int const num_threads = 4;
    struct receiver_context thread_ctx[num_threads];
    pthread_t threads[num_threads];

    int i;
    for (i = 0; i < num_threads; i++)
    {
        int err = pthread_create (
            &threads[i], NULL, recv_thread, &thread_ctx[i]);

        if (err)
        {
            fprintf (stderr, "Failed to start thread %d: %d (%s)",
                     i, err, strerror(err));
            exit (EXIT_FAILURE);
        }
    }

    signal (SIGTERM, graceful_leave);
    signal (SIGINT,  graceful_leave);

    /* Listening threads are now running and receiving writesets. Wait for them
     * to join. Threads will join after signal handler closes wsrep connection*/
    for (i = 0; i < num_threads; i++)
    {
        pthread_join (threads[i], NULL);
    }

    /* Unload provider after nobody uses it any more. */
    wsrep_unload (wsrep);

    return 0;
}