summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/wsrep-API/v26/examples/node/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'wsrep-lib/wsrep-API/v26/examples/node/worker.c')
-rw-r--r--wsrep-lib/wsrep-API/v26/examples/node/worker.c197
1 files changed, 197 insertions, 0 deletions
diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.c b/wsrep-lib/wsrep-API/v26/examples/node/worker.c
new file mode 100644
index 00000000..e9901ad8
--- /dev/null
+++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.c
@@ -0,0 +1,197 @@
+/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "worker.h"
+
+#include "log.h"
+#include "options.h"
+#include "trx.h"
+#include "wsrep.h"
+
+#include <assert.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <string.h> // strerror()
+
+struct node_worker
+{
+ struct node_ctx* node;
+ pthread_t thread_id;
+ size_t id;
+ bool exit;
+};
+
+enum wsrep_cb_status
+node_worker_apply_cb(void* const recv_ctx,
+ const wsrep_ws_handle_t* const ws_handle,
+ uint32_t const ws_flags,
+ const wsrep_buf_t* const ws,
+ const wsrep_trx_meta_t* const ws_meta,
+ wsrep_bool_t* const exit_loop)
+{
+ assert(recv_ctx);
+
+ struct node_worker* const worker = recv_ctx;
+
+ wsrep_status_t const ret = node_trx_apply(
+ worker->node->store,
+ node_wsrep_provider(worker->node->wsrep),
+ ws_handle,
+ ws_meta,
+ ws_flags & WSREP_FLAG_ROLLBACK ? NULL : ws);
+
+ *exit_loop = worker->exit;
+
+ return WSREP_OK == ret ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE;
+}
+
+
+static void*
+worker_slave(void* recv_ctx)
+{
+ struct node_worker* const worker = recv_ctx;
+ wsrep_t* const wsrep = node_wsrep_provider(worker->node->wsrep);
+
+ wsrep_status_t const ret = wsrep->recv(wsrep, worker);
+
+ if (WSREP_OK != ret)
+ {
+ NODE_ERROR("slave worker [%zu] exited with error %d.", worker->id, ret);
+ }
+
+ return NULL;
+}
+
+static void*
+worker_master(void* send_ctx)
+{
+ struct node_worker* const worker = send_ctx;
+ struct node_ctx* const node = worker->node;
+ wsrep_t* const wsrep = node_wsrep_provider(node->wsrep);
+
+ assert(node->opts->ws_size > 0);
+
+ wsrep_status_t ret;
+
+ do
+ {
+ /* REPLICATION: we should not perform any local writes until the node
+ * is synced with the cluster. */
+ if (!node_wsrep_wait_synced(node->wsrep))
+ {
+ NODE_ERROR("master worker [%zu] failed waiting for SYNCED state.",
+ worker->id);
+ break;
+ }
+
+ /* REPLICATION: the node is now synced */
+
+ do
+ {
+ ret = node_trx_execute(node->store,
+ wsrep,
+ worker->id,
+ (int)node->opts->operations);
+ }
+ while(WSREP_OK == ret // success
+ || (WSREP_TRX_FAIL == ret // certification failed, trx rolled back
+ && (usleep(10000),true)) // retry after short sleep
+ );
+ }
+ while (WSREP_CONN_FAIL == ret); // provider in bad state (e.g. non-Primary)
+
+ return NULL;
+}
+
+struct node_worker_pool
+{
+ size_t size; // size of the pool (nu,ber of nodes)
+ struct node_worker worker[1]; // worker context array;
+};
+
+struct node_worker_pool*
+node_worker_start(struct node_ctx* const ctx,
+ node_worker_type_t const type,
+ size_t const size)
+{
+ assert(ctx);
+
+ if (0 == size) return NULL;
+
+ const char* const type_str = type == NODE_WORKER_SLAVE ? "slave" : "master";
+
+ size_t const alloc_size =
+ sizeof(struct node_worker_pool) +
+ sizeof(struct node_worker) * (size - 1);
+
+ struct node_worker_pool* const ret = malloc(alloc_size);
+
+ if (ret)
+ {
+ void* (* const routine) (void*) =
+ type == NODE_WORKER_SLAVE ? worker_slave : worker_master;
+
+ size_t i;
+ for (i = 0; i < size; i++)
+ {
+ struct node_worker* const worker = &ret->worker[i];
+ worker->node = ctx;
+ worker->id = i;
+ worker->exit = false;
+
+ int const err = pthread_create(&worker->thread_id,
+ NULL,
+ routine,
+ worker);
+ if (err)
+ {
+ NODE_ERROR("Failed to start %s worker[%zu]: %d (%s)",
+ type_str, i, err, strerror(err));
+ if (0 == i)
+ {
+ free(ret);
+ return NULL;
+ }
+ else
+ {
+ break; // some threads have started,
+ // need to return to close them first
+ }
+ }
+ }
+
+ ret->size = i;
+ }
+ else
+ {
+ NODE_ERROR("Failed to allocate %zu bytes for the %s worker pool",
+ alloc_size, type_str);
+ }
+
+ return ret;
+}
+
+void
+node_worker_stop(struct node_worker_pool* pool)
+{
+ size_t i;
+ for (i = 0; pool && i < pool->size; i++)
+ {
+ pthread_join(pool->worker[i].thread_id, NULL);
+ }
+
+ free(pool);
+}