diff options
Diffstat (limited to 'lib/ldb/ldb_mdb/ldb_mdb.c')
-rw-r--r-- | lib/ldb/ldb_mdb/ldb_mdb.c | 1143 |
1 files changed, 1143 insertions, 0 deletions
diff --git a/lib/ldb/ldb_mdb/ldb_mdb.c b/lib/ldb/ldb_mdb/ldb_mdb.c new file mode 100644 index 0000000..c163321 --- /dev/null +++ b/lib/ldb/ldb_mdb/ldb_mdb.c @@ -0,0 +1,1143 @@ +/* + ldb database library using mdb back end + + Copyright (C) Jakub Hrozek 2014 + Copyright (C) Catalyst.Net Ltd 2017 + + ** NOTE! The following LGPL license applies to the ldb + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "ldb_mdb.h" +#include "../ldb_key_value/ldb_kv.h" +#include "include/dlinklist.h" + +#define MDB_URL_PREFIX "mdb://" +#define MDB_URL_PREFIX_SIZE (sizeof(MDB_URL_PREFIX)-1) + +#define LDB_MDB_MAX_KEY_LENGTH 511 + +#define GIGABYTE (1024*1024*1024) + +int ldb_mdb_err_map(int lmdb_err) +{ + switch (lmdb_err) { + case MDB_SUCCESS: + return LDB_SUCCESS; + case EIO: + return LDB_ERR_OPERATIONS_ERROR; +#ifdef EBADE + case EBADE: +#endif + case MDB_INCOMPATIBLE: + case MDB_CORRUPTED: + case MDB_INVALID: + return LDB_ERR_UNAVAILABLE; + case MDB_BAD_TXN: + case MDB_BAD_VALSIZE: +#ifdef MDB_BAD_DBI + case MDB_BAD_DBI: +#endif + case MDB_PANIC: + case EINVAL: + return LDB_ERR_PROTOCOL_ERROR; + case MDB_MAP_FULL: + case MDB_DBS_FULL: + case MDB_READERS_FULL: + case MDB_TLS_FULL: + case MDB_TXN_FULL: + case EAGAIN: + return LDB_ERR_BUSY; + case MDB_KEYEXIST: + return LDB_ERR_ENTRY_ALREADY_EXISTS; + case MDB_NOTFOUND: + case ENOENT: + return LDB_ERR_NO_SUCH_OBJECT; + case EACCES: + return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS; + default: + break; + } + return LDB_ERR_OTHER; +} + +#define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__) +static int lmdb_error_at(struct ldb_context *ldb, + int ecode, + const char *file, + int line) +{ + int ldb_err = ldb_mdb_err_map(ecode); + char *reason = mdb_strerror(ecode); + ldb_asprintf_errstring(ldb, + "(%d) - %s at %s:%d", + ecode, + reason, + file, + line); + return ldb_err; +} + +static bool lmdb_transaction_active(struct ldb_kv_private *ldb_kv) +{ + return ldb_kv->lmdb_private->txlist != NULL; +} + +static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx) +{ + if (ltx == NULL) { + return NULL; + } + + return ltx->tx; +} + +static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx) +{ + if (lmdb->txlist) { + talloc_steal(lmdb->txlist, ltx); + } + + DLIST_ADD(lmdb->txlist, ltx); +} + +static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx) +{ + DLIST_REMOVE(lmdb->txlist, ltx); + talloc_free(ltx); +} + + +static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb) +{ + struct lmdb_trans *ltx; + + ltx = lmdb->txlist; + return ltx; +} + + +static MDB_txn *get_current_txn(struct lmdb_private *lmdb) +{ + MDB_txn *txn = NULL; + + txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb)); + if (txn != NULL) { + return txn; + } + if (lmdb->read_txn != NULL) { + return lmdb->read_txn; + } + lmdb->error = MDB_BAD_TXN; + ldb_set_errstring(lmdb->ldb, __location__":No active transaction\n"); + return NULL; +} + +static int lmdb_store(struct ldb_kv_private *ldb_kv, + struct ldb_val key, + struct ldb_val data, + int flags) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + MDB_val mdb_key; + MDB_val mdb_data; + int mdb_flags; + MDB_txn *txn = NULL; + MDB_dbi dbi = 0; + + if (ldb_kv->read_only) { + return LDB_ERR_UNWILLING_TO_PERFORM; + } + + txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb)); + if (txn == NULL) { + ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); + lmdb->error = MDB_PANIC; + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + mdb_key.mv_size = key.length; + mdb_key.mv_data = key.data; + + mdb_data.mv_size = data.length; + mdb_data.mv_data = data.data; + + if (flags == TDB_INSERT) { + mdb_flags = MDB_NOOVERWRITE; + } else if (flags == TDB_MODIFY) { + /* + * Modifying a record, ensure that it exists. + * This mimics the TDB semantics + */ + MDB_val value; + lmdb->error = mdb_get(txn, dbi, &mdb_key, &value); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + mdb_flags = 0; + } else { + mdb_flags = 0; + } + + lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + return ldb_mdb_err_map(lmdb->error); +} + +static int lmdb_delete(struct ldb_kv_private *ldb_kv, struct ldb_val key) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + MDB_val mdb_key; + MDB_txn *txn = NULL; + MDB_dbi dbi = 0; + + if (ldb_kv->read_only) { + return LDB_ERR_UNWILLING_TO_PERFORM; + } + + txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb)); + if (txn == NULL) { + ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); + lmdb->error = MDB_PANIC; + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + mdb_key.mv_size = key.length; + mdb_key.mv_data = key.data; + + lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + return ldb_mdb_err_map(lmdb->error); +} + +static int lmdb_traverse_fn(struct ldb_kv_private *ldb_kv, + ldb_kv_traverse_fn fn, + void *ctx) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + MDB_val mdb_key; + MDB_val mdb_data; + MDB_txn *txn = NULL; + MDB_dbi dbi = 0; + MDB_cursor *cursor = NULL; + int ret; + + txn = get_current_txn(lmdb); + if (txn == NULL) { + ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); + lmdb->error = MDB_PANIC; + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_cursor_open(txn, dbi, &cursor); + if (lmdb->error != MDB_SUCCESS) { + goto done; + } + + while ((lmdb->error = mdb_cursor_get( + cursor, &mdb_key, + &mdb_data, MDB_NEXT)) == MDB_SUCCESS) { + + struct ldb_val key = { + .length = mdb_key.mv_size, + .data = mdb_key.mv_data, + }; + struct ldb_val data = { + .length = mdb_data.mv_size, + .data = mdb_data.mv_data, + }; + + ret = fn(ldb_kv, key, data, ctx); + if (ret != 0) { + /* + * NOTE: This DOES NOT set lmdb->error! + * + * This means that the caller will get success. + * This matches TDB traverse behaviour, where callbacks + * may terminate the traverse, but do not change the + * return code from success. + * + * Callers SHOULD store their own error codes. + */ + goto done; + } + } + if (lmdb->error == MDB_NOTFOUND) { + lmdb->error = MDB_SUCCESS; + } +done: + if (cursor != NULL) { + mdb_cursor_close(cursor); + } + + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + return ldb_mdb_err_map(lmdb->error); +} + +static int lmdb_update_in_iterate(struct ldb_kv_private *ldb_kv, + struct ldb_val key, + struct ldb_val key2, + struct ldb_val data, + void *state) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + struct ldb_val copy; + int ret = LDB_SUCCESS; + + /* + * Need to take a copy of the data as the delete operation alters the + * data, as it is in private lmdb memory. + */ + copy.length = data.length; + copy.data = talloc_memdup(ldb_kv, data.data, data.length); + if (copy.data == NULL) { + lmdb->error = MDB_PANIC; + return ldb_oom(lmdb->ldb); + } + + lmdb->error = lmdb_delete(ldb_kv, key); + if (lmdb->error != MDB_SUCCESS) { + ldb_debug( + lmdb->ldb, + LDB_DEBUG_ERROR, + "Failed to delete %*.*s " + "for rekey as %*.*s: %s", + (int)key.length, (int)key.length, + (const char *)key.data, + (int)key2.length, (int)key2.length, + (const char *)key.data, + mdb_strerror(lmdb->error)); + ret = ldb_mdb_error(lmdb->ldb, lmdb->error); + goto done; + } + + lmdb->error = lmdb_store(ldb_kv, key2, copy, 0); + if (lmdb->error != MDB_SUCCESS) { + ldb_debug( + lmdb->ldb, + LDB_DEBUG_ERROR, + "Failed to rekey %*.*s as %*.*s: %s", + (int)key.length, (int)key.length, + (const char *)key.data, + (int)key2.length, (int)key2.length, + (const char *)key.data, + mdb_strerror(lmdb->error)); + ret = ldb_mdb_error(lmdb->ldb, lmdb->error); + goto done; + } + +done: + if (copy.data != NULL) { + TALLOC_FREE(copy.data); + copy.length = 0; + } + + /* + * Explicity invalidate the data, as the delete has done this + */ + data.length = 0; + data.data = NULL; + + return ret; +} + +/* Handles only a single record */ +static int lmdb_parse_record(struct ldb_kv_private *ldb_kv, + struct ldb_val key, + int (*parser)(struct ldb_val key, + struct ldb_val data, + void *private_data), + void *ctx) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + MDB_val mdb_key; + MDB_val mdb_data; + MDB_txn *txn = NULL; + MDB_dbi dbi; + struct ldb_val data; + + txn = get_current_txn(lmdb); + if (txn == NULL) { + ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active"); + lmdb->error = MDB_PANIC; + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + mdb_key.mv_size = key.length; + mdb_key.mv_data = key.data; + + lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data); + if (lmdb->error != MDB_SUCCESS) { + /* TODO closing a handle should not even be necessary */ + mdb_dbi_close(lmdb->env, dbi); + if (lmdb->error == MDB_NOTFOUND) { + return LDB_ERR_NO_SUCH_OBJECT; + } + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + data.data = mdb_data.mv_data; + data.length = mdb_data.mv_size; + + /* TODO closing a handle should not even be necessary */ + mdb_dbi_close(lmdb->env, dbi); + + return parser(key, data, ctx); +} + +/* + * Exactly the same as iterate, except we have a start key and an end key + * (which are both included in the results if present). + * + * If start > end, return MDB_PANIC. + */ +static int lmdb_iterate_range(struct ldb_kv_private *ldb_kv, + struct ldb_val start_key, + struct ldb_val end_key, + ldb_kv_traverse_fn fn, + void *ctx) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + MDB_val mdb_key; + MDB_val mdb_data; + MDB_txn *txn = NULL; + MDB_dbi dbi = 0; + MDB_cursor *cursor = NULL; + int ret; + + MDB_val mdb_s_key; + MDB_val mdb_e_key; + + txn = get_current_txn(lmdb); + if (txn == NULL) { + ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); + lmdb->error = MDB_PANIC; + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + mdb_s_key.mv_size = start_key.length; + mdb_s_key.mv_data = start_key.data; + + mdb_e_key.mv_size = end_key.length; + mdb_e_key.mv_data = end_key.data; + + if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) { + lmdb->error = MDB_PANIC; + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + lmdb->error = mdb_cursor_open(txn, dbi, &cursor); + if (lmdb->error != MDB_SUCCESS) { + goto done; + } + + lmdb->error = mdb_cursor_get(cursor, &mdb_s_key, &mdb_data, MDB_SET_RANGE); + + if (lmdb->error != MDB_SUCCESS) { + if (lmdb->error == MDB_NOTFOUND) { + lmdb->error = MDB_SUCCESS; + } + goto done; + } else { + struct ldb_val key = { + .length = mdb_s_key.mv_size, + .data = mdb_s_key.mv_data, + }; + struct ldb_val data = { + .length = mdb_data.mv_size, + .data = mdb_data.mv_data, + }; + + if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) { + goto done; + } + + ret = fn(ldb_kv, key, data, ctx); + if (ret != 0) { + /* + * NOTE: This DOES NOT set lmdb->error! + * + * This means that the caller will get success. + * This matches TDB traverse behaviour, where callbacks + * may terminate the traverse, but do not change the + * return code from success. + * + * Callers SHOULD store their own error codes. + */ + goto done; + } + } + + while ((lmdb->error = mdb_cursor_get( + cursor, &mdb_key, + &mdb_data, MDB_NEXT)) == MDB_SUCCESS) { + + struct ldb_val key = { + .length = mdb_key.mv_size, + .data = mdb_key.mv_data, + }; + struct ldb_val data = { + .length = mdb_data.mv_size, + .data = mdb_data.mv_data, + }; + + if (mdb_cmp(txn, dbi, &mdb_key, &mdb_e_key) > 0) { + goto done; + } + + ret = fn(ldb_kv, key, data, ctx); + if (ret != 0) { + /* + * NOTE: This DOES NOT set lmdb->error! + * + * This means that the caller will get success. + * This matches TDB traverse behaviour, where callbacks + * may terminate the traverse, but do not change the + * return code from success. + * + * Callers SHOULD store their own error codes. + */ + goto done; + } + } + if (lmdb->error == MDB_NOTFOUND) { + lmdb->error = MDB_SUCCESS; + } +done: + if (cursor != NULL) { + mdb_cursor_close(cursor); + } + + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + return ldb_mdb_err_map(lmdb->error); +} + +static int lmdb_lock_read(struct ldb_module *module) +{ + void *data = ldb_module_get_private(module); + struct ldb_kv_private *ldb_kv = + talloc_get_type(data, struct ldb_kv_private); + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + pid_t pid = getpid(); + + if (pid != lmdb->pid) { + ldb_asprintf_errstring( + lmdb->ldb, + __location__": Reusing ldb opened by pid %d in " + "process %d\n", + lmdb->pid, + pid); + lmdb->error = MDB_BAD_TXN; + return LDB_ERR_PROTOCOL_ERROR; + } + + lmdb->error = MDB_SUCCESS; + if (lmdb_transaction_active(ldb_kv) == false && + ldb_kv->read_lock_count == 0) { + lmdb->error = mdb_txn_begin(lmdb->env, + NULL, + MDB_RDONLY, + &lmdb->read_txn); + } + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + ldb_kv->read_lock_count++; + return ldb_mdb_err_map(lmdb->error); +} + +static int lmdb_unlock_read(struct ldb_module *module) +{ + void *data = ldb_module_get_private(module); + struct ldb_kv_private *ldb_kv = + talloc_get_type(data, struct ldb_kv_private); + + if (lmdb_transaction_active(ldb_kv) == false && + ldb_kv->read_lock_count == 1) { + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + mdb_txn_commit(lmdb->read_txn); + lmdb->read_txn = NULL; + ldb_kv->read_lock_count--; + return LDB_SUCCESS; + } + ldb_kv->read_lock_count--; + return LDB_SUCCESS; +} + +static int lmdb_transaction_start(struct ldb_kv_private *ldb_kv) +{ + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + struct lmdb_trans *ltx; + struct lmdb_trans *ltx_head; + MDB_txn *tx_parent; + pid_t pid = getpid(); + + /* Do not take out the transaction lock on a read-only DB */ + if (ldb_kv->read_only) { + return LDB_ERR_UNWILLING_TO_PERFORM; + } + + ltx = talloc_zero(lmdb, struct lmdb_trans); + if (ltx == NULL) { + return ldb_oom(lmdb->ldb); + } + + if (pid != lmdb->pid) { + ldb_asprintf_errstring( + lmdb->ldb, + __location__": Reusing ldb opened by pid %d in " + "process %d\n", + lmdb->pid, + pid); + lmdb->error = MDB_BAD_TXN; + return LDB_ERR_PROTOCOL_ERROR; + } + + /* + * Clear out any stale readers + */ + { + int stale = 0; + mdb_reader_check(lmdb->env, &stale); + if (stale > 0) { + ldb_debug( + lmdb->ldb, + LDB_DEBUG_ERROR, + "LMDB Stale readers, deleted (%d)", + stale); + } + } + + + + ltx_head = lmdb_private_trans_head(lmdb); + + tx_parent = lmdb_trans_get_tx(ltx_head); + + lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, <x->tx); + if (lmdb->error != MDB_SUCCESS) { + return ldb_mdb_error(lmdb->ldb, lmdb->error); + } + + trans_push(lmdb, ltx); + + return ldb_mdb_err_map(lmdb->error); +} + +static int lmdb_transaction_cancel(struct ldb_kv_private *ldb_kv) +{ + struct lmdb_trans *ltx; + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + + ltx = lmdb_private_trans_head(lmdb); + if (ltx == NULL) { + return LDB_ERR_OPERATIONS_ERROR; + } + + mdb_txn_abort(ltx->tx); + trans_finished(lmdb, ltx); + return LDB_SUCCESS; +} + +static int lmdb_transaction_prepare_commit(struct ldb_kv_private *ldb_kv) +{ + /* No need to prepare a commit */ + return LDB_SUCCESS; +} + +static int lmdb_transaction_commit(struct ldb_kv_private *ldb_kv) +{ + struct lmdb_trans *ltx; + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + + ltx = lmdb_private_trans_head(lmdb); + if (ltx == NULL) { + return LDB_ERR_OPERATIONS_ERROR; + } + + lmdb->error = mdb_txn_commit(ltx->tx); + trans_finished(lmdb, ltx); + + return lmdb->error; +} + +static int lmdb_error(struct ldb_kv_private *ldb_kv) +{ + return ldb_mdb_err_map(ldb_kv->lmdb_private->error); +} + +static const char *lmdb_errorstr(struct ldb_kv_private *ldb_kv) +{ + return mdb_strerror(ldb_kv->lmdb_private->error); +} + +static const char *lmdb_name(struct ldb_kv_private *ldb_kv) +{ + return "lmdb"; +} + +static bool lmdb_changed(struct ldb_kv_private *ldb_kv) +{ + /* + * lmdb does no provide a quick way to determine if the database + * has changed. This function always returns true. + * + * Note that tdb uses a sequence number that allows this function + * to be implemented efficiently. + */ + return true; +} + +/* + * Get the number of records in the database. + * + * The mdb_env_stat call returns an accurate count, so we return the actual + * number of records in the database rather than an estimate. + */ +static size_t lmdb_get_size(struct ldb_kv_private *ldb_kv) +{ + + struct MDB_stat stats = {0}; + struct lmdb_private *lmdb = ldb_kv->lmdb_private; + int ret = 0; + + ret = mdb_env_stat(lmdb->env, &stats); + if (ret != 0) { + return 0; + } + return stats.ms_entries; +} + +/* + * Start a sub transaction + * As lmdb supports nested transactions we can start a new transaction + */ +static int lmdb_nested_transaction_start(struct ldb_kv_private *ldb_kv) +{ + int ret = lmdb_transaction_start(ldb_kv); + return ret; +} + +/* + * Commit a sub transaction + * As lmdb supports nested transactions we can commit the nested transaction + */ +static int lmdb_nested_transaction_commit(struct ldb_kv_private *ldb_kv) +{ + int ret = lmdb_transaction_commit(ldb_kv); + return ret; +} + +/* + * Cancel a sub transaction + * As lmdb supports nested transactions we can cancel the nested transaction + */ +static int lmdb_nested_transaction_cancel(struct ldb_kv_private *ldb_kv) +{ + int ret = lmdb_transaction_cancel(ldb_kv); + return ret; +} + +static struct kv_db_ops lmdb_key_value_ops = { + .options = LDB_KV_OPTION_STABLE_READ_LOCK, + + .store = lmdb_store, + .delete = lmdb_delete, + .iterate = lmdb_traverse_fn, + .update_in_iterate = lmdb_update_in_iterate, + .fetch_and_parse = lmdb_parse_record, + .iterate_range = lmdb_iterate_range, + .lock_read = lmdb_lock_read, + .unlock_read = lmdb_unlock_read, + .begin_write = lmdb_transaction_start, + .prepare_write = lmdb_transaction_prepare_commit, + .finish_write = lmdb_transaction_commit, + .abort_write = lmdb_transaction_cancel, + .error = lmdb_error, + .errorstr = lmdb_errorstr, + .name = lmdb_name, + .has_changed = lmdb_changed, + .transaction_active = lmdb_transaction_active, + .get_size = lmdb_get_size, + .begin_nested_write = lmdb_nested_transaction_start, + .finish_nested_write = lmdb_nested_transaction_commit, + .abort_nested_write = lmdb_nested_transaction_cancel, +}; + +static const char *lmdb_get_path(const char *url) +{ + const char *path; + + /* parse the url */ + if (strchr(url, ':')) { + if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) { + return NULL; + } + path = url + MDB_URL_PREFIX_SIZE; + } else { + path = url; + } + + return path; +} + +static int lmdb_pvt_destructor(struct lmdb_private *lmdb) +{ + struct lmdb_trans *ltx = NULL; + + /* Check if this is a forked child */ + if (getpid() != lmdb->pid) { + int fd = 0; + /* + * We cannot call mdb_env_close or commit any transactions, + * otherwise they might appear finished in the parent. + * + */ + + if (mdb_env_get_fd(lmdb->env, &fd) == 0) { + close(fd); + } + + /* Remove the pointer, so that no access should occur */ + lmdb->env = NULL; + + return 0; + } + + /* + * Close the read transaction if it's open + */ + if (lmdb->read_txn != NULL) { + mdb_txn_abort(lmdb->read_txn); + } + + if (lmdb->env == NULL) { + return 0; + } + + /* + * Abort any currently active transactions + */ + ltx = lmdb_private_trans_head(lmdb); + while (ltx != NULL) { + mdb_txn_abort(ltx->tx); + trans_finished(lmdb, ltx); + ltx = lmdb_private_trans_head(lmdb); + } + lmdb->env = NULL; + + return 0; +} + +struct mdb_env_wrap { + struct mdb_env_wrap *next, *prev; + dev_t device; + ino_t inode; + MDB_env *env; + pid_t pid; +}; + +static struct mdb_env_wrap *mdb_list; + +/* destroy the last connection to an mdb */ +static int mdb_env_wrap_destructor(struct mdb_env_wrap *w) +{ + mdb_env_close(w->env); + DLIST_REMOVE(mdb_list, w); + return 0; +} + +static int lmdb_open_env(TALLOC_CTX *mem_ctx, + MDB_env **env, + struct ldb_context *ldb, + const char *path, + const size_t env_map_size, + unsigned int flags) +{ + int ret; + unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS; + /* + * MDB_NOSUBDIR implies there is a separate file called path and a + * separate lockfile called path-lock + */ + + struct mdb_env_wrap *w; + struct stat st; + pid_t pid = getpid(); + int fd = 0; + unsigned v; + + if (stat(path, &st) == 0) { + for (w=mdb_list;w;w=w->next) { + if (st.st_dev == w->device && + st.st_ino == w->inode && + pid == w->pid) { + /* + * We must have only one MDB_env per process + */ + if (!talloc_reference(mem_ctx, w)) { + return ldb_oom(ldb); + } + *env = w->env; + return LDB_SUCCESS; + } + } + } + + w = talloc(mem_ctx, struct mdb_env_wrap); + if (w == NULL) { + return ldb_oom(ldb); + } + + ret = mdb_env_create(env); + if (ret != 0) { + ldb_asprintf_errstring( + ldb, + "Could not create MDB environment %s: %s\n", + path, + mdb_strerror(ret)); + return ldb_mdb_err_map(ret); + } + + if (env_map_size > 0) { + ret = mdb_env_set_mapsize(*env, env_map_size); + if (ret != 0) { + ldb_asprintf_errstring( + ldb, + "Could not set MDB mmap() size to %llu " + "on %s: %s\n", + (unsigned long long)(env_map_size), + path, + mdb_strerror(ret)); + TALLOC_FREE(w); + return ldb_mdb_err_map(ret); + } + } + + mdb_env_set_maxreaders(*env, 100000); + /* + * As we ensure that there is only one MDB_env open per database per + * process. We can not use the MDB_RDONLY flag, as another ldb may be + * opened in read write mode + */ + if (flags & LDB_FLG_NOSYNC) { + mdb_flags |= MDB_NOSYNC; + } + ret = mdb_env_open(*env, path, mdb_flags, 0644); + if (ret != 0) { + ldb_asprintf_errstring(ldb, + "Could not open DB %s: %s\n", + path, mdb_strerror(ret)); + TALLOC_FREE(w); + return ldb_mdb_err_map(ret); + } + + { + MDB_envinfo stat = {0}; + ret = mdb_env_info (*env, &stat); + if (ret != 0) { + ldb_asprintf_errstring( + ldb, + "Could not get MDB environment stats %s: %s\n", + path, + mdb_strerror(ret)); + return ldb_mdb_err_map(ret); + } + } + + ret = mdb_env_get_fd(*env, &fd); + if (ret != 0) { + ldb_asprintf_errstring(ldb, + "Could not obtain DB FD %s: %s\n", + path, mdb_strerror(ret)); + TALLOC_FREE(w); + return ldb_mdb_err_map(ret); + } + + /* Just as for TDB: on exec, don't inherit the fd */ + v = fcntl(fd, F_GETFD, 0); + if (v == -1) { + TALLOC_FREE(w); + return LDB_ERR_OPERATIONS_ERROR; + } + + ret = fcntl(fd, F_SETFD, v | FD_CLOEXEC); + if (ret == -1) { + TALLOC_FREE(w); + return LDB_ERR_OPERATIONS_ERROR; + } + + if (fstat(fd, &st) != 0) { + ldb_asprintf_errstring( + ldb, + "Could not stat %s:\n", + path); + TALLOC_FREE(w); + return LDB_ERR_OPERATIONS_ERROR; + } + w->env = *env; + w->device = st.st_dev; + w->inode = st.st_ino; + w->pid = pid; + + talloc_set_destructor(w, mdb_env_wrap_destructor); + + DLIST_ADD(mdb_list, w); + + return LDB_SUCCESS; + +} + +static int lmdb_pvt_open(struct lmdb_private *lmdb, + struct ldb_context *ldb, + const char *path, + const size_t env_map_size, + unsigned int flags) +{ + int ret; + int lmdb_max_key_length; + + if (flags & LDB_FLG_DONT_CREATE_DB) { + struct stat st; + if (stat(path, &st) != 0) { + return LDB_ERR_UNAVAILABLE; + } + } + + ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, env_map_size, flags); + if (ret != 0) { + return ret; + } + + /* Close when lmdb is released */ + talloc_set_destructor(lmdb, lmdb_pvt_destructor); + + /* Store the original pid during the LMDB open */ + lmdb->pid = getpid(); + + lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env); + + /* This will never happen, but if it does make sure to freak out */ + if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) { + return ldb_operr(ldb); + } + + return LDB_SUCCESS; +} + +int lmdb_connect(struct ldb_context *ldb, + const char *url, + unsigned int flags, + const char *options[], + struct ldb_module **_module) +{ + const char *path = NULL; + struct lmdb_private *lmdb = NULL; + struct ldb_kv_private *ldb_kv = NULL; + int ret; + size_t env_map_size = 0; + + /* + * We hold locks, so we must use a private event context + * on each returned handle + */ + ldb_set_require_private_event_context(ldb); + + path = lmdb_get_path(url); + if (path == NULL) { + ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url); + return LDB_ERR_OPERATIONS_ERROR; + } + + ldb_kv = talloc_zero(ldb, struct ldb_kv_private); + if (!ldb_kv) { + ldb_oom(ldb); + return LDB_ERR_OPERATIONS_ERROR; + } + + lmdb = talloc_zero(ldb_kv, struct lmdb_private); + if (lmdb == NULL) { + TALLOC_FREE(ldb_kv); + return ldb_oom(ldb); + } + lmdb->ldb = ldb; + ldb_kv->kv_ops = &lmdb_key_value_ops; + + { + const char *size = ldb_options_find( + ldb, ldb->options, "lmdb_env_size"); + if (size != NULL) { + env_map_size = strtoull(size, NULL, 0); + } + } + + ret = lmdb_pvt_open(lmdb, ldb, path, env_map_size, flags); + if (ret != LDB_SUCCESS) { + TALLOC_FREE(ldb_kv); + return ret; + } + + ldb_kv->lmdb_private = lmdb; + if (flags & LDB_FLG_RDONLY) { + ldb_kv->read_only = true; + } + + /* + * This maximum length becomes encoded in the index values so + * must never change even if LMDB starts to allow longer keys. + * The override option is max_key_len_for_self_test, and is + * used for testing only. + */ + ldb_kv->max_key_length = LDB_MDB_MAX_KEY_LENGTH; + + return ldb_kv_init_store( + ldb_kv, "ldb_mdb backend", ldb, options, _module); +} |