diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:47:29 +0000 |
commit | 4f5791ebd03eaec1c7da0865a383175b05102712 (patch) | |
tree | 8ce7b00f7a76baa386372422adebbe64510812d4 /ctdb/server/ctdb_recovery_helper.c | |
parent | Initial commit. (diff) | |
download | samba-upstream.tar.xz samba-upstream.zip |
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ctdb/server/ctdb_recovery_helper.c')
-rw-r--r-- | ctdb/server/ctdb_recovery_helper.c | 3200 |
1 files changed, 3200 insertions, 0 deletions
diff --git a/ctdb/server/ctdb_recovery_helper.c b/ctdb/server/ctdb_recovery_helper.c new file mode 100644 index 0000000..e0d3219 --- /dev/null +++ b/ctdb/server/ctdb_recovery_helper.c @@ -0,0 +1,3200 @@ +/* + ctdb parallel database recovery + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> +#include <libgen.h> + +#include "lib/tdb_wrap/tdb_wrap.h" +#include "lib/util/dlinklist.h" +#include "lib/util/sys_rw.h" +#include "lib/util/time.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/util.h" +#include "lib/util/smb_strtox.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" +#include "client/client.h" + +#include "common/logging.h" + +static int recover_timeout = 30; + +#define NUM_RETRIES 3 + +#define TIMEOUT() timeval_current_ofs(recover_timeout, 0) + +/* + * Utility functions + */ + +static bool generic_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +static uint64_t rec_srvid = CTDB_SRVID_RECOVERY; + +static uint64_t srvid_next(void) +{ + rec_srvid += 1; + return rec_srvid; +} + +/* + * Node related functions + */ + +struct node_list { + uint32_t *pnn_list; + uint32_t *caps; + uint32_t *ban_credits; + unsigned int size; + unsigned int count; +}; + +static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size) +{ + struct node_list *nlist; + unsigned int i; + + nlist = talloc_zero(mem_ctx, struct node_list); + if (nlist == NULL) { + return NULL; + } + + nlist->pnn_list = talloc_array(nlist, uint32_t, size); + nlist->caps = talloc_zero_array(nlist, uint32_t, size); + nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size); + + if (nlist->pnn_list == NULL || + nlist->caps == NULL || + nlist->ban_credits == NULL) { + talloc_free(nlist); + return NULL; + } + nlist->size = size; + + for (i=0; i<nlist->size; i++) { + nlist->pnn_list[i] = CTDB_UNKNOWN_PNN; + } + + return nlist; +} + +static bool node_list_add(struct node_list *nlist, uint32_t pnn) +{ + unsigned int i; + + if (nlist->count == nlist->size) { + return false; + } + + for (i=0; i<nlist->count; i++) { + if (nlist->pnn_list[i] == pnn) { + return false; + } + } + + nlist->pnn_list[nlist->count] = pnn; + nlist->count += 1; + + return true; +} + +static uint32_t *node_list_lmaster(struct node_list *nlist, + TALLOC_CTX *mem_ctx, + unsigned int *pnn_count) +{ + uint32_t *pnn_list; + unsigned int count, i; + + pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count); + if (pnn_list == NULL) { + return NULL; + } + + count = 0; + for (i=0; i<nlist->count; i++) { + if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) { + continue; + } + + pnn_list[count] = nlist->pnn_list[i]; + count += 1; + } + + *pnn_count = count; + return pnn_list; +} + +static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn) +{ + unsigned int i; + + for (i=0; i<nlist->count; i++) { + if (nlist->pnn_list[i] == pnn) { + nlist->ban_credits[i] += 1; + break; + } + } +} + +/* + * Database list functions + * + * Simple, naive implementation that could be updated to a db_hash or similar + */ + +struct db { + struct db *prev, *next; + + uint32_t db_id; + uint32_t db_flags; + uint32_t *pnn_list; + unsigned int num_nodes; +}; + +struct db_list { + unsigned int num_dbs; + struct db *db; + unsigned int num_nodes; +}; + +static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes) +{ + struct db_list *l; + + l = talloc_zero(mem_ctx, struct db_list); + l->num_nodes = num_nodes; + + return l; +} + +static struct db *db_list_find(struct db_list *dblist, uint32_t db_id) +{ + struct db *db; + + if (dblist == NULL) { + return NULL; + } + + db = dblist->db; + while (db != NULL && db->db_id != db_id) { + db = db->next; + } + + return db; +} + +static int db_list_add(struct db_list *dblist, + uint32_t db_id, + uint32_t db_flags, + uint32_t node) +{ + struct db *db = NULL; + + if (dblist == NULL) { + return EINVAL; + } + + db = talloc_zero(dblist, struct db); + if (db == NULL) { + return ENOMEM; + } + + db->db_id = db_id; + db->db_flags = db_flags; + db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes); + if (db->pnn_list == NULL) { + talloc_free(db); + return ENOMEM; + } + db->pnn_list[0] = node; + db->num_nodes = 1; + + DLIST_ADD_END(dblist->db, db); + dblist->num_dbs++; + + return 0; +} + +static int db_list_check_and_add(struct db_list *dblist, + uint32_t db_id, + uint32_t db_flags, + uint32_t node) +{ + struct db *db = NULL; + int ret; + + /* + * These flags are masked out because they are only set on a + * node when a client attaches to that node, so they might not + * be set yet. They can't be passed as part of the attch, so + * they're no use here. + */ + db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY); + + if (dblist == NULL) { + return EINVAL; + } + + db = db_list_find(dblist, db_id); + if (db == NULL) { + ret = db_list_add(dblist, db_id, db_flags, node); + return ret; + } + + if (db->db_flags != db_flags) { + D_ERR("Incompatible database flags for 0x%"PRIx32" " + "(0x%"PRIx32" != 0x%"PRIx32")\n", + db_id, + db_flags, + db->db_flags); + return EINVAL; + } + + if (db->num_nodes >= dblist->num_nodes) { + return EINVAL; + } + + db->pnn_list[db->num_nodes] = node; + db->num_nodes++; + + return 0; +} + +/* + * Create database on nodes where it is missing + */ + +struct db_create_missing_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + + struct node_list *nlist; + + const char *db_name; + uint32_t *missing_pnn_list; + int missing_num_nodes; +}; + +static void db_create_missing_done(struct tevent_req *subreq); + +static struct tevent_req *db_create_missing_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct node_list *nlist, + const char *db_name, + struct db *db) +{ + struct tevent_req *req, *subreq; + struct db_create_missing_state *state; + struct ctdb_req_control request; + unsigned int i, j; + + req = tevent_req_create(mem_ctx, + &state, + struct db_create_missing_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->nlist = nlist; + state->db_name = db_name; + + if (nlist->count == db->num_nodes) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count); + if (tevent_req_nomem(state->missing_pnn_list, req)) { + return tevent_req_post(req, ev); + } + + for (i = 0; i < nlist->count; i++) { + uint32_t pnn = nlist->pnn_list[i] ; + + for (j = 0; j < db->num_nodes; j++) { + if (pnn == db->pnn_list[j]) { + break; + } + } + + if (j < db->num_nodes) { + continue; + } + + DBG_INFO("Create database %s on node %u\n", + state->db_name, + pnn); + state->missing_pnn_list[state->missing_num_nodes] = pnn; + state->missing_num_nodes++; + } + + if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) { + ctdb_req_control_db_attach_persistent(&request, db_name); + } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) { + ctdb_req_control_db_attach_replicated(&request, db_name); + } else { + ctdb_req_control_db_attach(&request, db_name); + } + request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY; + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->missing_pnn_list, + state->missing_num_nodes, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, db_create_missing_done, req); + + return req; +} + +static void db_create_missing_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct db_create_missing_state *state = tevent_req_data( + req, struct db_create_missing_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, + &ret, + NULL, + &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error( + state->missing_pnn_list, + state->missing_num_nodes, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control DB_ATTACH failed for db %s" + " on node %u, ret=%d\n", + state->db_name, + pnn, + ret2); + node_list_ban_credits(state->nlist, pnn); + } else { + D_ERR("control DB_ATTACH failed for db %s, ret=%d\n", + state->db_name, + ret); + } + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +static bool db_create_missing_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + +/* + * Recovery database functions + */ + +struct recdb_context { + uint32_t db_id; + const char *db_name; + const char *db_path; + struct tdb_wrap *db; + bool persistent; +}; + +static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id, + const char *db_name, + const char *db_path, + uint32_t hash_size, bool persistent) +{ + static char *db_dir_state = NULL; + struct recdb_context *recdb; + unsigned int tdb_flags; + + recdb = talloc(mem_ctx, struct recdb_context); + if (recdb == NULL) { + return NULL; + } + + if (db_dir_state == NULL) { + db_dir_state = getenv("CTDB_DBDIR_STATE"); + } + + recdb->db_name = db_name; + recdb->db_id = db_id; + recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s", + db_dir_state != NULL ? + db_dir_state : + dirname(discard_const(db_path)), + db_name); + if (recdb->db_path == NULL) { + talloc_free(recdb); + return NULL; + } + unlink(recdb->db_path); + + tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING; + recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size, + tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600); + if (recdb->db == NULL) { + talloc_free(recdb); + D_ERR("failed to create recovery db %s\n", recdb->db_path); + return NULL; + } + + recdb->persistent = persistent; + + return recdb; +} + +static uint32_t recdb_id(struct recdb_context *recdb) +{ + return recdb->db_id; +} + +static const char *recdb_name(struct recdb_context *recdb) +{ + return recdb->db_name; +} + +static const char *recdb_path(struct recdb_context *recdb) +{ + return recdb->db_path; +} + +static struct tdb_context *recdb_tdb(struct recdb_context *recdb) +{ + return recdb->db->tdb; +} + +static bool recdb_persistent(struct recdb_context *recdb) +{ + return recdb->persistent; +} + +struct recdb_add_traverse_state { + struct recdb_context *recdb; + uint32_t mypnn; +}; + +static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header, + TDB_DATA key, TDB_DATA data, + void *private_data) +{ + struct recdb_add_traverse_state *state = + (struct recdb_add_traverse_state *)private_data; + struct ctdb_ltdb_header *hdr; + TDB_DATA prev_data; + int ret; + + /* header is not marshalled separately in the pulldb control */ + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + return -1; + } + + hdr = (struct ctdb_ltdb_header *)data.dptr; + + /* fetch the existing record, if any */ + prev_data = tdb_fetch(recdb_tdb(state->recdb), key); + + if (prev_data.dptr != NULL) { + struct ctdb_ltdb_header prev_hdr; + + prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr; + free(prev_data.dptr); + if (hdr->rsn < prev_hdr.rsn || + (hdr->rsn == prev_hdr.rsn && + prev_hdr.dmaster != state->mypnn)) { + return 0; + } + } + + ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE); + if (ret != 0) { + return -1; + } + return 0; +} + +static bool recdb_add(struct recdb_context *recdb, int mypnn, + struct ctdb_rec_buffer *recbuf) +{ + struct recdb_add_traverse_state state; + int ret; + + state.recdb = recdb; + state.mypnn = mypnn; + + ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state); + if (ret != 0) { + return false; + } + + return true; +} + +/* This function decides which records from recdb are retained */ +static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent, + uint32_t reqid, uint32_t dmaster, + TDB_DATA key, TDB_DATA data) +{ + struct ctdb_ltdb_header *header; + int ret; + + /* Skip empty records */ + if (data.dsize <= sizeof(struct ctdb_ltdb_header)) { + return 0; + } + + /* update the dmaster field to point to us */ + header = (struct ctdb_ltdb_header *)data.dptr; + if (!persistent) { + header->dmaster = dmaster; + header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA; + } + + ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data); + if (ret != 0) { + return ret; + } + + return 0; +} + +struct recdb_file_traverse_state { + struct ctdb_rec_buffer *recbuf; + struct recdb_context *recdb; + TALLOC_CTX *mem_ctx; + uint32_t dmaster; + uint32_t reqid; + bool persistent; + bool failed; + int fd; + size_t max_size; + unsigned int num_buffers; +}; + +static int recdb_file_traverse(struct tdb_context *tdb, + TDB_DATA key, TDB_DATA data, + void *private_data) +{ + struct recdb_file_traverse_state *state = + (struct recdb_file_traverse_state *)private_data; + int ret; + + ret = recbuf_filter_add(state->recbuf, state->persistent, + state->reqid, state->dmaster, key, data); + if (ret != 0) { + state->failed = true; + return ret; + } + + if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) { + ret = ctdb_rec_buffer_write(state->recbuf, state->fd); + if (ret != 0) { + D_ERR("Failed to collect recovery records for %s\n", + recdb_name(state->recdb)); + state->failed = true; + return ret; + } + + state->num_buffers += 1; + + TALLOC_FREE(state->recbuf); + state->recbuf = ctdb_rec_buffer_init(state->mem_ctx, + recdb_id(state->recdb)); + if (state->recbuf == NULL) { + state->failed = true; + return ENOMEM; + } + } + + return 0; +} + +static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx, + uint32_t dmaster, int fd, int max_size) +{ + struct recdb_file_traverse_state state; + int ret; + + state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb)); + if (state.recbuf == NULL) { + return -1; + } + state.recdb = recdb; + state.mem_ctx = mem_ctx; + state.dmaster = dmaster; + state.reqid = 0; + state.persistent = recdb_persistent(recdb); + state.failed = false; + state.fd = fd; + state.max_size = max_size; + state.num_buffers = 0; + + ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state); + if (ret == -1 || state.failed) { + TALLOC_FREE(state.recbuf); + return -1; + } + + ret = ctdb_rec_buffer_write(state.recbuf, fd); + if (ret != 0) { + D_ERR("Failed to collect recovery records for %s\n", + recdb_name(recdb)); + TALLOC_FREE(state.recbuf); + return -1; + } + state.num_buffers += 1; + + D_DEBUG("Wrote %d buffers of recovery records for %s\n", + state.num_buffers, recdb_name(recdb)); + + return state.num_buffers; +} + +/* + * Pull database from a single node + */ + +struct pull_database_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct recdb_context *recdb; + uint32_t pnn; + uint64_t srvid; + unsigned int num_records; + int result; +}; + +static void pull_database_handler(uint64_t srvid, TDB_DATA data, + void *private_data); +static void pull_database_register_done(struct tevent_req *subreq); +static void pull_database_unregister_done(struct tevent_req *subreq); +static void pull_database_done(struct tevent_req *subreq); + +static struct tevent_req *pull_database_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t pnn, + struct recdb_context *recdb) +{ + struct tevent_req *req, *subreq; + struct pull_database_state *state; + + req = tevent_req_create(mem_ctx, &state, struct pull_database_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->recdb = recdb; + state->pnn = pnn; + state->srvid = srvid_next(); + + subreq = ctdb_client_set_message_handler_send( + state, state->ev, state->client, + state->srvid, pull_database_handler, + req); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + tevent_req_set_callback(subreq, pull_database_register_done, req); + + return req; +} + +static void pull_database_handler(uint64_t srvid, TDB_DATA data, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_rec_buffer *recbuf; + size_t np; + int ret; + bool status; + + if (srvid != state->srvid) { + return; + } + + ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np); + if (ret != 0) { + D_ERR("Invalid data received for DB_PULL messages\n"); + return; + } + + if (recbuf->db_id != recdb_id(state->recdb)) { + talloc_free(recbuf); + D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n", + recbuf->db_id, recdb_name(state->recdb)); + return; + } + + status = recdb_add(state->recdb, ctdb_client_pnn(state->client), + recbuf); + if (! status) { + talloc_free(recbuf); + D_ERR("Failed to add records to recdb for %s\n", + recdb_name(state->recdb)); + return; + } + + state->num_records += recbuf->count; + talloc_free(recbuf); +} + +static void pull_database_register_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_req_control request; + struct ctdb_pulldb_ext pulldb_ext; + int ret; + bool status; + + status = ctdb_client_set_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("Failed to set message handler for DB_PULL for %s\n", + recdb_name(state->recdb)); + tevent_req_error(req, ret); + return; + } + + pulldb_ext.db_id = recdb_id(state->recdb); + pulldb_ext.lmaster = CTDB_LMASTER_ANY; + pulldb_ext.srvid = state->srvid; + + ctdb_req_control_db_pull(&request, &pulldb_ext); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->pnn, TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, pull_database_done, req); +} + +static void pull_database_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_reply_control *reply; + uint32_t num_records; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n", + recdb_name(state->recdb), state->pnn, ret); + state->result = ret; + goto unregister; + } + + ret = ctdb_reply_control_db_pull(reply, &num_records); + talloc_free(reply); + if (num_records != state->num_records) { + D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n", + num_records, state->num_records, + recdb_name(state->recdb)); + state->result = EIO; + goto unregister; + } + + D_INFO("Pulled %d records for db %s from node %d\n", + state->num_records, recdb_name(state->recdb), state->pnn); + +unregister: + + subreq = ctdb_client_remove_message_handler_send( + state, state->ev, state->client, + state->srvid, req); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, pull_database_unregister_done, req); +} + +static void pull_database_unregister_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + int ret; + bool status; + + status = ctdb_client_remove_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("failed to remove message handler for DB_PULL for db %s\n", + recdb_name(state->recdb)); + tevent_req_error(req, ret); + return; + } + + if (state->result != 0) { + tevent_req_error(req, state->result); + return; + } + + tevent_req_done(req); +} + +static bool pull_database_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + +/* + * Push database to specified nodes (new style) + */ + +struct push_database_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct recdb_context *recdb; + uint32_t *pnn_list; + unsigned int count; + uint64_t srvid; + uint32_t dmaster; + int fd; + int num_buffers; + int num_buffers_sent; + unsigned int num_records; +}; + +static void push_database_started(struct tevent_req *subreq); +static void push_database_send_msg(struct tevent_req *req); +static void push_database_send_done(struct tevent_req *subreq); +static void push_database_confirmed(struct tevent_req *subreq); + +static struct tevent_req *push_database_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, + unsigned int count, + struct recdb_context *recdb, + int max_size) +{ + struct tevent_req *req, *subreq; + struct push_database_state *state; + struct ctdb_req_control request; + struct ctdb_pulldb_ext pulldb_ext; + char *filename; + off_t offset; + + req = tevent_req_create(mem_ctx, &state, + struct push_database_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->recdb = recdb; + state->pnn_list = pnn_list; + state->count = count; + + state->srvid = srvid_next(); + state->dmaster = ctdb_client_pnn(client); + state->num_buffers_sent = 0; + state->num_records = 0; + + filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb)); + if (tevent_req_nomem(filename, req)) { + return tevent_req_post(req, ev); + } + + state->fd = open(filename, O_RDWR|O_CREAT, 0644); + if (state->fd == -1) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + unlink(filename); + talloc_free(filename); + + state->num_buffers = recdb_file(recdb, state, state->dmaster, + state->fd, max_size); + if (state->num_buffers == -1) { + tevent_req_error(req, ENOMEM); + return tevent_req_post(req, ev); + } + + offset = lseek(state->fd, 0, SEEK_SET); + if (offset != 0) { + tevent_req_error(req, EIO); + return tevent_req_post(req, ev); + } + + pulldb_ext.db_id = recdb_id(recdb); + pulldb_ext.srvid = state->srvid; + + ctdb_req_control_db_push_start(&request, &pulldb_ext); + subreq = ctdb_client_control_multi_send(state, ev, client, + pnn_list, count, + TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, push_database_started, req); + + return req; +} + +static void push_database_started(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_state *state = tevent_req_data( + req, struct push_database_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, + &err_list, NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->pnn_list, + state->count, + err_list, &pnn); + if (ret2 != 0) { + D_ERR("control DB_PUSH_START failed for db %s" + " on node %u, ret=%d\n", + recdb_name(state->recdb), pnn, ret2); + } else { + D_ERR("control DB_PUSH_START failed for db %s," + " ret=%d\n", + recdb_name(state->recdb), ret); + } + talloc_free(err_list); + + tevent_req_error(req, ret); + return; + } + + push_database_send_msg(req); +} + +static void push_database_send_msg(struct tevent_req *req) +{ + struct push_database_state *state = tevent_req_data( + req, struct push_database_state); + struct tevent_req *subreq; + struct ctdb_rec_buffer *recbuf; + struct ctdb_req_message message; + TDB_DATA data; + size_t np; + int ret; + + if (state->num_buffers_sent == state->num_buffers) { + struct ctdb_req_control request; + + ctdb_req_control_db_push_confirm(&request, + recdb_id(state->recdb)); + subreq = ctdb_client_control_multi_send(state, state->ev, + state->client, + state->pnn_list, + state->count, + TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, push_database_confirmed, req); + return; + } + + ret = ctdb_rec_buffer_read(state->fd, state, &recbuf); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + data.dsize = ctdb_rec_buffer_len(recbuf); + data.dptr = talloc_size(state, data.dsize); + if (tevent_req_nomem(data.dptr, req)) { + return; + } + + ctdb_rec_buffer_push(recbuf, data.dptr, &np); + + message.srvid = state->srvid; + message.data.data = data; + + D_DEBUG("Pushing buffer %d with %d records for db %s\n", + state->num_buffers_sent, recbuf->count, + recdb_name(state->recdb)); + + subreq = ctdb_client_message_multi_send(state, state->ev, + state->client, + state->pnn_list, state->count, + &message); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, push_database_send_done, req); + + state->num_records += recbuf->count; + + talloc_free(data.dptr); + talloc_free(recbuf); +} + +static void push_database_send_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_state *state = tevent_req_data( + req, struct push_database_state); + bool status; + int ret; + + status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("Sending recovery records failed for %s\n", + recdb_name(state->recdb)); + tevent_req_error(req, ret); + return; + } + + state->num_buffers_sent += 1; + + push_database_send_msg(req); +} + +static void push_database_confirmed(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_state *state = tevent_req_data( + req, struct push_database_state); + struct ctdb_reply_control **reply; + int *err_list; + bool status; + unsigned int i; + int ret; + uint32_t num_records; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, + &err_list, &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->pnn_list, + state->count, err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control DB_PUSH_CONFIRM failed for db %s" + " on node %u, ret=%d\n", + recdb_name(state->recdb), pnn, ret2); + } else { + D_ERR("control DB_PUSH_CONFIRM failed for db %s," + " ret=%d\n", + recdb_name(state->recdb), ret); + } + tevent_req_error(req, ret); + return; + } + + for (i=0; i<state->count; i++) { + ret = ctdb_reply_control_db_push_confirm(reply[i], + &num_records); + if (ret != 0) { + tevent_req_error(req, EPROTO); + return; + } + + if (num_records != state->num_records) { + D_ERR("Node %u received %d of %d records for %s\n", + state->pnn_list[i], num_records, + state->num_records, recdb_name(state->recdb)); + tevent_req_error(req, EPROTO); + return; + } + } + + talloc_free(reply); + + D_INFO("Pushed %d records for db %s\n", + state->num_records, recdb_name(state->recdb)); + + tevent_req_done(req); +} + +static bool push_database_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + +/* + * Collect databases using highest sequence number + */ + +struct collect_highseqnum_db_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct node_list *nlist; + uint32_t db_id; + struct recdb_context *recdb; + + uint32_t max_pnn; +}; + +static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq); +static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq); + +static struct tevent_req *collect_highseqnum_db_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct node_list *nlist, + uint32_t db_id, + struct recdb_context *recdb) +{ + struct tevent_req *req, *subreq; + struct collect_highseqnum_db_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct collect_highseqnum_db_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->nlist = nlist; + state->db_id = db_id; + state->recdb = recdb; + + ctdb_req_control_get_db_seqnum(&request, db_id); + subreq = ctdb_client_control_multi_send(mem_ctx, + ev, + client, + nlist->pnn_list, + nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done, + req); + + return req; +} + +static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct collect_highseqnum_db_state *state = tevent_req_data( + req, struct collect_highseqnum_db_state); + struct ctdb_reply_control **reply; + int *err_list; + bool status; + unsigned int i; + int ret; + uint64_t seqnum, max_seqnum; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, + &err_list, &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control GET_DB_SEQNUM failed for db %s" + " on node %u, ret=%d\n", + recdb_name(state->recdb), pnn, ret2); + } else { + D_ERR("control GET_DB_SEQNUM failed for db %s," + " ret=%d\n", + recdb_name(state->recdb), ret); + } + tevent_req_error(req, ret); + return; + } + + max_seqnum = 0; + state->max_pnn = state->nlist->pnn_list[0]; + for (i=0; i<state->nlist->count; i++) { + ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum); + if (ret != 0) { + tevent_req_error(req, EPROTO); + return; + } + + if (max_seqnum < seqnum) { + max_seqnum = seqnum; + state->max_pnn = state->nlist->pnn_list[i]; + } + } + + talloc_free(reply); + + D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n", + recdb_name(state->recdb), state->max_pnn, max_seqnum); + + subreq = pull_database_send(state, + state->ev, + state->client, + state->max_pnn, + state->recdb); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done, + req); +} + +static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct collect_highseqnum_db_state *state = tevent_req_data( + req, struct collect_highseqnum_db_state); + int ret; + bool status; + + status = pull_database_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + node_list_ban_credits(state->nlist, state->max_pnn); + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + +/* + * Collect all databases + */ + +struct collect_all_db_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct node_list *nlist; + uint32_t db_id; + struct recdb_context *recdb; + + struct ctdb_pulldb pulldb; + unsigned int index; +}; + +static void collect_all_db_pulldb_done(struct tevent_req *subreq); + +static struct tevent_req *collect_all_db_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct node_list *nlist, + uint32_t db_id, + struct recdb_context *recdb) +{ + struct tevent_req *req, *subreq; + struct collect_all_db_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct collect_all_db_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->nlist = nlist; + state->db_id = db_id; + state->recdb = recdb; + state->index = 0; + + subreq = pull_database_send(state, + ev, + client, + nlist->pnn_list[state->index], + recdb); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req); + + return req; +} + +static void collect_all_db_pulldb_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct collect_all_db_state *state = tevent_req_data( + req, struct collect_all_db_state); + int ret; + bool status; + + status = pull_database_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + node_list_ban_credits(state->nlist, + state->nlist->pnn_list[state->index]); + tevent_req_error(req, ret); + return; + } + + state->index += 1; + if (state->index == state->nlist->count) { + tevent_req_done(req); + return; + } + + subreq = pull_database_send(state, + state->ev, + state->client, + state->nlist->pnn_list[state->index], + state->recdb); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req); +} + +static bool collect_all_db_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + + +/** + * For each database do the following: + * - Get DB name from all nodes + * - Attach database on missing nodes + * - Get DB path + * - Freeze database on all nodes + * - Start transaction on all nodes + * - Collect database from all nodes + * - Wipe database on all nodes + * - Push database to all nodes + * - Commit transaction on all nodes + * - Thaw database on all nodes + */ + +struct recover_db_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_tunable_list *tun_list; + struct node_list *nlist; + struct db *db; + + uint32_t destnode; + struct ctdb_transdb transdb; + + const char *db_name, *db_path; + struct recdb_context *recdb; +}; + +static void recover_db_name_done(struct tevent_req *subreq); +static void recover_db_create_missing_done(struct tevent_req *subreq); +static void recover_db_path_done(struct tevent_req *subreq); +static void recover_db_freeze_done(struct tevent_req *subreq); +static void recover_db_transaction_started(struct tevent_req *subreq); +static void recover_db_collect_done(struct tevent_req *subreq); +static void recover_db_wipedb_done(struct tevent_req *subreq); +static void recover_db_pushdb_done(struct tevent_req *subreq); +static void recover_db_transaction_committed(struct tevent_req *subreq); +static void recover_db_thaw_done(struct tevent_req *subreq); + +static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_tunable_list *tun_list, + struct node_list *nlist, + uint32_t generation, + struct db *db) +{ + struct tevent_req *req, *subreq; + struct recover_db_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct recover_db_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->tun_list = tun_list; + state->nlist = nlist; + state->db = db; + + state->destnode = ctdb_client_pnn(client); + state->transdb.db_id = db->db_id; + state->transdb.tid = generation; + + ctdb_req_control_get_dbname(&request, db->db_id); + subreq = ctdb_client_control_multi_send(state, + ev, + client, + state->db->pnn_list, + state->db->num_nodes, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, recover_db_name_done, req); + + return req; +} + +static void recover_db_name_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_reply_control **reply; + int *err_list; + unsigned int i; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, + &ret, + state, + &err_list, + &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->db->pnn_list, + state->db->num_nodes, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control GET_DBNAME failed on node %u," + " ret=%d\n", + pnn, + ret2); + } else { + D_ERR("control GET_DBNAME failed, ret=%d\n", + ret); + } + tevent_req_error(req, ret); + return; + } + + for (i = 0; i < state->db->num_nodes; i++) { + const char *db_name; + uint32_t pnn; + + pnn = state->nlist->pnn_list[i]; + + ret = ctdb_reply_control_get_dbname(reply[i], + state, + &db_name); + if (ret != 0) { + D_ERR("control GET_DBNAME failed on node %u " + "for db=0x%x, ret=%d\n", + pnn, + state->db->db_id, + ret); + tevent_req_error(req, EPROTO); + return; + } + + if (state->db_name == NULL) { + state->db_name = db_name; + continue; + } + + if (strcmp(state->db_name, db_name) != 0) { + D_ERR("Incompatible database name for 0x%"PRIx32" " + "(%s != %s) on node %"PRIu32"\n", + state->db->db_id, + db_name, + state->db_name, + pnn); + node_list_ban_credits(state->nlist, pnn); + tevent_req_error(req, ret); + return; + } + } + + talloc_free(reply); + + subreq = db_create_missing_send(state, + state->ev, + state->client, + state->nlist, + state->db_name, + state->db); + + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_create_missing_done, req); +} + +static void recover_db_create_missing_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_req_control request; + int ret; + bool status; + + /* Could sanity check the db_id here */ + status = db_create_missing_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_getdbpath(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_path_done, req); +} + +static void recover_db_path_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("control GETDBPATH failed for db %s, ret=%d\n", + state->db_name, ret); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path); + if (ret != 0) { + D_ERR("control GETDBPATH failed for db %s, ret=%d\n", + state->db_name, ret); + tevent_req_error(req, EPROTO); + return; + } + + talloc_free(reply); + + ctdb_req_control_db_freeze(&request, state->db->db_id); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_freeze_done, req); +} + +static void recover_db_freeze_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_req_control request; + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control FREEZE_DB failed for db %s" + " on node %u, ret=%d\n", + state->db_name, pnn, ret2); + + node_list_ban_credits(state->nlist, pnn); + } else { + D_ERR("control FREEZE_DB failed for db %s, ret=%d\n", + state->db_name, ret); + } + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_transaction_start(&request, &state->transdb); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_transaction_started, req); +} + +static void recover_db_transaction_started(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + int *err_list; + uint32_t flags; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control TRANSACTION_DB failed for db=%s" + " on node %u, ret=%d\n", + state->db_name, pnn, ret2); + } else { + D_ERR("control TRANSACTION_DB failed for db=%s," + " ret=%d\n", state->db_name, ret); + } + tevent_req_error(req, ret); + return; + } + + flags = state->db->db_flags; + state->recdb = recdb_create(state, + state->db->db_id, + state->db_name, + state->db_path, + state->tun_list->database_hash_size, + flags & CTDB_DB_FLAGS_PERSISTENT); + if (tevent_req_nomem(state->recdb, req)) { + return; + } + + if ((flags & CTDB_DB_FLAGS_PERSISTENT) || + (flags & CTDB_DB_FLAGS_REPLICATED)) { + subreq = collect_highseqnum_db_send(state, + state->ev, + state->client, + state->nlist, + state->db->db_id, + state->recdb); + } else { + subreq = collect_all_db_send(state, + state->ev, + state->client, + state->nlist, + state->db->db_id, + state->recdb); + } + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_collect_done, req); +} + +static void recover_db_collect_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_req_control request; + int ret; + bool status; + + if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) || + (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) { + status = collect_highseqnum_db_recv(subreq, &ret); + } else { + status = collect_all_db_recv(subreq, &ret); + } + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_wipe_database(&request, &state->transdb); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_wipedb_done, req); +} + +static void recover_db_wipedb_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control WIPEDB failed for db %s on node %u," + " ret=%d\n", state->db_name, pnn, ret2); + } else { + D_ERR("control WIPEDB failed for db %s, ret=%d\n", + state->db_name, ret); + } + tevent_req_error(req, ret); + return; + } + + subreq = push_database_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + state->recdb, + state->tun_list->rec_buffer_size_limit); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_pushdb_done, req); +} + +static void recover_db_pushdb_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_req_control request; + int ret; + bool status; + + status = push_database_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + TALLOC_FREE(state->recdb); + + ctdb_req_control_db_transaction_commit(&request, &state->transdb); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_transaction_committed, req); +} + +static void recover_db_transaction_committed(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + struct ctdb_req_control request; + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control DB_TRANSACTION_COMMIT failed for db %s" + " on node %u, ret=%d\n", + state->db_name, pnn, ret2); + } else { + D_ERR("control DB_TRANSACTION_COMMIT failed for db %s," + " ret=%d\n", state->db_name, ret); + } + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_thaw(&request, state->db->db_id); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recover_db_thaw_done, req); +} + +static void recover_db_thaw_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recover_db_state *state = tevent_req_data( + req, struct recover_db_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control DB_THAW failed for db %s on node %u," + " ret=%d\n", state->db_name, pnn, ret2); + } else { + D_ERR("control DB_THAW failed for db %s, ret=%d\n", + state->db_name, ret); + } + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +static bool recover_db_recv(struct tevent_req *req) +{ + return generic_recv(req, NULL); +} + + +/* + * Start database recovery for each database + * + * Try to recover each database 5 times before failing recovery. + */ + +struct db_recovery_state { + struct tevent_context *ev; + struct db_list *dblist; + unsigned int num_replies; + unsigned int num_failed; +}; + +struct db_recovery_one_state { + struct tevent_req *req; + struct ctdb_client_context *client; + struct db_list *dblist; + struct ctdb_tunable_list *tun_list; + struct node_list *nlist; + uint32_t generation; + struct db *db; + int num_fails; +}; + +static void db_recovery_one_done(struct tevent_req *subreq); + +static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct db_list *dblist, + struct ctdb_tunable_list *tun_list, + struct node_list *nlist, + uint32_t generation) +{ + struct tevent_req *req, *subreq; + struct db_recovery_state *state; + struct db *db; + + req = tevent_req_create(mem_ctx, &state, struct db_recovery_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->dblist = dblist; + state->num_replies = 0; + state->num_failed = 0; + + if (dblist->num_dbs == 0) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + for (db = dblist->db; db != NULL; db = db->next) { + struct db_recovery_one_state *substate; + + substate = talloc_zero(state, struct db_recovery_one_state); + if (tevent_req_nomem(substate, req)) { + return tevent_req_post(req, ev); + } + + substate->req = req; + substate->client = client; + substate->dblist = dblist; + substate->tun_list = tun_list; + substate->nlist = nlist; + substate->generation = generation; + substate->db = db; + + subreq = recover_db_send(state, + ev, + client, + tun_list, + nlist, + generation, + substate->db); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, db_recovery_one_done, + substate); + D_NOTICE("recover database 0x%08x\n", substate->db->db_id); + } + + return req; +} + +static void db_recovery_one_done(struct tevent_req *subreq) +{ + struct db_recovery_one_state *substate = tevent_req_callback_data( + subreq, struct db_recovery_one_state); + struct tevent_req *req = substate->req; + struct db_recovery_state *state = tevent_req_data( + req, struct db_recovery_state); + bool status; + + status = recover_db_recv(subreq); + TALLOC_FREE(subreq); + + if (status) { + talloc_free(substate); + goto done; + } + + substate->num_fails += 1; + if (substate->num_fails < NUM_RETRIES) { + subreq = recover_db_send(state, + state->ev, + substate->client, + substate->tun_list, + substate->nlist, + substate->generation, + substate->db); + if (tevent_req_nomem(subreq, req)) { + goto failed; + } + tevent_req_set_callback(subreq, db_recovery_one_done, substate); + D_NOTICE("recover database 0x%08x, attempt %d\n", + substate->db->db_id, substate->num_fails+1); + return; + } + +failed: + state->num_failed += 1; + +done: + state->num_replies += 1; + + if (state->num_replies == state->dblist->num_dbs) { + tevent_req_done(req); + } +} + +static bool db_recovery_recv(struct tevent_req *req, unsigned int *count) +{ + struct db_recovery_state *state = tevent_req_data( + req, struct db_recovery_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + *count = 0; + return false; + } + + *count = state->num_replies - state->num_failed; + + if (state->num_failed > 0) { + return false; + } + + return true; +} + +struct ban_node_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_tunable_list *tun_list; + struct node_list *nlist; + uint32_t destnode; + + uint32_t max_pnn; +}; + +static bool ban_node_check(struct tevent_req *req); +static void ban_node_check_done(struct tevent_req *subreq); +static void ban_node_done(struct tevent_req *subreq); + +static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_tunable_list *tun_list, + struct node_list *nlist) +{ + struct tevent_req *req; + struct ban_node_state *state; + bool ok; + + req = tevent_req_create(mem_ctx, &state, struct ban_node_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->tun_list = tun_list; + state->nlist = nlist; + state->destnode = ctdb_client_pnn(client); + + /* Bans are not enabled */ + if (state->tun_list->enable_bans == 0) { + D_ERR("Bans are not enabled\n"); + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + ok = ban_node_check(req); + if (!ok) { + return tevent_req_post(req, ev); + } + + return req; +} + +static bool ban_node_check(struct tevent_req *req) +{ + struct tevent_req *subreq; + struct ban_node_state *state = tevent_req_data( + req, struct ban_node_state); + struct ctdb_req_control request; + unsigned max_credits = 0, i; + + for (i=0; i<state->nlist->count; i++) { + if (state->nlist->ban_credits[i] > max_credits) { + state->max_pnn = state->nlist->pnn_list[i]; + max_credits = state->nlist->ban_credits[i]; + } + } + + if (max_credits < NUM_RETRIES) { + tevent_req_done(req); + return false; + } + + ctdb_req_control_get_nodemap(&request); + subreq = ctdb_client_control_send(state, + state->ev, + state->client, + state->max_pnn, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return false; + } + tevent_req_set_callback(subreq, ban_node_check_done, req); + + return true; +} + +static void ban_node_check_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ban_node_state *state = tevent_req_data( + req, struct ban_node_state); + struct ctdb_reply_control *reply; + struct ctdb_node_map *nodemap; + struct ctdb_req_control request; + struct ctdb_ban_state ban; + unsigned int i; + int ret; + bool ok; + + ok = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (!ok) { + D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n", + state->max_pnn, ret); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); + if (ret != 0) { + D_ERR("control GET_NODEMAP failed, ret=%d\n", ret); + tevent_req_error(req, ret); + return; + } + + for (i=0; i<nodemap->num; i++) { + if (nodemap->node[i].pnn != state->max_pnn) { + continue; + } + + /* If the node became inactive, reset ban_credits */ + if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) { + unsigned int j; + + for (j=0; j<state->nlist->count; j++) { + if (state->nlist->pnn_list[j] == + state->max_pnn) { + state->nlist->ban_credits[j] = 0; + break; + } + } + state->max_pnn = CTDB_UNKNOWN_PNN; + } + } + + talloc_free(nodemap); + talloc_free(reply); + + /* If node becames inactive during recovery, pick next */ + if (state->max_pnn == CTDB_UNKNOWN_PNN) { + (void) ban_node_check(req); + return; + } + + ban = (struct ctdb_ban_state) { + .pnn = state->max_pnn, + .time = state->tun_list->recovery_ban_period, + }; + + D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time); + + ctdb_req_control_set_ban_state(&request, &ban); + subreq = ctdb_client_control_send(state, + state->ev, + state->client, + ban.pnn, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ban_node_done, req); +} + +static void ban_node_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct node_ban_state *state = tevent_req_data( + req, struct node_ban_state); + struct ctdb_reply_control *reply; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_set_ban_state(reply); + if (ret != 0) { + D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret); + tevent_req_error(req, ret); + return; + } + + talloc_free(reply); + tevent_req_done(req); +} + +static bool ban_node_recv(struct tevent_req *req, int *perr) +{ + if (tevent_req_is_unix_error(req, perr)) { + return false; + } + + return true; +} + +/* + * Run the parallel database recovery + * + * - Get tunables + * - Get nodemap from all nodes + * - Get capabilities from all nodes + * - Get dbmap + * - Set RECOVERY_ACTIVE + * - Send START_RECOVERY + * - Update vnnmap on all nodes + * - Run database recovery + * - Set RECOVERY_NORMAL + * - Send END_RECOVERY + */ + +struct recovery_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + uint32_t generation; + uint32_t destnode; + struct node_list *nlist; + struct ctdb_tunable_list *tun_list; + struct ctdb_vnn_map *vnnmap; + struct db_list *dblist; +}; + +static void recovery_tunables_done(struct tevent_req *subreq); +static void recovery_nodemap_done(struct tevent_req *subreq); +static void recovery_nodemap_verify(struct tevent_req *subreq); +static void recovery_capabilities_done(struct tevent_req *subreq); +static void recovery_dbmap_done(struct tevent_req *subreq); +static void recovery_active_done(struct tevent_req *subreq); +static void recovery_start_recovery_done(struct tevent_req *subreq); +static void recovery_vnnmap_update_done(struct tevent_req *subreq); +static void recovery_db_recovery_done(struct tevent_req *subreq); +static void recovery_failed_done(struct tevent_req *subreq); +static void recovery_normal_done(struct tevent_req *subreq); +static void recovery_end_recovery_done(struct tevent_req *subreq); + +static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t generation) +{ + struct tevent_req *req, *subreq; + struct recovery_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct recovery_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->generation = generation; + state->destnode = ctdb_client_pnn(client); + + ctdb_req_control_get_all_tunables(&request); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, recovery_tunables_done, req); + + return req; +} + +static void recovery_tunables_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_all_tunables(reply, state, + &state->tun_list); + if (ret != 0) { + D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret); + tevent_req_error(req, EPROTO); + return; + } + + talloc_free(reply); + + recover_timeout = state->tun_list->recover_timeout; + + ctdb_req_control_get_nodemap(&request); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_nodemap_done, req); +} + +static void recovery_nodemap_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + struct ctdb_node_map *nodemap; + unsigned int i; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n", + state->destnode, ret); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); + if (ret != 0) { + D_ERR("control GET_NODEMAP failed, ret=%d\n", ret); + tevent_req_error(req, ret); + return; + } + + state->nlist = node_list_init(state, nodemap->num); + if (tevent_req_nomem(state->nlist, req)) { + return; + } + + for (i=0; i<nodemap->num; i++) { + bool ok; + + if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) { + continue; + } + + ok = node_list_add(state->nlist, nodemap->node[i].pnn); + if (!ok) { + tevent_req_error(req, EINVAL); + return; + } + } + + talloc_free(nodemap); + talloc_free(reply); + + /* Verify flags by getting local node information from each node */ + ctdb_req_control_get_nodemap(&request); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_nodemap_verify, req); +} + +static void recovery_nodemap_verify(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_req_control request; + struct ctdb_reply_control **reply; + struct node_list *nlist; + unsigned int i; + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, + &ret, + state, + &err_list, + &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control GET_NODEMAP failed on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("control GET_NODEMAP failed, ret=%d\n", ret); + } + tevent_req_error(req, ret); + return; + } + + nlist = node_list_init(state, state->nlist->size); + if (tevent_req_nomem(nlist, req)) { + return; + } + + for (i=0; i<state->nlist->count; i++) { + struct ctdb_node_map *nodemap = NULL; + uint32_t pnn, flags; + unsigned int j; + bool ok; + + pnn = state->nlist->pnn_list[i]; + ret = ctdb_reply_control_get_nodemap(reply[i], + state, + &nodemap); + if (ret != 0) { + D_ERR("control GET_NODEMAP failed on node %u\n", pnn); + tevent_req_error(req, EPROTO); + return; + } + + flags = NODE_FLAGS_DISCONNECTED; + for (j=0; j<nodemap->num; j++) { + if (nodemap->node[j].pnn == pnn) { + flags = nodemap->node[j].flags; + break; + } + } + + TALLOC_FREE(nodemap); + + if (flags & NODE_FLAGS_INACTIVE) { + continue; + } + + ok = node_list_add(nlist, pnn); + if (!ok) { + tevent_req_error(req, EINVAL); + return; + } + } + + talloc_free(reply); + + talloc_free(state->nlist); + state->nlist = nlist; + + ctdb_req_control_get_capabilities(&request); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_capabilities_done, req); +} + +static void recovery_capabilities_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_reply_control **reply; + struct ctdb_req_control request; + int *err_list; + unsigned int i; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, + &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control GET_CAPABILITIES failed on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("control GET_CAPABILITIES failed, ret=%d\n", + ret); + } + tevent_req_error(req, ret); + return; + } + + for (i=0; i<state->nlist->count; i++) { + uint32_t caps; + + ret = ctdb_reply_control_get_capabilities(reply[i], &caps); + if (ret != 0) { + D_ERR("control GET_CAPABILITIES failed on node %u\n", + state->nlist->pnn_list[i]); + tevent_req_error(req, EPROTO); + return; + } + + state->nlist->caps[i] = caps; + } + + talloc_free(reply); + + ctdb_req_control_get_dbmap(&request); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_dbmap_done, req); +} + +static void recovery_dbmap_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_reply_control **reply; + struct ctdb_req_control request; + int *err_list; + unsigned int i, j; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, + &ret, + state, + &err_list, + &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("control GET_DBMAP failed on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("control GET_DBMAP failed, ret=%d\n", + ret); + } + tevent_req_error(req, ret); + return; + } + + state->dblist = db_list_init(state, state->nlist->count); + if (tevent_req_nomem(state->dblist, req)) { + D_ERR("memory allocation error\n"); + return; + } + + for (i = 0; i < state->nlist->count; i++) { + struct ctdb_dbid_map *dbmap = NULL; + uint32_t pnn; + + pnn = state->nlist->pnn_list[i]; + + ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap); + if (ret != 0) { + D_ERR("control GET_DBMAP failed on node %u\n", + pnn); + tevent_req_error(req, EPROTO); + return; + } + + for (j = 0; j < dbmap->num; j++) { + ret = db_list_check_and_add(state->dblist, + dbmap->dbs[j].db_id, + dbmap->dbs[j].flags, + pnn); + if (ret != 0) { + D_ERR("failed to add database list entry, " + "ret=%d\n", + ret); + tevent_req_error(req, ret); + return; + } + } + + TALLOC_FREE(dbmap); + } + + ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_active_done, req); +} + +static void recovery_active_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_req_control request; + struct ctdb_vnn_map *vnnmap; + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("failed to set recovery mode ACTIVE on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("failed to set recovery mode ACTIVE, ret=%d\n", + ret); + } + tevent_req_error(req, ret); + return; + } + + D_ERR("Set recovery mode to ACTIVE\n"); + + /* Calculate new VNNMAP */ + vnnmap = talloc_zero(state, struct ctdb_vnn_map); + if (tevent_req_nomem(vnnmap, req)) { + return; + } + + vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size); + if (tevent_req_nomem(vnnmap->map, req)) { + return; + } + + if (vnnmap->size == 0) { + D_WARNING("No active lmasters found. Adding recmaster anyway\n"); + vnnmap->map[0] = state->destnode; + vnnmap->size = 1; + } + + vnnmap->generation = state->generation; + + state->vnnmap = vnnmap; + + ctdb_req_control_start_recovery(&request); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_start_recovery_done, req); +} + +static void recovery_start_recovery_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_req_control request; + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("failed to run start_recovery event on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("failed to run start_recovery event, ret=%d\n", + ret); + } + tevent_req_error(req, ret); + return; + } + + D_ERR("start_recovery event finished\n"); + + ctdb_req_control_setvnnmap(&request, state->vnnmap); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req); +} + +static void recovery_vnnmap_update_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("failed to update VNNMAP on node %u, ret=%d\n", + pnn, ret2); + } else { + D_ERR("failed to update VNNMAP, ret=%d\n", ret); + } + tevent_req_error(req, ret); + return; + } + + D_NOTICE("updated VNNMAP\n"); + + subreq = db_recovery_send(state, + state->ev, + state->client, + state->dblist, + state->tun_list, + state->nlist, + state->vnnmap->generation); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_db_recovery_done, req); +} + +static void recovery_db_recovery_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_req_control request; + bool status; + unsigned int count; + + status = db_recovery_recv(subreq, &count); + TALLOC_FREE(subreq); + + D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs); + + if (! status) { + subreq = ban_node_send(state, + state->ev, + state->client, + state->tun_list, + state->nlist); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_failed_done, req); + return; + } + + ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_normal_done, req); +} + +static void recovery_failed_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + bool status; + + status = ban_node_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("failed to ban node, ret=%d\n", ret); + } + + tevent_req_error(req, EIO); +} + +static void recovery_normal_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + struct ctdb_req_control request; + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("failed to set recovery mode NORMAL on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("failed to set recovery mode NORMAL, ret=%d\n", + ret); + } + tevent_req_error(req, ret); + return; + } + + D_ERR("Set recovery mode to NORMAL\n"); + + ctdb_req_control_end_recovery(&request); + subreq = ctdb_client_control_multi_send(state, + state->ev, + state->client, + state->nlist->pnn_list, + state->nlist->count, + TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, recovery_end_recovery_done, req); +} + +static void recovery_end_recovery_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct recovery_state *state = tevent_req_data( + req, struct recovery_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, + NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, + state->nlist->count, + err_list, + &pnn); + if (ret2 != 0) { + D_ERR("failed to run recovered event on node %u," + " ret=%d\n", pnn, ret2); + } else { + D_ERR("failed to run recovered event, ret=%d\n", ret); + } + tevent_req_error(req, ret); + return; + } + + D_ERR("recovered event finished\n"); + + tevent_req_done(req); +} + +static void recovery_recv(struct tevent_req *req, int *perr) +{ + generic_recv(req, perr); +} + +static void usage(const char *progname) +{ + fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n", + progname); +} + + +/* + * Arguments - log fd, write fd, socket path, generation + */ +int main(int argc, char *argv[]) +{ + int write_fd; + const char *sockpath; + TALLOC_CTX *mem_ctx = NULL; + struct tevent_context *ev; + struct ctdb_client_context *client; + bool status; + int ret = 0; + struct tevent_req *req; + uint32_t generation; + + if (argc != 4) { + usage(argv[0]); + exit(1); + } + + write_fd = atoi(argv[1]); + sockpath = argv[2]; + generation = (uint32_t)smb_strtoul(argv[3], + NULL, + 0, + &ret, + SMB_STR_STANDARD); + if (ret != 0) { + fprintf(stderr, "recovery: unable to initialize generation\n"); + goto failed; + } + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + fprintf(stderr, "recovery: talloc_new() failed\n"); + goto failed; + } + + ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery"); + if (ret != 0) { + fprintf(stderr, "recovery: Unable to initialize logging\n"); + goto failed; + } + + ev = tevent_context_init(mem_ctx); + if (ev == NULL) { + D_ERR("tevent_context_init() failed\n"); + goto failed; + } + + status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL); + if (!status) { + D_ERR("logging_setup_sighup_handler() failed\n"); + goto failed; + } + + ret = ctdb_client_init(mem_ctx, ev, sockpath, &client); + if (ret != 0) { + D_ERR("ctdb_client_init() failed, ret=%d\n", ret); + goto failed; + } + + req = recovery_send(mem_ctx, ev, client, generation); + if (req == NULL) { + D_ERR("database_recover_send() failed\n"); + goto failed; + } + + if (! tevent_req_poll(req, ev)) { + D_ERR("tevent_req_poll() failed\n"); + goto failed; + } + + recovery_recv(req, &ret); + TALLOC_FREE(req); + if (ret != 0) { + D_ERR("database recovery failed, ret=%d\n", ret); + goto failed; + } + + sys_write(write_fd, &ret, sizeof(ret)); + return 0; + +failed: + TALLOC_FREE(mem_ctx); + return 1; +} |