summaryrefslogtreecommitdiffstats
path: root/ctdb/server/ctdb_traverse.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:47:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:47:29 +0000
commit4f5791ebd03eaec1c7da0865a383175b05102712 (patch)
tree8ce7b00f7a76baa386372422adebbe64510812d4 /ctdb/server/ctdb_traverse.c
parentInitial commit. (diff)
downloadsamba-upstream.tar.xz
samba-upstream.zip
Adding upstream version 2:4.17.12+dfsg.upstream/2%4.17.12+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ctdb/server/ctdb_traverse.c')
-rw-r--r--ctdb/server/ctdb_traverse.c781
1 files changed, 781 insertions, 0 deletions
diff --git a/ctdb/server/ctdb_traverse.c b/ctdb/server/ctdb_traverse.c
new file mode 100644
index 0000000..4865dcc
--- /dev/null
+++ b/ctdb/server/ctdb_traverse.c
@@ -0,0 +1,781 @@
+/*
+ efficient async ctdb traverse
+
+ Copyright (C) Andrew Tridgell 2007
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "system/network.h"
+#include "system/wait.h"
+#include "system/time.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
+#include "lib/util/util_process.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
+
+typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
+
+/*
+ handle returned to caller - freeing this handler will kill the child and
+ terminate the traverse
+ */
+struct ctdb_traverse_local_handle {
+ struct ctdb_traverse_local_handle *next, *prev;
+ struct ctdb_db_context *ctdb_db;
+ int fd[2];
+ pid_t child;
+ uint64_t srvid;
+ uint32_t client_reqid;
+ uint32_t reqid;
+ int srcnode;
+ void *private_data;
+ ctdb_traverse_fn_t callback;
+ bool withemptyrecords;
+ struct tevent_fd *fde;
+ int records_failed;
+ int records_sent;
+};
+
+/*
+ * called when traverse is completed by child or on error
+ */
+static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
+{
+ struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
+ struct ctdb_traverse_local_handle);
+ ctdb_traverse_fn_t callback = h->callback;
+ void *p = h->private_data;
+ int res;
+ ssize_t n;
+
+ /* Read the number of records sent by traverse child */
+ n = sys_read(h->fd[0], &res, sizeof(res));
+ if (n < 0 || n != sizeof(res)) {
+ /* Traverse child failed */
+ DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
+ h->ctdb_db->db_name, h->reqid));
+ } else if (res < 0) {
+ /* Traverse failed */
+ res = -res;
+ DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
+ h->ctdb_db->db_name, h->reqid, res));
+ } else {
+ DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
+ h->ctdb_db->db_name, h->reqid, res));
+ }
+
+ callback(p, tdb_null, tdb_null);
+}
+
+/*
+ destroy a in-flight traverse operation
+ */
+static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
+{
+ DLIST_REMOVE(h->ctdb_db->traverse, h);
+ ctdb_kill(h->ctdb_db->ctdb, h->child, SIGKILL);
+ return 0;
+}
+
+/*
+ callback from tdb_traverse_read()
+ */
+static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+ struct ctdb_traverse_local_handle *h = talloc_get_type(p,
+ struct ctdb_traverse_local_handle);
+ struct ctdb_rec_data_old *d;
+ struct ctdb_ltdb_header *hdr;
+ int res, status;
+ TDB_DATA outdata;
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+ if (ctdb_db_volatile(h->ctdb_db)) {
+ /* filter out zero-length records */
+ if (!h->withemptyrecords &&
+ data.dsize <= sizeof(struct ctdb_ltdb_header))
+ {
+ return 0;
+ }
+
+ /* filter out non-authoritative records */
+ if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
+ return 0;
+ }
+ }
+
+ d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
+ if (d == NULL) {
+ /* error handling is tricky in this child code .... */
+ h->records_failed++;
+ return -1;
+ }
+
+ outdata.dptr = (uint8_t *)d;
+ outdata.dsize = d->length;
+
+ res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
+ CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
+ if (res != 0 || status != 0) {
+ h->records_failed++;
+ return -1;
+ }
+
+ h->records_sent++;
+ return 0;
+}
+
+struct traverse_all_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_traverse_local_handle *h;
+ uint32_t reqid;
+ uint32_t srcnode;
+ uint32_t client_reqid;
+ uint64_t srvid;
+ bool withemptyrecords;
+};
+
+/*
+ setup a non-blocking traverse of a local ltdb. The callback function
+ will be called on every record in the local ltdb. To stop the
+ traverse, talloc_free() the traverse_handle.
+
+ The traverse is finished when the callback is called with tdb_null for key and data
+ */
+static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
+ ctdb_traverse_fn_t callback,
+ struct traverse_all_state *all_state)
+{
+ struct ctdb_traverse_local_handle *h;
+ int ret;
+
+ h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
+ if (h == NULL) {
+ return NULL;
+ }
+
+ ret = pipe(h->fd);
+
+ if (ret != 0) {
+ talloc_free(h);
+ return NULL;
+ }
+
+ h->child = ctdb_fork(ctdb_db->ctdb);
+
+ if (h->child == (pid_t)-1) {
+ close(h->fd[0]);
+ close(h->fd[1]);
+ talloc_free(h);
+ return NULL;
+ }
+
+ h->callback = callback;
+ h->private_data = all_state;
+ h->ctdb_db = ctdb_db;
+ h->client_reqid = all_state->client_reqid;
+ h->reqid = all_state->reqid;
+ h->srvid = all_state->srvid;
+ h->srcnode = all_state->srcnode;
+ h->withemptyrecords = all_state->withemptyrecords;
+
+ if (h->child == 0) {
+ /* start the traverse in the child */
+ int res, status;
+ pid_t parent = getpid();
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ struct ctdb_rec_data_old *d;
+ TDB_DATA outdata;
+
+ close(h->fd[0]);
+
+ prctl_set_comment("ctdb_traverse");
+ if (switch_from_server_to_client(ctdb) != 0) {
+ DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
+ _exit(0);
+ }
+
+ d = ctdb_marshall_record(h, h->reqid, tdb_null, NULL, tdb_null);
+ if (d == NULL) {
+ res = 0;
+ sys_write(h->fd[1], &res, sizeof(int));
+ _exit(0);
+ }
+
+ res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
+ if (res == -1 || h->records_failed > 0) {
+ /* traverse failed */
+ res = -(h->records_sent);
+ } else {
+ res = h->records_sent;
+ }
+
+ /* Wait till all the data is flushed from output queue */
+ while (ctdb_queue_length(ctdb->daemon.queue) > 0) {
+ tevent_loop_once(ctdb->ev);
+ }
+
+ /* End traverse by sending empty record */
+ outdata.dptr = (uint8_t *)d;
+ outdata.dsize = d->length;
+ ret = ctdb_control(ctdb, h->srcnode, 0,
+ CTDB_CONTROL_TRAVERSE_DATA,
+ CTDB_CTRL_FLAG_NOREPLY, outdata,
+ NULL, NULL, &status, NULL, NULL);
+ if (ret == -1 || status == -1) {
+ if (res > 0) {
+ res = -res;
+ }
+ }
+
+ sys_write(h->fd[1], &res, sizeof(res));
+
+ ctdb_wait_for_process_to_exit(parent);
+ _exit(0);
+ }
+
+ close(h->fd[1]);
+ set_close_on_exec(h->fd[0]);
+
+ talloc_set_destructor(h, traverse_local_destructor);
+
+ DLIST_ADD(ctdb_db->traverse, h);
+
+ h->fde = tevent_add_fd(ctdb_db->ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
+ ctdb_traverse_child_handler, h);
+ if (h->fde == NULL) {
+ close(h->fd[0]);
+ talloc_free(h);
+ return NULL;
+ }
+ tevent_fd_set_auto_close(h->fde);
+
+ return h;
+}
+
+
+struct ctdb_traverse_all_handle {
+ struct ctdb_context *ctdb;
+ struct ctdb_db_context *ctdb_db;
+ uint32_t reqid;
+ ctdb_traverse_fn_t callback;
+ void *private_data;
+ uint32_t null_count;
+ bool timedout;
+};
+
+/*
+ destroy a traverse_all op
+ */
+static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
+{
+ reqid_remove(state->ctdb->idr, state->reqid);
+ return 0;
+}
+
+/* called when a traverse times out */
+static void ctdb_traverse_all_timeout(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
+
+ DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
+ CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
+
+ state->timedout = true;
+ state->callback(state->private_data, tdb_null, tdb_null);
+}
+
+
+struct traverse_start_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_traverse_all_handle *h;
+ uint32_t srcnode;
+ uint32_t reqid;
+ uint32_t db_id;
+ uint64_t srvid;
+ bool withemptyrecords;
+ int num_records;
+};
+
+
+/*
+ setup a cluster-wide non-blocking traverse of a ctdb. The
+ callback function will be called on every record in the local
+ ltdb. To stop the traverse, talloc_free() the traverse_handle.
+
+ The traverse is finished when the callback is called with tdb_null
+ for key and data
+ */
+static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
+ ctdb_traverse_fn_t callback,
+ struct traverse_start_state *start_state)
+{
+ struct ctdb_traverse_all_handle *state;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ int ret;
+ TDB_DATA data;
+ struct ctdb_traverse_all r;
+ struct ctdb_traverse_all_ext r_ext;
+ uint32_t destination;
+
+ state = talloc(start_state, struct ctdb_traverse_all_handle);
+ if (state == NULL) {
+ return NULL;
+ }
+
+ state->ctdb = ctdb;
+ state->ctdb_db = ctdb_db;
+ state->reqid = reqid_new(ctdb_db->ctdb->idr, state);
+ state->callback = callback;
+ state->private_data = start_state;
+ state->null_count = 0;
+ state->timedout = false;
+
+ talloc_set_destructor(state, ctdb_traverse_all_destructor);
+
+ if (start_state->withemptyrecords) {
+ r_ext.db_id = ctdb_db->db_id;
+ r_ext.reqid = state->reqid;
+ r_ext.pnn = ctdb->pnn;
+ r_ext.client_reqid = start_state->reqid;
+ r_ext.srvid = start_state->srvid;
+ r_ext.withemptyrecords = start_state->withemptyrecords;
+
+ data.dptr = (uint8_t *)&r_ext;
+ data.dsize = sizeof(r_ext);
+ } else {
+ r.db_id = ctdb_db->db_id;
+ r.reqid = state->reqid;
+ r.pnn = ctdb->pnn;
+ r.client_reqid = start_state->reqid;
+ r.srvid = start_state->srvid;
+
+ data.dptr = (uint8_t *)&r;
+ data.dsize = sizeof(r);
+ }
+
+ if (ctdb_db_volatile(ctdb_db)) {
+ /* volatile database, traverse all active nodes */
+ destination = CTDB_BROADCAST_ACTIVE;
+ } else {
+ unsigned int i;
+ /* persistent database, traverse one node, preferably
+ * the local one
+ */
+ destination = ctdb->pnn;
+ /* check we are in the vnnmap */
+ for (i=0; i < ctdb->vnn_map->size; i++) {
+ if (ctdb->vnn_map->map[i] == ctdb->pnn) {
+ break;
+ }
+ }
+ /* if we are not in the vnn map we just pick the first
+ * node instead
+ */
+ if (i == ctdb->vnn_map->size) {
+ destination = ctdb->vnn_map->map[0];
+ }
+ }
+
+ /* tell all the nodes in the cluster to start sending records to this
+ * node, or if it is a persistent database, just tell the local
+ * node
+ */
+
+ if (start_state->withemptyrecords) {
+ ret = ctdb_daemon_send_control(ctdb, destination, 0,
+ CTDB_CONTROL_TRAVERSE_ALL_EXT,
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ } else {
+ ret = ctdb_daemon_send_control(ctdb, destination, 0,
+ CTDB_CONTROL_TRAVERSE_ALL,
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ }
+
+ if (ret != 0) {
+ talloc_free(state);
+ return NULL;
+ }
+
+ DEBUG(DEBUG_NOTICE,("Starting traverse on DB %s (id %d)\n",
+ ctdb_db->db_name, state->reqid));
+
+ /* timeout the traverse */
+ tevent_add_timer(ctdb->ev, state,
+ timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
+ ctdb_traverse_all_timeout, state);
+
+ return state;
+}
+
+/*
+ called when local traverse ends
+ */
+static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
+{
+ struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
+
+ /* we're done */
+ talloc_free(state);
+}
+
+/*
+ * extended version to take the "withemptyrecords" parameter"
+ */
+int32_t ctdb_control_traverse_all_ext(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+ struct ctdb_traverse_all_ext *c = (struct ctdb_traverse_all_ext *)data.dptr;
+ struct traverse_all_state *state;
+ struct ctdb_db_context *ctdb_db;
+
+ if (data.dsize != sizeof(struct ctdb_traverse_all_ext)) {
+ DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all_ext\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ if (ctdb_db->unhealthy_reason) {
+ if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+ DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
+ state = talloc(ctdb_db, struct traverse_all_state);
+ if (state == NULL) {
+ return -1;
+ }
+
+ state->reqid = c->reqid;
+ state->srcnode = c->pnn;
+ state->ctdb = ctdb;
+ state->client_reqid = c->client_reqid;
+ state->srvid = c->srvid;
+ state->withemptyrecords = c->withemptyrecords;
+
+ state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
+ if (state->h == NULL) {
+ talloc_free(state);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
+ setup a traverse of our local ltdb, sending the records as
+ CTDB_CONTROL_TRAVERSE_DATA records back to the originator
+ */
+int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+ struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
+ struct traverse_all_state *state;
+ struct ctdb_db_context *ctdb_db;
+
+ if (data.dsize != sizeof(struct ctdb_traverse_all)) {
+ DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ if (ctdb_db->unhealthy_reason) {
+ if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+ DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
+ state = talloc(ctdb_db, struct traverse_all_state);
+ if (state == NULL) {
+ return -1;
+ }
+
+ state->reqid = c->reqid;
+ state->srcnode = c->pnn;
+ state->ctdb = ctdb;
+ state->client_reqid = c->client_reqid;
+ state->srvid = c->srvid;
+ state->withemptyrecords = false;
+
+ state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
+ if (state->h == NULL) {
+ talloc_free(state);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
+ call the traverse_all callback with the record
+ */
+int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+ struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
+ struct ctdb_traverse_all_handle *state;
+ TDB_DATA key;
+ ctdb_traverse_fn_t callback;
+ void *private_data;
+
+ if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
+ DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
+ return -1;
+ }
+
+ state = reqid_find(ctdb->idr, d->reqid, struct ctdb_traverse_all_handle);
+ if (state == NULL || d->reqid != state->reqid) {
+ /* traverse might have been terminated already */
+ return -1;
+ }
+
+ key.dsize = d->keylen;
+ key.dptr = &d->data[0];
+ data.dsize = d->datalen;
+ data.dptr = &d->data[d->keylen];
+
+ if (key.dsize == 0 && data.dsize == 0) {
+ state->null_count++;
+ /* Persistent databases are only scanned on one node (the local
+ * node)
+ */
+ if (ctdb_db_volatile(state->ctdb_db)) {
+ if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
+ return 0;
+ }
+ }
+ }
+
+ callback = state->callback;
+ private_data = state->private_data;
+
+ callback(private_data, key, data);
+ return 0;
+}
+
+/*
+ kill a in-progress traverse, used when a client disconnects
+ */
+int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
+ TDB_DATA *outdata, uint32_t srcnode)
+{
+ struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_traverse_local_handle *t;
+
+ ctdb_db = find_ctdb_db(ctdb, d->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ for (t=ctdb_db->traverse; t; t=t->next) {
+ if (t->client_reqid == d->reqid &&
+ t->srvid == d->srvid) {
+ talloc_free(t);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ this is called when a client disconnects during a traverse
+ we need to notify all the nodes taking part in the search that they
+ should kill their traverse children
+ */
+static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
+{
+ struct ctdb_traverse_start r;
+ TDB_DATA data;
+
+ DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
+ r.db_id = state->db_id;
+ r.reqid = state->reqid;
+ r.srvid = state->srvid;
+
+ data.dptr = (uint8_t *)&r;
+ data.dsize = sizeof(r);
+
+ ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
+ CTDB_CONTROL_TRAVERSE_KILL,
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ return 0;
+}
+
+/*
+ callback which sends records as messages to the client
+ */
+static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
+{
+ struct traverse_start_state *state;
+ struct ctdb_rec_data_old *d;
+ TDB_DATA cdata;
+
+ state = talloc_get_type(p, struct traverse_start_state);
+
+ d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
+ if (d == NULL) {
+ return;
+ }
+
+ cdata.dptr = (uint8_t *)d;
+ cdata.dsize = d->length;
+
+ srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
+ if (key.dsize == 0 && data.dsize == 0) {
+ DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
+ state->h->ctdb_db->db_name, state->h->reqid,
+ state->num_records));
+
+ if (state->h->timedout) {
+ /* timed out, send TRAVERSE_KILL control */
+ talloc_free(state);
+ } else {
+ /* end of traverse */
+ talloc_set_destructor(state, NULL);
+ talloc_free(state);
+ }
+ } else {
+ state->num_records++;
+ }
+}
+
+
+/**
+ * start a traverse_all - called as a control from a client.
+ * extended version to take the "withemptyrecords" parameter.
+ */
+int32_t ctdb_control_traverse_start_ext(struct ctdb_context *ctdb,
+ TDB_DATA data,
+ TDB_DATA *outdata,
+ uint32_t srcnode,
+ uint32_t client_id)
+{
+ struct ctdb_traverse_start_ext *d = (struct ctdb_traverse_start_ext *)data.dptr;
+ struct traverse_start_state *state;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
+
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
+ return -1;
+ }
+
+ if (data.dsize != sizeof(*d)) {
+ DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, d->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ if (ctdb_db->unhealthy_reason) {
+ if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+ DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ return -1;
+ }
+ DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
+ ctdb_db->db_name, ctdb_db->unhealthy_reason));
+ }
+
+ state = talloc(client, struct traverse_start_state);
+ if (state == NULL) {
+ return -1;
+ }
+
+ state->srcnode = srcnode;
+ state->reqid = d->reqid;
+ state->srvid = d->srvid;
+ state->db_id = d->db_id;
+ state->ctdb = ctdb;
+ state->withemptyrecords = d->withemptyrecords;
+ state->num_records = 0;
+
+ state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
+ if (state->h == NULL) {
+ talloc_free(state);
+ return -1;
+ }
+
+ talloc_set_destructor(state, ctdb_traverse_start_destructor);
+
+ return 0;
+}
+
+/**
+ * start a traverse_all - called as a control from a client.
+ */
+int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb,
+ TDB_DATA data,
+ TDB_DATA *outdata,
+ uint32_t srcnode,
+ uint32_t client_id)
+{
+ struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+ struct ctdb_traverse_start_ext d2;
+ TDB_DATA data2;
+
+ ZERO_STRUCT(d2);
+ d2.db_id = d->db_id;
+ d2.reqid = d->reqid;
+ d2.srvid = d->srvid;
+ d2.withemptyrecords = false;
+
+ data2.dsize = sizeof(d2);
+ data2.dptr = (uint8_t *)&d2;
+
+ return ctdb_control_traverse_start_ext(ctdb, data2, outdata, srcnode, client_id);
+}