diff options
Diffstat (limited to 'lib/cache/cdb_lmdb.c')
-rw-r--r-- | lib/cache/cdb_lmdb.c | 710 |
1 files changed, 710 insertions, 0 deletions
diff --git a/lib/cache/cdb_lmdb.c b/lib/cache/cdb_lmdb.c new file mode 100644 index 0000000..621d2e8 --- /dev/null +++ b/lib/cache/cdb_lmdb.c @@ -0,0 +1,710 @@ +/* Copyright (C) 2016-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <fcntl.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <lmdb.h> + +#include "contrib/cleanup.h" +#include "lib/cache/cdb_lmdb.h" +#include "lib/cache/api.h" +#include "lib/utils.h" + + +/* Defines */ +#define LMDB_DIR_MODE 0770 +#define LMDB_FILE_MODE 0660 + +struct lmdb_env +{ + size_t mapsize; + MDB_dbi dbi; + MDB_env *env; + + /** Cached transactions + * + * - only one of (ro,rw) may be active at once + * - non-NULL .ro may be active or reset + * - non-NULL .rw is always active + */ + struct { + bool ro_active, ro_curs_active; + MDB_txn *ro, *rw; + MDB_cursor *ro_curs; + } txn; +}; + +/** @brief Convert LMDB error code. */ +static int lmdb_error(int error) +{ + /* _BAD_TXN may happen with overfull DB, + * even during mdb_get with a single fork :-/ */ + if (error == MDB_BAD_TXN) { + kr_log_info("[cache] MDB_BAD_TXN, probably overfull\n"); + error = ENOSPC; + } + switch (error) { + case MDB_SUCCESS: + return kr_ok(); + case MDB_NOTFOUND: + return kr_error(ENOENT); + case ENOSPC: + case MDB_MAP_FULL: + case MDB_TXN_FULL: + return kr_error(ENOSPC); + default: + kr_log_error("[cache] LMDB error: %s\n", mdb_strerror(error)); + return kr_error(error); + } +} + +/** Conversion between knot and lmdb structs for values. */ +static inline knot_db_val_t val_mdb2knot(MDB_val v) +{ + return (knot_db_val_t){ .len = v.mv_size, .data = v.mv_data }; +} +static inline MDB_val val_knot2mdb(knot_db_val_t v) +{ + return (MDB_val){ .mv_size = v.len, .mv_data = v.data }; +} + + +/*! \brief Set the environment map size. + * \note This also sets the maximum database size, see \fn mdb_env_set_mapsize + */ +static int set_mapsize(MDB_env *env, size_t map_size) +{ + long page_size = sysconf(_SC_PAGESIZE); + if (page_size <= 0) { + return KNOT_ERROR; + } + + /* Round to page size. */ + map_size = (map_size / page_size) * page_size; + int ret = mdb_env_set_mapsize(env, map_size); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + + return 0; +} + +#define FLAG_RENEW (2*MDB_RDONLY) +/** mdb_txn_begin or _renew + handle MDB_MAP_RESIZED. + * + * The retrying logic for MDB_MAP_RESIZED is so ugly that it has its own function. + * \note this assumes no transactions are active + * \return MDB_ errcode, not usual kr_error(...) + */ +static int txn_get_noresize(struct lmdb_env *env, unsigned int flag, MDB_txn **txn) +{ + assert(!env->txn.rw && (!env->txn.ro || !env->txn.ro_active)); + int ret; + if (flag == FLAG_RENEW) { + ret = mdb_txn_renew(*txn); + } else { + ret = mdb_txn_begin(env->env, NULL, flag, txn); + } + if (ret != MDB_MAP_RESIZED) { + return ret; + } + //:unlikely + /* Another process increased the size; let's try to recover. */ + kr_log_info("[cache] detected size increased by another process\n"); + ret = mdb_env_set_mapsize(env->env, 0); + if (ret != MDB_SUCCESS) { + return ret; + } + if (flag == FLAG_RENEW) { + ret = mdb_txn_renew(*txn); + } else { + ret = mdb_txn_begin(env->env, NULL, flag, txn); + } + return ret; +} + +/** Obtain a transaction. (they're cached in env->txn) */ +static int txn_get(struct lmdb_env *env, MDB_txn **txn, bool rdonly) +{ + assert(env && txn); + if (env->txn.rw) { + /* Reuse the *open* RW txn even if only reading is requested. + * We leave the management of this to the cdb_sync command. + * The user may e.g. want to do some reads between the writes. */ + *txn = env->txn.rw; + return kr_ok(); + } + + if (!rdonly) { + /* avoid two active transactions */ + if (env->txn.ro && env->txn.ro_active) { + mdb_txn_reset(env->txn.ro); + env->txn.ro_active = false; + env->txn.ro_curs_active = false; + } + int ret = txn_get_noresize(env, 0/*RW*/, &env->txn.rw); + if (ret == MDB_SUCCESS) { + *txn = env->txn.rw; + assert(*txn); + } + return lmdb_error(ret); + } + + /* Get an active RO txn and return it. */ + int ret = MDB_SUCCESS; + if (!env->txn.ro) { //:unlikely + ret = txn_get_noresize(env, MDB_RDONLY, &env->txn.ro); + } else if (!env->txn.ro_active) { + ret = txn_get_noresize(env, FLAG_RENEW, &env->txn.ro); + } + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + env->txn.ro_active = true; + *txn = env->txn.ro; + assert(*txn); + return kr_ok(); +} + +static int cdb_sync(knot_db_t *db) +{ + struct lmdb_env *env = db; + int ret = kr_ok(); + if (env->txn.rw) { + ret = lmdb_error(mdb_txn_commit(env->txn.rw)); + env->txn.rw = NULL; /* the transaction got freed even in case of errors */ + } else if (env->txn.ro && env->txn.ro_active) { + mdb_txn_reset(env->txn.ro); + env->txn.ro_active = false; + env->txn.ro_curs_active = false; + } + return ret; +} + +/** Obtain a read-only cursor (and a read-only transaction). */ +static int txn_curs_get(struct lmdb_env *env, MDB_cursor **curs) +{ + assert(env && curs); + if (env->txn.ro_curs_active) { + goto success; + } + /* Only in a read-only txn; TODO: it's a bit messy/coupled */ + if (env->txn.rw) { + int ret = cdb_sync(env); + if (ret) return ret; + } + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, true); + if (ret) return ret; + + if (env->txn.ro_curs) { + ret = mdb_cursor_renew(txn, env->txn.ro_curs); + } else { + ret = mdb_cursor_open(txn, env->dbi, &env->txn.ro_curs); + } + if (ret) return ret; + env->txn.ro_curs_active = true; +success: + assert(env->txn.ro_curs_active && env->txn.ro && env->txn.ro_active + && !env->txn.rw); + *curs = env->txn.ro_curs; + assert(*curs); + return kr_ok(); +} + +static void free_txn_ro(struct lmdb_env *env) +{ + if (env->txn.ro) { + mdb_txn_abort(env->txn.ro); + env->txn.ro = NULL; + } + if (env->txn.ro_curs) { + mdb_cursor_close(env->txn.ro_curs); + env->txn.ro_curs = NULL; + } +} + +/*! \brief Close the database. */ +static void cdb_close_env(struct lmdb_env *env) +{ + assert(env && env->env); + + /* Get rid of any transactions. */ + cdb_sync(env); + free_txn_ro(env); + + mdb_env_sync(env->env, 1); + mdb_dbi_close(env->env, env->dbi); + mdb_env_close(env->env); + memset(env, 0, sizeof(*env)); +} + +/*! \brief Open database environment. */ +static int cdb_open_env(struct lmdb_env *env, unsigned flags, const char *path, size_t mapsize) +{ + int ret = mkdir(path, LMDB_DIR_MODE); + if (ret == -1 && errno != EEXIST) { + return kr_error(errno); + } + + MDB_env *mdb_env = NULL; + ret = mdb_env_create(&mdb_env); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + + ret = set_mapsize(mdb_env, mapsize); + if (ret != 0) { + mdb_env_close(mdb_env); + return ret; + } + + ret = mdb_env_open(mdb_env, path, flags, LMDB_FILE_MODE); + if (ret != MDB_SUCCESS) { + mdb_env_close(mdb_env); + return lmdb_error(ret); + } + + /* Keep the environment pointer. */ + env->env = mdb_env; + env->mapsize = mapsize; + return 0; +} + +static int cdb_open(struct lmdb_env *env, const char *path, size_t mapsize) +{ + /* Cache doesn't require durability, we can be + * loose with the requirements as a tradeoff for speed. */ + const unsigned flags = MDB_WRITEMAP | MDB_MAPASYNC | MDB_NOTLS; + int ret = cdb_open_env(env, flags, path, mapsize); + if (ret != 0) { + return ret; + } + + /* Open the database. */ + MDB_txn *txn = NULL; + ret = mdb_txn_begin(env->env, NULL, 0, &txn); + if (ret != MDB_SUCCESS) { + mdb_env_close(env->env); + return lmdb_error(ret); + } + + ret = mdb_dbi_open(txn, NULL, 0, &env->dbi); + if (ret != MDB_SUCCESS) { + mdb_txn_abort(txn); + mdb_env_close(env->env); + return lmdb_error(ret); + } + + ret = mdb_txn_commit(txn); + if (ret != MDB_SUCCESS) { + mdb_env_close(env->env); + return lmdb_error(ret); + } + + return 0; +} + +static int cdb_init(knot_db_t **db, struct kr_cdb_opts *opts, knot_mm_t *pool) +{ + if (!db || !opts) { + return kr_error(EINVAL); + } + + struct lmdb_env *env = malloc(sizeof(*env)); + if (!env) { + return kr_error(ENOMEM); + } + memset(env, 0, sizeof(struct lmdb_env)); + + /* Clear stale lockfiles. */ + auto_free char *lockfile = kr_strcatdup(2, opts->path, "/.cachelock"); + if (lockfile) { + if (unlink(lockfile) == 0) { + kr_log_info("[cache] cleared stale lockfile '%s'\n", lockfile); + } else if (errno != ENOENT) { + kr_log_info("[cache] failed to clear stale lockfile '%s': %s\n", lockfile, + strerror(errno)); + } + } + + /* Open the database. */ + int ret = cdb_open(env, opts->path, opts->maxsize); + if (ret != 0) { + free(env); + return ret; + } + + *db = env; + return 0; +} + +static void cdb_deinit(knot_db_t *db) +{ + struct lmdb_env *env = db; + cdb_close_env(env); + free(env); +} + +static int cdb_count(knot_db_t *db) +{ + struct lmdb_env *env = db; + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, true); + if (ret != 0) { + return ret; + } + + MDB_stat stat; + ret = mdb_stat(txn, env->dbi, &stat); + + return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret); +} + +static int cdb_clear(knot_db_t *db) +{ + struct lmdb_env *env = db; + /* First try mdb_drop() to clear the DB; this may fail with ENOSPC. */ + /* If we didn't do this, explicit cache.clear() ran on an instance + * would lead to the instance detaching from the cache of others, + * until they reopened cache explicitly or cleared it for some reason. + */ + { + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, false); + if (ret == kr_ok()) { + ret = lmdb_error(mdb_drop(txn, env->dbi, 0)); + if (ret == kr_ok()) { + ret = cdb_sync(db); + } + if (ret == kr_ok()) { + return ret; + } + } + kr_log_info("[cache] clearing error, falling back\n"); + } + + /* We are about to switch to a different file, so end all txns, to be sure. */ + (void) cdb_sync(db); + free_txn_ro(db); + + /* Since there is no guarantee that there will be free + * pages to hold whole dirtied db for transaction-safe clear, + * we simply remove the database files and reopen. + * We can afford this since other readers will continue to read + * from removed file, but will reopen when encountering next + * error. */ + mdb_filehandle_t fd = -1; + int ret = mdb_env_get_fd(env->env, &fd); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + const char *path = NULL; + ret = mdb_env_get_path(env->env, &path); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + + auto_free char *mdb_datafile = kr_strcatdup(2, path, "/data.mdb"); + auto_free char *mdb_lockfile = kr_strcatdup(2, path, "/lock.mdb"); + auto_free char *lockfile = kr_strcatdup(2, path, "/.cachelock"); + if (!mdb_datafile || !mdb_lockfile || !lockfile) { + return kr_error(ENOMEM); + } + /* Find if we get a lock on lockfile. */ + ret = open(lockfile, O_CREAT|O_EXCL|O_RDONLY, S_IRUSR); + if (ret == -1) { + kr_log_error("[cache] clearing failed to get ./.cachelock; retry later\n"); + /* As we're out of space (almost certainly - mdb_drop didn't work), + * we will retry on the next failing write operation. */ + return kr_error(errno); + } + close(ret); + /* We acquired lockfile. Now find whether *.mdb are what we have open now. */ + struct stat old_stat, new_stat; + if (fstat(fd, &new_stat) || stat(mdb_datafile, &old_stat)) { + ret = errno; + unlink(lockfile); + return kr_error(ret); + } + /* Remove underlying files only if current open environment + * points to file on the disk. Otherwise just reopen as someone + * else has already removed the files. + */ + if (old_stat.st_dev == new_stat.st_dev && old_stat.st_ino == new_stat.st_ino) { + kr_log_verbose("[cache] clear: identical files, unlinking\n"); + // coverity[toctou] + unlink(mdb_datafile); + unlink(mdb_lockfile); + } else + kr_log_verbose("[cache] clear: not identical files, reopening\n"); + /* Keep copy as it points to current handle internals. */ + auto_free char *path_copy = strdup(path); + size_t mapsize = env->mapsize; + cdb_close_env(env); + ret = cdb_open(env, path_copy, mapsize); + /* Environment updated, release lockfile. */ + unlink(lockfile); + return ret; +} + +static int cdb_readv(knot_db_t *db, const knot_db_val_t *key, knot_db_val_t *val, + int maxcount) +{ + struct lmdb_env *env = db; + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, true); + if (ret) { + return ret; + } + + for (int i = 0; i < maxcount; ++i) { + /* Convert key structs */ + MDB_val _key = val_knot2mdb(key[i]); + MDB_val _val = val_knot2mdb(val[i]); + ret = mdb_get(txn, env->dbi, &_key, &_val); + if (ret != MDB_SUCCESS) { + ret = lmdb_error(ret); + if (ret == kr_error(ENOSPC)) { + /* we're likely to be forced to cache clear anyway */ + ret = kr_error(ENOENT); + } + return ret; + } + /* Update the result. */ + val[i] = val_mdb2knot(_val); + } + return kr_ok(); +} + +static int cdb_write(struct lmdb_env *env, MDB_txn **txn, const knot_db_val_t *key, + knot_db_val_t *val, unsigned flags) +{ + /* Convert key structs and write */ + MDB_val _key = val_knot2mdb(*key); + MDB_val _val = val_knot2mdb(*val); + int ret = mdb_put(*txn, env->dbi, &_key, &_val, flags); + + /* Try to recover from doing too much writing in a single transaction. */ + if (ret == MDB_TXN_FULL) { + ret = cdb_sync(env); + if (ret) { + ret = txn_get(env, txn, false); + } + if (ret) { + ret = mdb_put(*txn, env->dbi, &_key, &_val, flags); + } + } + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + + /* Update the result. */ + val->data = _val.mv_data; + val->len = _val.mv_size; + return kr_ok(); +} + +static int cdb_writev(knot_db_t *db, const knot_db_val_t *key, knot_db_val_t *val, + int maxcount) +{ + struct lmdb_env *env = db; + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, false); + + for (int i = 0; ret == kr_ok() && i < maxcount; ++i) { + /* This is LMDB specific optimisation, + * if caller specifies value with NULL data and non-zero length, + * LMDB will preallocate the entry for caller and leave write + * transaction open, caller is responsible for syncing thus committing transaction. + */ + unsigned mdb_flags = 0; + if (val[i].len > 0 && val[i].data == NULL) { + mdb_flags |= MDB_RESERVE; + } + ret = cdb_write(env, &txn, &key[i], &val[i], mdb_flags); + } + + return ret; +} + +static int cdb_remove(knot_db_t *db, knot_db_val_t keys[], int maxcount) +{ + struct lmdb_env *env = db; + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, false); + int deleted = 0; + + for (int i = 0; ret == kr_ok() && i < maxcount; ++i) { + MDB_val _key = val_knot2mdb(keys[i]); + MDB_val val = { 0, NULL }; + ret = lmdb_error(mdb_del(txn, env->dbi, &_key, &val)); + if (ret == kr_ok()) + deleted++; + else if (ret == KNOT_ENOENT) + ret = kr_ok(); /* skip over non-existing entries */ + } + + return ret < 0 ? ret : deleted; +} + +static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t keyval[][2], int maxcount) +{ + struct lmdb_env *env = db; + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, true); + if (ret != 0) { + return ret; + } + + /* LATER(optim.): use txn_curs_get() instead, to save resources. */ + MDB_cursor *cur = NULL; + ret = mdb_cursor_open(txn, env->dbi, &cur); + if (ret != 0) { + return lmdb_error(ret); + } + + MDB_val cur_key = val_knot2mdb(*key); + MDB_val cur_val = { 0, NULL }; + ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE); + if (ret != MDB_SUCCESS) { + mdb_cursor_close(cur); + return lmdb_error(ret); + } + + int results = 0; + while (ret == MDB_SUCCESS) { + /* Retrieve current key and compare with prefix */ + if (cur_key.mv_size < key->len || memcmp(cur_key.mv_data, key->data, key->len) != 0) { + break; + } + /* Add to result set */ + if (results < maxcount) { + keyval[results][0] = val_mdb2knot(cur_key); + keyval[results][1] = val_mdb2knot(cur_val); + ++results; + } else { + break; + } + ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT); + } + + mdb_cursor_close(cur); + return results; +} + + +static int cdb_prune(knot_db_t *db, int limit) +{ + return -1; +#if 0 + /* Sync in-flight transactions */ + cdb_sync(db); + + /* Prune old records */ + struct lmdb_env *env = db; + MDB_txn *txn = NULL; + int ret = txn_get(env, &txn, false); + if (ret != 0) { + return ret; + } + + MDB_cursor *cur = NULL; + ret = mdb_cursor_open(txn, env->dbi, &cur); + if (ret != 0) { + return lmdb_error(ret); + } + + MDB_val cur_key, cur_val; + ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_FIRST); + if (ret != 0) { + mdb_cursor_close(cur); + return lmdb_error(ret); + } + + int results = 0; + struct timeval now; + gettimeofday(&now, NULL); + while (ret == 0 && results < limit) { + /* Ignore special namespaces. */ + if (cur_key.mv_size < 2 || ((const char *)cur_key.mv_data)[0] == 'V') { + ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT); + continue; + } + /* Check entry age. */ + struct kr_cache_entry *entry = cur_val.mv_data; + if (entry->timestamp > now.tv_sec || + (now.tv_sec - entry->timestamp) < entry->ttl) { + ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT); + continue; + } + /* Remove entry */ + mdb_cursor_del(cur, 0); + ++results; + ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT); + } + mdb_cursor_close(cur); + return ret < 0 ? ret : results; +#endif +} + +static int cdb_read_leq(knot_db_t *env, knot_db_val_t *key, knot_db_val_t *val) +{ + assert(env && key && key->data && val); + MDB_cursor *curs = NULL; + int ret = txn_curs_get(env, &curs); + if (ret) return ret; + + MDB_val key2_m = val_knot2mdb(*key); + MDB_val val2_m = { 0, NULL }; + ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_SET_RANGE); + if (ret) return lmdb_error(ret); + /* test for equality //:unlikely */ + if (key2_m.mv_size == key->len + && memcmp(key2_m.mv_data, key->data, key->len) == 0) { + ret = 0; /* equality */ + goto success; + } + /* we must be greater than key; do one step to smaller */ + ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_PREV); + if (ret) return lmdb_error(ret); + ret = 1; +success: + /* finalize the output */ + *key = val_mdb2knot(key2_m); + *val = val_mdb2knot(val2_m); + return ret; +} + + +const struct kr_cdb_api *kr_cdb_lmdb(void) +{ + static const struct kr_cdb_api api = { + "lmdb", + cdb_init, cdb_deinit, cdb_count, cdb_clear, cdb_sync, + cdb_readv, cdb_writev, cdb_remove, + cdb_match, cdb_prune, + cdb_read_leq + }; + + return &api; +} |