diff options
Diffstat (limited to 'ctdb/client/client_db.c')
-rw-r--r-- | ctdb/client/client_db.c | 2791 |
1 files changed, 2791 insertions, 0 deletions
diff --git a/ctdb/client/client_db.c b/ctdb/client/client_db.c new file mode 100644 index 0000000..0b06d6e --- /dev/null +++ b/ctdb/client/client_db.c @@ -0,0 +1,2791 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "common/logging.h" + +#include "lib/tdb_wrap/tdb_wrap.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/dlinklist.h" +#include "lib/util/debug.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" +#include "client/client_private.h" +#include "client/client.h" + +struct tdb_context *client_db_tdb(struct ctdb_db_context *db) +{ + return db->ltdb->tdb; +} + +static struct ctdb_db_context *client_db_handle( + struct ctdb_client_context *client, + const char *db_name) +{ + struct ctdb_db_context *db; + + for (db = client->db; db != NULL; db = db->next) { + if (strcmp(db_name, db->db_name) == 0) { + return db; + } + } + + return NULL; +} + +static bool ctdb_db_persistent(struct ctdb_db_context *db) +{ + if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) { + return true; + } + return false; +} + +static bool ctdb_db_replicated(struct ctdb_db_context *db) +{ + if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) { + return true; + } + return false; +} + +static bool ctdb_db_volatile(struct ctdb_db_context *db) +{ + if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT || + db->db_flags & CTDB_DB_FLAGS_REPLICATED) { + return false; + } + return true; +} + +struct ctdb_set_db_flags_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct timeval timeout; + uint32_t db_id; + uint8_t db_flags; + bool readonly_done, sticky_done; + uint32_t *pnn_list; + int count; +}; + +static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq); +static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq); +static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq); + +static struct tevent_req *ctdb_set_db_flags_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t destnode, struct timeval timeout, + uint32_t db_id, uint8_t db_flags) +{ + struct tevent_req *req, *subreq; + struct ctdb_set_db_flags_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_set_db_flags_state); + if (req == NULL) { + return NULL; + } + + if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + state->ev = ev; + state->client = client; + state->timeout = timeout; + state->db_id = db_id; + state->db_flags = db_flags; + + ctdb_req_control_get_nodemap(&request); + subreq = ctdb_client_control_send(state, ev, client, destnode, timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req); + + return req; +} + +static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_set_db_flags_state *state = tevent_req_data( + req, struct ctdb_set_db_flags_state); + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + struct ctdb_node_map *nodemap; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN, + state, &state->pnn_list); + talloc_free(nodemap); + if (state->count <= 0) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x no connected nodes, count=%d\n", + state->db_id, state->count)); + tevent_req_error(req, ENOMEM); + return; + } + + if (state->db_flags & CTDB_DB_FLAGS_READONLY) { + ctdb_req_control_set_db_readonly(&request, state->db_id); + subreq = ctdb_client_control_multi_send( + state, state->ev, state->client, + state->pnn_list, state->count, + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, + ctdb_set_db_flags_readonly_done, req); + } else { + state->readonly_done = true; + } + + if (state->db_flags & CTDB_DB_FLAGS_STICKY) { + ctdb_req_control_set_db_sticky(&request, state->db_id); + subreq = ctdb_client_control_multi_send( + state, state->ev, state->client, + state->pnn_list, state->count, + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done, + req); + } else { + state->sticky_done = true; + } +} + +static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_set_db_flags_state *state = tevent_req_data( + req, struct ctdb_set_db_flags_state); + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL, + NULL); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + state->readonly_done = true; + + if (state->readonly_done && state->sticky_done) { + tevent_req_done(req); + } +} + +static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_set_db_flags_state *state = tevent_req_data( + req, struct ctdb_set_db_flags_state); + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL, + NULL); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + state->sticky_done = true; + + if (state->readonly_done && state->sticky_done) { + tevent_req_done(req); + } +} + +static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + return true; +} + +struct ctdb_attach_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct timeval timeout; + uint32_t destnode; + uint8_t db_flags; + struct ctdb_db_context *db; +}; + +static void ctdb_attach_dbid_done(struct tevent_req *subreq); +static void ctdb_attach_dbpath_done(struct tevent_req *subreq); +static void ctdb_attach_health_done(struct tevent_req *subreq); +static void ctdb_attach_flags_done(struct tevent_req *subreq); +static void ctdb_attach_open_flags_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + const char *db_name, uint8_t db_flags) +{ + struct tevent_req *req, *subreq; + struct ctdb_attach_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state); + if (req == NULL) { + return NULL; + } + + state->db = client_db_handle(client, db_name); + if (state->db != NULL) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + + state->ev = ev; + state->client = client; + state->timeout = timeout; + state->destnode = ctdb_client_pnn(client); + state->db_flags = db_flags; + + state->db = talloc_zero(client, struct ctdb_db_context); + if (tevent_req_nomem(state->db, req)) { + return tevent_req_post(req, ev); + } + + state->db->db_name = talloc_strdup(state->db, db_name); + if (tevent_req_nomem(state->db, req)) { + return tevent_req_post(req, ev); + } + + state->db->db_flags = db_flags; + + if (ctdb_db_persistent(state->db)) { + ctdb_req_control_db_attach_persistent(&request, + state->db->db_name); + } else if (ctdb_db_replicated(state->db)) { + ctdb_req_control_db_attach_replicated(&request, + state->db->db_name); + } else { + ctdb_req_control_db_attach(&request, state->db->db_name); + } + + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req); + + return req; +} + +static void ctdb_attach_dbid_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_req_control request; + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n", + state->db->db_name, + (ctdb_db_persistent(state->db) + ? "DB_ATTACH_PERSISTENT" + : (ctdb_db_replicated(state->db) + ? "DB_ATTACH_REPLICATED" + : "DB_ATTACH")), + ret)); + tevent_req_error(req, ret); + return; + } + + if (ctdb_db_persistent(state->db)) { + ret = ctdb_reply_control_db_attach_persistent( + reply, &state->db->db_id); + } else if (ctdb_db_replicated(state->db)) { + ret = ctdb_reply_control_db_attach_replicated( + reply, &state->db->db_id); + } else { + ret = ctdb_reply_control_db_attach(reply, &state->db->db_id); + } + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_getdbpath(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req); +} + +static void ctdb_attach_dbpath_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_getdbpath(reply, state->db, + &state->db->db_path); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_get_health(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_health_done, req); +} + +static void ctdb_attach_health_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_reply_control *reply; + const char *reason; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_get_health(reply, state, &reason); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + if (reason != NULL) { + /* Database unhealthy, avoid attach */ + DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n", + state->db->db_name, reason)); + tevent_req_error(req, EIO); + return; + } + + subreq = ctdb_set_db_flags_send(state, state->ev, state->client, + state->destnode, state->timeout, + state->db->db_id, state->db_flags); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_flags_done, req); +} + +static void ctdb_attach_flags_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_req_control request; + bool status; + int ret; + + status = ctdb_set_db_flags_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n", + state->db->db_name, state->db_flags)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_open_flags(&request, state->db->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req); +} + +static void ctdb_attach_open_flags_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + struct ctdb_reply_control *reply; + bool status; + int ret, tdb_flags; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n", + state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed," + " ret=%d\n", state->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0, + tdb_flags, O_RDWR, 0); + if (tevent_req_nomem(state->db->ltdb, req)) { + DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n", + state->db->db_name)); + return; + } + DLIST_ADD(state->client->db, state->db); + + tevent_req_done(req); +} + +bool ctdb_attach_recv(struct tevent_req *req, int *perr, + struct ctdb_db_context **out) +{ + struct ctdb_attach_state *state = tevent_req_data( + req, struct ctdb_attach_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + if (out != NULL) { + *out = state->db; + } + return true; +} + +int ctdb_attach(struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + const char *db_name, uint8_t db_flags, + struct ctdb_db_context **out) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + bool status; + int ret; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_attach_send(mem_ctx, ev, client, timeout, + db_name, db_flags); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_attach_recv(req, &ret, out); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + /* + ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func); + ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func); + ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func); + */ + + talloc_free(mem_ctx); + return 0; +} + +struct ctdb_detach_state { + struct ctdb_client_context *client; + struct tevent_context *ev; + struct timeval timeout; + uint32_t db_id; + const char *db_name; +}; + +static void ctdb_detach_dbname_done(struct tevent_req *subreq); +static void ctdb_detach_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, uint32_t db_id) +{ + struct tevent_req *req, *subreq; + struct ctdb_detach_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state); + if (req == NULL) { + return NULL; + } + + state->client = client; + state->ev = ev; + state->timeout = timeout; + state->db_id = db_id; + + ctdb_req_control_get_dbname(&request, db_id); + subreq = ctdb_client_control_send(state, ev, client, + ctdb_client_pnn(client), timeout, + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req); + + return req; +} + +static void ctdb_detach_dbname_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_detach_state *state = tevent_req_data( + req, struct ctdb_detach_state); + struct ctdb_reply_control *reply; + struct ctdb_req_control request; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n", + state->db_id, ret)); + tevent_req_error(req, ret); + return; + } + + ctdb_req_control_db_detach(&request, state->db_id); + subreq = ctdb_client_control_send(state, state->ev, state->client, + ctdb_client_pnn(state->client), + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_detach_done, req); + +} + +static void ctdb_detach_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_detach_state *state = tevent_req_data( + req, struct ctdb_detach_state); + struct ctdb_reply_control *reply; + struct ctdb_db_context *db; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n", + state->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_detach(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n", + state->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + db = client_db_handle(state->client, state->db_name); + if (db != NULL) { + DLIST_REMOVE(state->client->db, db); + TALLOC_FREE(db); + } + + tevent_req_done(req); +} + +bool ctdb_detach_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_detach(struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, uint32_t db_id) +{ + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + mem_ctx = talloc_new(client); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_detach_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +uint32_t ctdb_db_id(struct ctdb_db_context *db) +{ + return db->db_id; +} + +struct ctdb_db_traverse_local_state { + ctdb_rec_parser_func_t parser; + void *private_data; + bool extract_header; + int error; +}; + +static int ctdb_db_traverse_local_handler(struct tdb_context *tdb, + TDB_DATA key, TDB_DATA data, + void *private_data) +{ + struct ctdb_db_traverse_local_state *state = + (struct ctdb_db_traverse_local_state *)private_data; + int ret; + + if (state->extract_header) { + struct ctdb_ltdb_header header; + + ret = ctdb_ltdb_header_extract(&data, &header); + if (ret != 0) { + state->error = ret; + return 1; + } + + ret = state->parser(0, &header, key, data, state->private_data); + } else { + ret = state->parser(0, NULL, key, data, state->private_data); + } + + if (ret != 0) { + state->error = ret; + return 1; + } + + return 0; +} + +int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly, + bool extract_header, + ctdb_rec_parser_func_t parser, void *private_data) +{ + struct ctdb_db_traverse_local_state state; + int ret; + + state.parser = parser; + state.private_data = private_data; + state.extract_header = extract_header; + state.error = 0; + + if (readonly) { + ret = tdb_traverse_read(client_db_tdb(db), + ctdb_db_traverse_local_handler, + &state); + } else { + ret = tdb_traverse(client_db_tdb(db), + ctdb_db_traverse_local_handler, &state); + } + + if (ret == -1) { + return EIO; + } + + return state.error; +} + +struct ctdb_db_traverse_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + uint32_t destnode; + uint64_t srvid; + struct timeval timeout; + ctdb_rec_parser_func_t parser; + void *private_data; + int result; +}; + +static void ctdb_db_traverse_handler_set(struct tevent_req *subreq); +static void ctdb_db_traverse_started(struct tevent_req *subreq); +static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, + void *private_data); +static void ctdb_db_traverse_remove_handler(struct tevent_req *req); +static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq); + +struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, + struct timeval timeout, + ctdb_rec_parser_func_t parser, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_db_traverse_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_db_traverse_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->destnode = destnode; + state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid(); + state->timeout = timeout; + state->parser = parser; + state->private_data = private_data; + + subreq = ctdb_client_set_message_handler_send(state, ev, client, + state->srvid, + ctdb_db_traverse_handler, + req); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req); + + return req; +} + +static void ctdb_db_traverse_handler_set(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_traverse_start_ext traverse; + struct ctdb_req_control request; + int ret = 0; + bool status; + + status = ctdb_client_set_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + traverse = (struct ctdb_traverse_start_ext) { + .db_id = ctdb_db_id(state->db), + .reqid = 0, + .srvid = state->srvid, + .withemptyrecords = false, + }; + + ctdb_req_control_traverse_start_ext(&request, &traverse); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (subreq == NULL) { + state->result = ENOMEM; + ctdb_db_traverse_remove_handler(req); + return; + } + tevent_req_set_callback(subreq, ctdb_db_traverse_started, req); +} + +static void ctdb_db_traverse_started(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_reply_control *reply; + int ret = 0; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret)); + state->result = ret; + ctdb_db_traverse_remove_handler(req); + return; + } + + ret = ctdb_reply_control_traverse_start_ext(reply); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n", + ret)); + state->result = ret; + ctdb_db_traverse_remove_handler(req); + return; + } +} + +static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_rec_data *rec; + struct ctdb_ltdb_header header; + size_t np; + int ret; + + ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np); + if (ret != 0) { + return; + } + + if (rec->key.dsize == 0 && rec->data.dsize == 0) { + talloc_free(rec); + ctdb_db_traverse_remove_handler(req); + return; + } + + ret = ctdb_ltdb_header_extract(&rec->data, &header); + if (ret != 0) { + talloc_free(rec); + return; + } + + if (rec->data.dsize == 0) { + talloc_free(rec); + return; + } + + ret = state->parser(rec->reqid, &header, rec->key, rec->data, + state->private_data); + talloc_free(rec); + if (ret != 0) { + state->result = ret; + ctdb_db_traverse_remove_handler(req); + } +} + +static void ctdb_db_traverse_remove_handler(struct tevent_req *req) +{ + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct tevent_req *subreq; + + subreq = ctdb_client_remove_message_handler_send(state, state->ev, + state->client, + state->srvid, req); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req); +} + +static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + int ret; + bool status; + + status = ctdb_client_remove_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + if (state->result != 0) { + tevent_req_error(req, state->result); + return; + } + + tevent_req_done(req); +} + +bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, struct timeval timeout, + ctdb_rec_parser_func_t parser, void *private_data) +{ + struct tevent_req *req; + int ret = 0; + bool status; + + req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode, + timeout, parser, private_data); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_db_traverse_recv(req, &ret); + if (! status) { + return ret; + } + + return 0; +} + +int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key, + struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + TDB_DATA rec; + size_t np; + int ret; + + rec = tdb_fetch(client_db_tdb(db), key); + if (rec.dsize < sizeof(struct ctdb_ltdb_header)) { + /* No record present */ + if (rec.dptr != NULL) { + free(rec.dptr); + } + + if (tdb_error(client_db_tdb(db)) != TDB_ERR_NOEXIST) { + return EIO; + } + + *header = (struct ctdb_ltdb_header) { + .dmaster = CTDB_UNKNOWN_PNN, + }; + + if (data != NULL) { + *data = tdb_null; + } + return 0; + } + + ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np); + if (ret != 0) { + return ret; + } + + ret = 0; + if (data != NULL) { + data->dsize = rec.dsize - np; + data->dptr = talloc_memdup(mem_ctx, rec.dptr + np, + data->dsize); + if (data->dptr == NULL) { + ret = ENOMEM; + } + } + + free(rec.dptr); + return ret; +} + +/* + * Fetch a record from volatile database + * + * Steps: + * 1. Get a lock on the hash chain + * 2. If the record does not exist, migrate the record + * 3. If readonly=true and delegations do not exist, migrate the record. + * 4. If readonly=false and delegations exist, migrate the record. + * 5. If the local node is not dmaster, migrate the record. + * 6. Return record + */ + +struct ctdb_fetch_lock_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_record_handle *h; + bool readonly; + uint32_t pnn; +}; + +static int ctdb_fetch_lock_check(struct tevent_req *req); +static void ctdb_fetch_lock_migrate(struct tevent_req *req); +static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + TDB_DATA key, bool readonly) +{ + struct ctdb_fetch_lock_state *state; + struct tevent_req *req; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + + state->h = talloc_zero(db, struct ctdb_record_handle); + if (tevent_req_nomem(state->h, req)) { + return tevent_req_post(req, ev); + } + state->h->ev = ev; + state->h->client = client; + state->h->db = db; + state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize); + if (tevent_req_nomem(state->h->key.dptr, req)) { + return tevent_req_post(req, ev); + } + state->h->key.dsize = key.dsize; + state->h->readonly = false; + + state->readonly = readonly; + state->pnn = ctdb_client_pnn(client); + + /* Check that database is not persistent */ + if (! ctdb_db_volatile(db)) { + DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n", + db->db_name)); + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + ret = ctdb_fetch_lock_check(req); + if (ret == 0) { + tevent_req_done(req); + return tevent_req_post(req, ev); + } + if (ret != EAGAIN) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + return req; +} + +static int ctdb_fetch_lock_check(struct tevent_req *req) +{ + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_record_handle *h = state->h; + struct ctdb_ltdb_header header; + TDB_DATA data = tdb_null; + size_t np; + int ret, err = 0; + bool do_migrate = false; + + ret = tdb_chainlock(client_db_tdb(h->db), h->key); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("fetch_lock: %s tdb_chainlock failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + err = EIO; + goto failed; + } + + data = tdb_fetch(client_db_tdb(h->db), h->key); + if (data.dptr == NULL) { + if (tdb_error(client_db_tdb(h->db)) == TDB_ERR_NOEXIST) { + goto migrate; + } else { + err = EIO; + goto failed; + } + } + + /* Got the record */ + ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np); + if (ret != 0) { + err = ret; + goto failed; + } + + if (! state->readonly) { + /* Read/write access */ + if (header.dmaster == state->pnn && + header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) { + goto migrate; + } + + if (header.dmaster != state->pnn) { + goto migrate; + } + } else { + /* Readonly access */ + if (header.dmaster != state->pnn && + ! (header.flags & (CTDB_REC_RO_HAVE_READONLY | + CTDB_REC_RO_HAVE_DELEGATIONS))) { + goto migrate; + } + } + + /* We are the dmaster or readonly delegation */ + h->header = header; + h->data = data; + if (header.flags & (CTDB_REC_RO_HAVE_READONLY | + CTDB_REC_RO_HAVE_DELEGATIONS)) { + h->readonly = true; + } + return 0; + +migrate: + do_migrate = true; + err = EAGAIN; + +failed: + if (data.dptr != NULL) { + free(data.dptr); + } + ret = tdb_chainunlock(client_db_tdb(h->db), h->key); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("fetch_lock: %s tdb_chainunlock failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + return EIO; + } + + if (do_migrate) { + ctdb_fetch_lock_migrate(req); + } + return err; +} + +static void ctdb_fetch_lock_migrate(struct tevent_req *req) +{ + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_req_call request; + struct tevent_req *subreq; + + ZERO_STRUCT(request); + request.flags = CTDB_IMMEDIATE_MIGRATION; + if (state->readonly) { + request.flags |= CTDB_WANT_READONLY; + } + request.db_id = state->h->db->db_id; + request.callid = CTDB_NULL_FUNC; + request.key = state->h->key; + request.calldata = tdb_null; + + subreq = ctdb_client_call_send(state, state->ev, state->client, + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + + tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req); +} + +static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_reply_call *reply; + int ret; + bool status; + + status = ctdb_client_call_recv(subreq, state, &reply, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n", + state->h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + if (reply->status != 0) { + tevent_req_error(req, EIO); + return; + } + talloc_free(reply); + + ret = ctdb_fetch_lock_check(req); + if (ret != 0) { + if (ret != EAGAIN) { + tevent_req_error(req, ret); + } + return; + } + + tevent_req_done(req); +} + +static int ctdb_record_handle_destructor(struct ctdb_record_handle *h) +{ + int ret; + + ret = tdb_chainunlock(client_db_tdb(h->db), h->key); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("fetch_lock: %s tdb_chainunlock failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + } + free(h->data.dptr); + return 0; +} + +struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req, + struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, + TDB_DATA *data, int *perr) +{ + struct ctdb_fetch_lock_state *state = tevent_req_data( + req, struct ctdb_fetch_lock_state); + struct ctdb_record_handle *h = state->h; + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + TALLOC_FREE(state->h); + *perr = err; + } + return NULL; + } + + if (header != NULL) { + *header = h->header; + } + if (data != NULL) { + size_t offset; + + offset = ctdb_ltdb_header_len(&h->header); + + data->dsize = h->data.dsize - offset; + if (data->dsize == 0) { + data->dptr = NULL; + } else { + data->dptr = talloc_memdup(mem_ctx, + h->data.dptr + offset, + data->dsize); + if (data->dptr == NULL) { + TALLOC_FREE(state->h); + if (perr != NULL) { + *perr = ENOMEM; + } + return NULL; + } + } + } + + talloc_set_destructor(h, ctdb_record_handle_destructor); + return h; +} + +int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, TDB_DATA key, bool readonly, + struct ctdb_record_handle **out, + struct ctdb_ltdb_header *header, TDB_DATA *data) +{ + struct tevent_req *req; + struct ctdb_record_handle *h; + int ret = 0; + + req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret); + if (h == NULL) { + return ret; + } + + *out = h; + return 0; +} + +int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data) +{ + uint8_t header[sizeof(struct ctdb_ltdb_header)]; + TDB_DATA rec[2]; + size_t np; + int ret; + + /* Cannot modify the record if it was obtained as a readonly copy */ + if (h->readonly) { + return EINVAL; + } + + /* Check if the new data is same */ + if (h->data.dsize == data.dsize && + memcmp(h->data.dptr, data.dptr, data.dsize) == 0) { + /* No need to do anything */ + return 0; + } + + ctdb_ltdb_header_push(&h->header, header, &np); + + rec[0].dsize = np; + rec[0].dptr = header; + + rec[1].dsize = data.dsize; + rec[1].dptr = data.dptr; + + ret = tdb_storev(client_db_tdb(h->db), h->key, rec, 2, TDB_REPLACE); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("store_record: %s tdb_storev failed, %s\n", + h->db->db_name, tdb_errorstr(client_db_tdb(h->db)))); + return EIO; + } + + return 0; +} + +struct ctdb_delete_record_state { + struct ctdb_record_handle *h; +}; + +static void ctdb_delete_record_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_record_handle *h) +{ + struct tevent_req *req, *subreq; + struct ctdb_delete_record_state *state; + struct ctdb_key_data key; + struct ctdb_req_control request; + uint8_t header[sizeof(struct ctdb_ltdb_header)]; + TDB_DATA rec; + size_t np; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_delete_record_state); + if (req == NULL) { + return NULL; + } + + state->h = h; + + /* Cannot delete the record if it was obtained as a readonly copy */ + if (h->readonly) { + DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n", + h->db->db_name)); + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + ctdb_ltdb_header_push(&h->header, header, &np); + + rec.dsize = np; + rec.dptr = header; + + ret = tdb_store(client_db_tdb(h->db), h->key, rec, TDB_REPLACE); + if (ret != 0) { + D_ERR("fetch_lock delete: %s tdb_store failed, %s\n", + h->db->db_name, + tdb_errorstr(client_db_tdb(h->db))); + tevent_req_error(req, EIO); + return tevent_req_post(req, ev); + } + + key.db_id = h->db->db_id; + key.header = h->header; + key.key = h->key; + + ctdb_req_control_schedule_for_deletion(&request, &key); + subreq = ctdb_client_control_send(state, ev, h->client, + ctdb_client_pnn(h->client), + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_delete_record_done, req); + + return req; +} + +static void ctdb_delete_record_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_delete_record_state *state = tevent_req_data( + req, struct ctdb_delete_record_state); + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, NULL, NULL); + TALLOC_FREE(subreq); + if (! status) { + D_ERR("delete_record: %s SCHEDULE_FOR_DELETION failed, ret=%d\n", + state->h->db->db_name, + ret); + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_delete_record_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + + +int ctdb_delete_record(struct ctdb_record_handle *h) +{ + struct tevent_context *ev = h->ev; + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_delete_record_send(mem_ctx, ev, h); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_delete_record_recv(req, &ret); + talloc_free(mem_ctx); + if (! status) { + return ret; + } + + return 0; +} + +/* + * Global lock functions + */ + +struct ctdb_g_lock_lock_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + TDB_DATA key; + struct ctdb_server_id my_sid; + enum ctdb_g_lock_type lock_type; + struct ctdb_record_handle *h; + /* state for verification of active locks */ + struct ctdb_g_lock_list *lock_list; + unsigned int current; +}; + +static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq); +static void ctdb_g_lock_lock_process_locks(struct tevent_req *req); +static void ctdb_g_lock_lock_checked(struct tevent_req *subreq); +static int ctdb_g_lock_lock_update(struct tevent_req *req); +static void ctdb_g_lock_lock_retry(struct tevent_req *subreq); + +static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1, + enum ctdb_g_lock_type l2) +{ + if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) { + return false; + } + return true; +} + +struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + const char *keyname, + struct ctdb_server_id *sid, + bool readonly) +{ + struct tevent_req *req, *subreq; + struct ctdb_g_lock_lock_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_g_lock_lock_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->key.dptr = discard_const(keyname); + state->key.dsize = strlen(keyname) + 1; + state->my_sid = *sid; + state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE); + + subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key, + false); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req); + + return req; +} + +static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + TDB_DATA data; + size_t np; + int ret = 0; + + state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret); + TALLOC_FREE(subreq); + if (state->h == NULL) { + DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + if (state->lock_list != NULL) { + TALLOC_FREE(state->lock_list); + state->current = 0; + } + + ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state, + &state->lock_list, &np); + talloc_free(data.dptr); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + ctdb_g_lock_lock_process_locks(req); +} + +static void ctdb_g_lock_lock_process_locks(struct tevent_req *req) +{ + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + struct tevent_req *subreq; + struct ctdb_g_lock *lock; + bool check_server = false; + int ret; + + while (state->current < state->lock_list->num) { + lock = &state->lock_list->lock[state->current]; + + /* We should not ask for the same lock more than once */ + if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) { + DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n", + (char *)state->key.dptr)); + tevent_req_error(req, EDEADLK); + return; + } + + if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) { + check_server = true; + break; + } + + state->current += 1; + } + + if (check_server) { + struct ctdb_req_control request; + + ctdb_req_control_process_exists(&request, lock->sid.pid); + subreq = ctdb_client_control_send(state, state->ev, + state->client, + lock->sid.vnn, + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req); + return; + } + + /* There is no conflict, add ourself to the lock_list */ + state->lock_list->lock = talloc_realloc(state->lock_list, + state->lock_list->lock, + struct ctdb_g_lock, + state->lock_list->num + 1); + if (state->lock_list->lock == NULL) { + tevent_req_error(req, ENOMEM); + return; + } + + lock = &state->lock_list->lock[state->lock_list->num]; + lock->type = state->lock_type; + lock->sid = state->my_sid; + state->lock_list->num += 1; + + ret = ctdb_g_lock_lock_update(req); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + TALLOC_FREE(state->h); + tevent_req_done(req); +} + +static void ctdb_g_lock_lock_checked(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + struct ctdb_reply_control *reply; + int ret, value; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n", + (char *)state->key.dptr, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_process_exists(reply, &value); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + talloc_free(reply); + + if (value == 0) { + /* server process exists, need to retry */ + TALLOC_FREE(state->h); + subreq = tevent_wakeup_send(state, state->ev, + tevent_timeval_current_ofs(0,1000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req); + return; + } + + /* server process does not exist, remove conflicting entry */ + state->lock_list->lock[state->current] = + state->lock_list->lock[state->lock_list->num-1]; + state->lock_list->num -= 1; + + ret = ctdb_g_lock_lock_update(req); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ctdb_g_lock_lock_process_locks(req); +} + +static int ctdb_g_lock_lock_update(struct tevent_req *req) +{ + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + TDB_DATA data; + size_t np; + int ret; + + data.dsize = ctdb_g_lock_list_len(state->lock_list); + data.dptr = talloc_size(state, data.dsize); + if (data.dptr == NULL) { + return ENOMEM; + } + + ctdb_g_lock_list_push(state->lock_list, data.dptr, &np); + ret = ctdb_store_record(state->h, data); + talloc_free(data.dptr); + return ret; +} + +static void ctdb_g_lock_lock_retry(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + bool success; + + success = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (! success) { + tevent_req_error(req, ENOMEM); + return; + } + + subreq = ctdb_fetch_lock_send(state, state->ev, state->client, + state->db, state->key, false); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req); +} + +bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr) +{ + struct ctdb_g_lock_lock_state *state = tevent_req_data( + req, struct ctdb_g_lock_lock_state); + int err; + + TALLOC_FREE(state->h); + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +struct ctdb_g_lock_unlock_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + TDB_DATA key; + struct ctdb_server_id my_sid; + struct ctdb_record_handle *h; + struct ctdb_g_lock_list *lock_list; +}; + +static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq); +static int ctdb_g_lock_unlock_update(struct tevent_req *req); +static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq); + +struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + const char *keyname, + struct ctdb_server_id sid) +{ + struct tevent_req *req, *subreq; + struct ctdb_g_lock_unlock_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_g_lock_unlock_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->key.dptr = discard_const(keyname); + state->key.dsize = strlen(keyname) + 1; + state->my_sid = sid; + + subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key, + false); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req); + + return req; +} + +static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + TDB_DATA data; + size_t np; + int ret = 0; + + state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret); + TALLOC_FREE(subreq); + if (state->h == NULL) { + DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state, + &state->lock_list, &np); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n", + (char *)state->key.dptr)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_g_lock_unlock_update(req); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + if (state->lock_list->num == 0) { + subreq = ctdb_delete_record_send(state, state->ev, state->h); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted, + req); + return; + } + + TALLOC_FREE(state->h); + tevent_req_done(req); +} + +static int ctdb_g_lock_unlock_update(struct tevent_req *req) +{ + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + struct ctdb_g_lock *lock; + unsigned int i; + int ret; + + for (i=0; i<state->lock_list->num; i++) { + lock = &state->lock_list->lock[i]; + + if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) { + break; + } + } + + if (i < state->lock_list->num) { + state->lock_list->lock[i] = + state->lock_list->lock[state->lock_list->num-1]; + state->lock_list->num -= 1; + } + + if (state->lock_list->num != 0) { + TDB_DATA data; + size_t np; + + data.dsize = ctdb_g_lock_list_len(state->lock_list); + data.dptr = talloc_size(state, data.dsize); + if (data.dptr == NULL) { + return ENOMEM; + } + + ctdb_g_lock_list_push(state->lock_list, data.dptr, &np); + ret = ctdb_store_record(state->h, data); + talloc_free(data.dptr); + if (ret != 0) { + return ret; + } + } + + return 0; +} + +static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + int ret; + bool status; + + status = ctdb_delete_record_recv(subreq, &ret); + if (! status) { + DEBUG(DEBUG_ERR, + ("g_lock_unlock %s delete record failed, ret=%d\n", + (char *)state->key.dptr, ret)); + tevent_req_error(req, ret); + return; + } + + TALLOC_FREE(state->h); + tevent_req_done(req); +} + +bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr) +{ + struct ctdb_g_lock_unlock_state *state = tevent_req_data( + req, struct ctdb_g_lock_unlock_state); + int err; + + TALLOC_FREE(state->h); + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +/* + * Persistent database functions + */ +struct ctdb_transaction_start_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct timeval timeout; + struct ctdb_transaction_handle *h; + uint32_t destnode; +}; + +static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq); +static void ctdb_transaction_g_lock_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + struct ctdb_db_context *db, + bool readonly) +{ + struct ctdb_transaction_start_state *state; + struct tevent_req *req, *subreq; + struct ctdb_transaction_handle *h; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_transaction_start_state); + if (req == NULL) { + return NULL; + } + + if (ctdb_db_volatile(db)) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + state->ev = ev; + state->client = client; + state->destnode = ctdb_client_pnn(client); + + h = talloc_zero(db, struct ctdb_transaction_handle); + if (tevent_req_nomem(h, req)) { + return tevent_req_post(req, ev); + } + + h->ev = ev; + h->client = client; + h->db = db; + h->readonly = readonly; + h->updated = false; + + /* SRVID is unique for databases, so client can have transactions + * active for multiple databases */ + h->sid = ctdb_client_get_server_id(client, db->db_id); + + h->recbuf = ctdb_rec_buffer_init(h, db->db_id); + if (tevent_req_nomem(h->recbuf, req)) { + return tevent_req_post(req, ev); + } + + h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id); + if (tevent_req_nomem(h->lock_name, req)) { + return tevent_req_post(req, ev); + } + + state->h = h; + + subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req); + + return req; +} + +static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_start_state *state = tevent_req_data( + req, struct ctdb_transaction_start_state); + bool status; + int ret; + + status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_start: %s attach g_lock.tdb failed\n", + state->h->db->db_name)); + tevent_req_error(req, ret); + return; + } + + subreq = ctdb_g_lock_lock_send(state, state->ev, state->client, + state->h->db_g_lock, + state->h->lock_name, + &state->h->sid, state->h->readonly); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req); +} + +static void ctdb_transaction_g_lock_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_start_state *state = tevent_req_data( + req, struct ctdb_transaction_start_state); + int ret; + bool status; + + status = ctdb_g_lock_lock_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_start: %s g_lock lock failed, ret=%d\n", + state->h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +struct ctdb_transaction_handle *ctdb_transaction_start_recv( + struct tevent_req *req, + int *perr) +{ + struct ctdb_transaction_start_state *state = tevent_req_data( + req, struct ctdb_transaction_start_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return NULL; + } + + return state->h; +} + +int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct timeval timeout, + struct ctdb_db_context *db, bool readonly, + struct ctdb_transaction_handle **out) +{ + struct tevent_req *req; + struct ctdb_transaction_handle *h; + int ret = 0; + + req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db, + readonly); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + h = ctdb_transaction_start_recv(req, &ret); + if (h == NULL) { + return ret; + } + + *out = h; + return 0; +} + +struct ctdb_transaction_record_fetch_state { + TDB_DATA key, data; + struct ctdb_ltdb_header header; + bool found; +}; + +static int ctdb_transaction_record_fetch_traverse( + uint32_t reqid, + struct ctdb_ltdb_header *nullheader, + TDB_DATA key, TDB_DATA data, + void *private_data) +{ + struct ctdb_transaction_record_fetch_state *state = + (struct ctdb_transaction_record_fetch_state *)private_data; + + if (state->key.dsize == key.dsize && + memcmp(state->key.dptr, key.dptr, key.dsize) == 0) { + int ret; + + ret = ctdb_ltdb_header_extract(&data, &state->header); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("record_fetch: Failed to extract header, " + "ret=%d\n", ret)); + return 1; + } + + state->data = data; + state->found = true; + } + + return 0; +} + +static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h, + TDB_DATA key, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_transaction_record_fetch_state state; + int ret; + + state.key = key; + state.found = false; + + ret = ctdb_rec_buffer_traverse(h->recbuf, + ctdb_transaction_record_fetch_traverse, + &state); + if (ret != 0) { + return ret; + } + + if (state.found) { + if (header != NULL) { + *header = state.header; + } + if (data != NULL) { + *data = state.data; + } + return 0; + } + + return ENOENT; +} + +int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h, + TDB_DATA key, + TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + TDB_DATA tmp_data; + struct ctdb_ltdb_header header; + int ret; + + ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data); + if (ret == 0) { + data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr, + tmp_data.dsize); + if (data->dptr == NULL) { + return ENOMEM; + } + data->dsize = tmp_data.dsize; + return 0; + } + + ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data); + if (ret != 0) { + return ret; + } + + ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data); + if (ret != 0) { + return ret; + } + + return 0; +} + +int ctdb_transaction_store_record(struct ctdb_transaction_handle *h, + TDB_DATA key, TDB_DATA data) +{ + TALLOC_CTX *tmp_ctx; + struct ctdb_ltdb_header header; + TDB_DATA old_data; + int ret; + + if (h->readonly) { + return EINVAL; + } + + tmp_ctx = talloc_new(h); + if (tmp_ctx == NULL) { + return ENOMEM; + } + + ret = ctdb_transaction_record_fetch(h, key, &header, &old_data); + if (ret != 0) { + ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data); + if (ret != 0) { + return ret; + } + } + + if (old_data.dsize == data.dsize && + memcmp(old_data.dptr, data.dptr, data.dsize) == 0) { + talloc_free(tmp_ctx); + return 0; + } + + header.dmaster = ctdb_client_pnn(h->client); + header.rsn += 1; + + ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data); + talloc_free(tmp_ctx); + if (ret != 0) { + return ret; + } + h->updated = true; + + return 0; +} + +int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h, + TDB_DATA key) +{ + return ctdb_transaction_store_record(h, key, tdb_null); +} + +static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h, + uint64_t *seqnum) +{ + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + struct ctdb_ltdb_header header; + int ret; + + key.dptr = discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s seqnum fetch failed, ret=%d\n", + h->db->db_name, ret)); + return ret; + } + + if (data.dsize == 0) { + /* initial data */ + *seqnum = 0; + return 0; + } + + if (data.dsize != sizeof(uint64_t)) { + talloc_free(data.dptr); + return EINVAL; + } + + *seqnum = *(uint64_t *)data.dptr; + + talloc_free(data.dptr); + return 0; +} + +static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h, + uint64_t seqnum) +{ + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + + key.dptr = discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + data.dptr = (uint8_t *)&seqnum; + data.dsize = sizeof(seqnum); + + return ctdb_transaction_store_record(h, key, data); +} + +struct ctdb_transaction_commit_state { + struct tevent_context *ev; + struct timeval timeout; + struct ctdb_transaction_handle *h; + uint64_t seqnum; +}; + +static void ctdb_transaction_commit_done(struct tevent_req *subreq); +static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_transaction_commit_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct timeval timeout, + struct ctdb_transaction_handle *h) +{ + struct tevent_req *req, *subreq; + struct ctdb_transaction_commit_state *state; + struct ctdb_req_control request; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_transaction_commit_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->timeout = timeout; + state->h = h; + + ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ctdb_req_control_trans3_commit(&request, h->recbuf); + subreq = ctdb_client_control_send(state, ev, h->client, + ctdb_client_pnn(h->client), + timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req); + + return req; +} + +static void ctdb_transaction_commit_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_commit_state *state = tevent_req_data( + req, struct ctdb_transaction_commit_state); + struct ctdb_transaction_handle *h = state->h; + struct ctdb_reply_control *reply; + uint64_t seqnum; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n", + h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_trans3_commit(reply); + talloc_free(reply); + + if (ret != 0) { + /* Control failed due to recovery */ + + ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + if (seqnum == state->seqnum) { + struct ctdb_req_control request; + + /* try again */ + ctdb_req_control_trans3_commit(&request, + state->h->recbuf); + subreq = ctdb_client_control_send( + state, state->ev, state->h->client, + ctdb_client_pnn(state->h->client), + state->timeout, &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, + ctdb_transaction_commit_done, + req); + return; + } + + if (seqnum != state->seqnum + 1) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s seqnum mismatch " + "0x%"PRIx64" != 0x%"PRIx64" + 1\n", + state->h->db->db_name, seqnum, state->seqnum)); + tevent_req_error(req, EIO); + return; + } + } + + /* trans3_commit successful */ + subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client, + h->db_g_lock, h->lock_name, h->sid); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done, + req); +} + +static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_commit_state *state = tevent_req_data( + req, struct ctdb_transaction_commit_state); + int ret; + bool status; + + status = ctdb_g_lock_unlock_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_commit: %s g_lock unlock failed, ret=%d\n", + state->h->db->db_name, ret)); + tevent_req_error(req, ret); + return; + } + + talloc_free(state->h); + tevent_req_done(req); +} + +bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +int ctdb_transaction_commit(struct ctdb_transaction_handle *h) +{ + struct tevent_context *ev = h->ev; + TALLOC_CTX *mem_ctx; + struct tevent_req *req; + int ret; + bool status; + + if (h->readonly || ! h->updated) { + return ctdb_transaction_cancel(h); + } + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + return ENOMEM; + } + + req = ctdb_transaction_commit_send(mem_ctx, ev, + tevent_timeval_zero(), h); + if (req == NULL) { + talloc_free(mem_ctx); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_transaction_commit_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +struct ctdb_transaction_cancel_state { + struct tevent_context *ev; + struct ctdb_transaction_handle *h; + struct timeval timeout; +}; + +static void ctdb_transaction_cancel_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_transaction_cancel_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct timeval timeout, + struct ctdb_transaction_handle *h) +{ + struct tevent_req *req, *subreq; + struct ctdb_transaction_cancel_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_transaction_cancel_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->h = h; + state->timeout = timeout; + + subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client, + state->h->db_g_lock, + state->h->lock_name, state->h->sid); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_transaction_cancel_done, + req); + + return req; +} + +static void ctdb_transaction_cancel_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_transaction_cancel_state *state = tevent_req_data( + req, struct ctdb_transaction_cancel_state); + int ret; + bool status; + + status = ctdb_g_lock_unlock_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, + ("transaction_cancel: %s g_lock unlock failed, ret=%d\n", + state->h->db->db_name, ret)); + talloc_free(state->h); + tevent_req_error(req, ret); + return; + } + + talloc_free(state->h); + tevent_req_done(req); +} + +bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + return true; +} + +int ctdb_transaction_cancel(struct ctdb_transaction_handle *h) +{ + struct tevent_context *ev = h->ev; + struct tevent_req *req; + TALLOC_CTX *mem_ctx; + int ret; + bool status; + + mem_ctx = talloc_new(NULL); + if (mem_ctx == NULL) { + talloc_free(h); + return ENOMEM; + } + + req = ctdb_transaction_cancel_send(mem_ctx, ev, + tevent_timeval_zero(), h); + if (req == NULL) { + talloc_free(mem_ctx); + talloc_free(h); + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_transaction_cancel_recv(req, &ret); + if (! status) { + talloc_free(mem_ctx); + return ret; + } + + talloc_free(mem_ctx); + return 0; +} + +/* + * TODO: + * + * In future Samba should register SERVER_ID. + * Make that structure same as struct srvid {}. + */ |