summaryrefslogtreecommitdiffstats
path: root/ctdb/server/ctdb_recovery_helper.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:47:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:47:29 +0000
commit4f5791ebd03eaec1c7da0865a383175b05102712 (patch)
tree8ce7b00f7a76baa386372422adebbe64510812d4 /ctdb/server/ctdb_recovery_helper.c
parentInitial commit. (diff)
downloadsamba-upstream.tar.xz
samba-upstream.zip
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--ctdb/server/ctdb_recovery_helper.c3200
1 files changed, 3200 insertions, 0 deletions
diff --git a/ctdb/server/ctdb_recovery_helper.c b/ctdb/server/ctdb_recovery_helper.c
new file mode 100644
index 0000000..e0d3219
--- /dev/null
+++ b/ctdb/server/ctdb_recovery_helper.c
@@ -0,0 +1,3200 @@
+/*
+ ctdb parallel database recovery
+
+ Copyright (C) Amitay Isaacs 2015
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/network.h"
+#include "system/filesys.h"
+
+#include <talloc.h>
+#include <tevent.h>
+#include <tdb.h>
+#include <libgen.h>
+
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/sys_rw.h"
+#include "lib/util/time.h"
+#include "lib/util/tevent_unix.h"
+#include "lib/util/util.h"
+#include "lib/util/smb_strtox.h"
+
+#include "protocol/protocol.h"
+#include "protocol/protocol_api.h"
+#include "client/client.h"
+
+#include "common/logging.h"
+
+static int recover_timeout = 30;
+
+#define NUM_RETRIES 3
+
+#define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
+
+/*
+ * Utility functions
+ */
+
+static bool generic_recv(struct tevent_req *req, int *perr)
+{
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
+
+static uint64_t srvid_next(void)
+{
+ rec_srvid += 1;
+ return rec_srvid;
+}
+
+/*
+ * Node related functions
+ */
+
+struct node_list {
+ uint32_t *pnn_list;
+ uint32_t *caps;
+ uint32_t *ban_credits;
+ unsigned int size;
+ unsigned int count;
+};
+
+static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
+{
+ struct node_list *nlist;
+ unsigned int i;
+
+ nlist = talloc_zero(mem_ctx, struct node_list);
+ if (nlist == NULL) {
+ return NULL;
+ }
+
+ nlist->pnn_list = talloc_array(nlist, uint32_t, size);
+ nlist->caps = talloc_zero_array(nlist, uint32_t, size);
+ nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
+
+ if (nlist->pnn_list == NULL ||
+ nlist->caps == NULL ||
+ nlist->ban_credits == NULL) {
+ talloc_free(nlist);
+ return NULL;
+ }
+ nlist->size = size;
+
+ for (i=0; i<nlist->size; i++) {
+ nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
+ }
+
+ return nlist;
+}
+
+static bool node_list_add(struct node_list *nlist, uint32_t pnn)
+{
+ unsigned int i;
+
+ if (nlist->count == nlist->size) {
+ return false;
+ }
+
+ for (i=0; i<nlist->count; i++) {
+ if (nlist->pnn_list[i] == pnn) {
+ return false;
+ }
+ }
+
+ nlist->pnn_list[nlist->count] = pnn;
+ nlist->count += 1;
+
+ return true;
+}
+
+static uint32_t *node_list_lmaster(struct node_list *nlist,
+ TALLOC_CTX *mem_ctx,
+ unsigned int *pnn_count)
+{
+ uint32_t *pnn_list;
+ unsigned int count, i;
+
+ pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
+ if (pnn_list == NULL) {
+ return NULL;
+ }
+
+ count = 0;
+ for (i=0; i<nlist->count; i++) {
+ if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
+ continue;
+ }
+
+ pnn_list[count] = nlist->pnn_list[i];
+ count += 1;
+ }
+
+ *pnn_count = count;
+ return pnn_list;
+}
+
+static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
+{
+ unsigned int i;
+
+ for (i=0; i<nlist->count; i++) {
+ if (nlist->pnn_list[i] == pnn) {
+ nlist->ban_credits[i] += 1;
+ break;
+ }
+ }
+}
+
+/*
+ * Database list functions
+ *
+ * Simple, naive implementation that could be updated to a db_hash or similar
+ */
+
+struct db {
+ struct db *prev, *next;
+
+ uint32_t db_id;
+ uint32_t db_flags;
+ uint32_t *pnn_list;
+ unsigned int num_nodes;
+};
+
+struct db_list {
+ unsigned int num_dbs;
+ struct db *db;
+ unsigned int num_nodes;
+};
+
+static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
+{
+ struct db_list *l;
+
+ l = talloc_zero(mem_ctx, struct db_list);
+ l->num_nodes = num_nodes;
+
+ return l;
+}
+
+static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
+{
+ struct db *db;
+
+ if (dblist == NULL) {
+ return NULL;
+ }
+
+ db = dblist->db;
+ while (db != NULL && db->db_id != db_id) {
+ db = db->next;
+ }
+
+ return db;
+}
+
+static int db_list_add(struct db_list *dblist,
+ uint32_t db_id,
+ uint32_t db_flags,
+ uint32_t node)
+{
+ struct db *db = NULL;
+
+ if (dblist == NULL) {
+ return EINVAL;
+ }
+
+ db = talloc_zero(dblist, struct db);
+ if (db == NULL) {
+ return ENOMEM;
+ }
+
+ db->db_id = db_id;
+ db->db_flags = db_flags;
+ db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
+ if (db->pnn_list == NULL) {
+ talloc_free(db);
+ return ENOMEM;
+ }
+ db->pnn_list[0] = node;
+ db->num_nodes = 1;
+
+ DLIST_ADD_END(dblist->db, db);
+ dblist->num_dbs++;
+
+ return 0;
+}
+
+static int db_list_check_and_add(struct db_list *dblist,
+ uint32_t db_id,
+ uint32_t db_flags,
+ uint32_t node)
+{
+ struct db *db = NULL;
+ int ret;
+
+ /*
+ * These flags are masked out because they are only set on a
+ * node when a client attaches to that node, so they might not
+ * be set yet. They can't be passed as part of the attch, so
+ * they're no use here.
+ */
+ db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
+
+ if (dblist == NULL) {
+ return EINVAL;
+ }
+
+ db = db_list_find(dblist, db_id);
+ if (db == NULL) {
+ ret = db_list_add(dblist, db_id, db_flags, node);
+ return ret;
+ }
+
+ if (db->db_flags != db_flags) {
+ D_ERR("Incompatible database flags for 0x%"PRIx32" "
+ "(0x%"PRIx32" != 0x%"PRIx32")\n",
+ db_id,
+ db_flags,
+ db->db_flags);
+ return EINVAL;
+ }
+
+ if (db->num_nodes >= dblist->num_nodes) {
+ return EINVAL;
+ }
+
+ db->pnn_list[db->num_nodes] = node;
+ db->num_nodes++;
+
+ return 0;
+}
+
+/*
+ * Create database on nodes where it is missing
+ */
+
+struct db_create_missing_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+
+ struct node_list *nlist;
+
+ const char *db_name;
+ uint32_t *missing_pnn_list;
+ int missing_num_nodes;
+};
+
+static void db_create_missing_done(struct tevent_req *subreq);
+
+static struct tevent_req *db_create_missing_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct node_list *nlist,
+ const char *db_name,
+ struct db *db)
+{
+ struct tevent_req *req, *subreq;
+ struct db_create_missing_state *state;
+ struct ctdb_req_control request;
+ unsigned int i, j;
+
+ req = tevent_req_create(mem_ctx,
+ &state,
+ struct db_create_missing_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->nlist = nlist;
+ state->db_name = db_name;
+
+ if (nlist->count == db->num_nodes) {
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+
+ state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
+ if (tevent_req_nomem(state->missing_pnn_list, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ for (i = 0; i < nlist->count; i++) {
+ uint32_t pnn = nlist->pnn_list[i] ;
+
+ for (j = 0; j < db->num_nodes; j++) {
+ if (pnn == db->pnn_list[j]) {
+ break;
+ }
+ }
+
+ if (j < db->num_nodes) {
+ continue;
+ }
+
+ DBG_INFO("Create database %s on node %u\n",
+ state->db_name,
+ pnn);
+ state->missing_pnn_list[state->missing_num_nodes] = pnn;
+ state->missing_num_nodes++;
+ }
+
+ if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
+ ctdb_req_control_db_attach_persistent(&request, db_name);
+ } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
+ ctdb_req_control_db_attach_replicated(&request, db_name);
+ } else {
+ ctdb_req_control_db_attach(&request, db_name);
+ }
+ request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->missing_pnn_list,
+ state->missing_num_nodes,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, db_create_missing_done, req);
+
+ return req;
+}
+
+static void db_create_missing_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct db_create_missing_state *state = tevent_req_data(
+ req, struct db_create_missing_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq,
+ &ret,
+ NULL,
+ &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(
+ state->missing_pnn_list,
+ state->missing_num_nodes,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control DB_ATTACH failed for db %s"
+ " on node %u, ret=%d\n",
+ state->db_name,
+ pnn,
+ ret2);
+ node_list_ban_credits(state->nlist, pnn);
+ } else {
+ D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
+ state->db_name,
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static bool db_create_missing_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+/*
+ * Recovery database functions
+ */
+
+struct recdb_context {
+ uint32_t db_id;
+ const char *db_name;
+ const char *db_path;
+ struct tdb_wrap *db;
+ bool persistent;
+};
+
+static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
+ const char *db_name,
+ const char *db_path,
+ uint32_t hash_size, bool persistent)
+{
+ static char *db_dir_state = NULL;
+ struct recdb_context *recdb;
+ unsigned int tdb_flags;
+
+ recdb = talloc(mem_ctx, struct recdb_context);
+ if (recdb == NULL) {
+ return NULL;
+ }
+
+ if (db_dir_state == NULL) {
+ db_dir_state = getenv("CTDB_DBDIR_STATE");
+ }
+
+ recdb->db_name = db_name;
+ recdb->db_id = db_id;
+ recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
+ db_dir_state != NULL ?
+ db_dir_state :
+ dirname(discard_const(db_path)),
+ db_name);
+ if (recdb->db_path == NULL) {
+ talloc_free(recdb);
+ return NULL;
+ }
+ unlink(recdb->db_path);
+
+ tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
+ recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
+ tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
+ if (recdb->db == NULL) {
+ talloc_free(recdb);
+ D_ERR("failed to create recovery db %s\n", recdb->db_path);
+ return NULL;
+ }
+
+ recdb->persistent = persistent;
+
+ return recdb;
+}
+
+static uint32_t recdb_id(struct recdb_context *recdb)
+{
+ return recdb->db_id;
+}
+
+static const char *recdb_name(struct recdb_context *recdb)
+{
+ return recdb->db_name;
+}
+
+static const char *recdb_path(struct recdb_context *recdb)
+{
+ return recdb->db_path;
+}
+
+static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
+{
+ return recdb->db->tdb;
+}
+
+static bool recdb_persistent(struct recdb_context *recdb)
+{
+ return recdb->persistent;
+}
+
+struct recdb_add_traverse_state {
+ struct recdb_context *recdb;
+ uint32_t mypnn;
+};
+
+static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct recdb_add_traverse_state *state =
+ (struct recdb_add_traverse_state *)private_data;
+ struct ctdb_ltdb_header *hdr;
+ TDB_DATA prev_data;
+ int ret;
+
+ /* header is not marshalled separately in the pulldb control */
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ return -1;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+ /* fetch the existing record, if any */
+ prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
+
+ if (prev_data.dptr != NULL) {
+ struct ctdb_ltdb_header prev_hdr;
+
+ prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
+ free(prev_data.dptr);
+ if (hdr->rsn < prev_hdr.rsn ||
+ (hdr->rsn == prev_hdr.rsn &&
+ prev_hdr.dmaster != state->mypnn)) {
+ return 0;
+ }
+ }
+
+ ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
+ if (ret != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static bool recdb_add(struct recdb_context *recdb, int mypnn,
+ struct ctdb_rec_buffer *recbuf)
+{
+ struct recdb_add_traverse_state state;
+ int ret;
+
+ state.recdb = recdb;
+ state.mypnn = mypnn;
+
+ ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
+ if (ret != 0) {
+ return false;
+ }
+
+ return true;
+}
+
+/* This function decides which records from recdb are retained */
+static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
+ uint32_t reqid, uint32_t dmaster,
+ TDB_DATA key, TDB_DATA data)
+{
+ struct ctdb_ltdb_header *header;
+ int ret;
+
+ /* Skip empty records */
+ if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+ return 0;
+ }
+
+ /* update the dmaster field to point to us */
+ header = (struct ctdb_ltdb_header *)data.dptr;
+ if (!persistent) {
+ header->dmaster = dmaster;
+ header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
+ }
+
+ ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
+ if (ret != 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
+struct recdb_file_traverse_state {
+ struct ctdb_rec_buffer *recbuf;
+ struct recdb_context *recdb;
+ TALLOC_CTX *mem_ctx;
+ uint32_t dmaster;
+ uint32_t reqid;
+ bool persistent;
+ bool failed;
+ int fd;
+ size_t max_size;
+ unsigned int num_buffers;
+};
+
+static int recdb_file_traverse(struct tdb_context *tdb,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct recdb_file_traverse_state *state =
+ (struct recdb_file_traverse_state *)private_data;
+ int ret;
+
+ ret = recbuf_filter_add(state->recbuf, state->persistent,
+ state->reqid, state->dmaster, key, data);
+ if (ret != 0) {
+ state->failed = true;
+ return ret;
+ }
+
+ if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
+ ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
+ if (ret != 0) {
+ D_ERR("Failed to collect recovery records for %s\n",
+ recdb_name(state->recdb));
+ state->failed = true;
+ return ret;
+ }
+
+ state->num_buffers += 1;
+
+ TALLOC_FREE(state->recbuf);
+ state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
+ recdb_id(state->recdb));
+ if (state->recbuf == NULL) {
+ state->failed = true;
+ return ENOMEM;
+ }
+ }
+
+ return 0;
+}
+
+static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
+ uint32_t dmaster, int fd, int max_size)
+{
+ struct recdb_file_traverse_state state;
+ int ret;
+
+ state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
+ if (state.recbuf == NULL) {
+ return -1;
+ }
+ state.recdb = recdb;
+ state.mem_ctx = mem_ctx;
+ state.dmaster = dmaster;
+ state.reqid = 0;
+ state.persistent = recdb_persistent(recdb);
+ state.failed = false;
+ state.fd = fd;
+ state.max_size = max_size;
+ state.num_buffers = 0;
+
+ ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
+ if (ret == -1 || state.failed) {
+ TALLOC_FREE(state.recbuf);
+ return -1;
+ }
+
+ ret = ctdb_rec_buffer_write(state.recbuf, fd);
+ if (ret != 0) {
+ D_ERR("Failed to collect recovery records for %s\n",
+ recdb_name(recdb));
+ TALLOC_FREE(state.recbuf);
+ return -1;
+ }
+ state.num_buffers += 1;
+
+ D_DEBUG("Wrote %d buffers of recovery records for %s\n",
+ state.num_buffers, recdb_name(recdb));
+
+ return state.num_buffers;
+}
+
+/*
+ * Pull database from a single node
+ */
+
+struct pull_database_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct recdb_context *recdb;
+ uint32_t pnn;
+ uint64_t srvid;
+ unsigned int num_records;
+ int result;
+};
+
+static void pull_database_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data);
+static void pull_database_register_done(struct tevent_req *subreq);
+static void pull_database_unregister_done(struct tevent_req *subreq);
+static void pull_database_done(struct tevent_req *subreq);
+
+static struct tevent_req *pull_database_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t pnn,
+ struct recdb_context *recdb)
+{
+ struct tevent_req *req, *subreq;
+ struct pull_database_state *state;
+
+ req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->recdb = recdb;
+ state->pnn = pnn;
+ state->srvid = srvid_next();
+
+ subreq = ctdb_client_set_message_handler_send(
+ state, state->ev, state->client,
+ state->srvid, pull_database_handler,
+ req);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ tevent_req_set_callback(subreq, pull_database_register_done, req);
+
+ return req;
+}
+
+static void pull_database_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_rec_buffer *recbuf;
+ size_t np;
+ int ret;
+ bool status;
+
+ if (srvid != state->srvid) {
+ return;
+ }
+
+ ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
+ if (ret != 0) {
+ D_ERR("Invalid data received for DB_PULL messages\n");
+ return;
+ }
+
+ if (recbuf->db_id != recdb_id(state->recdb)) {
+ talloc_free(recbuf);
+ D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
+ recbuf->db_id, recdb_name(state->recdb));
+ return;
+ }
+
+ status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
+ recbuf);
+ if (! status) {
+ talloc_free(recbuf);
+ D_ERR("Failed to add records to recdb for %s\n",
+ recdb_name(state->recdb));
+ return;
+ }
+
+ state->num_records += recbuf->count;
+ talloc_free(recbuf);
+}
+
+static void pull_database_register_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_req_control request;
+ struct ctdb_pulldb_ext pulldb_ext;
+ int ret;
+ bool status;
+
+ status = ctdb_client_set_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("Failed to set message handler for DB_PULL for %s\n",
+ recdb_name(state->recdb));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ pulldb_ext.db_id = recdb_id(state->recdb);
+ pulldb_ext.lmaster = CTDB_LMASTER_ANY;
+ pulldb_ext.srvid = state->srvid;
+
+ ctdb_req_control_db_pull(&request, &pulldb_ext);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->pnn, TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, pull_database_done, req);
+}
+
+static void pull_database_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_reply_control *reply;
+ uint32_t num_records;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
+ recdb_name(state->recdb), state->pnn, ret);
+ state->result = ret;
+ goto unregister;
+ }
+
+ ret = ctdb_reply_control_db_pull(reply, &num_records);
+ talloc_free(reply);
+ if (num_records != state->num_records) {
+ D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
+ num_records, state->num_records,
+ recdb_name(state->recdb));
+ state->result = EIO;
+ goto unregister;
+ }
+
+ D_INFO("Pulled %d records for db %s from node %d\n",
+ state->num_records, recdb_name(state->recdb), state->pnn);
+
+unregister:
+
+ subreq = ctdb_client_remove_message_handler_send(
+ state, state->ev, state->client,
+ state->srvid, req);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, pull_database_unregister_done, req);
+}
+
+static void pull_database_unregister_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_remove_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("failed to remove message handler for DB_PULL for db %s\n",
+ recdb_name(state->recdb));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (state->result != 0) {
+ tevent_req_error(req, state->result);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static bool pull_database_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+/*
+ * Push database to specified nodes (new style)
+ */
+
+struct push_database_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct recdb_context *recdb;
+ uint32_t *pnn_list;
+ unsigned int count;
+ uint64_t srvid;
+ uint32_t dmaster;
+ int fd;
+ int num_buffers;
+ int num_buffers_sent;
+ unsigned int num_records;
+};
+
+static void push_database_started(struct tevent_req *subreq);
+static void push_database_send_msg(struct tevent_req *req);
+static void push_database_send_done(struct tevent_req *subreq);
+static void push_database_confirmed(struct tevent_req *subreq);
+
+static struct tevent_req *push_database_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t *pnn_list,
+ unsigned int count,
+ struct recdb_context *recdb,
+ int max_size)
+{
+ struct tevent_req *req, *subreq;
+ struct push_database_state *state;
+ struct ctdb_req_control request;
+ struct ctdb_pulldb_ext pulldb_ext;
+ char *filename;
+ off_t offset;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct push_database_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->recdb = recdb;
+ state->pnn_list = pnn_list;
+ state->count = count;
+
+ state->srvid = srvid_next();
+ state->dmaster = ctdb_client_pnn(client);
+ state->num_buffers_sent = 0;
+ state->num_records = 0;
+
+ filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
+ if (tevent_req_nomem(filename, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ state->fd = open(filename, O_RDWR|O_CREAT, 0644);
+ if (state->fd == -1) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
+ }
+ unlink(filename);
+ talloc_free(filename);
+
+ state->num_buffers = recdb_file(recdb, state, state->dmaster,
+ state->fd, max_size);
+ if (state->num_buffers == -1) {
+ tevent_req_error(req, ENOMEM);
+ return tevent_req_post(req, ev);
+ }
+
+ offset = lseek(state->fd, 0, SEEK_SET);
+ if (offset != 0) {
+ tevent_req_error(req, EIO);
+ return tevent_req_post(req, ev);
+ }
+
+ pulldb_ext.db_id = recdb_id(recdb);
+ pulldb_ext.srvid = state->srvid;
+
+ ctdb_req_control_db_push_start(&request, &pulldb_ext);
+ subreq = ctdb_client_control_multi_send(state, ev, client,
+ pnn_list, count,
+ TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, push_database_started, req);
+
+ return req;
+}
+
+static void push_database_started(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_state *state = tevent_req_data(
+ req, struct push_database_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state,
+ &err_list, NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->pnn_list,
+ state->count,
+ err_list, &pnn);
+ if (ret2 != 0) {
+ D_ERR("control DB_PUSH_START failed for db %s"
+ " on node %u, ret=%d\n",
+ recdb_name(state->recdb), pnn, ret2);
+ } else {
+ D_ERR("control DB_PUSH_START failed for db %s,"
+ " ret=%d\n",
+ recdb_name(state->recdb), ret);
+ }
+ talloc_free(err_list);
+
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ push_database_send_msg(req);
+}
+
+static void push_database_send_msg(struct tevent_req *req)
+{
+ struct push_database_state *state = tevent_req_data(
+ req, struct push_database_state);
+ struct tevent_req *subreq;
+ struct ctdb_rec_buffer *recbuf;
+ struct ctdb_req_message message;
+ TDB_DATA data;
+ size_t np;
+ int ret;
+
+ if (state->num_buffers_sent == state->num_buffers) {
+ struct ctdb_req_control request;
+
+ ctdb_req_control_db_push_confirm(&request,
+ recdb_id(state->recdb));
+ subreq = ctdb_client_control_multi_send(state, state->ev,
+ state->client,
+ state->pnn_list,
+ state->count,
+ TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, push_database_confirmed, req);
+ return;
+ }
+
+ ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ data.dsize = ctdb_rec_buffer_len(recbuf);
+ data.dptr = talloc_size(state, data.dsize);
+ if (tevent_req_nomem(data.dptr, req)) {
+ return;
+ }
+
+ ctdb_rec_buffer_push(recbuf, data.dptr, &np);
+
+ message.srvid = state->srvid;
+ message.data.data = data;
+
+ D_DEBUG("Pushing buffer %d with %d records for db %s\n",
+ state->num_buffers_sent, recbuf->count,
+ recdb_name(state->recdb));
+
+ subreq = ctdb_client_message_multi_send(state, state->ev,
+ state->client,
+ state->pnn_list, state->count,
+ &message);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, push_database_send_done, req);
+
+ state->num_records += recbuf->count;
+
+ talloc_free(data.dptr);
+ talloc_free(recbuf);
+}
+
+static void push_database_send_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_state *state = tevent_req_data(
+ req, struct push_database_state);
+ bool status;
+ int ret;
+
+ status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("Sending recovery records failed for %s\n",
+ recdb_name(state->recdb));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->num_buffers_sent += 1;
+
+ push_database_send_msg(req);
+}
+
+static void push_database_confirmed(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct push_database_state *state = tevent_req_data(
+ req, struct push_database_state);
+ struct ctdb_reply_control **reply;
+ int *err_list;
+ bool status;
+ unsigned int i;
+ int ret;
+ uint32_t num_records;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state,
+ &err_list, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->pnn_list,
+ state->count, err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control DB_PUSH_CONFIRM failed for db %s"
+ " on node %u, ret=%d\n",
+ recdb_name(state->recdb), pnn, ret2);
+ } else {
+ D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
+ " ret=%d\n",
+ recdb_name(state->recdb), ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ for (i=0; i<state->count; i++) {
+ ret = ctdb_reply_control_db_push_confirm(reply[i],
+ &num_records);
+ if (ret != 0) {
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ if (num_records != state->num_records) {
+ D_ERR("Node %u received %d of %d records for %s\n",
+ state->pnn_list[i], num_records,
+ state->num_records, recdb_name(state->recdb));
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+ }
+
+ talloc_free(reply);
+
+ D_INFO("Pushed %d records for db %s\n",
+ state->num_records, recdb_name(state->recdb));
+
+ tevent_req_done(req);
+}
+
+static bool push_database_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+/*
+ * Collect databases using highest sequence number
+ */
+
+struct collect_highseqnum_db_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct node_list *nlist;
+ uint32_t db_id;
+ struct recdb_context *recdb;
+
+ uint32_t max_pnn;
+};
+
+static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
+static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
+
+static struct tevent_req *collect_highseqnum_db_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct node_list *nlist,
+ uint32_t db_id,
+ struct recdb_context *recdb)
+{
+ struct tevent_req *req, *subreq;
+ struct collect_highseqnum_db_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct collect_highseqnum_db_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->nlist = nlist;
+ state->db_id = db_id;
+ state->recdb = recdb;
+
+ ctdb_req_control_get_db_seqnum(&request, db_id);
+ subreq = ctdb_client_control_multi_send(mem_ctx,
+ ev,
+ client,
+ nlist->pnn_list,
+ nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
+ req);
+
+ return req;
+}
+
+static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct collect_highseqnum_db_state *state = tevent_req_data(
+ req, struct collect_highseqnum_db_state);
+ struct ctdb_reply_control **reply;
+ int *err_list;
+ bool status;
+ unsigned int i;
+ int ret;
+ uint64_t seqnum, max_seqnum;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state,
+ &err_list, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control GET_DB_SEQNUM failed for db %s"
+ " on node %u, ret=%d\n",
+ recdb_name(state->recdb), pnn, ret2);
+ } else {
+ D_ERR("control GET_DB_SEQNUM failed for db %s,"
+ " ret=%d\n",
+ recdb_name(state->recdb), ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ max_seqnum = 0;
+ state->max_pnn = state->nlist->pnn_list[0];
+ for (i=0; i<state->nlist->count; i++) {
+ ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
+ if (ret != 0) {
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ if (max_seqnum < seqnum) {
+ max_seqnum = seqnum;
+ state->max_pnn = state->nlist->pnn_list[i];
+ }
+ }
+
+ talloc_free(reply);
+
+ D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
+ recdb_name(state->recdb), state->max_pnn, max_seqnum);
+
+ subreq = pull_database_send(state,
+ state->ev,
+ state->client,
+ state->max_pnn,
+ state->recdb);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
+ req);
+}
+
+static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct collect_highseqnum_db_state *state = tevent_req_data(
+ req, struct collect_highseqnum_db_state);
+ int ret;
+ bool status;
+
+ status = pull_database_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ node_list_ban_credits(state->nlist, state->max_pnn);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+/*
+ * Collect all databases
+ */
+
+struct collect_all_db_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct node_list *nlist;
+ uint32_t db_id;
+ struct recdb_context *recdb;
+
+ struct ctdb_pulldb pulldb;
+ unsigned int index;
+};
+
+static void collect_all_db_pulldb_done(struct tevent_req *subreq);
+
+static struct tevent_req *collect_all_db_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct node_list *nlist,
+ uint32_t db_id,
+ struct recdb_context *recdb)
+{
+ struct tevent_req *req, *subreq;
+ struct collect_all_db_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct collect_all_db_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->nlist = nlist;
+ state->db_id = db_id;
+ state->recdb = recdb;
+ state->index = 0;
+
+ subreq = pull_database_send(state,
+ ev,
+ client,
+ nlist->pnn_list[state->index],
+ recdb);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
+
+ return req;
+}
+
+static void collect_all_db_pulldb_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct collect_all_db_state *state = tevent_req_data(
+ req, struct collect_all_db_state);
+ int ret;
+ bool status;
+
+ status = pull_database_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ node_list_ban_credits(state->nlist,
+ state->nlist->pnn_list[state->index]);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->index += 1;
+ if (state->index == state->nlist->count) {
+ tevent_req_done(req);
+ return;
+ }
+
+ subreq = pull_database_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list[state->index],
+ state->recdb);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
+}
+
+static bool collect_all_db_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
+
+/**
+ * For each database do the following:
+ * - Get DB name from all nodes
+ * - Attach database on missing nodes
+ * - Get DB path
+ * - Freeze database on all nodes
+ * - Start transaction on all nodes
+ * - Collect database from all nodes
+ * - Wipe database on all nodes
+ * - Push database to all nodes
+ * - Commit transaction on all nodes
+ * - Thaw database on all nodes
+ */
+
+struct recover_db_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_tunable_list *tun_list;
+ struct node_list *nlist;
+ struct db *db;
+
+ uint32_t destnode;
+ struct ctdb_transdb transdb;
+
+ const char *db_name, *db_path;
+ struct recdb_context *recdb;
+};
+
+static void recover_db_name_done(struct tevent_req *subreq);
+static void recover_db_create_missing_done(struct tevent_req *subreq);
+static void recover_db_path_done(struct tevent_req *subreq);
+static void recover_db_freeze_done(struct tevent_req *subreq);
+static void recover_db_transaction_started(struct tevent_req *subreq);
+static void recover_db_collect_done(struct tevent_req *subreq);
+static void recover_db_wipedb_done(struct tevent_req *subreq);
+static void recover_db_pushdb_done(struct tevent_req *subreq);
+static void recover_db_transaction_committed(struct tevent_req *subreq);
+static void recover_db_thaw_done(struct tevent_req *subreq);
+
+static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_tunable_list *tun_list,
+ struct node_list *nlist,
+ uint32_t generation,
+ struct db *db)
+{
+ struct tevent_req *req, *subreq;
+ struct recover_db_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->tun_list = tun_list;
+ state->nlist = nlist;
+ state->db = db;
+
+ state->destnode = ctdb_client_pnn(client);
+ state->transdb.db_id = db->db_id;
+ state->transdb.tid = generation;
+
+ ctdb_req_control_get_dbname(&request, db->db_id);
+ subreq = ctdb_client_control_multi_send(state,
+ ev,
+ client,
+ state->db->pnn_list,
+ state->db->num_nodes,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, recover_db_name_done, req);
+
+ return req;
+}
+
+static void recover_db_name_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_reply_control **reply;
+ int *err_list;
+ unsigned int i;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq,
+ &ret,
+ state,
+ &err_list,
+ &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
+ state->db->num_nodes,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control GET_DBNAME failed on node %u,"
+ " ret=%d\n",
+ pnn,
+ ret2);
+ } else {
+ D_ERR("control GET_DBNAME failed, ret=%d\n",
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ for (i = 0; i < state->db->num_nodes; i++) {
+ const char *db_name;
+ uint32_t pnn;
+
+ pnn = state->nlist->pnn_list[i];
+
+ ret = ctdb_reply_control_get_dbname(reply[i],
+ state,
+ &db_name);
+ if (ret != 0) {
+ D_ERR("control GET_DBNAME failed on node %u "
+ "for db=0x%x, ret=%d\n",
+ pnn,
+ state->db->db_id,
+ ret);
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ if (state->db_name == NULL) {
+ state->db_name = db_name;
+ continue;
+ }
+
+ if (strcmp(state->db_name, db_name) != 0) {
+ D_ERR("Incompatible database name for 0x%"PRIx32" "
+ "(%s != %s) on node %"PRIu32"\n",
+ state->db->db_id,
+ db_name,
+ state->db_name,
+ pnn);
+ node_list_ban_credits(state->nlist, pnn);
+ tevent_req_error(req, ret);
+ return;
+ }
+ }
+
+ talloc_free(reply);
+
+ subreq = db_create_missing_send(state,
+ state->ev,
+ state->client,
+ state->nlist,
+ state->db_name,
+ state->db);
+
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
+}
+
+static void recover_db_create_missing_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_req_control request;
+ int ret;
+ bool status;
+
+ /* Could sanity check the db_id here */
+ status = db_create_missing_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_getdbpath(&request, state->db->db_id);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_path_done, req);
+}
+
+static void recover_db_path_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_req_control request;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
+ state->db_name, ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
+ if (ret != 0) {
+ D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
+ state->db_name, ret);
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ talloc_free(reply);
+
+ ctdb_req_control_db_freeze(&request, state->db->db_id);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_freeze_done, req);
+}
+
+static void recover_db_freeze_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_req_control request;
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control FREEZE_DB failed for db %s"
+ " on node %u, ret=%d\n",
+ state->db_name, pnn, ret2);
+
+ node_list_ban_credits(state->nlist, pnn);
+ } else {
+ D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
+ state->db_name, ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_db_transaction_start(&request, &state->transdb);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_transaction_started, req);
+}
+
+static void recover_db_transaction_started(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ int *err_list;
+ uint32_t flags;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control TRANSACTION_DB failed for db=%s"
+ " on node %u, ret=%d\n",
+ state->db_name, pnn, ret2);
+ } else {
+ D_ERR("control TRANSACTION_DB failed for db=%s,"
+ " ret=%d\n", state->db_name, ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ flags = state->db->db_flags;
+ state->recdb = recdb_create(state,
+ state->db->db_id,
+ state->db_name,
+ state->db_path,
+ state->tun_list->database_hash_size,
+ flags & CTDB_DB_FLAGS_PERSISTENT);
+ if (tevent_req_nomem(state->recdb, req)) {
+ return;
+ }
+
+ if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
+ (flags & CTDB_DB_FLAGS_REPLICATED)) {
+ subreq = collect_highseqnum_db_send(state,
+ state->ev,
+ state->client,
+ state->nlist,
+ state->db->db_id,
+ state->recdb);
+ } else {
+ subreq = collect_all_db_send(state,
+ state->ev,
+ state->client,
+ state->nlist,
+ state->db->db_id,
+ state->recdb);
+ }
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_collect_done, req);
+}
+
+static void recover_db_collect_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_req_control request;
+ int ret;
+ bool status;
+
+ if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
+ (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
+ status = collect_highseqnum_db_recv(subreq, &ret);
+ } else {
+ status = collect_all_db_recv(subreq, &ret);
+ }
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_wipe_database(&request, &state->transdb);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
+}
+
+static void recover_db_wipedb_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control WIPEDB failed for db %s on node %u,"
+ " ret=%d\n", state->db_name, pnn, ret2);
+ } else {
+ D_ERR("control WIPEDB failed for db %s, ret=%d\n",
+ state->db_name, ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ subreq = push_database_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ state->recdb,
+ state->tun_list->rec_buffer_size_limit);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
+}
+
+static void recover_db_pushdb_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_req_control request;
+ int ret;
+ bool status;
+
+ status = push_database_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ TALLOC_FREE(state->recdb);
+
+ ctdb_req_control_db_transaction_commit(&request, &state->transdb);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
+}
+
+static void recover_db_transaction_committed(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ struct ctdb_req_control request;
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
+ " on node %u, ret=%d\n",
+ state->db_name, pnn, ret2);
+ } else {
+ D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
+ " ret=%d\n", state->db_name, ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_db_thaw(&request, state->db->db_id);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recover_db_thaw_done, req);
+}
+
+static void recover_db_thaw_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recover_db_state *state = tevent_req_data(
+ req, struct recover_db_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control DB_THAW failed for db %s on node %u,"
+ " ret=%d\n", state->db_name, pnn, ret2);
+ } else {
+ D_ERR("control DB_THAW failed for db %s, ret=%d\n",
+ state->db_name, ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static bool recover_db_recv(struct tevent_req *req)
+{
+ return generic_recv(req, NULL);
+}
+
+
+/*
+ * Start database recovery for each database
+ *
+ * Try to recover each database 5 times before failing recovery.
+ */
+
+struct db_recovery_state {
+ struct tevent_context *ev;
+ struct db_list *dblist;
+ unsigned int num_replies;
+ unsigned int num_failed;
+};
+
+struct db_recovery_one_state {
+ struct tevent_req *req;
+ struct ctdb_client_context *client;
+ struct db_list *dblist;
+ struct ctdb_tunable_list *tun_list;
+ struct node_list *nlist;
+ uint32_t generation;
+ struct db *db;
+ int num_fails;
+};
+
+static void db_recovery_one_done(struct tevent_req *subreq);
+
+static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct db_list *dblist,
+ struct ctdb_tunable_list *tun_list,
+ struct node_list *nlist,
+ uint32_t generation)
+{
+ struct tevent_req *req, *subreq;
+ struct db_recovery_state *state;
+ struct db *db;
+
+ req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->dblist = dblist;
+ state->num_replies = 0;
+ state->num_failed = 0;
+
+ if (dblist->num_dbs == 0) {
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+
+ for (db = dblist->db; db != NULL; db = db->next) {
+ struct db_recovery_one_state *substate;
+
+ substate = talloc_zero(state, struct db_recovery_one_state);
+ if (tevent_req_nomem(substate, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ substate->req = req;
+ substate->client = client;
+ substate->dblist = dblist;
+ substate->tun_list = tun_list;
+ substate->nlist = nlist;
+ substate->generation = generation;
+ substate->db = db;
+
+ subreq = recover_db_send(state,
+ ev,
+ client,
+ tun_list,
+ nlist,
+ generation,
+ substate->db);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, db_recovery_one_done,
+ substate);
+ D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
+ }
+
+ return req;
+}
+
+static void db_recovery_one_done(struct tevent_req *subreq)
+{
+ struct db_recovery_one_state *substate = tevent_req_callback_data(
+ subreq, struct db_recovery_one_state);
+ struct tevent_req *req = substate->req;
+ struct db_recovery_state *state = tevent_req_data(
+ req, struct db_recovery_state);
+ bool status;
+
+ status = recover_db_recv(subreq);
+ TALLOC_FREE(subreq);
+
+ if (status) {
+ talloc_free(substate);
+ goto done;
+ }
+
+ substate->num_fails += 1;
+ if (substate->num_fails < NUM_RETRIES) {
+ subreq = recover_db_send(state,
+ state->ev,
+ substate->client,
+ substate->tun_list,
+ substate->nlist,
+ substate->generation,
+ substate->db);
+ if (tevent_req_nomem(subreq, req)) {
+ goto failed;
+ }
+ tevent_req_set_callback(subreq, db_recovery_one_done, substate);
+ D_NOTICE("recover database 0x%08x, attempt %d\n",
+ substate->db->db_id, substate->num_fails+1);
+ return;
+ }
+
+failed:
+ state->num_failed += 1;
+
+done:
+ state->num_replies += 1;
+
+ if (state->num_replies == state->dblist->num_dbs) {
+ tevent_req_done(req);
+ }
+}
+
+static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
+{
+ struct db_recovery_state *state = tevent_req_data(
+ req, struct db_recovery_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ *count = 0;
+ return false;
+ }
+
+ *count = state->num_replies - state->num_failed;
+
+ if (state->num_failed > 0) {
+ return false;
+ }
+
+ return true;
+}
+
+struct ban_node_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_tunable_list *tun_list;
+ struct node_list *nlist;
+ uint32_t destnode;
+
+ uint32_t max_pnn;
+};
+
+static bool ban_node_check(struct tevent_req *req);
+static void ban_node_check_done(struct tevent_req *subreq);
+static void ban_node_done(struct tevent_req *subreq);
+
+static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_tunable_list *tun_list,
+ struct node_list *nlist)
+{
+ struct tevent_req *req;
+ struct ban_node_state *state;
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->tun_list = tun_list;
+ state->nlist = nlist;
+ state->destnode = ctdb_client_pnn(client);
+
+ /* Bans are not enabled */
+ if (state->tun_list->enable_bans == 0) {
+ D_ERR("Bans are not enabled\n");
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+
+ ok = ban_node_check(req);
+ if (!ok) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static bool ban_node_check(struct tevent_req *req)
+{
+ struct tevent_req *subreq;
+ struct ban_node_state *state = tevent_req_data(
+ req, struct ban_node_state);
+ struct ctdb_req_control request;
+ unsigned max_credits = 0, i;
+
+ for (i=0; i<state->nlist->count; i++) {
+ if (state->nlist->ban_credits[i] > max_credits) {
+ state->max_pnn = state->nlist->pnn_list[i];
+ max_credits = state->nlist->ban_credits[i];
+ }
+ }
+
+ if (max_credits < NUM_RETRIES) {
+ tevent_req_done(req);
+ return false;
+ }
+
+ ctdb_req_control_get_nodemap(&request);
+ subreq = ctdb_client_control_send(state,
+ state->ev,
+ state->client,
+ state->max_pnn,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return false;
+ }
+ tevent_req_set_callback(subreq, ban_node_check_done, req);
+
+ return true;
+}
+
+static void ban_node_check_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ban_node_state *state = tevent_req_data(
+ req, struct ban_node_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_node_map *nodemap;
+ struct ctdb_req_control request;
+ struct ctdb_ban_state ban;
+ unsigned int i;
+ int ret;
+ bool ok;
+
+ ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (!ok) {
+ D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
+ state->max_pnn, ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
+ if (ret != 0) {
+ D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ for (i=0; i<nodemap->num; i++) {
+ if (nodemap->node[i].pnn != state->max_pnn) {
+ continue;
+ }
+
+ /* If the node became inactive, reset ban_credits */
+ if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
+ unsigned int j;
+
+ for (j=0; j<state->nlist->count; j++) {
+ if (state->nlist->pnn_list[j] ==
+ state->max_pnn) {
+ state->nlist->ban_credits[j] = 0;
+ break;
+ }
+ }
+ state->max_pnn = CTDB_UNKNOWN_PNN;
+ }
+ }
+
+ talloc_free(nodemap);
+ talloc_free(reply);
+
+ /* If node becames inactive during recovery, pick next */
+ if (state->max_pnn == CTDB_UNKNOWN_PNN) {
+ (void) ban_node_check(req);
+ return;
+ }
+
+ ban = (struct ctdb_ban_state) {
+ .pnn = state->max_pnn,
+ .time = state->tun_list->recovery_ban_period,
+ };
+
+ D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
+
+ ctdb_req_control_set_ban_state(&request, &ban);
+ subreq = ctdb_client_control_send(state,
+ state->ev,
+ state->client,
+ ban.pnn,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ban_node_done, req);
+}
+
+static void ban_node_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct node_ban_state *state = tevent_req_data(
+ req, struct node_ban_state);
+ struct ctdb_reply_control *reply;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_set_ban_state(reply);
+ if (ret != 0) {
+ D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ talloc_free(reply);
+ tevent_req_done(req);
+}
+
+static bool ban_node_recv(struct tevent_req *req, int *perr)
+{
+ if (tevent_req_is_unix_error(req, perr)) {
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Run the parallel database recovery
+ *
+ * - Get tunables
+ * - Get nodemap from all nodes
+ * - Get capabilities from all nodes
+ * - Get dbmap
+ * - Set RECOVERY_ACTIVE
+ * - Send START_RECOVERY
+ * - Update vnnmap on all nodes
+ * - Run database recovery
+ * - Set RECOVERY_NORMAL
+ * - Send END_RECOVERY
+ */
+
+struct recovery_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ uint32_t generation;
+ uint32_t destnode;
+ struct node_list *nlist;
+ struct ctdb_tunable_list *tun_list;
+ struct ctdb_vnn_map *vnnmap;
+ struct db_list *dblist;
+};
+
+static void recovery_tunables_done(struct tevent_req *subreq);
+static void recovery_nodemap_done(struct tevent_req *subreq);
+static void recovery_nodemap_verify(struct tevent_req *subreq);
+static void recovery_capabilities_done(struct tevent_req *subreq);
+static void recovery_dbmap_done(struct tevent_req *subreq);
+static void recovery_active_done(struct tevent_req *subreq);
+static void recovery_start_recovery_done(struct tevent_req *subreq);
+static void recovery_vnnmap_update_done(struct tevent_req *subreq);
+static void recovery_db_recovery_done(struct tevent_req *subreq);
+static void recovery_failed_done(struct tevent_req *subreq);
+static void recovery_normal_done(struct tevent_req *subreq);
+static void recovery_end_recovery_done(struct tevent_req *subreq);
+
+static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t generation)
+{
+ struct tevent_req *req, *subreq;
+ struct recovery_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state, struct recovery_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->generation = generation;
+ state->destnode = ctdb_client_pnn(client);
+
+ ctdb_req_control_get_all_tunables(&request);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, recovery_tunables_done, req);
+
+ return req;
+}
+
+static void recovery_tunables_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_req_control request;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_get_all_tunables(reply, state,
+ &state->tun_list);
+ if (ret != 0) {
+ D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ talloc_free(reply);
+
+ recover_timeout = state->tun_list->recover_timeout;
+
+ ctdb_req_control_get_nodemap(&request);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_nodemap_done, req);
+}
+
+static void recovery_nodemap_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_req_control request;
+ struct ctdb_node_map *nodemap;
+ unsigned int i;
+ bool status;
+ int ret;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
+ state->destnode, ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
+ if (ret != 0) {
+ D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->nlist = node_list_init(state, nodemap->num);
+ if (tevent_req_nomem(state->nlist, req)) {
+ return;
+ }
+
+ for (i=0; i<nodemap->num; i++) {
+ bool ok;
+
+ if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
+ continue;
+ }
+
+ ok = node_list_add(state->nlist, nodemap->node[i].pnn);
+ if (!ok) {
+ tevent_req_error(req, EINVAL);
+ return;
+ }
+ }
+
+ talloc_free(nodemap);
+ talloc_free(reply);
+
+ /* Verify flags by getting local node information from each node */
+ ctdb_req_control_get_nodemap(&request);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
+}
+
+static void recovery_nodemap_verify(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_req_control request;
+ struct ctdb_reply_control **reply;
+ struct node_list *nlist;
+ unsigned int i;
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq,
+ &ret,
+ state,
+ &err_list,
+ &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control GET_NODEMAP failed on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ nlist = node_list_init(state, state->nlist->size);
+ if (tevent_req_nomem(nlist, req)) {
+ return;
+ }
+
+ for (i=0; i<state->nlist->count; i++) {
+ struct ctdb_node_map *nodemap = NULL;
+ uint32_t pnn, flags;
+ unsigned int j;
+ bool ok;
+
+ pnn = state->nlist->pnn_list[i];
+ ret = ctdb_reply_control_get_nodemap(reply[i],
+ state,
+ &nodemap);
+ if (ret != 0) {
+ D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ flags = NODE_FLAGS_DISCONNECTED;
+ for (j=0; j<nodemap->num; j++) {
+ if (nodemap->node[j].pnn == pnn) {
+ flags = nodemap->node[j].flags;
+ break;
+ }
+ }
+
+ TALLOC_FREE(nodemap);
+
+ if (flags & NODE_FLAGS_INACTIVE) {
+ continue;
+ }
+
+ ok = node_list_add(nlist, pnn);
+ if (!ok) {
+ tevent_req_error(req, EINVAL);
+ return;
+ }
+ }
+
+ talloc_free(reply);
+
+ talloc_free(state->nlist);
+ state->nlist = nlist;
+
+ ctdb_req_control_get_capabilities(&request);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_capabilities_done, req);
+}
+
+static void recovery_capabilities_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_reply_control **reply;
+ struct ctdb_req_control request;
+ int *err_list;
+ unsigned int i;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
+ &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control GET_CAPABILITIES failed on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ for (i=0; i<state->nlist->count; i++) {
+ uint32_t caps;
+
+ ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
+ if (ret != 0) {
+ D_ERR("control GET_CAPABILITIES failed on node %u\n",
+ state->nlist->pnn_list[i]);
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ state->nlist->caps[i] = caps;
+ }
+
+ talloc_free(reply);
+
+ ctdb_req_control_get_dbmap(&request);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_dbmap_done, req);
+}
+
+static void recovery_dbmap_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_reply_control **reply;
+ struct ctdb_req_control request;
+ int *err_list;
+ unsigned int i, j;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq,
+ &ret,
+ state,
+ &err_list,
+ &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("control GET_DBMAP failed on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("control GET_DBMAP failed, ret=%d\n",
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->dblist = db_list_init(state, state->nlist->count);
+ if (tevent_req_nomem(state->dblist, req)) {
+ D_ERR("memory allocation error\n");
+ return;
+ }
+
+ for (i = 0; i < state->nlist->count; i++) {
+ struct ctdb_dbid_map *dbmap = NULL;
+ uint32_t pnn;
+
+ pnn = state->nlist->pnn_list[i];
+
+ ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
+ if (ret != 0) {
+ D_ERR("control GET_DBMAP failed on node %u\n",
+ pnn);
+ tevent_req_error(req, EPROTO);
+ return;
+ }
+
+ for (j = 0; j < dbmap->num; j++) {
+ ret = db_list_check_and_add(state->dblist,
+ dbmap->dbs[j].db_id,
+ dbmap->dbs[j].flags,
+ pnn);
+ if (ret != 0) {
+ D_ERR("failed to add database list entry, "
+ "ret=%d\n",
+ ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+ }
+
+ TALLOC_FREE(dbmap);
+ }
+
+ ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_active_done, req);
+}
+
+static void recovery_active_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_req_control request;
+ struct ctdb_vnn_map *vnnmap;
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("failed to set recovery mode ACTIVE on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ D_ERR("Set recovery mode to ACTIVE\n");
+
+ /* Calculate new VNNMAP */
+ vnnmap = talloc_zero(state, struct ctdb_vnn_map);
+ if (tevent_req_nomem(vnnmap, req)) {
+ return;
+ }
+
+ vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
+ if (tevent_req_nomem(vnnmap->map, req)) {
+ return;
+ }
+
+ if (vnnmap->size == 0) {
+ D_WARNING("No active lmasters found. Adding recmaster anyway\n");
+ vnnmap->map[0] = state->destnode;
+ vnnmap->size = 1;
+ }
+
+ vnnmap->generation = state->generation;
+
+ state->vnnmap = vnnmap;
+
+ ctdb_req_control_start_recovery(&request);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
+}
+
+static void recovery_start_recovery_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_req_control request;
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("failed to run start_recovery event on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("failed to run start_recovery event, ret=%d\n",
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ D_ERR("start_recovery event finished\n");
+
+ ctdb_req_control_setvnnmap(&request, state->vnnmap);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
+}
+
+static void recovery_vnnmap_update_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
+ pnn, ret2);
+ } else {
+ D_ERR("failed to update VNNMAP, ret=%d\n", ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ D_NOTICE("updated VNNMAP\n");
+
+ subreq = db_recovery_send(state,
+ state->ev,
+ state->client,
+ state->dblist,
+ state->tun_list,
+ state->nlist,
+ state->vnnmap->generation);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
+}
+
+static void recovery_db_recovery_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_req_control request;
+ bool status;
+ unsigned int count;
+
+ status = db_recovery_recv(subreq, &count);
+ TALLOC_FREE(subreq);
+
+ D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
+
+ if (! status) {
+ subreq = ban_node_send(state,
+ state->ev,
+ state->client,
+ state->tun_list,
+ state->nlist);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_failed_done, req);
+ return;
+ }
+
+ ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_normal_done, req);
+}
+
+static void recovery_failed_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ int ret;
+ bool status;
+
+ status = ban_node_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("failed to ban node, ret=%d\n", ret);
+ }
+
+ tevent_req_error(req, EIO);
+}
+
+static void recovery_normal_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ struct ctdb_req_control request;
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("failed to set recovery mode NORMAL on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
+ ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ D_ERR("Set recovery mode to NORMAL\n");
+
+ ctdb_req_control_end_recovery(&request);
+ subreq = ctdb_client_control_multi_send(state,
+ state->ev,
+ state->client,
+ state->nlist->pnn_list,
+ state->nlist->count,
+ TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
+}
+
+static void recovery_end_recovery_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct recovery_state *state = tevent_req_data(
+ req, struct recovery_state);
+ int *err_list;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ int ret2;
+ uint32_t pnn;
+
+ ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
+ state->nlist->count,
+ err_list,
+ &pnn);
+ if (ret2 != 0) {
+ D_ERR("failed to run recovered event on node %u,"
+ " ret=%d\n", pnn, ret2);
+ } else {
+ D_ERR("failed to run recovered event, ret=%d\n", ret);
+ }
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ D_ERR("recovered event finished\n");
+
+ tevent_req_done(req);
+}
+
+static void recovery_recv(struct tevent_req *req, int *perr)
+{
+ generic_recv(req, perr);
+}
+
+static void usage(const char *progname)
+{
+ fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
+ progname);
+}
+
+
+/*
+ * Arguments - log fd, write fd, socket path, generation
+ */
+int main(int argc, char *argv[])
+{
+ int write_fd;
+ const char *sockpath;
+ TALLOC_CTX *mem_ctx = NULL;
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ bool status;
+ int ret = 0;
+ struct tevent_req *req;
+ uint32_t generation;
+
+ if (argc != 4) {
+ usage(argv[0]);
+ exit(1);
+ }
+
+ write_fd = atoi(argv[1]);
+ sockpath = argv[2];
+ generation = (uint32_t)smb_strtoul(argv[3],
+ NULL,
+ 0,
+ &ret,
+ SMB_STR_STANDARD);
+ if (ret != 0) {
+ fprintf(stderr, "recovery: unable to initialize generation\n");
+ goto failed;
+ }
+
+ mem_ctx = talloc_new(NULL);
+ if (mem_ctx == NULL) {
+ fprintf(stderr, "recovery: talloc_new() failed\n");
+ goto failed;
+ }
+
+ ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
+ if (ret != 0) {
+ fprintf(stderr, "recovery: Unable to initialize logging\n");
+ goto failed;
+ }
+
+ ev = tevent_context_init(mem_ctx);
+ if (ev == NULL) {
+ D_ERR("tevent_context_init() failed\n");
+ goto failed;
+ }
+
+ status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL);
+ if (!status) {
+ D_ERR("logging_setup_sighup_handler() failed\n");
+ goto failed;
+ }
+
+ ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
+ if (ret != 0) {
+ D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
+ goto failed;
+ }
+
+ req = recovery_send(mem_ctx, ev, client, generation);
+ if (req == NULL) {
+ D_ERR("database_recover_send() failed\n");
+ goto failed;
+ }
+
+ if (! tevent_req_poll(req, ev)) {
+ D_ERR("tevent_req_poll() failed\n");
+ goto failed;
+ }
+
+ recovery_recv(req, &ret);
+ TALLOC_FREE(req);
+ if (ret != 0) {
+ D_ERR("database recovery failed, ret=%d\n", ret);
+ goto failed;
+ }
+
+ sys_write(write_fd, &ret, sizeof(ret));
+ return 0;
+
+failed:
+ TALLOC_FREE(mem_ctx);
+ return 1;
+}