diff options
Diffstat (limited to 'ctdb/common/ctdb_ltdb.c')
-rw-r--r-- | ctdb/common/ctdb_ltdb.c | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/ctdb/common/ctdb_ltdb.c b/ctdb/common/ctdb_ltdb.c new file mode 100644 index 0000000..6634416 --- /dev/null +++ b/ctdb/common/ctdb_ltdb.c @@ -0,0 +1,430 @@ +/* + ctdb ltdb code + + Copyright (C) Andrew Tridgell 2006 + Copyright (C) Ronnie sahlberg 2011 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <tdb.h> + +#include "lib/tdb_wrap/tdb_wrap.h" +#include "lib/util/dlinklist.h" +#include "lib/util/debug.h" + +#include "ctdb_private.h" + +#include "common/common.h" +#include "common/logging.h" + + +/* + * Calculate tdb flags based on database type + */ +int ctdb_db_tdb_flags(uint8_t db_flags, bool with_valgrind, bool with_mutex) +{ + int tdb_flags = 0; + + if (db_flags & CTDB_DB_FLAGS_PERSISTENT) { + tdb_flags = TDB_DEFAULT; + + } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) { + tdb_flags = TDB_NOSYNC | + TDB_CLEAR_IF_FIRST | + TDB_INCOMPATIBLE_HASH; + + } else { + tdb_flags = TDB_NOSYNC | + TDB_CLEAR_IF_FIRST | + TDB_INCOMPATIBLE_HASH; + +#ifdef TDB_MUTEX_LOCKING + if (with_mutex && tdb_runtime_check_for_robust_mutexes()) { + tdb_flags |= TDB_MUTEX_LOCKING; + } +#endif + + } + + tdb_flags |= TDB_DISALLOW_NESTING; + if (with_valgrind) { + tdb_flags |= TDB_NOMMAP; + } + + return tdb_flags; +} + +/* + find an attached ctdb_db handle given a name + */ +struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name) +{ + struct ctdb_db_context *tmp_db; + for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) { + if (strcmp(name, tmp_db->db_name) == 0) { + return tmp_db; + } + } + return NULL; +} + +bool ctdb_db_persistent(struct ctdb_db_context *ctdb_db) +{ + if (ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) { + return true; + } + return false; +} + +bool ctdb_db_replicated(struct ctdb_db_context *ctdb_db) +{ + if (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED) { + return true; + } + return false; +} + +bool ctdb_db_volatile(struct ctdb_db_context *ctdb_db) +{ + if ((ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) || + (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED)) { + return false; + } + return true; +} + +bool ctdb_db_readonly(struct ctdb_db_context *ctdb_db) +{ + if (ctdb_db->db_flags & CTDB_DB_FLAGS_READONLY) { + return true; + } + return false; +} + +void ctdb_db_set_readonly(struct ctdb_db_context *ctdb_db) +{ + ctdb_db->db_flags |= CTDB_DB_FLAGS_READONLY; +} + +void ctdb_db_reset_readonly(struct ctdb_db_context *ctdb_db) +{ + ctdb_db->db_flags &= ~CTDB_DB_FLAGS_READONLY; +} + +bool ctdb_db_sticky(struct ctdb_db_context *ctdb_db) +{ + if (ctdb_db->db_flags & CTDB_DB_FLAGS_STICKY) { + return true; + } + return false; +} + +void ctdb_db_set_sticky(struct ctdb_db_context *ctdb_db) +{ + ctdb_db->db_flags |= CTDB_DB_FLAGS_STICKY; +} + +/* + return the lmaster given a key +*/ +uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key) +{ + uint32_t idx, lmaster; + + idx = ctdb_hash(key) % ctdb->vnn_map->size; + lmaster = ctdb->vnn_map->map[idx]; + + return lmaster; +} + + +/* + construct an initial header for a record with no ltdb header yet +*/ +static void ltdb_initial_header(struct ctdb_db_context *ctdb_db, + TDB_DATA key, + struct ctdb_ltdb_header *header) +{ + ZERO_STRUCTP(header); + /* initial dmaster is the lmaster */ + header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key); + header->flags = CTDB_REC_FLAG_AUTOMATIC; +} + +struct ctdb_ltdb_fetch_state { + struct ctdb_ltdb_header *header; + TALLOC_CTX *mem_ctx; + TDB_DATA *data; + int ret; + bool found; +}; + +static int ctdb_ltdb_fetch_fn(TDB_DATA key, TDB_DATA data, void *private_data) +{ + struct ctdb_ltdb_fetch_state *state = private_data; + struct ctdb_ltdb_header *header = state->header; + TDB_DATA *dstdata = state->data; + + if (data.dsize < sizeof(*header)) { + return 0; + } + + state->found = true; + memcpy(header, data.dptr, sizeof(*header)); + + if (dstdata != NULL) { + dstdata->dsize = data.dsize - sizeof(struct ctdb_ltdb_header); + dstdata->dptr = talloc_memdup( + state->mem_ctx, + data.dptr + sizeof(struct ctdb_ltdb_header), + dstdata->dsize); + if (dstdata->dptr == NULL) { + state->ret = -1; + } + } + + return 0; +} + +/* + fetch a record from the ltdb, separating out the header information + and returning the body of the record. A valid (initial) header is + returned if the record is not present +*/ +int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_fetch_state state = { + .header = header, + .mem_ctx = mem_ctx, + .data = data, + .found = false, + }; + int ret; + + ret = tdb_parse_record( + ctdb_db->ltdb->tdb, key, ctdb_ltdb_fetch_fn, &state); + + if (ret == -1) { + enum TDB_ERROR err = tdb_error(ctdb_db->ltdb->tdb); + if (err != TDB_ERR_NOEXIST) { + return -1; + } + } + + if (state.ret != 0) { + DBG_DEBUG("ctdb_ltdb_fetch_fn failed\n"); + return state.ret; + } + + if (state.found) { + return 0; + } + + if (data != NULL) { + *data = tdb_null; + } + + if (ctdb->vnn_map == NULL) { + /* called from the client */ + header->dmaster = (uint32_t)-1; + return -1; + } + + ltdb_initial_header(ctdb_db, key, header); + if (ctdb_db_persistent(ctdb_db) || + header->dmaster == ctdb_db->ctdb->pnn) { + + ret = ctdb_ltdb_store(ctdb_db, key, header, tdb_null); + if (ret != 0) { + DBG_NOTICE("failed to store initial header\n"); + } + } + + return 0; +} + +/* + write a record to a normal database +*/ +int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA data) +{ + struct ctdb_context *ctdb = ctdb_db->ctdb; + TDB_DATA rec[2]; + uint32_t hsize = sizeof(struct ctdb_ltdb_header); + int ret; + + if (ctdb_db->ctdb_ltdb_store_fn) { + return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data); + } + + if (ctdb->flags & CTDB_FLAG_TORTURE) { + TDB_DATA old; + struct ctdb_ltdb_header *h2; + + old = tdb_fetch(ctdb_db->ltdb->tdb, key); + h2 = (struct ctdb_ltdb_header *)old.dptr; + if (old.dptr != NULL && old.dsize >= hsize && + h2->rsn > header->rsn) { + DEBUG(DEBUG_ERR, + ("RSN regression! %"PRIu64" %"PRIu64"\n", + h2->rsn, header->rsn)); + } + if (old.dptr != NULL) { + free(old.dptr); + } + } + + rec[0].dsize = hsize; + rec[0].dptr = (uint8_t *)header; + + rec[1].dsize = data.dsize; + rec[1].dptr = data.dptr; + + ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n")); + } + + return ret; +} + +/* + lock a record in the ltdb, given a key + */ +int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + return tdb_chainlock(ctdb_db->ltdb->tdb, key); +} + +/* + unlock a record in the ltdb, given a key + */ +int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key); + if (ret != 0) { + DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb))); + } + return ret; +} + + +/* + delete a record from a normal database +*/ +int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + if (! ctdb_db_volatile(ctdb_db)) { + DEBUG(DEBUG_WARNING, + ("Ignored deletion of empty record from " + "non-volatile database\n")); + return 0; + } + if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) { + DEBUG(DEBUG_ERR,("Failed to delete empty record.\n")); + return -1; + } + return 0; +} + +int ctdb_trackingdb_add_pnn(struct ctdb_context *ctdb, TDB_DATA *data, uint32_t pnn) +{ + unsigned int byte_pos = pnn / 8; + unsigned char bit_mask = 1 << (pnn % 8); + + if (byte_pos + 1 > data->dsize) { + char *buf; + + buf = malloc(byte_pos + 1); + memset(buf, 0, byte_pos + 1); + if (buf == NULL) { + DEBUG(DEBUG_ERR, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos + 1)); + return -1; + } + if (data->dptr != NULL) { + memcpy(buf, data->dptr, data->dsize); + free(data->dptr); + } + data->dptr = (uint8_t *)buf; + data->dsize = byte_pos + 1; + } + + data->dptr[byte_pos] |= bit_mask; + return 0; +} + +void ctdb_trackingdb_traverse(struct ctdb_context *ctdb, TDB_DATA data, ctdb_trackingdb_cb cb, void *private_data) +{ + unsigned int i; + + for(i = 0; i < data.dsize; i++) { + unsigned int j; + + for (j=0; j<8; j++) { + int mask = 1<<j; + + if (data.dptr[i] & mask) { + cb(ctdb, i * 8 + j, private_data); + } + } + } +} + +/* + this is the dummy null procedure that all databases support +*/ +int ctdb_null_func(struct ctdb_call_info *call) +{ + return 0; +} + +/* + this is a plain fetch procedure that all databases support +*/ +int ctdb_fetch_func(struct ctdb_call_info *call) +{ + call->reply_data = &call->record_data; + return 0; +} + +/* + this is a plain fetch procedure that all databases support + this returns the full record including the ltdb header +*/ +int ctdb_fetch_with_header_func(struct ctdb_call_info *call) +{ + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return -1; + } + call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize; + call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize); + if (call->reply_data->dptr == NULL) { + return -1; + } + memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header)); + memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize); + + return 0; +} + |