summaryrefslogtreecommitdiffstats
path: root/ctdb/client/client_db.c
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/client/client_db.c')
-rw-r--r--ctdb/client/client_db.c2791
1 files changed, 2791 insertions, 0 deletions
diff --git a/ctdb/client/client_db.c b/ctdb/client/client_db.c
new file mode 100644
index 0000000..0b06d6e
--- /dev/null
+++ b/ctdb/client/client_db.c
@@ -0,0 +1,2791 @@
+/*
+ CTDB client code
+
+ Copyright (C) Amitay Isaacs 2015
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/network.h"
+#include "system/filesys.h"
+
+#include <talloc.h>
+#include <tevent.h>
+#include <tdb.h>
+
+#include "common/logging.h"
+
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "lib/util/tevent_unix.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+
+#include "protocol/protocol.h"
+#include "protocol/protocol_api.h"
+#include "client/client_private.h"
+#include "client/client.h"
+
+struct tdb_context *client_db_tdb(struct ctdb_db_context *db)
+{
+ return db->ltdb->tdb;
+}
+
+static struct ctdb_db_context *client_db_handle(
+ struct ctdb_client_context *client,
+ const char *db_name)
+{
+ struct ctdb_db_context *db;
+
+ for (db = client->db; db != NULL; db = db->next) {
+ if (strcmp(db_name, db->db_name) == 0) {
+ return db;
+ }
+ }
+
+ return NULL;
+}
+
+static bool ctdb_db_persistent(struct ctdb_db_context *db)
+{
+ if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
+ return true;
+ }
+ return false;
+}
+
+static bool ctdb_db_replicated(struct ctdb_db_context *db)
+{
+ if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
+ return true;
+ }
+ return false;
+}
+
+static bool ctdb_db_volatile(struct ctdb_db_context *db)
+{
+ if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT ||
+ db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
+ return false;
+ }
+ return true;
+}
+
+struct ctdb_set_db_flags_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct timeval timeout;
+ uint32_t db_id;
+ uint8_t db_flags;
+ bool readonly_done, sticky_done;
+ uint32_t *pnn_list;
+ int count;
+};
+
+static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
+static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
+static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
+
+static struct tevent_req *ctdb_set_db_flags_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t destnode, struct timeval timeout,
+ uint32_t db_id, uint8_t db_flags)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_set_db_flags_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_set_db_flags_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->timeout = timeout;
+ state->db_id = db_id;
+ state->db_flags = db_flags;
+
+ ctdb_req_control_get_nodemap(&request);
+ subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
+
+ return req;
+}
+
+static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_set_db_flags_state *state = tevent_req_data(
+ req, struct ctdb_set_db_flags_state);
+ struct ctdb_req_control request;
+ struct ctdb_reply_control *reply;
+ struct ctdb_node_map *nodemap;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
+ state->db_id, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
+ talloc_free(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
+ state->db_id, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
+ state, &state->pnn_list);
+ talloc_free(nodemap);
+ if (state->count <= 0) {
+ DEBUG(DEBUG_ERR,
+ ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
+ state->db_id, state->count));
+ tevent_req_error(req, ENOMEM);
+ return;
+ }
+
+ if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
+ ctdb_req_control_set_db_readonly(&request, state->db_id);
+ subreq = ctdb_client_control_multi_send(
+ state, state->ev, state->client,
+ state->pnn_list, state->count,
+ state->timeout, &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq,
+ ctdb_set_db_flags_readonly_done, req);
+ } else {
+ state->readonly_done = true;
+ }
+
+ if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
+ ctdb_req_control_set_db_sticky(&request, state->db_id);
+ subreq = ctdb_client_control_multi_send(
+ state, state->ev, state->client,
+ state->pnn_list, state->count,
+ state->timeout, &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
+ req);
+ } else {
+ state->sticky_done = true;
+ }
+}
+
+static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_set_db_flags_state *state = tevent_req_data(
+ req, struct ctdb_set_db_flags_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
+ state->db_id, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->readonly_done = true;
+
+ if (state->readonly_done && state->sticky_done) {
+ tevent_req_done(req);
+ }
+}
+
+static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_set_db_flags_state *state = tevent_req_data(
+ req, struct ctdb_set_db_flags_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
+ NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
+ state->db_id, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->sticky_done = true;
+
+ if (state->readonly_done && state->sticky_done) {
+ tevent_req_done(req);
+ }
+}
+
+static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
+{
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+ return true;
+}
+
+struct ctdb_attach_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct timeval timeout;
+ uint32_t destnode;
+ uint8_t db_flags;
+ struct ctdb_db_context *db;
+};
+
+static void ctdb_attach_dbid_done(struct tevent_req *subreq);
+static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
+static void ctdb_attach_health_done(struct tevent_req *subreq);
+static void ctdb_attach_flags_done(struct tevent_req *subreq);
+static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct timeval timeout,
+ const char *db_name, uint8_t db_flags)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_attach_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->db = client_db_handle(client, db_name);
+ if (state->db != NULL) {
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->timeout = timeout;
+ state->destnode = ctdb_client_pnn(client);
+ state->db_flags = db_flags;
+
+ state->db = talloc_zero(client, struct ctdb_db_context);
+ if (tevent_req_nomem(state->db, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ state->db->db_name = talloc_strdup(state->db, db_name);
+ if (tevent_req_nomem(state->db, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ state->db->db_flags = db_flags;
+
+ if (ctdb_db_persistent(state->db)) {
+ ctdb_req_control_db_attach_persistent(&request,
+ state->db->db_name);
+ } else if (ctdb_db_replicated(state->db)) {
+ ctdb_req_control_db_attach_replicated(&request,
+ state->db->db_name);
+ } else {
+ ctdb_req_control_db_attach(&request, state->db->db_name);
+ }
+
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
+
+ return req;
+}
+
+static void ctdb_attach_dbid_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_attach_state *state = tevent_req_data(
+ req, struct ctdb_attach_state);
+ struct ctdb_req_control request;
+ struct ctdb_reply_control *reply;
+ bool status;
+ int ret;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
+ state->db->db_name,
+ (ctdb_db_persistent(state->db)
+ ? "DB_ATTACH_PERSISTENT"
+ : (ctdb_db_replicated(state->db)
+ ? "DB_ATTACH_REPLICATED"
+ : "DB_ATTACH")),
+ ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (ctdb_db_persistent(state->db)) {
+ ret = ctdb_reply_control_db_attach_persistent(
+ reply, &state->db->db_id);
+ } else if (ctdb_db_replicated(state->db)) {
+ ret = ctdb_reply_control_db_attach_replicated(
+ reply, &state->db->db_id);
+ } else {
+ ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
+ }
+ talloc_free(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
+ state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_getdbpath(&request, state->db->db_id);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
+}
+
+static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_attach_state *state = tevent_req_data(
+ req, struct ctdb_attach_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_req_control request;
+ bool status;
+ int ret;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
+ state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_getdbpath(reply, state->db,
+ &state->db->db_path);
+ talloc_free(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
+ state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_db_get_health(&request, state->db->db_id);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
+}
+
+static void ctdb_attach_health_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_attach_state *state = tevent_req_data(
+ req, struct ctdb_attach_state);
+ struct ctdb_reply_control *reply;
+ const char *reason;
+ bool status;
+ int ret;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
+ state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_db_get_health(reply, state, &reason);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
+ state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (reason != NULL) {
+ /* Database unhealthy, avoid attach */
+ DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
+ state->db->db_name, reason));
+ tevent_req_error(req, EIO);
+ return;
+ }
+
+ subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ state->db->db_id, state->db_flags);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
+}
+
+static void ctdb_attach_flags_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_attach_state *state = tevent_req_data(
+ req, struct ctdb_attach_state);
+ struct ctdb_req_control request;
+ bool status;
+ int ret;
+
+ status = ctdb_set_db_flags_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
+ state->db->db_name, state->db_flags));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_db_open_flags(&request, state->db->db_id);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
+}
+
+static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_attach_state *state = tevent_req_data(
+ req, struct ctdb_attach_state);
+ struct ctdb_reply_control *reply;
+ bool status;
+ int ret, tdb_flags;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
+ state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
+ talloc_free(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
+ " ret=%d\n", state->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
+ tdb_flags, O_RDWR, 0);
+ if (tevent_req_nomem(state->db->ltdb, req)) {
+ DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
+ state->db->db_name));
+ return;
+ }
+ DLIST_ADD(state->client->db, state->db);
+
+ tevent_req_done(req);
+}
+
+bool ctdb_attach_recv(struct tevent_req *req, int *perr,
+ struct ctdb_db_context **out)
+{
+ struct ctdb_attach_state *state = tevent_req_data(
+ req, struct ctdb_attach_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ if (out != NULL) {
+ *out = state->db;
+ }
+ return true;
+}
+
+int ctdb_attach(struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct timeval timeout,
+ const char *db_name, uint8_t db_flags,
+ struct ctdb_db_context **out)
+{
+ TALLOC_CTX *mem_ctx;
+ struct tevent_req *req;
+ bool status;
+ int ret;
+
+ mem_ctx = talloc_new(client);
+ if (mem_ctx == NULL) {
+ return ENOMEM;
+ }
+
+ req = ctdb_attach_send(mem_ctx, ev, client, timeout,
+ db_name, db_flags);
+ if (req == NULL) {
+ talloc_free(mem_ctx);
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_attach_recv(req, &ret, out);
+ if (! status) {
+ talloc_free(mem_ctx);
+ return ret;
+ }
+
+ /*
+ ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
+ ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
+ ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
+ */
+
+ talloc_free(mem_ctx);
+ return 0;
+}
+
+struct ctdb_detach_state {
+ struct ctdb_client_context *client;
+ struct tevent_context *ev;
+ struct timeval timeout;
+ uint32_t db_id;
+ const char *db_name;
+};
+
+static void ctdb_detach_dbname_done(struct tevent_req *subreq);
+static void ctdb_detach_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct timeval timeout, uint32_t db_id)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_detach_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->client = client;
+ state->ev = ev;
+ state->timeout = timeout;
+ state->db_id = db_id;
+
+ ctdb_req_control_get_dbname(&request, db_id);
+ subreq = ctdb_client_control_send(state, ev, client,
+ ctdb_client_pnn(client), timeout,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
+
+ return req;
+}
+
+static void ctdb_detach_dbname_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_detach_state *state = tevent_req_data(
+ req, struct ctdb_detach_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_req_control request;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
+ state->db_id, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
+ state->db_id, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_req_control_db_detach(&request, state->db_id);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ ctdb_client_pnn(state->client),
+ state->timeout, &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_detach_done, req);
+
+}
+
+static void ctdb_detach_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_detach_state *state = tevent_req_data(
+ req, struct ctdb_detach_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_db_context *db;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
+ state->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_db_detach(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
+ state->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ db = client_db_handle(state->client, state->db_name);
+ if (db != NULL) {
+ DLIST_REMOVE(state->client->db, db);
+ TALLOC_FREE(db);
+ }
+
+ tevent_req_done(req);
+}
+
+bool ctdb_detach_recv(struct tevent_req *req, int *perr)
+{
+ int ret;
+
+ if (tevent_req_is_unix_error(req, &ret)) {
+ if (perr != NULL) {
+ *perr = ret;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+int ctdb_detach(struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct timeval timeout, uint32_t db_id)
+{
+ TALLOC_CTX *mem_ctx;
+ struct tevent_req *req;
+ int ret;
+ bool status;
+
+ mem_ctx = talloc_new(client);
+ if (mem_ctx == NULL) {
+ return ENOMEM;
+ }
+
+ req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
+ if (req == NULL) {
+ talloc_free(mem_ctx);
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_detach_recv(req, &ret);
+ if (! status) {
+ talloc_free(mem_ctx);
+ return ret;
+ }
+
+ talloc_free(mem_ctx);
+ return 0;
+}
+
+uint32_t ctdb_db_id(struct ctdb_db_context *db)
+{
+ return db->db_id;
+}
+
+struct ctdb_db_traverse_local_state {
+ ctdb_rec_parser_func_t parser;
+ void *private_data;
+ bool extract_header;
+ int error;
+};
+
+static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct ctdb_db_traverse_local_state *state =
+ (struct ctdb_db_traverse_local_state *)private_data;
+ int ret;
+
+ if (state->extract_header) {
+ struct ctdb_ltdb_header header;
+
+ ret = ctdb_ltdb_header_extract(&data, &header);
+ if (ret != 0) {
+ state->error = ret;
+ return 1;
+ }
+
+ ret = state->parser(0, &header, key, data, state->private_data);
+ } else {
+ ret = state->parser(0, NULL, key, data, state->private_data);
+ }
+
+ if (ret != 0) {
+ state->error = ret;
+ return 1;
+ }
+
+ return 0;
+}
+
+int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
+ bool extract_header,
+ ctdb_rec_parser_func_t parser, void *private_data)
+{
+ struct ctdb_db_traverse_local_state state;
+ int ret;
+
+ state.parser = parser;
+ state.private_data = private_data;
+ state.extract_header = extract_header;
+ state.error = 0;
+
+ if (readonly) {
+ ret = tdb_traverse_read(client_db_tdb(db),
+ ctdb_db_traverse_local_handler,
+ &state);
+ } else {
+ ret = tdb_traverse(client_db_tdb(db),
+ ctdb_db_traverse_local_handler, &state);
+ }
+
+ if (ret == -1) {
+ return EIO;
+ }
+
+ return state.error;
+}
+
+struct ctdb_db_traverse_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_db_context *db;
+ uint32_t destnode;
+ uint64_t srvid;
+ struct timeval timeout;
+ ctdb_rec_parser_func_t parser;
+ void *private_data;
+ int result;
+};
+
+static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
+static void ctdb_db_traverse_started(struct tevent_req *subreq);
+static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data);
+static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
+static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ uint32_t destnode,
+ struct timeval timeout,
+ ctdb_rec_parser_func_t parser,
+ void *private_data)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_db_traverse_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_db_traverse_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->db = db;
+ state->destnode = destnode;
+ state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
+ state->timeout = timeout;
+ state->parser = parser;
+ state->private_data = private_data;
+
+ subreq = ctdb_client_set_message_handler_send(state, ev, client,
+ state->srvid,
+ ctdb_db_traverse_handler,
+ req);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
+
+ return req;
+}
+
+static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct ctdb_traverse_start_ext traverse;
+ struct ctdb_req_control request;
+ int ret = 0;
+ bool status;
+
+ status = ctdb_client_set_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ traverse = (struct ctdb_traverse_start_ext) {
+ .db_id = ctdb_db_id(state->db),
+ .reqid = 0,
+ .srvid = state->srvid,
+ .withemptyrecords = false,
+ };
+
+ ctdb_req_control_traverse_start_ext(&request, &traverse);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ &request);
+ if (subreq == NULL) {
+ state->result = ENOMEM;
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
+}
+
+static void ctdb_db_traverse_started(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct ctdb_reply_control *reply;
+ int ret = 0;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
+ state->result = ret;
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+
+ ret = ctdb_reply_control_traverse_start_ext(reply);
+ talloc_free(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
+ ret));
+ state->result = ret;
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+}
+
+static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct ctdb_rec_data *rec;
+ struct ctdb_ltdb_header header;
+ size_t np;
+ int ret;
+
+ ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np);
+ if (ret != 0) {
+ return;
+ }
+
+ if (rec->key.dsize == 0 && rec->data.dsize == 0) {
+ talloc_free(rec);
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+
+ ret = ctdb_ltdb_header_extract(&rec->data, &header);
+ if (ret != 0) {
+ talloc_free(rec);
+ return;
+ }
+
+ if (rec->data.dsize == 0) {
+ talloc_free(rec);
+ return;
+ }
+
+ ret = state->parser(rec->reqid, &header, rec->key, rec->data,
+ state->private_data);
+ talloc_free(rec);
+ if (ret != 0) {
+ state->result = ret;
+ ctdb_db_traverse_remove_handler(req);
+ }
+}
+
+static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
+{
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct tevent_req *subreq;
+
+ subreq = ctdb_client_remove_message_handler_send(state, state->ev,
+ state->client,
+ state->srvid, req);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
+}
+
+static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_remove_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (state->result != 0) {
+ tevent_req_error(req, state->result);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
+{
+ int ret;
+
+ if (tevent_req_is_unix_error(req, &ret)) {
+ if (perr != NULL) {
+ *perr = ret;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ uint32_t destnode, struct timeval timeout,
+ ctdb_rec_parser_func_t parser, void *private_data)
+{
+ struct tevent_req *req;
+ int ret = 0;
+ bool status;
+
+ req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
+ timeout, parser, private_data);
+ if (req == NULL) {
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_db_traverse_recv(req, &ret);
+ if (! status) {
+ return ret;
+ }
+
+ return 0;
+}
+
+int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
+ struct ctdb_ltdb_header *header,
+ TALLOC_CTX *mem_ctx, TDB_DATA *data)
+{
+ TDB_DATA rec;
+ size_t np;
+ int ret;
+
+ rec = tdb_fetch(client_db_tdb(db), key);
+ if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
+ /* No record present */
+ if (rec.dptr != NULL) {
+ free(rec.dptr);
+ }
+
+ if (tdb_error(client_db_tdb(db)) != TDB_ERR_NOEXIST) {
+ return EIO;
+ }
+
+ *header = (struct ctdb_ltdb_header) {
+ .dmaster = CTDB_UNKNOWN_PNN,
+ };
+
+ if (data != NULL) {
+ *data = tdb_null;
+ }
+ return 0;
+ }
+
+ ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = 0;
+ if (data != NULL) {
+ data->dsize = rec.dsize - np;
+ data->dptr = talloc_memdup(mem_ctx, rec.dptr + np,
+ data->dsize);
+ if (data->dptr == NULL) {
+ ret = ENOMEM;
+ }
+ }
+
+ free(rec.dptr);
+ return ret;
+}
+
+/*
+ * Fetch a record from volatile database
+ *
+ * Steps:
+ * 1. Get a lock on the hash chain
+ * 2. If the record does not exist, migrate the record
+ * 3. If readonly=true and delegations do not exist, migrate the record.
+ * 4. If readonly=false and delegations exist, migrate the record.
+ * 5. If the local node is not dmaster, migrate the record.
+ * 6. Return record
+ */
+
+struct ctdb_fetch_lock_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_record_handle *h;
+ bool readonly;
+ uint32_t pnn;
+};
+
+static int ctdb_fetch_lock_check(struct tevent_req *req);
+static void ctdb_fetch_lock_migrate(struct tevent_req *req);
+static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ TDB_DATA key, bool readonly)
+{
+ struct ctdb_fetch_lock_state *state;
+ struct tevent_req *req;
+ int ret;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+
+ state->h = talloc_zero(db, struct ctdb_record_handle);
+ if (tevent_req_nomem(state->h, req)) {
+ return tevent_req_post(req, ev);
+ }
+ state->h->ev = ev;
+ state->h->client = client;
+ state->h->db = db;
+ state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
+ if (tevent_req_nomem(state->h->key.dptr, req)) {
+ return tevent_req_post(req, ev);
+ }
+ state->h->key.dsize = key.dsize;
+ state->h->readonly = false;
+
+ state->readonly = readonly;
+ state->pnn = ctdb_client_pnn(client);
+
+ /* Check that database is not persistent */
+ if (! ctdb_db_volatile(db)) {
+ DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
+ db->db_name));
+ tevent_req_error(req, EINVAL);
+ return tevent_req_post(req, ev);
+ }
+
+ ret = ctdb_fetch_lock_check(req);
+ if (ret == 0) {
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+ if (ret != EAGAIN) {
+ tevent_req_error(req, ret);
+ return tevent_req_post(req, ev);
+ }
+ return req;
+}
+
+static int ctdb_fetch_lock_check(struct tevent_req *req)
+{
+ struct ctdb_fetch_lock_state *state = tevent_req_data(
+ req, struct ctdb_fetch_lock_state);
+ struct ctdb_record_handle *h = state->h;
+ struct ctdb_ltdb_header header;
+ TDB_DATA data = tdb_null;
+ size_t np;
+ int ret, err = 0;
+ bool do_migrate = false;
+
+ ret = tdb_chainlock(client_db_tdb(h->db), h->key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("fetch_lock: %s tdb_chainlock failed, %s\n",
+ h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
+ err = EIO;
+ goto failed;
+ }
+
+ data = tdb_fetch(client_db_tdb(h->db), h->key);
+ if (data.dptr == NULL) {
+ if (tdb_error(client_db_tdb(h->db)) == TDB_ERR_NOEXIST) {
+ goto migrate;
+ } else {
+ err = EIO;
+ goto failed;
+ }
+ }
+
+ /* Got the record */
+ ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np);
+ if (ret != 0) {
+ err = ret;
+ goto failed;
+ }
+
+ if (! state->readonly) {
+ /* Read/write access */
+ if (header.dmaster == state->pnn &&
+ header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
+ goto migrate;
+ }
+
+ if (header.dmaster != state->pnn) {
+ goto migrate;
+ }
+ } else {
+ /* Readonly access */
+ if (header.dmaster != state->pnn &&
+ ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
+ CTDB_REC_RO_HAVE_DELEGATIONS))) {
+ goto migrate;
+ }
+ }
+
+ /* We are the dmaster or readonly delegation */
+ h->header = header;
+ h->data = data;
+ if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
+ CTDB_REC_RO_HAVE_DELEGATIONS)) {
+ h->readonly = true;
+ }
+ return 0;
+
+migrate:
+ do_migrate = true;
+ err = EAGAIN;
+
+failed:
+ if (data.dptr != NULL) {
+ free(data.dptr);
+ }
+ ret = tdb_chainunlock(client_db_tdb(h->db), h->key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("fetch_lock: %s tdb_chainunlock failed, %s\n",
+ h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
+ return EIO;
+ }
+
+ if (do_migrate) {
+ ctdb_fetch_lock_migrate(req);
+ }
+ return err;
+}
+
+static void ctdb_fetch_lock_migrate(struct tevent_req *req)
+{
+ struct ctdb_fetch_lock_state *state = tevent_req_data(
+ req, struct ctdb_fetch_lock_state);
+ struct ctdb_req_call request;
+ struct tevent_req *subreq;
+
+ ZERO_STRUCT(request);
+ request.flags = CTDB_IMMEDIATE_MIGRATION;
+ if (state->readonly) {
+ request.flags |= CTDB_WANT_READONLY;
+ }
+ request.db_id = state->h->db->db_id;
+ request.callid = CTDB_NULL_FUNC;
+ request.key = state->h->key;
+ request.calldata = tdb_null;
+
+ subreq = ctdb_client_call_send(state, state->ev, state->client,
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+
+ tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
+}
+
+static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_fetch_lock_state *state = tevent_req_data(
+ req, struct ctdb_fetch_lock_state);
+ struct ctdb_reply_call *reply;
+ int ret;
+ bool status;
+
+ status = ctdb_client_call_recv(subreq, state, &reply, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
+ state->h->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (reply->status != 0) {
+ tevent_req_error(req, EIO);
+ return;
+ }
+ talloc_free(reply);
+
+ ret = ctdb_fetch_lock_check(req);
+ if (ret != 0) {
+ if (ret != EAGAIN) {
+ tevent_req_error(req, ret);
+ }
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
+{
+ int ret;
+
+ ret = tdb_chainunlock(client_db_tdb(h->db), h->key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("fetch_lock: %s tdb_chainunlock failed, %s\n",
+ h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
+ }
+ free(h->data.dptr);
+ return 0;
+}
+
+struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
+ struct ctdb_ltdb_header *header,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA *data, int *perr)
+{
+ struct ctdb_fetch_lock_state *state = tevent_req_data(
+ req, struct ctdb_fetch_lock_state);
+ struct ctdb_record_handle *h = state->h;
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ TALLOC_FREE(state->h);
+ *perr = err;
+ }
+ return NULL;
+ }
+
+ if (header != NULL) {
+ *header = h->header;
+ }
+ if (data != NULL) {
+ size_t offset;
+
+ offset = ctdb_ltdb_header_len(&h->header);
+
+ data->dsize = h->data.dsize - offset;
+ if (data->dsize == 0) {
+ data->dptr = NULL;
+ } else {
+ data->dptr = talloc_memdup(mem_ctx,
+ h->data.dptr + offset,
+ data->dsize);
+ if (data->dptr == NULL) {
+ TALLOC_FREE(state->h);
+ if (perr != NULL) {
+ *perr = ENOMEM;
+ }
+ return NULL;
+ }
+ }
+ }
+
+ talloc_set_destructor(h, ctdb_record_handle_destructor);
+ return h;
+}
+
+int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db, TDB_DATA key, bool readonly,
+ struct ctdb_record_handle **out,
+ struct ctdb_ltdb_header *header, TDB_DATA *data)
+{
+ struct tevent_req *req;
+ struct ctdb_record_handle *h;
+ int ret = 0;
+
+ req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
+ if (req == NULL) {
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
+ if (h == NULL) {
+ return ret;
+ }
+
+ *out = h;
+ return 0;
+}
+
+int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
+{
+ uint8_t header[sizeof(struct ctdb_ltdb_header)];
+ TDB_DATA rec[2];
+ size_t np;
+ int ret;
+
+ /* Cannot modify the record if it was obtained as a readonly copy */
+ if (h->readonly) {
+ return EINVAL;
+ }
+
+ /* Check if the new data is same */
+ if (h->data.dsize == data.dsize &&
+ memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
+ /* No need to do anything */
+ return 0;
+ }
+
+ ctdb_ltdb_header_push(&h->header, header, &np);
+
+ rec[0].dsize = np;
+ rec[0].dptr = header;
+
+ rec[1].dsize = data.dsize;
+ rec[1].dptr = data.dptr;
+
+ ret = tdb_storev(client_db_tdb(h->db), h->key, rec, 2, TDB_REPLACE);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("store_record: %s tdb_storev failed, %s\n",
+ h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
+ return EIO;
+ }
+
+ return 0;
+}
+
+struct ctdb_delete_record_state {
+ struct ctdb_record_handle *h;
+};
+
+static void ctdb_delete_record_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_record_handle *h)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_delete_record_state *state;
+ struct ctdb_key_data key;
+ struct ctdb_req_control request;
+ uint8_t header[sizeof(struct ctdb_ltdb_header)];
+ TDB_DATA rec;
+ size_t np;
+ int ret;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_delete_record_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->h = h;
+
+ /* Cannot delete the record if it was obtained as a readonly copy */
+ if (h->readonly) {
+ DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
+ h->db->db_name));
+ tevent_req_error(req, EINVAL);
+ return tevent_req_post(req, ev);
+ }
+
+ ctdb_ltdb_header_push(&h->header, header, &np);
+
+ rec.dsize = np;
+ rec.dptr = header;
+
+ ret = tdb_store(client_db_tdb(h->db), h->key, rec, TDB_REPLACE);
+ if (ret != 0) {
+ D_ERR("fetch_lock delete: %s tdb_store failed, %s\n",
+ h->db->db_name,
+ tdb_errorstr(client_db_tdb(h->db)));
+ tevent_req_error(req, EIO);
+ return tevent_req_post(req, ev);
+ }
+
+ key.db_id = h->db->db_id;
+ key.header = h->header;
+ key.key = h->key;
+
+ ctdb_req_control_schedule_for_deletion(&request, &key);
+ subreq = ctdb_client_control_send(state, ev, h->client,
+ ctdb_client_pnn(h->client),
+ tevent_timeval_zero(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
+
+ return req;
+}
+
+static void ctdb_delete_record_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_delete_record_state *state = tevent_req_data(
+ req, struct ctdb_delete_record_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ D_ERR("delete_record: %s SCHEDULE_FOR_DELETION failed, ret=%d\n",
+ state->h->db->db_name,
+ ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
+{
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+
+int ctdb_delete_record(struct ctdb_record_handle *h)
+{
+ struct tevent_context *ev = h->ev;
+ TALLOC_CTX *mem_ctx;
+ struct tevent_req *req;
+ int ret;
+ bool status;
+
+ mem_ctx = talloc_new(NULL);
+ if (mem_ctx == NULL) {
+ return ENOMEM;
+ }
+
+ req = ctdb_delete_record_send(mem_ctx, ev, h);
+ if (req == NULL) {
+ talloc_free(mem_ctx);
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_delete_record_recv(req, &ret);
+ talloc_free(mem_ctx);
+ if (! status) {
+ return ret;
+ }
+
+ return 0;
+}
+
+/*
+ * Global lock functions
+ */
+
+struct ctdb_g_lock_lock_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_db_context *db;
+ TDB_DATA key;
+ struct ctdb_server_id my_sid;
+ enum ctdb_g_lock_type lock_type;
+ struct ctdb_record_handle *h;
+ /* state for verification of active locks */
+ struct ctdb_g_lock_list *lock_list;
+ unsigned int current;
+};
+
+static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
+static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
+static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
+static int ctdb_g_lock_lock_update(struct tevent_req *req);
+static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
+
+static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
+ enum ctdb_g_lock_type l2)
+{
+ if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
+ return false;
+ }
+ return true;
+}
+
+struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ const char *keyname,
+ struct ctdb_server_id *sid,
+ bool readonly)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_g_lock_lock_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_g_lock_lock_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->db = db;
+ state->key.dptr = discard_const(keyname);
+ state->key.dsize = strlen(keyname) + 1;
+ state->my_sid = *sid;
+ state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
+
+ subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
+ false);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
+
+ return req;
+}
+
+static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_g_lock_lock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_lock_state);
+ TDB_DATA data;
+ size_t np;
+ int ret = 0;
+
+ state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
+ TALLOC_FREE(subreq);
+ if (state->h == NULL) {
+ DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
+ (char *)state->key.dptr));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (state->lock_list != NULL) {
+ TALLOC_FREE(state->lock_list);
+ state->current = 0;
+ }
+
+ ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
+ &state->lock_list, &np);
+ talloc_free(data.dptr);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
+ (char *)state->key.dptr));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_g_lock_lock_process_locks(req);
+}
+
+static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
+{
+ struct ctdb_g_lock_lock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_lock_state);
+ struct tevent_req *subreq;
+ struct ctdb_g_lock *lock;
+ bool check_server = false;
+ int ret;
+
+ while (state->current < state->lock_list->num) {
+ lock = &state->lock_list->lock[state->current];
+
+ /* We should not ask for the same lock more than once */
+ if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
+ DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
+ (char *)state->key.dptr));
+ tevent_req_error(req, EDEADLK);
+ return;
+ }
+
+ if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
+ check_server = true;
+ break;
+ }
+
+ state->current += 1;
+ }
+
+ if (check_server) {
+ struct ctdb_req_control request;
+
+ ctdb_req_control_process_exists(&request, lock->sid.pid);
+ subreq = ctdb_client_control_send(state, state->ev,
+ state->client,
+ lock->sid.vnn,
+ tevent_timeval_zero(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
+ return;
+ }
+
+ /* There is no conflict, add ourself to the lock_list */
+ state->lock_list->lock = talloc_realloc(state->lock_list,
+ state->lock_list->lock,
+ struct ctdb_g_lock,
+ state->lock_list->num + 1);
+ if (state->lock_list->lock == NULL) {
+ tevent_req_error(req, ENOMEM);
+ return;
+ }
+
+ lock = &state->lock_list->lock[state->lock_list->num];
+ lock->type = state->lock_type;
+ lock->sid = state->my_sid;
+ state->lock_list->num += 1;
+
+ ret = ctdb_g_lock_lock_update(req);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ TALLOC_FREE(state->h);
+ tevent_req_done(req);
+}
+
+static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_g_lock_lock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_lock_state);
+ struct ctdb_reply_control *reply;
+ int ret, value;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
+ (char *)state->key.dptr, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_process_exists(reply, &value);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+ talloc_free(reply);
+
+ if (value == 0) {
+ /* server process exists, need to retry */
+ TALLOC_FREE(state->h);
+ subreq = tevent_wakeup_send(state, state->ev,
+ tevent_timeval_current_ofs(0,1000));
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
+ return;
+ }
+
+ /* server process does not exist, remove conflicting entry */
+ state->lock_list->lock[state->current] =
+ state->lock_list->lock[state->lock_list->num-1];
+ state->lock_list->num -= 1;
+
+ ret = ctdb_g_lock_lock_update(req);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ctdb_g_lock_lock_process_locks(req);
+}
+
+static int ctdb_g_lock_lock_update(struct tevent_req *req)
+{
+ struct ctdb_g_lock_lock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_lock_state);
+ TDB_DATA data;
+ size_t np;
+ int ret;
+
+ data.dsize = ctdb_g_lock_list_len(state->lock_list);
+ data.dptr = talloc_size(state, data.dsize);
+ if (data.dptr == NULL) {
+ return ENOMEM;
+ }
+
+ ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
+ ret = ctdb_store_record(state->h, data);
+ talloc_free(data.dptr);
+ return ret;
+}
+
+static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_g_lock_lock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_lock_state);
+ bool success;
+
+ success = tevent_wakeup_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (! success) {
+ tevent_req_error(req, ENOMEM);
+ return;
+ }
+
+ subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
+ state->db, state->key, false);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
+}
+
+bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
+{
+ struct ctdb_g_lock_lock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_lock_state);
+ int err;
+
+ TALLOC_FREE(state->h);
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+struct ctdb_g_lock_unlock_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_db_context *db;
+ TDB_DATA key;
+ struct ctdb_server_id my_sid;
+ struct ctdb_record_handle *h;
+ struct ctdb_g_lock_list *lock_list;
+};
+
+static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
+static int ctdb_g_lock_unlock_update(struct tevent_req *req);
+static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ const char *keyname,
+ struct ctdb_server_id sid)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_g_lock_unlock_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_g_lock_unlock_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->db = db;
+ state->key.dptr = discard_const(keyname);
+ state->key.dsize = strlen(keyname) + 1;
+ state->my_sid = sid;
+
+ subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
+ false);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
+
+ return req;
+}
+
+static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_g_lock_unlock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_unlock_state);
+ TDB_DATA data;
+ size_t np;
+ int ret = 0;
+
+ state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
+ TALLOC_FREE(subreq);
+ if (state->h == NULL) {
+ DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
+ (char *)state->key.dptr));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
+ &state->lock_list, &np);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
+ (char *)state->key.dptr));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_g_lock_unlock_update(req);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (state->lock_list->num == 0) {
+ subreq = ctdb_delete_record_send(state, state->ev, state->h);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
+ req);
+ return;
+ }
+
+ TALLOC_FREE(state->h);
+ tevent_req_done(req);
+}
+
+static int ctdb_g_lock_unlock_update(struct tevent_req *req)
+{
+ struct ctdb_g_lock_unlock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_unlock_state);
+ struct ctdb_g_lock *lock;
+ unsigned int i;
+ int ret;
+
+ for (i=0; i<state->lock_list->num; i++) {
+ lock = &state->lock_list->lock[i];
+
+ if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
+ break;
+ }
+ }
+
+ if (i < state->lock_list->num) {
+ state->lock_list->lock[i] =
+ state->lock_list->lock[state->lock_list->num-1];
+ state->lock_list->num -= 1;
+ }
+
+ if (state->lock_list->num != 0) {
+ TDB_DATA data;
+ size_t np;
+
+ data.dsize = ctdb_g_lock_list_len(state->lock_list);
+ data.dptr = talloc_size(state, data.dsize);
+ if (data.dptr == NULL) {
+ return ENOMEM;
+ }
+
+ ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
+ ret = ctdb_store_record(state->h, data);
+ talloc_free(data.dptr);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_g_lock_unlock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_unlock_state);
+ int ret;
+ bool status;
+
+ status = ctdb_delete_record_recv(subreq, &ret);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("g_lock_unlock %s delete record failed, ret=%d\n",
+ (char *)state->key.dptr, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ TALLOC_FREE(state->h);
+ tevent_req_done(req);
+}
+
+bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
+{
+ struct ctdb_g_lock_unlock_state *state = tevent_req_data(
+ req, struct ctdb_g_lock_unlock_state);
+ int err;
+
+ TALLOC_FREE(state->h);
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Persistent database functions
+ */
+struct ctdb_transaction_start_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct timeval timeout;
+ struct ctdb_transaction_handle *h;
+ uint32_t destnode;
+};
+
+static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
+static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct timeval timeout,
+ struct ctdb_db_context *db,
+ bool readonly)
+{
+ struct ctdb_transaction_start_state *state;
+ struct tevent_req *req, *subreq;
+ struct ctdb_transaction_handle *h;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_transaction_start_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ if (ctdb_db_volatile(db)) {
+ tevent_req_error(req, EINVAL);
+ return tevent_req_post(req, ev);
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->destnode = ctdb_client_pnn(client);
+
+ h = talloc_zero(db, struct ctdb_transaction_handle);
+ if (tevent_req_nomem(h, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ h->ev = ev;
+ h->client = client;
+ h->db = db;
+ h->readonly = readonly;
+ h->updated = false;
+
+ /* SRVID is unique for databases, so client can have transactions
+ * active for multiple databases */
+ h->sid = ctdb_client_get_server_id(client, db->db_id);
+
+ h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
+ if (tevent_req_nomem(h->recbuf, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
+ if (tevent_req_nomem(h->lock_name, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ state->h = h;
+
+ subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
+
+ return req;
+}
+
+static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_transaction_start_state *state = tevent_req_data(
+ req, struct ctdb_transaction_start_state);
+ bool status;
+ int ret;
+
+ status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_start: %s attach g_lock.tdb failed\n",
+ state->h->db->db_name));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
+ state->h->db_g_lock,
+ state->h->lock_name,
+ &state->h->sid, state->h->readonly);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
+}
+
+static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_transaction_start_state *state = tevent_req_data(
+ req, struct ctdb_transaction_start_state);
+ int ret;
+ bool status;
+
+ status = ctdb_g_lock_lock_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_start: %s g_lock lock failed, ret=%d\n",
+ state->h->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+struct ctdb_transaction_handle *ctdb_transaction_start_recv(
+ struct tevent_req *req,
+ int *perr)
+{
+ struct ctdb_transaction_start_state *state = tevent_req_data(
+ req, struct ctdb_transaction_start_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return NULL;
+ }
+
+ return state->h;
+}
+
+int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct timeval timeout,
+ struct ctdb_db_context *db, bool readonly,
+ struct ctdb_transaction_handle **out)
+{
+ struct tevent_req *req;
+ struct ctdb_transaction_handle *h;
+ int ret = 0;
+
+ req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
+ readonly);
+ if (req == NULL) {
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ h = ctdb_transaction_start_recv(req, &ret);
+ if (h == NULL) {
+ return ret;
+ }
+
+ *out = h;
+ return 0;
+}
+
+struct ctdb_transaction_record_fetch_state {
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ bool found;
+};
+
+static int ctdb_transaction_record_fetch_traverse(
+ uint32_t reqid,
+ struct ctdb_ltdb_header *nullheader,
+ TDB_DATA key, TDB_DATA data,
+ void *private_data)
+{
+ struct ctdb_transaction_record_fetch_state *state =
+ (struct ctdb_transaction_record_fetch_state *)private_data;
+
+ if (state->key.dsize == key.dsize &&
+ memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
+ int ret;
+
+ ret = ctdb_ltdb_header_extract(&data, &state->header);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("record_fetch: Failed to extract header, "
+ "ret=%d\n", ret));
+ return 1;
+ }
+
+ state->data = data;
+ state->found = true;
+ }
+
+ return 0;
+}
+
+static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
+ TDB_DATA key,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA *data)
+{
+ struct ctdb_transaction_record_fetch_state state;
+ int ret;
+
+ state.key = key;
+ state.found = false;
+
+ ret = ctdb_rec_buffer_traverse(h->recbuf,
+ ctdb_transaction_record_fetch_traverse,
+ &state);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (state.found) {
+ if (header != NULL) {
+ *header = state.header;
+ }
+ if (data != NULL) {
+ *data = state.data;
+ }
+ return 0;
+ }
+
+ return ENOENT;
+}
+
+int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
+ TDB_DATA key,
+ TALLOC_CTX *mem_ctx, TDB_DATA *data)
+{
+ TDB_DATA tmp_data;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
+ if (ret == 0) {
+ data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
+ tmp_data.dsize);
+ if (data->dptr == NULL) {
+ return ENOMEM;
+ }
+ data->dsize = tmp_data.dsize;
+ return 0;
+ }
+
+ ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
+ if (ret != 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
+int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
+ TDB_DATA key, TDB_DATA data)
+{
+ TALLOC_CTX *tmp_ctx;
+ struct ctdb_ltdb_header header;
+ TDB_DATA old_data;
+ int ret;
+
+ if (h->readonly) {
+ return EINVAL;
+ }
+
+ tmp_ctx = talloc_new(h);
+ if (tmp_ctx == NULL) {
+ return ENOMEM;
+ }
+
+ ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
+ if (ret != 0) {
+ ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+
+ if (old_data.dsize == data.dsize &&
+ memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
+ talloc_free(tmp_ctx);
+ return 0;
+ }
+
+ header.dmaster = ctdb_client_pnn(h->client);
+ header.rsn += 1;
+
+ ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
+ talloc_free(tmp_ctx);
+ if (ret != 0) {
+ return ret;
+ }
+ h->updated = true;
+
+ return 0;
+}
+
+int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
+ TDB_DATA key)
+{
+ return ctdb_transaction_store_record(h, key, tdb_null);
+}
+
+static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
+ uint64_t *seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ key.dptr = discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
+ h->db->db_name, ret));
+ return ret;
+ }
+
+ if (data.dsize == 0) {
+ /* initial data */
+ *seqnum = 0;
+ return 0;
+ }
+
+ if (data.dsize != sizeof(uint64_t)) {
+ talloc_free(data.dptr);
+ return EINVAL;
+ }
+
+ *seqnum = *(uint64_t *)data.dptr;
+
+ talloc_free(data.dptr);
+ return 0;
+}
+
+static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
+ uint64_t seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+
+ key.dptr = discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ data.dptr = (uint8_t *)&seqnum;
+ data.dsize = sizeof(seqnum);
+
+ return ctdb_transaction_store_record(h, key, data);
+}
+
+struct ctdb_transaction_commit_state {
+ struct tevent_context *ev;
+ struct timeval timeout;
+ struct ctdb_transaction_handle *h;
+ uint64_t seqnum;
+};
+
+static void ctdb_transaction_commit_done(struct tevent_req *subreq);
+static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_transaction_commit_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct timeval timeout,
+ struct ctdb_transaction_handle *h)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_transaction_commit_state *state;
+ struct ctdb_req_control request;
+ int ret;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_transaction_commit_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->timeout = timeout;
+ state->h = h;
+
+ ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return tevent_req_post(req, ev);
+ }
+
+ ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return tevent_req_post(req, ev);
+ }
+
+ ctdb_req_control_trans3_commit(&request, h->recbuf);
+ subreq = ctdb_client_control_send(state, ev, h->client,
+ ctdb_client_pnn(h->client),
+ timeout, &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
+
+ return req;
+}
+
+static void ctdb_transaction_commit_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_transaction_commit_state *state = tevent_req_data(
+ req, struct ctdb_transaction_commit_state);
+ struct ctdb_transaction_handle *h = state->h;
+ struct ctdb_reply_control *reply;
+ uint64_t seqnum;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
+ h->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_trans3_commit(reply);
+ talloc_free(reply);
+
+ if (ret != 0) {
+ /* Control failed due to recovery */
+
+ ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (seqnum == state->seqnum) {
+ struct ctdb_req_control request;
+
+ /* try again */
+ ctdb_req_control_trans3_commit(&request,
+ state->h->recbuf);
+ subreq = ctdb_client_control_send(
+ state, state->ev, state->h->client,
+ ctdb_client_pnn(state->h->client),
+ state->timeout, &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq,
+ ctdb_transaction_commit_done,
+ req);
+ return;
+ }
+
+ if (seqnum != state->seqnum + 1) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_commit: %s seqnum mismatch "
+ "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
+ state->h->db->db_name, seqnum, state->seqnum));
+ tevent_req_error(req, EIO);
+ return;
+ }
+ }
+
+ /* trans3_commit successful */
+ subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
+ h->db_g_lock, h->lock_name, h->sid);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
+ req);
+}
+
+static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_transaction_commit_state *state = tevent_req_data(
+ req, struct ctdb_transaction_commit_state);
+ int ret;
+ bool status;
+
+ status = ctdb_g_lock_unlock_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
+ state->h->db->db_name, ret));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ talloc_free(state->h);
+ tevent_req_done(req);
+}
+
+bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
+{
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+ struct tevent_context *ev = h->ev;
+ TALLOC_CTX *mem_ctx;
+ struct tevent_req *req;
+ int ret;
+ bool status;
+
+ if (h->readonly || ! h->updated) {
+ return ctdb_transaction_cancel(h);
+ }
+
+ mem_ctx = talloc_new(NULL);
+ if (mem_ctx == NULL) {
+ return ENOMEM;
+ }
+
+ req = ctdb_transaction_commit_send(mem_ctx, ev,
+ tevent_timeval_zero(), h);
+ if (req == NULL) {
+ talloc_free(mem_ctx);
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_transaction_commit_recv(req, &ret);
+ if (! status) {
+ talloc_free(mem_ctx);
+ return ret;
+ }
+
+ talloc_free(mem_ctx);
+ return 0;
+}
+
+struct ctdb_transaction_cancel_state {
+ struct tevent_context *ev;
+ struct ctdb_transaction_handle *h;
+ struct timeval timeout;
+};
+
+static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_transaction_cancel_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct timeval timeout,
+ struct ctdb_transaction_handle *h)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_transaction_cancel_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_transaction_cancel_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->h = h;
+ state->timeout = timeout;
+
+ subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
+ state->h->db_g_lock,
+ state->h->lock_name, state->h->sid);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
+ req);
+
+ return req;
+}
+
+static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_transaction_cancel_state *state = tevent_req_data(
+ req, struct ctdb_transaction_cancel_state);
+ int ret;
+ bool status;
+
+ status = ctdb_g_lock_unlock_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR,
+ ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
+ state->h->db->db_name, ret));
+ talloc_free(state->h);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ talloc_free(state->h);
+ tevent_req_done(req);
+}
+
+bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
+{
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
+{
+ struct tevent_context *ev = h->ev;
+ struct tevent_req *req;
+ TALLOC_CTX *mem_ctx;
+ int ret;
+ bool status;
+
+ mem_ctx = talloc_new(NULL);
+ if (mem_ctx == NULL) {
+ talloc_free(h);
+ return ENOMEM;
+ }
+
+ req = ctdb_transaction_cancel_send(mem_ctx, ev,
+ tevent_timeval_zero(), h);
+ if (req == NULL) {
+ talloc_free(mem_ctx);
+ talloc_free(h);
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_transaction_cancel_recv(req, &ret);
+ if (! status) {
+ talloc_free(mem_ctx);
+ return ret;
+ }
+
+ talloc_free(mem_ctx);
+ return 0;
+}
+
+/*
+ * TODO:
+ *
+ * In future Samba should register SERVER_ID.
+ * Make that structure same as struct srvid {}.
+ */