/* Fake CTDB server for testing Copyright (C) Amitay Isaacs 2016 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . */ #include "replace.h" #include "system/network.h" #include "system/time.h" #include "system/filesys.h" #include #include #include #include #include "lib/util/dlinklist.h" #include "lib/util/tevent_unix.h" #include "lib/util/debug.h" #include "lib/util/samba_util.h" #include "lib/async_req/async_sock.h" #include "protocol/protocol.h" #include "protocol/protocol_api.h" #include "protocol/protocol_util.h" #include "protocol/protocol_private.h" #include "common/comm.h" #include "common/logging.h" #include "common/tunable.h" #include "common/srvid.h" #include "common/system.h" #include "ipalloc_read_known_ips.h" #define CTDB_PORT 4379 /* A fake flag that is only supported by some functions */ #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000 struct node { ctdb_sock_addr addr; uint32_t pnn; uint32_t flags; uint32_t capabilities; bool recovery_disabled; void *recovery_substate; }; struct node_map { uint32_t num_nodes; struct node *node; uint32_t pnn; uint32_t recmaster; }; struct interface { const char *name; bool link_up; uint32_t references; }; struct interface_map { int num; struct interface *iface; }; struct vnn_map { uint32_t recmode; uint32_t generation; uint32_t size; uint32_t *map; }; struct database { struct database *prev, *next; const char *name; const char *path; struct tdb_context *tdb; uint32_t id; uint8_t flags; uint64_t seq_num; }; struct database_map { struct database *db; const char *dbdir; }; struct fake_control_failure { struct fake_control_failure *prev, *next; enum ctdb_controls opcode; uint32_t pnn; const char *error; const char *comment; }; struct ctdb_client { struct ctdb_client *prev, *next; struct ctdbd_context *ctdb; pid_t pid; void *state; }; struct ctdbd_context { struct node_map *node_map; struct interface_map *iface_map; struct vnn_map *vnn_map; struct database_map *db_map; struct srvid_context *srv; int num_clients; struct timeval start_time; struct timeval recovery_start_time; struct timeval recovery_end_time; bool takeover_disabled; int log_level; enum ctdb_runstate runstate; struct ctdb_tunable_list tun_list; char *reclock; struct ctdb_public_ip_list *known_ips; struct fake_control_failure *control_failures; struct ctdb_client *client_list; }; /* * Parse routines */ static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx) { struct node_map *node_map; node_map = talloc_zero(mem_ctx, struct node_map); if (node_map == NULL) { return NULL; } node_map->pnn = CTDB_UNKNOWN_PNN; node_map->recmaster = CTDB_UNKNOWN_PNN; return node_map; } /* Read a nodemap from stdin. Each line looks like: * [RECMASTER] [CURRENT] [CAPABILITIES] * EOF or a blank line terminates input. * * By default, capabilities for each node are * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER. These 2 * capabilities can be faked off by adding, for example, * -CTDB_CAP_RECMASTER. */ static bool nodemap_parse(struct node_map *node_map) { char line[1024]; while ((fgets(line, sizeof(line), stdin) != NULL)) { uint32_t pnn, flags, capabilities; char *tok, *t; char *ip; ctdb_sock_addr saddr; struct node *node; int ret; if (line[0] == '\n') { break; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } /* Get PNN */ tok = strtok(line, " \t"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing PNN\n", line); continue; } pnn = (uint32_t)strtoul(tok, NULL, 0); /* Get IP */ tok = strtok(NULL, " \t"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing IP\n", line); continue; } ret = ctdb_sock_addr_from_string(tok, &saddr, false); if (ret != 0) { fprintf(stderr, "bad line (%s) - invalid IP\n", line); continue; } ctdb_sock_addr_set_port(&saddr, CTDB_PORT); ip = talloc_strdup(node_map, tok); if (ip == NULL) { goto fail; } /* Get flags */ tok = strtok(NULL, " \t"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing flags\n", line); continue; } flags = (uint32_t)strtoul(tok, NULL, 0); /* Handle deleted nodes */ if (flags & NODE_FLAGS_DELETED) { talloc_free(ip); ip = talloc_strdup(node_map, "0.0.0.0"); if (ip == NULL) { goto fail; } } capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER; tok = strtok(NULL, " \t"); while (tok != NULL) { if (strcmp(tok, "CURRENT") == 0) { node_map->pnn = pnn; } else if (strcmp(tok, "RECMASTER") == 0) { node_map->recmaster = pnn; } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) { capabilities &= ~CTDB_CAP_RECMASTER; } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) { capabilities &= ~CTDB_CAP_LMASTER; } else if (strcmp(tok, "TIMEOUT") == 0) { /* This can be done with just a flag * value but it is probably clearer * and less error-prone to fake this * with an explicit token */ flags |= NODE_FLAGS_FAKE_TIMEOUT; } tok = strtok(NULL, " \t"); } node_map->node = talloc_realloc(node_map, node_map->node, struct node, node_map->num_nodes + 1); if (node_map->node == NULL) { goto fail; } node = &node_map->node[node_map->num_nodes]; ret = ctdb_sock_addr_from_string(ip, &node->addr, false); if (ret != 0) { fprintf(stderr, "bad line (%s) - invalid IP\n", line); continue; } ctdb_sock_addr_set_port(&node->addr, CTDB_PORT); node->pnn = pnn; node->flags = flags; node->capabilities = capabilities; node->recovery_disabled = false; node->recovery_substate = NULL; node_map->num_nodes += 1; } if (node_map->num_nodes == 0) { goto fail; } DEBUG(DEBUG_INFO, ("Parsing nodemap done\n")); return true; fail: DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n")); return false; } /* Append a node to a node map with given address and flags */ static bool node_map_add(struct ctdb_node_map *nodemap, const char *nstr, uint32_t flags) { ctdb_sock_addr addr; uint32_t num; struct ctdb_node_and_flags *n; int ret; ret = ctdb_sock_addr_from_string(nstr, &addr, false); if (ret != 0) { fprintf(stderr, "Invalid IP address %s\n", nstr); return false; } ctdb_sock_addr_set_port(&addr, CTDB_PORT); num = nodemap->num; nodemap->node = talloc_realloc(nodemap, nodemap->node, struct ctdb_node_and_flags, num+1); if (nodemap->node == NULL) { return false; } n = &nodemap->node[num]; n->addr = addr; n->pnn = num; n->flags = flags; nodemap->num = num+1; return true; } /* Read a nodes file into a node map */ static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx, const char *nlist) { char **lines; int nlines; int i; struct ctdb_node_map *nodemap; nodemap = talloc_zero(mem_ctx, struct ctdb_node_map); if (nodemap == NULL) { return NULL; } lines = file_lines_load(nlist, &nlines, 0, mem_ctx); if (lines == NULL) { return NULL; } while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) { nlines--; } for (i=0; i 1) && ((node[len-1] == ' ') || (node[len-1] == '\t'))) { node[len-1] = '\0'; len--; } if (len == 0) { continue; } if (*node == '#') { /* A "deleted" node is a node that is commented out in the nodes file. This is used instead of removing a line, which would cause subsequent nodes to change their PNN. */ flags = NODE_FLAGS_DELETED; node = discard_const("0.0.0.0"); } else { flags = 0; } if (! node_map_add(nodemap, node, flags)) { talloc_free(lines); TALLOC_FREE(nodemap); return NULL; } } talloc_free(lines); return nodemap; } static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn) { struct ctdb_node_map *nodemap; char nodes_list[PATH_MAX]; const char *ctdb_base; int num; ctdb_base = getenv("CTDB_BASE"); if (ctdb_base == NULL) { D_ERR("CTDB_BASE is not set\n"); return NULL; } /* read optional node-specific nodes file */ num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes.%d", ctdb_base, pnn); if (num == sizeof(nodes_list)) { D_ERR("nodes file path too long\n"); return NULL; } nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list); if (nodemap != NULL) { /* Fake a load failure for an empty nodemap */ if (nodemap->num == 0) { talloc_free(nodemap); D_ERR("Failed to read nodes file \"%s\"\n", nodes_list); return NULL; } return nodemap; } /* read normal nodes file */ num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base); if (num == sizeof(nodes_list)) { D_ERR("nodes file path too long\n"); return NULL; } nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list); if (nodemap != NULL) { return nodemap; } DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list); return NULL; } static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx) { struct interface_map *iface_map; iface_map = talloc_zero(mem_ctx, struct interface_map); if (iface_map == NULL) { return NULL; } return iface_map; } /* Read interfaces information. Same format as "ctdb ifaces -Y" * output: * :Name:LinkStatus:References: * :eth2:1:4294967294 * :eth1:1:4294967292 */ static bool interfaces_parse(struct interface_map *iface_map) { char line[1024]; while ((fgets(line, sizeof(line), stdin) != NULL)) { uint16_t link_state; uint32_t references; char *tok, *t, *name; struct interface *iface; if (line[0] == '\n') { break; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } if (strcmp(line, ":Name:LinkStatus:References:") == 0) { continue; } /* Leading colon... */ // tok = strtok(line, ":"); /* name */ tok = strtok(line, ":"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing name\n", line); continue; } name = tok; /* link_state */ tok = strtok(NULL, ":"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing link state\n", line); continue; } link_state = (uint16_t)strtoul(tok, NULL, 0); /* references... */ tok = strtok(NULL, ":"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing references\n", line); continue; } references = (uint32_t)strtoul(tok, NULL, 0); iface_map->iface = talloc_realloc(iface_map, iface_map->iface, struct interface, iface_map->num + 1); if (iface_map->iface == NULL) { goto fail; } iface = &iface_map->iface[iface_map->num]; iface->name = talloc_strdup(iface_map, name); if (iface->name == NULL) { goto fail; } iface->link_up = link_state; iface->references = references; iface_map->num += 1; } if (iface_map->num == 0) { goto fail; } DEBUG(DEBUG_INFO, ("Parsing interfaces done\n")); return true; fail: fprintf(stderr, "Parsing interfaces failed\n"); return false; } static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx) { struct vnn_map *vnn_map; vnn_map = talloc_zero(mem_ctx, struct vnn_map); if (vnn_map == NULL) { fprintf(stderr, "Memory error\n"); return NULL; } vnn_map->recmode = CTDB_RECOVERY_ACTIVE; vnn_map->generation = INVALID_GENERATION; return vnn_map; } /* Read vnn map. * output: * * * * ... */ static bool vnnmap_parse(struct vnn_map *vnn_map) { char line[1024]; while (fgets(line, sizeof(line), stdin) != NULL) { uint32_t n; char *t; if (line[0] == '\n') { break; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } n = (uint32_t) strtol(line, NULL, 0); /* generation */ if (vnn_map->generation == INVALID_GENERATION) { vnn_map->generation = n; continue; } vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t, vnn_map->size + 1); if (vnn_map->map == NULL) { fprintf(stderr, "Memory error\n"); goto fail; } vnn_map->map[vnn_map->size] = n; vnn_map->size += 1; } if (vnn_map->size == 0) { goto fail; } DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n")); return true; fail: fprintf(stderr, "Parsing vnnmap failed\n"); return false; } static bool reclock_parse(struct ctdbd_context *ctdb) { char line[1024]; char *t; if (fgets(line, sizeof(line), stdin) == NULL) { goto fail; } if (line[0] == '\n') { goto fail; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } ctdb->reclock = talloc_strdup(ctdb, line); if (ctdb->reclock == NULL) { goto fail; } /* Swallow possible blank line following section. Picky * compiler settings don't allow the return value to be * ignored, so make the compiler happy. */ if (fgets(line, sizeof(line), stdin) == NULL) { ; } DEBUG(DEBUG_INFO, ("Parsing reclock done\n")); return true; fail: fprintf(stderr, "Parsing reclock failed\n"); return false; } static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx, const char *dbdir) { struct database_map *db_map; db_map = talloc_zero(mem_ctx, struct database_map); if (db_map == NULL) { return NULL; } db_map->dbdir = talloc_strdup(db_map, dbdir); if (db_map->dbdir == NULL) { talloc_free(db_map); return NULL; } return db_map; } /* Read a database map from stdin. Each line looks like: * [FLAGS] [SEQ_NUM] * EOF or a blank line terminates input. * * By default, flags and seq_num are 0 */ static bool dbmap_parse(struct database_map *db_map) { char line[1024]; while ((fgets(line, sizeof(line), stdin) != NULL)) { uint32_t id; uint8_t flags = 0; uint32_t seq_num = 0; char *tok, *t; char *name; struct database *db; if (line[0] == '\n') { break; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } /* Get ID */ tok = strtok(line, " \t"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing ID\n", line); continue; } id = (uint32_t)strtoul(tok, NULL, 0); /* Get NAME */ tok = strtok(NULL, " \t"); if (tok == NULL) { fprintf(stderr, "bad line (%s) - missing NAME\n", line); continue; } name = talloc_strdup(db_map, tok); if (name == NULL) { goto fail; } /* Get flags */ tok = strtok(NULL, " \t"); while (tok != NULL) { if (strcmp(tok, "PERSISTENT") == 0) { flags |= CTDB_DB_FLAGS_PERSISTENT; } else if (strcmp(tok, "STICKY") == 0) { flags |= CTDB_DB_FLAGS_STICKY; } else if (strcmp(tok, "READONLY") == 0) { flags |= CTDB_DB_FLAGS_READONLY; } else if (strcmp(tok, "REPLICATED") == 0) { flags |= CTDB_DB_FLAGS_REPLICATED; } else if (tok[0] >= '0'&& tok[0] <= '9') { uint8_t nv = CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED; if ((flags & nv) == 0) { fprintf(stderr, "seq_num for volatile db\n"); goto fail; } seq_num = (uint64_t)strtoull(tok, NULL, 0); } tok = strtok(NULL, " \t"); } db = talloc_zero(db_map, struct database); if (db == NULL) { goto fail; } db->id = id; db->name = talloc_steal(db, name); db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name); if (db->path == NULL) { talloc_free(db); goto fail; } db->flags = flags; db->seq_num = seq_num; DLIST_ADD_END(db_map->db, db); } if (db_map->db == NULL) { goto fail; } DEBUG(DEBUG_INFO, ("Parsing dbmap done\n")); return true; fail: DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n")); return false; } static struct database *database_find(struct database_map *db_map, uint32_t db_id) { struct database *db; for (db = db_map->db; db != NULL; db = db->next) { if (db->id == db_id) { return db; } } return NULL; } static int database_count(struct database_map *db_map) { struct database *db; int count = 0; for (db = db_map->db; db != NULL; db = db->next) { count += 1; } return count; } static int database_flags(uint8_t db_flags) { int tdb_flags = 0; if (db_flags & CTDB_DB_FLAGS_PERSISTENT) { tdb_flags = TDB_DEFAULT; } else { /* volatile and replicated use the same flags */ tdb_flags = TDB_NOSYNC | TDB_CLEAR_IF_FIRST | TDB_INCOMPATIBLE_HASH; } tdb_flags |= TDB_DISALLOW_NESTING; return tdb_flags; } static struct database *database_new(struct database_map *db_map, const char *name, uint8_t flags) { struct database *db; TDB_DATA key; int tdb_flags; db = talloc_zero(db_map, struct database); if (db == NULL) { return NULL; } db->name = talloc_strdup(db, name); if (db->name == NULL) { goto fail; } db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name); if (db->path == NULL) { goto fail; } key.dsize = strlen(db->name) + 1; key.dptr = discard_const(db->name); db->id = tdb_jenkins_hash(&key); db->flags = flags; tdb_flags = database_flags(flags); db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644); if (db->tdb == NULL) { DBG_ERR("tdb_open\n"); goto fail; } DLIST_ADD_END(db_map->db, db); return db; fail: DBG_ERR("Memory error\n"); talloc_free(db); return NULL; } static int ltdb_store(struct database *db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data) { int ret; bool db_volatile = true; bool keep = false; if (db->tdb == NULL) { return EINVAL; } if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) || (db->flags & CTDB_DB_FLAGS_REPLICATED)) { db_volatile = false; } if (data.dsize > 0) { keep = true; } else { if (db_volatile && header->rsn == 0) { keep = true; } } if (keep) { TDB_DATA rec[2]; rec[0].dsize = ctdb_ltdb_header_len(header); rec[0].dptr = (uint8_t *)header; rec[1].dsize = data.dsize; rec[1].dptr = data.dptr; ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE); } else { if (header->rsn > 0) { ret = tdb_delete(db->tdb, key); } else { ret = 0; } } return ret; } static int ltdb_fetch(struct database *db, TDB_DATA key, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, TDB_DATA *data) { TDB_DATA rec; size_t np; int ret; if (db->tdb == NULL) { return EINVAL; } rec = tdb_fetch(db->tdb, key); ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np); if (ret != 0) { if (rec.dptr != NULL) { free(rec.dptr); } *header = (struct ctdb_ltdb_header) { .rsn = 0, .dmaster = 0, .flags = 0, }; ret = ltdb_store(db, key, header, tdb_null); if (ret != 0) { return ret; } *data = tdb_null; return 0; } data->dsize = rec.dsize - ctdb_ltdb_header_len(header); data->dptr = talloc_memdup(mem_ctx, rec.dptr + ctdb_ltdb_header_len(header), data->dsize); free(rec.dptr); if (data->dptr == NULL) { return ENOMEM; } return 0; } static int database_seqnum(struct database *db, uint64_t *seqnum) { const char *keyname = CTDB_DB_SEQNUM_KEY; TDB_DATA key, data; struct ctdb_ltdb_header header; size_t np; int ret; if (db->tdb == NULL) { *seqnum = db->seq_num; return 0; } key.dptr = discard_const(keyname); key.dsize = strlen(keyname) + 1; ret = ltdb_fetch(db, key, &header, db, &data); if (ret != 0) { return ret; } if (data.dsize == 0) { *seqnum = 0; return 0; } ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np); talloc_free(data.dptr); if (ret != 0) { *seqnum = 0; } return ret; } static int ltdb_transaction_update(uint32_t reqid, struct ctdb_ltdb_header *no_header, TDB_DATA key, TDB_DATA data, void *private_data) { struct database *db = (struct database *)private_data; TALLOC_CTX *tmp_ctx = talloc_new(db); struct ctdb_ltdb_header header = { 0 }, oldheader; TDB_DATA olddata; int ret; if (db->tdb == NULL) { return EINVAL; } ret = ctdb_ltdb_header_extract(&data, &header); if (ret != 0) { return ret; } ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata); if (ret != 0) { return ret; } if (olddata.dsize > 0) { if (oldheader.rsn > header.rsn || (oldheader.rsn == header.rsn && olddata.dsize != data.dsize)) { return -1; } } talloc_free(tmp_ctx); ret = ltdb_store(db, key, &header, data); return ret; } static int ltdb_transaction(struct database *db, struct ctdb_rec_buffer *recbuf) { int ret; if (db->tdb == NULL) { return EINVAL; } ret = tdb_transaction_start(db->tdb); if (ret == -1) { return ret; } ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db); if (ret != 0) { tdb_transaction_cancel(db->tdb); } ret = tdb_transaction_commit(db->tdb); return ret; } static bool public_ips_parse(struct ctdbd_context *ctdb, uint32_t numnodes) { bool status; if (numnodes == 0) { D_ERR("Must initialise nodemap before public IPs\n"); return false; } ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false); status = (ctdb->known_ips != NULL && ctdb->known_ips->num != 0); if (status) { D_INFO("Parsing public IPs done\n"); } else { D_INFO("Parsing public IPs failed\n"); } return status; } /* Read information about controls to fail. Format is: * {ERROR|TIMEOUT} */ static bool control_failures_parse(struct ctdbd_context *ctdb) { char line[1024]; while ((fgets(line, sizeof(line), stdin) != NULL)) { char *tok, *t; enum ctdb_controls opcode; uint32_t pnn; const char *error; const char *comment; struct fake_control_failure *failure = NULL; if (line[0] == '\n') { break; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } /* Get opcode */ tok = strtok(line, " \t"); if (tok == NULL) { D_ERR("bad line (%s) - missing opcode\n", line); continue; } opcode = (enum ctdb_controls)strtoul(tok, NULL, 0); /* Get PNN */ tok = strtok(NULL, " \t"); if (tok == NULL) { D_ERR("bad line (%s) - missing PNN\n", line); continue; } pnn = (uint32_t)strtoul(tok, NULL, 0); /* Get error */ tok = strtok(NULL, " \t"); if (tok == NULL) { D_ERR("bad line (%s) - missing errno\n", line); continue; } error = talloc_strdup(ctdb, tok); if (error == NULL) { goto fail; } if (strcmp(error, "ERROR") != 0 && strcmp(error, "TIMEOUT") != 0) { D_ERR("bad line (%s) " "- error must be \"ERROR\" or \"TIMEOUT\"\n", line); goto fail; } /* Get comment */ tok = strtok(NULL, "\n"); /* rest of line */ if (tok == NULL) { D_ERR("bad line (%s) - missing comment\n", line); continue; } comment = talloc_strdup(ctdb, tok); if (comment == NULL) { goto fail; } failure = talloc_zero(ctdb, struct fake_control_failure); if (failure == NULL) { goto fail; } failure->opcode = opcode; failure->pnn = pnn; failure->error = error; failure->comment = comment; DLIST_ADD(ctdb->control_failures, failure); } if (ctdb->control_failures == NULL) { goto fail; } D_INFO("Parsing fake control failures done\n"); return true; fail: D_INFO("Parsing fake control failures failed\n"); return false; } static bool runstate_parse(struct ctdbd_context *ctdb) { char line[1024]; char *t; if (fgets(line, sizeof(line), stdin) == NULL) { goto fail; } if (line[0] == '\n') { goto fail; } /* Get rid of pesky newline */ if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } ctdb->runstate = ctdb_runstate_from_string(line); if (ctdb->runstate == CTDB_RUNSTATE_UNKNOWN) { goto fail; } /* Swallow possible blank line following section. Picky * compiler settings don't allow the return value to be * ignored, so make the compiler happy. */ if (fgets(line, sizeof(line), stdin) == NULL) { ; } D_INFO("Parsing runstate done\n"); return true; fail: D_ERR("Parsing runstate failed\n"); return false; } /* * Manage clients */ static int ctdb_client_destructor(struct ctdb_client *client) { DLIST_REMOVE(client->ctdb->client_list, client); return 0; } static int client_add(struct ctdbd_context *ctdb, pid_t client_pid, void *client_state) { struct ctdb_client *client; client = talloc_zero(client_state, struct ctdb_client); if (client == NULL) { return ENOMEM; } client->ctdb = ctdb; client->pid = client_pid; client->state = client_state; DLIST_ADD(ctdb->client_list, client); talloc_set_destructor(client, ctdb_client_destructor); return 0; } static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid) { struct ctdb_client *client; for (client=ctdb->client_list; client != NULL; client=client->next) { if (client->pid == client_pid) { return client->state; } } return NULL; } /* * CTDB context setup */ static uint32_t new_generation(uint32_t old_generation) { uint32_t generation; while (1) { generation = random(); if (generation != INVALID_GENERATION && generation != old_generation) { break; } } return generation; } static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx, const char *dbdir) { struct ctdbd_context *ctdb; char line[1024]; bool status; int ret; ctdb = talloc_zero(mem_ctx, struct ctdbd_context); if (ctdb == NULL) { return NULL; } ctdb->node_map = nodemap_init(ctdb); if (ctdb->node_map == NULL) { goto fail; } ctdb->iface_map = interfaces_init(ctdb); if (ctdb->iface_map == NULL) { goto fail; } ctdb->vnn_map = vnnmap_init(ctdb); if (ctdb->vnn_map == NULL) { goto fail; } ctdb->db_map = dbmap_init(ctdb, dbdir); if (ctdb->db_map == NULL) { goto fail; } ret = srvid_init(ctdb, &ctdb->srv); if (ret != 0) { goto fail; } ctdb->runstate = CTDB_RUNSTATE_RUNNING; while (fgets(line, sizeof(line), stdin) != NULL) { char *t; if ((t = strchr(line, '\n')) != NULL) { *t = '\0'; } if (strcmp(line, "NODEMAP") == 0) { status = nodemap_parse(ctdb->node_map); } else if (strcmp(line, "IFACES") == 0) { status = interfaces_parse(ctdb->iface_map); } else if (strcmp(line, "VNNMAP") == 0) { status = vnnmap_parse(ctdb->vnn_map); } else if (strcmp(line, "DBMAP") == 0) { status = dbmap_parse(ctdb->db_map); } else if (strcmp(line, "PUBLICIPS") == 0) { status = public_ips_parse(ctdb, ctdb->node_map->num_nodes); } else if (strcmp(line, "RECLOCK") == 0) { status = reclock_parse(ctdb); } else if (strcmp(line, "CONTROLFAILS") == 0) { status = control_failures_parse(ctdb); } else if (strcmp(line, "RUNSTATE") == 0) { status = runstate_parse(ctdb); } else { fprintf(stderr, "Unknown line %s\n", line); status = false; } if (! status) { goto fail; } } ctdb->start_time = tevent_timeval_current(); ctdb->recovery_start_time = tevent_timeval_current(); ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL; if (ctdb->vnn_map->generation == INVALID_GENERATION) { ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation); } ctdb->recovery_end_time = tevent_timeval_current(); ctdb->log_level = DEBUG_ERR; ctdb_tunable_set_defaults(&ctdb->tun_list); return ctdb; fail: TALLOC_FREE(ctdb); return NULL; } static bool ctdbd_verify(struct ctdbd_context *ctdb) { struct node *node; unsigned int i; if (ctdb->node_map->num_nodes == 0) { return true; } /* Make sure all the nodes are in order */ for (i=0; inode_map->num_nodes; i++) { node = &ctdb->node_map->node[i]; if (node->pnn != i) { fprintf(stderr, "Expected node %u, found %u\n", i, node->pnn); return false; } } node = &ctdb->node_map->node[ctdb->node_map->pnn]; if (node->flags & NODE_FLAGS_DISCONNECTED) { DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n")); exit(0); } return true; } /* * Doing a recovery */ struct recover_state { struct tevent_context *ev; struct ctdbd_context *ctdb; }; static int recover_check(struct tevent_req *req); static void recover_wait_done(struct tevent_req *subreq); static void recover_done(struct tevent_req *subreq); static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_context *ctdb) { struct tevent_req *req; struct recover_state *state; int ret; req = tevent_req_create(mem_ctx, &state, struct recover_state); if (req == NULL) { return NULL; } state->ev = ev; state->ctdb = ctdb; ret = recover_check(req); if (ret != 0) { tevent_req_error(req, ret); return tevent_req_post(req, ev); } return req; } static int recover_check(struct tevent_req *req) { struct recover_state *state = tevent_req_data( req, struct recover_state); struct ctdbd_context *ctdb = state->ctdb; struct tevent_req *subreq; bool recovery_disabled; unsigned int i; recovery_disabled = false; for (i=0; inode_map->num_nodes; i++) { if (ctdb->node_map->node[i].recovery_disabled) { recovery_disabled = true; break; } } subreq = tevent_wakeup_send(state, state->ev, tevent_timeval_current_ofs(1, 0)); if (subreq == NULL) { return ENOMEM; } if (recovery_disabled) { tevent_req_set_callback(subreq, recover_wait_done, req); } else { ctdb->recovery_start_time = tevent_timeval_current(); tevent_req_set_callback(subreq, recover_done, req); } return 0; } static void recover_wait_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); int ret; bool status; status = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, EIO); return; } ret = recover_check(req); if (ret != 0) { tevent_req_error(req, ret); } } static void recover_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_state *state = tevent_req_data( req, struct recover_state); struct ctdbd_context *ctdb = state->ctdb; bool status; status = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, EIO); return; } ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL; ctdb->recovery_end_time = tevent_timeval_current(); ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation); tevent_req_done(req); } static bool recover_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } /* * Routines for ctdb_req_header */ static void header_fix_pnn(struct ctdb_req_header *header, struct ctdbd_context *ctdb) { if (header->srcnode == CTDB_CURRENT_NODE) { header->srcnode = ctdb->node_map->pnn; } if (header->destnode == CTDB_CURRENT_NODE) { header->destnode = ctdb->node_map->pnn; } } static struct ctdb_req_header header_reply_call( struct ctdb_req_header *header, struct ctdbd_context *ctdb) { struct ctdb_req_header reply_header; reply_header = (struct ctdb_req_header) { .ctdb_magic = CTDB_MAGIC, .ctdb_version = CTDB_PROTOCOL, .generation = ctdb->vnn_map->generation, .operation = CTDB_REPLY_CALL, .destnode = header->srcnode, .srcnode = header->destnode, .reqid = header->reqid, }; return reply_header; } static struct ctdb_req_header header_reply_control( struct ctdb_req_header *header, struct ctdbd_context *ctdb) { struct ctdb_req_header reply_header; reply_header = (struct ctdb_req_header) { .ctdb_magic = CTDB_MAGIC, .ctdb_version = CTDB_PROTOCOL, .generation = ctdb->vnn_map->generation, .operation = CTDB_REPLY_CONTROL, .destnode = header->srcnode, .srcnode = header->destnode, .reqid = header->reqid, }; return reply_header; } static struct ctdb_req_header header_reply_message( struct ctdb_req_header *header, struct ctdbd_context *ctdb) { struct ctdb_req_header reply_header; reply_header = (struct ctdb_req_header) { .ctdb_magic = CTDB_MAGIC, .ctdb_version = CTDB_PROTOCOL, .generation = ctdb->vnn_map->generation, .operation = CTDB_REQ_MESSAGE, .destnode = header->srcnode, .srcnode = header->destnode, .reqid = 0, }; return reply_header; } /* * Client state */ struct client_state { struct tevent_context *ev; int fd; struct ctdbd_context *ctdb; int pnn; pid_t pid; struct comm_context *comm; struct srvid_register_state *rstate; int status; }; /* * Send replies to call, controls and messages */ static void client_reply_done(struct tevent_req *subreq); static void client_send_call(struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_reply_call *reply) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct tevent_req *subreq; struct ctdb_req_header reply_header; uint8_t *buf; size_t datalen, buflen; int ret; reply_header = header_reply_call(header, ctdb); datalen = ctdb_reply_call_len(&reply_header, reply); ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); if (ret != 0) { tevent_req_error(req, ret); return; } ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen); if (ret != 0) { tevent_req_error(req, ret); return; } subreq = comm_write_send(state, state->ev, state->comm, buf, buflen); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, client_reply_done, req); talloc_steal(subreq, buf); } static void client_send_message(struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_message_data *message) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct tevent_req *subreq; struct ctdb_req_header reply_header; uint8_t *buf; size_t datalen, buflen; int ret; reply_header = header_reply_message(header, ctdb); datalen = ctdb_req_message_data_len(&reply_header, message); ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); if (ret != 0) { tevent_req_error(req, ret); return; } ret = ctdb_req_message_data_push(&reply_header, message, buf, &buflen); if (ret != 0) { tevent_req_error(req, ret); return; } DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid)); subreq = comm_write_send(state, state->ev, state->comm, buf, buflen); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, client_reply_done, req); talloc_steal(subreq, buf); } static void client_send_control(struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_reply_control *reply) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct tevent_req *subreq; struct ctdb_req_header reply_header; uint8_t *buf; size_t datalen, buflen; int ret; reply_header = header_reply_control(header, ctdb); datalen = ctdb_reply_control_len(&reply_header, reply); ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); if (ret != 0) { tevent_req_error(req, ret); return; } ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen); if (ret != 0) { tevent_req_error(req, ret); return; } DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode)); subreq = comm_write_send(state, state->ev, state->comm, buf, buflen); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, client_reply_done, req); talloc_steal(subreq, buf); } static void client_reply_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); int ret; bool status; status = comm_write_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); } } /* * Handling protocol - controls */ static void control_process_exists(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct client_state *cstate; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; cstate = client_find(ctdb, request->rdata.data.pid); if (cstate == NULL) { reply.status = -1; reply.errmsg = "No client for PID"; } else { reply.status = kill(request->rdata.data.pid, 0); reply.errmsg = NULL; } client_send_control(req, header, &reply); } static void control_ping(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.status = ctdb->num_clients; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_getdbpath(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; } else { reply.rdata.data.db_path = talloc_strdup(mem_ctx, db->path); if (reply.rdata.data.db_path == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; } else { reply.status = 0; reply.errmsg = NULL; } } client_send_control(req, header, &reply); } static void control_getvnnmap(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_vnn_map *vnnmap; reply.rdata.opcode = request->opcode; vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map); if (vnnmap == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; } else { vnnmap->generation = ctdb->vnn_map->generation; vnnmap->size = ctdb->vnn_map->size; vnnmap->map = ctdb->vnn_map->map; reply.rdata.data.vnnmap = vnnmap; reply.status = 0; reply.errmsg = NULL; } client_send_control(req, header, &reply); } static void control_get_debug(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.rdata.data.loglevel = (uint32_t)ctdb->log_level; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_set_debug(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; ctdb->log_level = (int)request->rdata.data.loglevel; reply.rdata.opcode = request->opcode; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_dbmap(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_dbid_map *dbmap; struct database *db; unsigned int i; reply.rdata.opcode = request->opcode; dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map); if (dbmap == NULL) { goto fail; } dbmap->num = database_count(ctdb->db_map); dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num); if (dbmap->dbs == NULL) { goto fail; } db = ctdb->db_map->db; for (i = 0; i < dbmap->num; i++) { dbmap->dbs[i] = (struct ctdb_dbid) { .db_id = db->id, .flags = db->flags, }; db = db->next; } reply.rdata.data.dbmap = dbmap; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; reply.errmsg = "Memory error"; client_send_control(req, header, &reply); } static void control_get_recmode(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.status = ctdb->vnn_map->recmode; reply.errmsg = NULL; client_send_control(req, header, &reply); } struct set_recmode_state { struct tevent_req *req; struct ctdbd_context *ctdb; struct ctdb_req_header header; struct ctdb_reply_control reply; }; static void set_recmode_callback(struct tevent_req *subreq) { struct set_recmode_state *substate = tevent_req_callback_data( subreq, struct set_recmode_state); bool status; int ret; status = recover_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { substate->reply.status = ret; substate->reply.errmsg = "recovery failed"; } else { substate->reply.status = 0; substate->reply.errmsg = NULL; } client_send_control(substate->req, &substate->header, &substate->reply); talloc_free(substate); } static void control_set_recmode(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct tevent_req *subreq; struct ctdbd_context *ctdb = state->ctdb; struct set_recmode_state *substate; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) { reply.status = -1; reply.errmsg = "Client cannot set recmode to NORMAL"; goto fail; } substate = talloc_zero(ctdb, struct set_recmode_state); if (substate == NULL) { reply.status = -1; reply.errmsg = "Memory error"; goto fail; } substate->req = req; substate->ctdb = ctdb; substate->header = *header; substate->reply.rdata.opcode = request->opcode; subreq = recover_send(substate, state->ev, state->ctdb); if (subreq == NULL) { talloc_free(substate); goto fail; } tevent_req_set_callback(subreq, set_recmode_callback, substate); ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE; return; fail: client_send_control(req, header, &reply); } static void control_db_attach(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; for (db = ctdb->db_map->db; db != NULL; db = db->next) { if (strcmp(db->name, request->rdata.data.db_name) == 0) { goto done; } } db = database_new(ctdb->db_map, request->rdata.data.db_name, 0); if (db == NULL) { reply.status = -1; reply.errmsg = "Failed to attach database"; client_send_control(req, header, &reply); return; } done: reply.rdata.data.db_id = db->id; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void srvid_handler_done(struct tevent_req *subreq); static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data) { struct client_state *state = talloc_get_type_abort( private_data, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct tevent_req *subreq; struct ctdb_req_header request_header; struct ctdb_req_message_data message; uint8_t *buf; size_t datalen, buflen; int ret; request_header = (struct ctdb_req_header) { .ctdb_magic = CTDB_MAGIC, .ctdb_version = CTDB_PROTOCOL, .generation = ctdb->vnn_map->generation, .operation = CTDB_REQ_MESSAGE, .destnode = state->pnn, .srcnode = ctdb->node_map->recmaster, .reqid = 0, }; message = (struct ctdb_req_message_data) { .srvid = srvid, .data = data, }; datalen = ctdb_req_message_data_len(&request_header, &message); ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen); if (ret != 0) { return; } ret = ctdb_req_message_data_push(&request_header, &message, buf, &buflen); if (ret != 0) { talloc_free(buf); return; } subreq = comm_write_send(state, state->ev, state->comm, buf, buflen); if (subreq == NULL) { talloc_free(buf); return; } tevent_req_set_callback(subreq, srvid_handler_done, state); talloc_steal(subreq, buf); } static void srvid_handler_done(struct tevent_req *subreq) { struct client_state *state = tevent_req_callback_data( subreq, struct client_state); int ret; bool ok; ok = comm_write_recv(subreq, &ret); TALLOC_FREE(subreq); if (!ok) { DEBUG(DEBUG_ERR, ("Failed to dispatch message to client pid=%u, ret=%d\n", state->pid, ret)); } } static void control_register_srvid(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; int ret; reply.rdata.opcode = request->opcode; ret = srvid_register(ctdb->srv, state, request->srvid, srvid_handler, state); if (ret != 0) { reply.status = -1; reply.errmsg = "Memory error"; goto fail; } DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid)); reply.status = 0; reply.errmsg = NULL; fail: client_send_control(req, header, &reply); } static void control_deregister_srvid(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; int ret; reply.rdata.opcode = request->opcode; ret = srvid_deregister(ctdb->srv, request->srvid, state); if (ret != 0) { reply.status = -1; reply.errmsg = "srvid not registered"; goto fail; } DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid)); reply.status = 0; reply.errmsg = NULL; fail: client_send_control(req, header, &reply); } static void control_get_dbname(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; } else { reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name); if (reply.rdata.data.db_name == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; } else { reply.status = 0; reply.errmsg = NULL; } } client_send_control(req, header, &reply); } static void control_get_pid(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.status = getpid(); reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_pnn(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.status = header->destnode; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_shutdown(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *hdr, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); state->status = 99; } static void control_set_tunable(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; bool ret, obsolete; reply.rdata.opcode = request->opcode; reply.errmsg = NULL; ret = ctdb_tunable_set_value(&ctdb->tun_list, request->rdata.data.tunable->name, request->rdata.data.tunable->value, &obsolete); if (! ret) { reply.status = -1; } else if (obsolete) { reply.status = 1; } else { reply.status = 0; } client_send_control(req, header, &reply); } static void control_get_tunable(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; uint32_t value; bool ret; reply.rdata.opcode = request->opcode; reply.errmsg = NULL; ret = ctdb_tunable_get_value(&ctdb->tun_list, request->rdata.data.tun_var, &value); if (! ret) { reply.status = -1; } else { reply.rdata.data.tun_value = value; reply.status = 0; } client_send_control(req, header, &reply); } static void control_list_tunables(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct ctdb_reply_control reply; struct ctdb_var_list *var_list; reply.rdata.opcode = request->opcode; reply.errmsg = NULL; var_list = ctdb_tunable_names(mem_ctx); if (var_list == NULL) { reply.status = -1; } else { reply.rdata.data.tun_var_list = var_list; reply.status = 0; } client_send_control(req, header, &reply); } static void control_modify_flags(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_node_flag_change *change = request->rdata.data.flag_change; struct ctdb_reply_control reply; struct node *node; reply.rdata.opcode = request->opcode; if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) || (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) { DEBUG(DEBUG_INFO, ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n")); reply.status = EINVAL; reply.errmsg = "Failed to MODIFY_FLAGS"; client_send_control(req, header, &reply); return; } /* There's all sorts of broadcast weirdness here. Only change * the specified node, not the destination node of the * control. */ node = &ctdb->node_map->node[change->pnn]; if ((node->flags & change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 && (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) { DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode)); node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED; goto done; } if ((node->flags & change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 && (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) { DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode)); node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED; goto done; } DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode)); done: reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_all_tunables(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.rdata.data.tun_list = &ctdb->tun_list; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_db_attach_persistent(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; for (db = ctdb->db_map->db; db != NULL; db = db->next) { if (strcmp(db->name, request->rdata.data.db_name) == 0) { goto done; } } db = database_new(ctdb->db_map, request->rdata.data.db_name, CTDB_DB_FLAGS_PERSISTENT); if (db == NULL) { reply.status = -1; reply.errmsg = "Failed to attach database"; client_send_control(req, header, &reply); return; } done: reply.rdata.data.db_id = db->id; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_uptime(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_uptime *uptime;; reply.rdata.opcode = request->opcode; uptime = talloc_zero(mem_ctx, struct ctdb_uptime); if (uptime == NULL) { goto fail; } uptime->current_time = tevent_timeval_current(); uptime->ctdbd_start_time = ctdb->start_time; uptime->last_recovery_started = ctdb->recovery_start_time; uptime->last_recovery_finished = ctdb->recovery_end_time; reply.rdata.data.uptime = uptime; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; reply.errmsg = "Memory error"; client_send_control(req, header, &reply); } static void control_reload_nodes_file(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_node_map *nodemap; struct node_map *node_map = ctdb->node_map; unsigned int i; reply.rdata.opcode = request->opcode; nodemap = read_nodes_file(mem_ctx, header->destnode); if (nodemap == NULL) { goto fail; } for (i=0; inum; i++) { struct node *node; if (i < node_map->num_nodes && ctdb_sock_addr_same(&nodemap->node[i].addr, &node_map->node[i].addr)) { continue; } if (nodemap->node[i].flags & NODE_FLAGS_DELETED) { int ret; node = &node_map->node[i]; node->flags |= NODE_FLAGS_DELETED; ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr, false); if (ret != 0) { /* Can't happen, but Coverity... */ goto fail; } continue; } if (i < node_map->num_nodes && node_map->node[i].flags & NODE_FLAGS_DELETED) { node = &node_map->node[i]; node->flags &= ~NODE_FLAGS_DELETED; node->addr = nodemap->node[i].addr; continue; } node_map->node = talloc_realloc(node_map, node_map->node, struct node, node_map->num_nodes+1); if (node_map->node == NULL) { goto fail; } node = &node_map->node[node_map->num_nodes]; node->addr = nodemap->node[i].addr; node->pnn = nodemap->node[i].pnn; node->flags = 0; node->capabilities = CTDB_CAP_DEFAULT; node->recovery_disabled = false; node->recovery_substate = NULL; node_map->num_nodes += 1; } talloc_free(nodemap); reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; reply.errmsg = "Memory error"; client_send_control(req, header, &reply); } static void control_get_capabilities(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct node *node; uint32_t caps = 0; reply.rdata.opcode = request->opcode; node = &ctdb->node_map->node[header->destnode]; caps = node->capabilities; if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) { /* Don't send reply */ return; } reply.rdata.data.caps = caps; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_release_ip(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_public_ip *ip = request->rdata.data.pubip; struct ctdb_reply_control reply; struct ctdb_public_ip_list *ips = NULL; struct ctdb_public_ip *t = NULL; unsigned int i; reply.rdata.opcode = request->opcode; if (ctdb->known_ips == NULL) { D_INFO("RELEASE_IP %s - not a public IP\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false)); goto done; } ips = &ctdb->known_ips[header->destnode]; t = NULL; for (i = 0; i < ips->num; i++) { if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) { t = &ips->ip[i]; break; } } if (t == NULL) { D_INFO("RELEASE_IP %s - not a public IP\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false)); goto done; } if (t->pnn != header->destnode) { if (header->destnode == ip->pnn) { D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false), ip->pnn); reply.status = -1; reply.errmsg = "RELEASE_IP to TAKE_IP node"; client_send_control(req, header, &reply); return; } D_INFO("RELEASE_IP %s - to node %d - redundant\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false), ip->pnn); t->pnn = ip->pnn; } else { D_NOTICE("RELEASE_IP %s - to node %d\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false), ip->pnn); t->pnn = ip->pnn; } done: reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_takeover_ip(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_public_ip *ip = request->rdata.data.pubip; struct ctdb_reply_control reply; struct ctdb_public_ip_list *ips = NULL; struct ctdb_public_ip *t = NULL; unsigned int i; reply.rdata.opcode = request->opcode; if (ctdb->known_ips == NULL) { D_INFO("TAKEOVER_IP %s - not a public IP\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false)); goto done; } ips = &ctdb->known_ips[header->destnode]; t = NULL; for (i = 0; i < ips->num; i++) { if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) { t = &ips->ip[i]; break; } } if (t == NULL) { D_INFO("TAKEOVER_IP %s - not a public IP\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false)); goto done; } if (t->pnn == header->destnode) { D_INFO("TAKEOVER_IP %s - redundant\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false)); } else { D_NOTICE("TAKEOVER_IP %s\n", ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false)); t->pnn = ip->pnn; } done: reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_public_ips(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_public_ip_list *ips = NULL; reply.rdata.opcode = request->opcode; if (ctdb->known_ips == NULL) { /* No IPs defined so create a dummy empty struct and ship it */ ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);; if (ips == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; goto done; } goto ok; } ips = &ctdb->known_ips[header->destnode]; if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) { /* If runstate is not RUNNING or a node is then return * no available IPs. Don't worry about interface * states here - we're not faking down to that level. */ uint32_t flags = ctdb->node_map->node[header->destnode].flags; if (ctdb->runstate != CTDB_RUNSTATE_RUNNING || ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) { /* No available IPs: return dummy empty struct */ ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);; if (ips == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; goto done; } } } ok: reply.rdata.data.pubip_list = ips; reply.status = 0; reply.errmsg = NULL; done: client_send_control(req, header, &reply); } static void control_get_nodemap(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_node_map *nodemap; struct node *node; unsigned int i; reply.rdata.opcode = request->opcode; nodemap = talloc_zero(mem_ctx, struct ctdb_node_map); if (nodemap == NULL) { goto fail; } nodemap->num = ctdb->node_map->num_nodes; nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags, nodemap->num); if (nodemap->node == NULL) { goto fail; } for (i=0; inum; i++) { node = &ctdb->node_map->node[i]; nodemap->node[i] = (struct ctdb_node_and_flags) { .pnn = node->pnn, .flags = node->flags, .addr = node->addr, }; } reply.rdata.data.nodemap = nodemap; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; reply.errmsg = "Memory error"; client_send_control(req, header, &reply); } static void control_get_reclock_file(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; if (ctdb->reclock != NULL) { reply.rdata.data.reclock_file = talloc_strdup(mem_ctx, ctdb->reclock); if (reply.rdata.data.reclock_file == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; goto done; } } else { reply.rdata.data.reclock_file = NULL; } reply.status = 0; reply.errmsg = NULL; done: client_send_control(req, header, &reply); } static void control_stop_node(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; DEBUG(DEBUG_INFO, ("Stopping node\n")); ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; } static void control_continue_node(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; DEBUG(DEBUG_INFO, ("Continue node\n")); ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; } static void set_ban_state_callback(struct tevent_req *subreq) { struct node *node = tevent_req_callback_data( subreq, struct node); bool status; status = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n")); } node->flags &= ~NODE_FLAGS_BANNED; } static void control_set_ban_state(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct tevent_req *subreq; struct ctdbd_context *ctdb = state->ctdb; struct ctdb_ban_state *ban = request->rdata.data.ban_state; struct ctdb_reply_control reply; struct node *node; reply.rdata.opcode = request->opcode; if (ban->pnn != header->destnode) { DEBUG(DEBUG_INFO, ("SET_BAN_STATE control for PNN %d rejected\n", ban->pnn)); reply.status = EINVAL; goto fail; } node = &ctdb->node_map->node[header->destnode]; if (ban->time == 0) { DEBUG(DEBUG_INFO,("Unbanning this node\n")); node->flags &= ~NODE_FLAGS_BANNED; goto done; } subreq = tevent_wakeup_send(ctdb->node_map, state->ev, tevent_timeval_current_ofs( ban->time, 0)); if (subreq == NULL) { reply.status = ENOMEM; goto fail; } tevent_req_set_callback(subreq, set_ban_state_callback, node); DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time)); node->flags |= NODE_FLAGS_BANNED; ctdb->vnn_map->generation = INVALID_GENERATION; done: reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.errmsg = "Failed to ban node"; } static void control_trans3_commit(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; int ret; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id); if (db == NULL) { reply.status = -1; reply.errmsg = "Unknown database"; client_send_control(req, header, &reply); return; } if (! (db->flags & (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) { reply.status = -1; reply.errmsg = "Transactions on volatile database"; client_send_control(req, header, &reply); return; } ret = ltdb_transaction(db, request->rdata.data.recbuf); if (ret != 0) { reply.status = -1; reply.errmsg = "Transaction failed"; client_send_control(req, header, &reply); return; } reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_db_seqnum(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; int ret; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; } else { uint64_t seqnum; ret = database_seqnum(db, &seqnum); if (ret == 0) { reply.rdata.data.seqnum = seqnum; reply.status = 0; reply.errmsg = NULL; } else { reply.status = ret; reply.errmsg = "Failed to get seqnum"; } } client_send_control(req, header, &reply); } static void control_db_get_health(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; } else { reply.rdata.data.reason = NULL; reply.status = 0; reply.errmsg = NULL; } client_send_control(req, header, &reply); } static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx, struct ctdbd_context *ctdb) { struct ctdb_iface_list *iface_list; struct interface *iface; unsigned int i; iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list); if (iface_list == NULL) { goto done; } iface_list->num = ctdb->iface_map->num; iface_list->iface = talloc_array(iface_list, struct ctdb_iface, iface_list->num); if (iface_list->iface == NULL) { TALLOC_FREE(iface_list); goto done; } for (i=0; inum; i++) { iface = &ctdb->iface_map->iface[i]; iface_list->iface[i] = (struct ctdb_iface) { .link_state = iface->link_up, .references = iface->references, }; strlcpy(iface_list->iface[i].name, iface->name, sizeof(iface_list->iface[i].name)); } done: return iface_list; } static void control_get_public_ip_info(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; ctdb_sock_addr *addr = request->rdata.data.addr; struct ctdb_public_ip_list *known = NULL; struct ctdb_public_ip_info *info = NULL; unsigned i; reply.rdata.opcode = request->opcode; info = talloc_zero(mem_ctx, struct ctdb_public_ip_info); if (info == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; goto done; } reply.rdata.data.ipinfo = info; if (ctdb->known_ips != NULL) { known = &ctdb->known_ips[header->destnode]; } else { /* No IPs defined so create a dummy empty struct and * fall through. The given IP won't be matched * below... */ known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);; if (known == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; goto done; } } for (i = 0; i < known->num; i++) { if (ctdb_sock_addr_same_ip(&known->ip[i].addr, addr)) { break; } } if (i == known->num) { D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n", ctdb_sock_addr_to_string(mem_ctx, addr, false)); reply.status = -1; reply.errmsg = "Unknown address"; goto done; } info->ip = known->ip[i]; /* The fake PUBLICIPS stanza and resulting known_ips data * don't know anything about interfaces, so completely fake * this. */ info->active_idx = 0; info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb); if (info->ifaces == NULL) { reply.status = ENOMEM; reply.errmsg = "Memory error"; goto done; } reply.status = 0; reply.errmsg = NULL; done: client_send_control(req, header, &reply); } static void control_get_ifaces(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_iface_list *iface_list; reply.rdata.opcode = request->opcode; iface_list = get_ctdb_iface_list(mem_ctx, ctdb); if (iface_list == NULL) { goto fail; } reply.rdata.data.iface_list = iface_list; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; reply.errmsg = "Memory error"; client_send_control(req, header, &reply); } static void control_set_iface_link_state(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct ctdb_iface *in_iface; struct interface *iface = NULL; bool link_up = false; int i; reply.rdata.opcode = request->opcode; in_iface = request->rdata.data.iface; if (in_iface->name[CTDB_IFACE_SIZE] != '\0') { reply.errmsg = "interface name not terminated"; goto fail; } switch (in_iface->link_state) { case 0: link_up = false; break; case 1: link_up = true; break; default: reply.errmsg = "invalid link state"; goto fail; } if (in_iface->references != 0) { reply.errmsg = "references should be 0"; goto fail; } for (i=0; iiface_map->num; i++) { if (strcmp(ctdb->iface_map->iface[i].name, in_iface->name) == 0) { iface = &ctdb->iface_map->iface[i]; break; } } if (iface == NULL) { reply.errmsg = "interface not found"; goto fail; } iface->link_up = link_up; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; client_send_control(req, header, &reply); } static void control_set_db_readonly(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; goto done; } if (db->flags & CTDB_DB_FLAGS_PERSISTENT) { reply.status = EINVAL; reply.errmsg = "Can not set READONLY on persistent db"; goto done; } db->flags |= CTDB_DB_FLAGS_READONLY; reply.status = 0; reply.errmsg = NULL; done: client_send_control(req, header, &reply); } struct traverse_start_ext_state { struct tevent_req *req; struct ctdb_req_header *header; uint32_t reqid; uint64_t srvid; bool withemptyrecords; int status; }; static int traverse_start_ext_handler(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data) { struct traverse_start_ext_state *state = (struct traverse_start_ext_state *)private_data; struct ctdb_rec_data rec; struct ctdb_req_message_data message; size_t np; if (data.dsize < sizeof(struct ctdb_ltdb_header)) { return 0; } if ((data.dsize == sizeof(struct ctdb_ltdb_header)) && (!state->withemptyrecords)) { return 0; } rec = (struct ctdb_rec_data) { .reqid = state->reqid, .header = NULL, .key = key, .data = data, }; message.srvid = state->srvid; message.data.dsize = ctdb_rec_data_len(&rec); message.data.dptr = talloc_size(state->req, message.data.dsize); if (message.data.dptr == NULL) { state->status = ENOMEM; return 1; } ctdb_rec_data_push(&rec, message.data.dptr, &np); client_send_message(state->req, state->header, &message); talloc_free(message.data.dptr); return 0; } static void control_traverse_start_ext(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; struct ctdb_traverse_start_ext *ext; struct traverse_start_ext_state t_state; struct ctdb_rec_data rec; struct ctdb_req_message_data message; uint8_t buffer[32]; size_t np; int ret; reply.rdata.opcode = request->opcode; ext = request->rdata.data.traverse_start_ext; db = database_find(ctdb->db_map, ext->db_id); if (db == NULL) { reply.status = -1; reply.errmsg = "Unknown database"; client_send_control(req, header, &reply); return; } t_state = (struct traverse_start_ext_state) { .req = req, .header = header, .reqid = ext->reqid, .srvid = ext->srvid, .withemptyrecords = ext->withemptyrecords, }; ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state); DEBUG(DEBUG_INFO, ("traversed %d records\n", ret)); if (t_state.status != 0) { reply.status = -1; reply.errmsg = "Memory error"; client_send_control(req, header, &reply); } reply.status = 0; client_send_control(req, header, &reply); rec = (struct ctdb_rec_data) { .reqid = ext->reqid, .header = NULL, .key = tdb_null, .data = tdb_null, }; message.srvid = ext->srvid; message.data.dsize = ctdb_rec_data_len(&rec); ctdb_rec_data_push(&rec, buffer, &np); message.data.dptr = buffer; client_send_message(req, header, &message); } static void control_set_db_sticky(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; goto done; } if (db->flags & CTDB_DB_FLAGS_PERSISTENT) { reply.status = EINVAL; reply.errmsg = "Can not set STICKY on persistent db"; goto done; } db->flags |= CTDB_DB_FLAGS_STICKY; reply.status = 0; reply.errmsg = NULL; done: client_send_control(req, header, &reply); } static void control_ipreallocated(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct ctdb_reply_control reply; /* Always succeed */ reply.rdata.opcode = request->opcode; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_runstate(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; reply.rdata.data.runstate = ctdb->runstate; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_get_nodes_file(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct ctdb_reply_control reply; struct ctdb_node_map *nodemap; reply.rdata.opcode = request->opcode; nodemap = read_nodes_file(mem_ctx, header->destnode); if (nodemap == NULL) { goto fail; } reply.rdata.data.nodemap = nodemap; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; fail: reply.status = -1; reply.errmsg = "Failed to read nodes file"; client_send_control(req, header, &reply); } static void control_db_open_flags(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; db = database_find(ctdb->db_map, request->rdata.data.db_id); if (db == NULL) { reply.status = ENOENT; reply.errmsg = "Database not found"; } else { reply.rdata.data.tdb_flags = database_flags(db->flags); reply.status = 0; reply.errmsg = NULL; } client_send_control(req, header, &reply); } static void control_db_attach_replicated(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct database *db; reply.rdata.opcode = request->opcode; for (db = ctdb->db_map->db; db != NULL; db = db->next) { if (strcmp(db->name, request->rdata.data.db_name) == 0) { goto done; } } db = database_new(ctdb->db_map, request->rdata.data.db_name, CTDB_DB_FLAGS_REPLICATED); if (db == NULL) { reply.status = -1; reply.errmsg = "Failed to attach database"; client_send_control(req, header, &reply); return; } done: reply.rdata.data.db_id = db->id; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); } static void control_check_pid_srvid(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_client *client; struct client_state *cstate; struct ctdb_reply_control reply; bool pid_found, srvid_found; int ret; reply.rdata.opcode = request->opcode; pid_found = false; srvid_found = false; for (client=ctdb->client_list; client != NULL; client=client->next) { if (client->pid == request->rdata.data.pid_srvid->pid) { pid_found = true; cstate = (struct client_state *)client->state; ret = srvid_exists(ctdb->srv, request->rdata.data.pid_srvid->srvid, cstate); if (ret == 0) { srvid_found = true; ret = kill(cstate->pid, 0); if (ret != 0) { reply.status = ret; reply.errmsg = strerror(errno); } else { reply.status = 0; reply.errmsg = NULL; } } } } if (! pid_found) { reply.status = -1; reply.errmsg = "No client for PID"; } else if (! srvid_found) { reply.status = -1; reply.errmsg = "No client for PID and SRVID"; } client_send_control(req, header, &reply); } static void control_disable_node(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; DEBUG(DEBUG_INFO, ("Disabling node\n")); ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_PERMANENTLY_DISABLED; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; } static void control_enable_node(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; reply.rdata.opcode = request->opcode; DEBUG(DEBUG_INFO, ("Enable node\n")); ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED; reply.status = 0; reply.errmsg = NULL; client_send_control(req, header, &reply); return; } static bool fake_control_failure(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_reply_control reply; struct fake_control_failure *f = NULL; D_DEBUG("Checking fake control failure for control %u on node %u\n", request->opcode, header->destnode); for (f = ctdb->control_failures; f != NULL; f = f->next) { if (f->opcode == request->opcode && (f->pnn == header->destnode || f->pnn == CTDB_UNKNOWN_PNN)) { reply.rdata.opcode = request->opcode; if (strcmp(f->error, "TIMEOUT") == 0) { /* Causes no reply */ D_ERR("Control %u fake timeout on node %u\n", request->opcode, header->destnode); return true; } else if (strcmp(f->error, "ERROR") == 0) { D_ERR("Control %u fake error on node %u\n", request->opcode, header->destnode); reply.status = -1; reply.errmsg = f->comment; client_send_control(req, header, &reply); return true; } } } return false; } static void control_error(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_control *request) { struct ctdb_reply_control reply; D_DEBUG("Control %u not implemented\n", request->opcode); reply.rdata.opcode = request->opcode; reply.status = -1; reply.errmsg = "Not implemented"; client_send_control(req, header, &reply); } /* * Handling protocol - messages */ struct disable_recoveries_state { struct node *node; }; static void disable_recoveries_callback(struct tevent_req *subreq) { struct disable_recoveries_state *substate = tevent_req_callback_data( subreq, struct disable_recoveries_state); bool status; status = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n")); } substate->node->recovery_disabled = false; TALLOC_FREE(substate->node->recovery_substate); } static void message_disable_recoveries(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_message *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct tevent_req *subreq; struct ctdbd_context *ctdb = state->ctdb; struct disable_recoveries_state *substate; struct ctdb_disable_message *disable = request->data.disable; struct ctdb_req_message_data reply; struct node *node; int ret = -1; TDB_DATA data; node = &ctdb->node_map->node[header->destnode]; if (disable->timeout == 0) { TALLOC_FREE(node->recovery_substate); node->recovery_disabled = false; DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n", header->destnode)); goto done; } substate = talloc_zero(ctdb->node_map, struct disable_recoveries_state); if (substate == NULL) { goto fail; } substate->node = node; subreq = tevent_wakeup_send(substate, state->ev, tevent_timeval_current_ofs( disable->timeout, 0)); if (subreq == NULL) { talloc_free(substate); goto fail; } tevent_req_set_callback(subreq, disable_recoveries_callback, substate); DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n", disable->timeout, header->destnode)); node->recovery_substate = substate; node->recovery_disabled = true; done: ret = header->destnode; fail: reply.srvid = disable->srvid; data.dptr = (uint8_t *)&ret; data.dsize = sizeof(int); reply.data = data; client_send_message(req, header, &reply); } static void message_takeover_run(TALLOC_CTX *mem_ctx, struct tevent_req *req, struct ctdb_req_header *header, struct ctdb_req_message *request) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_srvid_message *srvid = request->data.msg; struct ctdb_req_message_data reply; int ret = -1; TDB_DATA data; if (header->destnode != ctdb->node_map->recmaster) { /* No reply! Only recmaster replies... */ return; } DEBUG(DEBUG_INFO, ("IP takover run on node %u\n", header->destnode)); ret = header->destnode; reply.srvid = srvid->srvid; data.dptr = (uint8_t *)&ret; data.dsize = sizeof(int); reply.data = data; client_send_message(req, header, &reply); } /* * Handle a single client */ static void client_read_handler(uint8_t *buf, size_t buflen, void *private_data); static void client_dead_handler(void *private_data); static void client_process_packet(struct tevent_req *req, uint8_t *buf, size_t buflen); static void client_process_call(struct tevent_req *req, uint8_t *buf, size_t buflen); static void client_process_message(struct tevent_req *req, uint8_t *buf, size_t buflen); static void client_process_control(struct tevent_req *req, uint8_t *buf, size_t buflen); static void client_reply_done(struct tevent_req *subreq); static struct tevent_req *client_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd, struct ctdbd_context *ctdb, int pnn) { struct tevent_req *req; struct client_state *state; int ret; req = tevent_req_create(mem_ctx, &state, struct client_state); if (req == NULL) { return NULL; } state->ev = ev; state->fd = fd; state->ctdb = ctdb; state->pnn = pnn; (void) ctdb_get_peer_pid(fd, &state->pid); ret = comm_setup(state, ev, fd, client_read_handler, req, client_dead_handler, req, &state->comm); if (ret != 0) { tevent_req_error(req, ret); return tevent_req_post(req, ev); } ret = client_add(ctdb, state->pid, state); if (ret != 0) { tevent_req_error(req, ret); return tevent_req_post(req, ev); } DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd)); return req; } static void client_read_handler(uint8_t *buf, size_t buflen, void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; struct ctdb_req_header header; size_t np; unsigned int i; int ret; ret = ctdb_req_header_pull(buf, buflen, &header, &np); if (ret != 0) { return; } if (buflen != header.length) { return; } ret = ctdb_req_header_verify(&header, 0); if (ret != 0) { return; } header_fix_pnn(&header, ctdb); if (header.destnode == CTDB_BROADCAST_ALL) { for (i=0; inode_map->num_nodes; i++) { header.destnode = i; ctdb_req_header_push(&header, buf, &np); client_process_packet(req, buf, buflen); } return; } if (header.destnode == CTDB_BROADCAST_CONNECTED) { for (i=0; inode_map->num_nodes; i++) { if (ctdb->node_map->node[i].flags & NODE_FLAGS_DISCONNECTED) { continue; } header.destnode = i; ctdb_req_header_push(&header, buf, &np); client_process_packet(req, buf, buflen); } return; } if (header.destnode > ctdb->node_map->num_nodes) { fprintf(stderr, "Invalid destination pnn 0x%x\n", header.destnode); return; } if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) { fprintf(stderr, "Packet for disconnected node pnn %u\n", header.destnode); return; } ctdb_req_header_push(&header, buf, &np); client_process_packet(req, buf, buflen); } static void client_dead_handler(void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); tevent_req_done(req); } static void client_process_packet(struct tevent_req *req, uint8_t *buf, size_t buflen) { struct ctdb_req_header header; size_t np; int ret; ret = ctdb_req_header_pull(buf, buflen, &header, &np); if (ret != 0) { return; } switch (header.operation) { case CTDB_REQ_CALL: client_process_call(req, buf, buflen); break; case CTDB_REQ_MESSAGE: client_process_message(req, buf, buflen); break; case CTDB_REQ_CONTROL: client_process_control(req, buf, buflen); break; default: break; } } static void client_process_call(struct tevent_req *req, uint8_t *buf, size_t buflen) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; TALLOC_CTX *mem_ctx; struct ctdb_req_header header; struct ctdb_req_call request; struct ctdb_reply_call reply; struct database *db; struct ctdb_ltdb_header hdr; TDB_DATA data; int ret; mem_ctx = talloc_new(state); if (tevent_req_nomem(mem_ctx, req)) { return; } ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request); if (ret != 0) { talloc_free(mem_ctx); tevent_req_error(req, ret); return; } header_fix_pnn(&header, ctdb); if (header.destnode >= ctdb->node_map->num_nodes) { goto fail; } DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id)); db = database_find(ctdb->db_map, request.db_id); if (db == NULL) { goto fail; } ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data); if (ret != 0) { goto fail; } /* Fake migration */ if (hdr.dmaster != ctdb->node_map->pnn) { hdr.dmaster = ctdb->node_map->pnn; ret = ltdb_store(db, request.key, &hdr, data); if (ret != 0) { goto fail; } } talloc_free(mem_ctx); reply.status = 0; reply.data = tdb_null; client_send_call(req, &header, &reply); return; fail: talloc_free(mem_ctx); reply.status = -1; reply.data = tdb_null; client_send_call(req, &header, &reply); } static void client_process_message(struct tevent_req *req, uint8_t *buf, size_t buflen) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; TALLOC_CTX *mem_ctx; struct ctdb_req_header header; struct ctdb_req_message request; uint64_t srvid; int ret; mem_ctx = talloc_new(state); if (tevent_req_nomem(mem_ctx, req)) { return; } ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request); if (ret != 0) { talloc_free(mem_ctx); tevent_req_error(req, ret); return; } header_fix_pnn(&header, ctdb); if (header.destnode >= ctdb->node_map->num_nodes) { /* Many messages are not replied to, so just behave as * though this message was not received */ fprintf(stderr, "Invalid node %d\n", header.destnode); talloc_free(mem_ctx); return; } srvid = request.srvid; DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid)); if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) { message_disable_recoveries(mem_ctx, req, &header, &request); } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) { message_takeover_run(mem_ctx, req, &header, &request); } else { D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid); } /* check srvid */ talloc_free(mem_ctx); } static void client_process_control(struct tevent_req *req, uint8_t *buf, size_t buflen) { struct client_state *state = tevent_req_data( req, struct client_state); struct ctdbd_context *ctdb = state->ctdb; TALLOC_CTX *mem_ctx; struct ctdb_req_header header; struct ctdb_req_control request; int ret; mem_ctx = talloc_new(state); if (tevent_req_nomem(mem_ctx, req)) { return; } ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request); if (ret != 0) { talloc_free(mem_ctx); tevent_req_error(req, ret); return; } header_fix_pnn(&header, ctdb); if (header.destnode >= ctdb->node_map->num_nodes) { struct ctdb_reply_control reply; reply.rdata.opcode = request.opcode; reply.errmsg = "Invalid node"; reply.status = -1; client_send_control(req, &header, &reply); return; } DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n", request.opcode, header.reqid)); if (fake_control_failure(mem_ctx, req, &header, &request)) { goto done; } switch (request.opcode) { case CTDB_CONTROL_PROCESS_EXISTS: control_process_exists(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_PING: control_ping(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GETDBPATH: control_getdbpath(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GETVNNMAP: control_getvnnmap(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_DEBUG: control_get_debug(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_DEBUG: control_set_debug(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_DBMAP: control_get_dbmap(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_RECMODE: control_get_recmode(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_RECMODE: control_set_recmode(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DB_ATTACH: control_db_attach(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_REGISTER_SRVID: control_register_srvid(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DEREGISTER_SRVID: control_deregister_srvid(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_DBNAME: control_get_dbname(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_PID: control_get_pid(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_PNN: control_get_pnn(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SHUTDOWN: control_shutdown(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_TUNABLE: control_set_tunable(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_TUNABLE: control_get_tunable(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_LIST_TUNABLES: control_list_tunables(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_MODIFY_FLAGS: control_modify_flags(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_ALL_TUNABLES: control_get_all_tunables(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DB_ATTACH_PERSISTENT: control_db_attach_persistent(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_UPTIME: control_uptime(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_RELOAD_NODES_FILE: control_reload_nodes_file(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_CAPABILITIES: control_get_capabilities(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_RELEASE_IP: control_release_ip(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_TAKEOVER_IP: control_takeover_ip(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_PUBLIC_IPS: control_get_public_ips(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_NODEMAP: control_get_nodemap(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_RECLOCK_FILE: control_get_reclock_file(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_STOP_NODE: control_stop_node(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_CONTINUE_NODE: control_continue_node(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_BAN_STATE: control_set_ban_state(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_TRANS3_COMMIT: control_trans3_commit(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_DB_SEQNUM: control_get_db_seqnum(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DB_GET_HEALTH: control_db_get_health(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_PUBLIC_IP_INFO: control_get_public_ip_info(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_IFACES: control_get_ifaces(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_IFACE_LINK_STATE: control_set_iface_link_state(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_DB_READONLY: control_set_db_readonly(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_TRAVERSE_START_EXT: control_traverse_start_ext(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_SET_DB_STICKY: control_set_db_sticky(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_IPREALLOCATED: control_ipreallocated(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_RUNSTATE: control_get_runstate(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_GET_NODES_FILE: control_get_nodes_file(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DB_OPEN_FLAGS: control_db_open_flags(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DB_ATTACH_REPLICATED: control_db_attach_replicated(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_CHECK_PID_SRVID: control_check_pid_srvid(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_DISABLE_NODE: control_disable_node(mem_ctx, req, &header, &request); break; case CTDB_CONTROL_ENABLE_NODE: control_enable_node(mem_ctx, req, &header, &request); break; default: if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) { control_error(mem_ctx, req, &header, &request); } break; } done: talloc_free(mem_ctx); } static int client_recv(struct tevent_req *req, int *perr) { struct client_state *state = tevent_req_data( req, struct client_state); int err; DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd)); close(state->fd); if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return -1; } return state->status; } /* * Fake CTDB server */ struct server_state { struct tevent_context *ev; struct ctdbd_context *ctdb; struct tevent_timer *leader_broadcast_te; int fd; }; static void server_leader_broadcast(struct tevent_context *ev, struct tevent_timer *te, struct timeval current_time, void *private_data); static void server_new_client(struct tevent_req *subreq); static void server_client_done(struct tevent_req *subreq); static struct tevent_req *server_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_context *ctdb, int fd) { struct tevent_req *req, *subreq; struct server_state *state; req = tevent_req_create(mem_ctx, &state, struct server_state); if (req == NULL) { return NULL; } state->ev = ev; state->ctdb = ctdb; state->fd = fd; state->leader_broadcast_te = tevent_add_timer(state->ev, state, timeval_current_ofs(0, 0), server_leader_broadcast, state); if (state->leader_broadcast_te == NULL) { DBG_WARNING("Failed to set up leader broadcast\n"); } subreq = accept_send(state, ev, fd); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, server_new_client, req); return req; } static void server_leader_broadcast(struct tevent_context *ev, struct tevent_timer *te, struct timeval current_time, void *private_data) { struct server_state *state = talloc_get_type_abort( private_data, struct server_state); struct ctdbd_context *ctdb = state->ctdb; uint32_t leader = ctdb->node_map->recmaster; TDB_DATA data; int ret; if (leader == CTDB_UNKNOWN_PNN) { goto done; } data.dptr = (uint8_t *)&leader; data.dsize = sizeof(leader); ret = srvid_dispatch(ctdb->srv, CTDB_SRVID_LEADER, 0, data); if (ret != 0) { DBG_WARNING("Failed to send leader broadcast, ret=%d\n", ret); } done: state->leader_broadcast_te = tevent_add_timer(state->ev, state, timeval_current_ofs(1, 0), server_leader_broadcast, state); if (state->leader_broadcast_te == NULL) { DBG_WARNING("Failed to set up leader broadcast\n"); } } static void server_new_client(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct server_state *state = tevent_req_data( req, struct server_state); struct ctdbd_context *ctdb = state->ctdb; int client_fd; int ret = 0; client_fd = accept_recv(subreq, NULL, NULL, &ret); TALLOC_FREE(subreq); if (client_fd == -1) { tevent_req_error(req, ret); return; } subreq = client_send(state, state->ev, client_fd, ctdb, ctdb->node_map->pnn); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, server_client_done, req); ctdb->num_clients += 1; subreq = accept_send(state, state->ev, state->fd); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, server_new_client, req); } static void server_client_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct server_state *state = tevent_req_data( req, struct server_state); struct ctdbd_context *ctdb = state->ctdb; int ret = 0; int status; status = client_recv(subreq, &ret); TALLOC_FREE(subreq); if (status < 0) { tevent_req_error(req, ret); return; } ctdb->num_clients -= 1; if (status == 99) { /* Special status, to shutdown server */ DEBUG(DEBUG_INFO, ("Shutting down server\n")); tevent_req_done(req); } } static bool server_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } /* * Main functions */ static int socket_init(const char *sockpath) { struct sockaddr_un addr; size_t len; int ret, fd; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path)); if (len >= sizeof(addr.sun_path)) { fprintf(stderr, "path too long: %s\n", sockpath); return -1; } fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd == -1) { fprintf(stderr, "socket failed - %s\n", sockpath); return -1; } ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); if (ret != 0) { fprintf(stderr, "bind failed - %s\n", sockpath); goto fail; } ret = listen(fd, 10); if (ret != 0) { fprintf(stderr, "listen failed\n"); goto fail; } DEBUG(DEBUG_INFO, ("Socket init done\n")); return fd; fail: if (fd != -1) { close(fd); } return -1; } static struct options { const char *dbdir; const char *sockpath; const char *pidfile; const char *debuglevel; } options; static struct poptOption cmdline_options[] = { POPT_AUTOHELP { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0, "Database directory", "directory" }, { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0, "Unix domain socket path", "filename" }, { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0, "pid file", "filename" } , { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0, "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } , POPT_TABLEEND }; static void cleanup(void) { unlink(options.sockpath); unlink(options.pidfile); } static void signal_handler(int sig) { cleanup(); exit(0); } static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_context *ctdb, int fd, int pfd) { struct tevent_req *req; int ret = 0; ssize_t len; atexit(cleanup); signal(SIGTERM, signal_handler); req = server_send(mem_ctx, ev, ctdb, fd); if (req == NULL) { fprintf(stderr, "Memory error\n"); exit(1); } len = write(pfd, &ret, sizeof(ret)); if (len != sizeof(ret)) { fprintf(stderr, "Failed to send message to parent\n"); exit(1); } close(pfd); tevent_req_poll(req, ev); server_recv(req, &ret); if (ret != 0) { exit(1); } } int main(int argc, const char *argv[]) { TALLOC_CTX *mem_ctx; struct ctdbd_context *ctdb; struct tevent_context *ev; poptContext pc; int opt, fd, ret, pfd[2]; ssize_t len; pid_t pid; FILE *fp; pc = poptGetContext(argv[0], argc, argv, cmdline_options, POPT_CONTEXT_KEEP_FIRST); while ((opt = poptGetNextOpt(pc)) != -1) { fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0)); exit(1); } if (options.dbdir == NULL) { fprintf(stderr, "Please specify database directory\n"); poptPrintHelp(pc, stdout, 0); exit(1); } if (options.sockpath == NULL) { fprintf(stderr, "Please specify socket path\n"); poptPrintHelp(pc, stdout, 0); exit(1); } if (options.pidfile == NULL) { fprintf(stderr, "Please specify pid file\n"); poptPrintHelp(pc, stdout, 0); exit(1); } mem_ctx = talloc_new(NULL); if (mem_ctx == NULL) { fprintf(stderr, "Memory error\n"); exit(1); } ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd"); if (ret != 0) { fprintf(stderr, "Invalid debug level\n"); poptPrintHelp(pc, stdout, 0); exit(1); } ctdb = ctdbd_setup(mem_ctx, options.dbdir); if (ctdb == NULL) { exit(1); } if (! ctdbd_verify(ctdb)) { exit(1); } ev = tevent_context_init(mem_ctx); if (ev == NULL) { fprintf(stderr, "Memory error\n"); exit(1); } fd = socket_init(options.sockpath); if (fd == -1) { exit(1); } ret = pipe(pfd); if (ret != 0) { fprintf(stderr, "Failed to create pipe\n"); cleanup(); exit(1); } pid = fork(); if (pid == -1) { fprintf(stderr, "Failed to fork\n"); cleanup(); exit(1); } if (pid == 0) { /* Child */ close(pfd[0]); start_server(mem_ctx, ev, ctdb, fd, pfd[1]); exit(1); } /* Parent */ close(pfd[1]); len = read(pfd[0], &ret, sizeof(ret)); close(pfd[0]); if (len != sizeof(ret)) { fprintf(stderr, "len = %zi\n", len); fprintf(stderr, "Failed to get message from child\n"); kill(pid, SIGTERM); exit(1); } fp = fopen(options.pidfile, "w"); if (fp == NULL) { fprintf(stderr, "Failed to open pid file %s\n", options.pidfile); kill(pid, SIGTERM); exit(1); } fprintf(fp, "%d\n", pid); fclose(fp); return 0; }