summaryrefslogtreecommitdiffstats
path: root/lib/cache/cdb_lmdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cache/cdb_lmdb.c')
-rw-r--r--lib/cache/cdb_lmdb.c868
1 files changed, 868 insertions, 0 deletions
diff --git a/lib/cache/cdb_lmdb.c b/lib/cache/cdb_lmdb.c
new file mode 100644
index 0000000..80c7372
--- /dev/null
+++ b/lib/cache/cdb_lmdb.c
@@ -0,0 +1,868 @@
+/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
+ * SPDX-License-Identifier: GPL-3.0-or-later
+*/
+
+#include <fcntl.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <lmdb.h>
+
+#include "contrib/cleanup.h"
+#include "contrib/ucw/lib.h"
+#include "lib/cache/cdb_lmdb.h"
+#include "lib/cache/cdb_api.h"
+#include "lib/utils.h"
+
+
+/* Defines */
+#define LMDB_DIR_MODE 0770
+#define LMDB_FILE_MODE 0660
+
+/* TODO: we rely on mirrors of these two structs not changing layout
+ * in libknot and knot resolver! */
+struct lmdb_env
+{
+ size_t mapsize;
+ MDB_dbi dbi;
+ MDB_env *env;
+
+ /** Cached transactions
+ *
+ * - only one of (ro,rw) may be active at once
+ * - non-NULL .ro may be active or reset
+ * - non-NULL .rw is always active
+ */
+ struct {
+ bool ro_active, ro_curs_active;
+ MDB_txn *ro, *rw;
+ MDB_cursor *ro_curs;
+ } txn;
+
+ /* Cached part of struct stat for data.mdb. */
+ dev_t st_dev;
+ ino_t st_ino;
+ off_t st_size;
+ const char *mdb_data_path; /**< path to data.mdb, for convenience */
+};
+
+struct libknot_lmdb_env {
+ bool shared;
+ unsigned dbi;
+ void *env;
+ knot_mm_t *pool;
+};
+
+/** Type-safe conversion helper.
+ *
+ * We keep lmdb_env as a separate type from kr_db_pt, as different implementation of API
+ * would need to define the contents differently.
+ */
+static inline struct lmdb_env * db2env(kr_cdb_pt db)
+{
+ return (struct lmdb_env *)db;
+}
+static inline kr_cdb_pt env2db(struct lmdb_env *env)
+{
+ return (kr_cdb_pt)env;
+}
+
+static int cdb_commit(kr_cdb_pt db, struct kr_cdb_stats *stats);
+
+/** @brief Convert LMDB error code. */
+static int lmdb_error(int error)
+{
+ switch (error) {
+ case MDB_SUCCESS:
+ return kr_ok();
+ case MDB_NOTFOUND:
+ return kr_error(ENOENT);
+ case ENOSPC:
+ case MDB_MAP_FULL:
+ case MDB_TXN_FULL:
+ return kr_error(ENOSPC);
+ default:
+ kr_log_error(CACHE, "LMDB error: %s\n", mdb_strerror(error));
+ return kr_error(error);
+ }
+}
+
+/** Conversion between knot and lmdb structs for values. */
+static inline knot_db_val_t val_mdb2knot(MDB_val v)
+{
+ return (knot_db_val_t){ .len = v.mv_size, .data = v.mv_data };
+}
+static inline MDB_val val_knot2mdb(knot_db_val_t v)
+{
+ return (MDB_val){ .mv_size = v.len, .mv_data = v.data };
+}
+
+/** Refresh mapsize value from file, including env->mapsize.
+ * It's much lighter than reopen_env(). */
+static int refresh_mapsize(struct lmdb_env *env)
+{
+ int ret = cdb_commit(env2db(env), NULL);
+ if (!ret) ret = lmdb_error(mdb_env_set_mapsize(env->env, 0));
+ if (ret) return ret;
+
+ MDB_envinfo info;
+ ret = lmdb_error(mdb_env_info(env->env, &info));
+ if (ret) return ret;
+
+ env->mapsize = info.me_mapsize;
+ if (env->mapsize != env->st_size) {
+ kr_log_info(CACHE, "suspicious size of cache file '%s'"
+ ": file size %zu != LMDB map size %zu\n",
+ env->mdb_data_path, (size_t)env->st_size, env->mapsize);
+ }
+ return kr_ok();
+}
+
+static void clear_stale_readers(struct lmdb_env *env)
+{
+ int cleared;
+ int ret = mdb_reader_check(env->env, &cleared);
+ if (ret != MDB_SUCCESS) {
+ kr_log_error(CACHE, "failed to clear stale reader locks: "
+ "LMDB error %d %s\n", ret, mdb_strerror(ret));
+ } else if (cleared != 0) {
+ kr_log_info(CACHE, "cleared %d stale reader locks\n", cleared);
+ }
+}
+
+#define FLAG_RENEW (2*MDB_RDONLY)
+/** mdb_txn_begin or _renew + handle retries in some situations
+ *
+ * The retrying logic is so ugly that it has its own function.
+ * \note this assumes no transactions are active
+ * \return MDB_ errcode, not usual kr_error(...)
+ */
+static int txn_get_noresize(struct lmdb_env *env, unsigned int flag, MDB_txn **txn)
+{
+ if (kr_fails_assert(!env->txn.rw && (!env->txn.ro || !env->txn.ro_active)))
+ return kr_error(1);
+ int attempts = 0;
+ int ret;
+retry:
+ /* Do a few attempts in case we encounter multiple issues at once. */
+ if (++attempts > 2)
+ return kr_error(1);
+
+ if (flag == FLAG_RENEW) {
+ ret = mdb_txn_renew(*txn);
+ } else {
+ ret = mdb_txn_begin(env->env, NULL, flag, txn);
+ }
+
+ if (unlikely(ret == MDB_MAP_RESIZED)) {
+ kr_log_info(CACHE, "detected size increased by another process\n");
+ ret = refresh_mapsize(env);
+ if (ret == 0)
+ goto retry;
+ } else if (unlikely(ret == MDB_READERS_FULL)) {
+ clear_stale_readers(env);
+ goto retry;
+ }
+ return ret;
+}
+
+/** Obtain a transaction. (they're cached in env->txn) */
+static int txn_get(struct lmdb_env *env, MDB_txn **txn, bool rdonly)
+{
+ if (kr_fails_assert(env && txn))
+ return kr_error(EINVAL);
+ if (env->txn.rw) {
+ /* Reuse the *open* RW txn even if only reading is requested.
+ * We leave the management of this to the cdb_commit command.
+ * The user may e.g. want to do some reads between the writes. */
+ *txn = env->txn.rw;
+ return kr_ok();
+ }
+
+ if (!rdonly) {
+ /* avoid two active transactions */
+ if (env->txn.ro && env->txn.ro_active) {
+ mdb_txn_reset(env->txn.ro);
+ env->txn.ro_active = false;
+ env->txn.ro_curs_active = false;
+ }
+ int ret = txn_get_noresize(env, 0/*RW*/, &env->txn.rw);
+ if (ret == MDB_SUCCESS) {
+ *txn = env->txn.rw;
+ kr_assert(*txn);
+ }
+ return lmdb_error(ret);
+ }
+
+ /* Get an active RO txn and return it. */
+ int ret = MDB_SUCCESS;
+ if (!env->txn.ro) { //:unlikely
+ ret = txn_get_noresize(env, MDB_RDONLY, &env->txn.ro);
+ } else if (!env->txn.ro_active) {
+ ret = txn_get_noresize(env, FLAG_RENEW, &env->txn.ro);
+ }
+ if (ret != MDB_SUCCESS) {
+ return lmdb_error(ret);
+ }
+ env->txn.ro_active = true;
+ *txn = env->txn.ro;
+ kr_assert(*txn);
+ return kr_ok();
+}
+
+static int cdb_commit(kr_cdb_pt db, struct kr_cdb_stats *stats)
+{
+ struct lmdb_env *env = db2env(db);
+ int ret = kr_ok();
+ if (env->txn.rw) {
+ if (stats) stats->commit++;
+ ret = lmdb_error(mdb_txn_commit(env->txn.rw));
+ env->txn.rw = NULL; /* the transaction got freed even in case of errors */
+ } else if (env->txn.ro && env->txn.ro_active) {
+ mdb_txn_reset(env->txn.ro);
+ env->txn.ro_active = false;
+ env->txn.ro_curs_active = false;
+ }
+ return ret;
+}
+
+/** Obtain a read-only cursor (and a read-only transaction). */
+static int txn_curs_get(struct lmdb_env *env, MDB_cursor **curs, struct kr_cdb_stats *stats)
+{
+ if (kr_fails_assert(env && curs))
+ return kr_error(EINVAL);
+ if (env->txn.ro_curs_active)
+ goto success;
+ /* Only in a read-only txn; TODO: it's a bit messy/coupled */
+ if (env->txn.rw) {
+ int ret = cdb_commit(env2db(env), stats);
+ if (ret) return ret;
+ }
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, true);
+ if (ret) return ret;
+
+ if (env->txn.ro_curs) {
+ ret = mdb_cursor_renew(txn, env->txn.ro_curs);
+ } else {
+ ret = mdb_cursor_open(txn, env->dbi, &env->txn.ro_curs);
+ }
+ if (ret) return lmdb_error(ret);
+ env->txn.ro_curs_active = true;
+success:
+ kr_assert(env->txn.ro_curs_active && env->txn.ro && env->txn.ro_active
+ && !env->txn.rw);
+ *curs = env->txn.ro_curs;
+ kr_assert(*curs);
+ return kr_ok();
+}
+
+static void txn_free_ro(struct lmdb_env *env)
+{
+ if (env->txn.ro_curs) {
+ mdb_cursor_close(env->txn.ro_curs);
+ env->txn.ro_curs = NULL;
+ }
+ if (env->txn.ro) {
+ mdb_txn_abort(env->txn.ro);
+ env->txn.ro = NULL;
+ }
+}
+
+/** Abort all transactions.
+ *
+ * This is useful after an error happens, as those (always?) require abortion.
+ * It's possible that _reset() would suffice and marking cursor inactive,
+ * but these errors should be rare so let's close them completely. */
+static void txn_abort(struct lmdb_env *env)
+{
+ txn_free_ro(env);
+ if (env->txn.rw) {
+ mdb_txn_abort(env->txn.rw);
+ env->txn.rw = NULL; /* the transaction got freed even in case of errors */
+ }
+}
+
+/*! \brief Close the database. */
+static void cdb_close_env(struct lmdb_env *env, struct kr_cdb_stats *stats)
+{
+ if (kr_fails_assert(env && env->env))
+ return;
+
+ /* Get rid of any transactions. */
+ txn_free_ro(env);
+ cdb_commit(env2db(env), stats);
+
+ mdb_env_sync(env->env, 1);
+ stats->close++;
+ mdb_dbi_close(env->env, env->dbi);
+ mdb_env_close(env->env);
+ free_const(env->mdb_data_path);
+ memset(env, 0, sizeof(*env));
+}
+
+/** We assume that *env is zeroed and we return it zeroed on errors. */
+static int cdb_open_env(struct lmdb_env *env, const char *path, const size_t mapsize,
+ struct kr_cdb_stats *stats)
+{
+ int ret = mkdir(path, LMDB_DIR_MODE);
+ if (ret && errno != EEXIST) return kr_error(errno);
+
+ stats->open++;
+ ret = mdb_env_create(&env->env);
+ if (ret != MDB_SUCCESS) return lmdb_error(ret);
+
+ env->mdb_data_path = kr_absolutize_path(path, "data.mdb");
+ if (!env->mdb_data_path) {
+ ret = ENOMEM;
+ goto error_sys;
+ }
+
+ /* Set map size, rounded to page size. */
+ errno = 0;
+ const long pagesize = sysconf(_SC_PAGESIZE);
+ if (errno) {
+ ret = errno;
+ goto error_sys;
+ }
+
+ const bool size_requested = mapsize;
+ if (size_requested) {
+ env->mapsize = (mapsize / pagesize) * pagesize;
+ ret = mdb_env_set_mapsize(env->env, env->mapsize);
+ if (ret != MDB_SUCCESS) goto error_mdb;
+ }
+
+ /* Cache doesn't require durability, we can be
+ * loose with the requirements as a tradeoff for speed. */
+ const unsigned flags = MDB_WRITEMAP | MDB_MAPASYNC | MDB_NOTLS;
+ ret = mdb_env_open(env->env, path, flags, LMDB_FILE_MODE);
+ if (ret != MDB_SUCCESS) goto error_mdb;
+
+ mdb_filehandle_t fd = -1;
+ ret = mdb_env_get_fd(env->env, &fd);
+ if (ret != MDB_SUCCESS) goto error_mdb;
+
+ struct stat st;
+ if (fstat(fd, &st)) {
+ ret = errno;
+ goto error_sys;
+ }
+ env->st_dev = st.st_dev;
+ env->st_ino = st.st_ino;
+ env->st_size = st.st_size;
+
+ /* Get the real mapsize. Shrinking can be restricted, etc.
+ * Unfortunately this is only reliable when not setting the size explicitly. */
+ if (!size_requested) {
+ ret = refresh_mapsize(env);
+ if (ret) goto error_sys;
+ }
+
+ /* Open the database. */
+ MDB_txn *txn = NULL;
+ ret = mdb_txn_begin(env->env, NULL, 0, &txn);
+ if (ret != MDB_SUCCESS) goto error_mdb;
+
+ ret = mdb_dbi_open(txn, NULL, 0, &env->dbi);
+ if (ret != MDB_SUCCESS) {
+ mdb_txn_abort(txn);
+ goto error_mdb;
+ }
+
+#if !defined(__MACOSX__) && !(defined(__APPLE__) && defined(__MACH__))
+ if (size_requested) {
+ ret = posix_fallocate(fd, 0, MAX(env->mapsize, env->st_size));
+ } else {
+ ret = 0;
+ }
+ if (ret == EINVAL || ret == EOPNOTSUPP) {
+ /* POSIX says this can happen when the feature isn't supported by the FS.
+ * We haven't seen this happen on Linux+glibc but it was reported on
+ * Linux+musl and FreeBSD. */
+ kr_log_info(CACHE, "space pre-allocation failed and ignored; "
+ "your (file)system probably doesn't support it.\n");
+ } else if (ret != 0) {
+ mdb_txn_abort(txn);
+ goto error_sys;
+ }
+#endif
+
+ stats->commit++;
+ ret = mdb_txn_commit(txn);
+ if (ret != MDB_SUCCESS) goto error_mdb;
+
+ /* Stale RO transactions could have been left behind by a cashing process
+ * (e.g. one whose termination lead to spawning the current one).
+ * According to docs they might hold onto some space until we clear them. */
+ clear_stale_readers(env);
+
+ return kr_ok();
+
+error_mdb:
+ ret = lmdb_error(ret);
+error_sys:
+ free_const(env->mdb_data_path);
+ stats->close++;
+ mdb_env_close(env->env);
+ memset(env, 0, sizeof(*env));
+ return kr_error(ret);
+}
+
+static int cdb_init(kr_cdb_pt *db, struct kr_cdb_stats *stats,
+ struct kr_cdb_opts *opts, knot_mm_t *pool)
+{
+ if (!db || !stats || !opts) {
+ return kr_error(EINVAL);
+ }
+
+ /* Open the database. */
+ struct lmdb_env *env = calloc(1, sizeof(*env));
+ if (!env) {
+ return kr_error(ENOMEM);
+ }
+ int ret = cdb_open_env(env, opts->path, opts->maxsize, stats);
+ if (ret != 0) {
+ free(env);
+ return ret;
+ }
+
+ *db = env2db(env);
+ return 0;
+}
+
+static void cdb_deinit(kr_cdb_pt db, struct kr_cdb_stats *stats)
+{
+ cdb_close_env(db2env(db), stats);
+ free(db);
+}
+
+static int cdb_count(kr_cdb_pt db, struct kr_cdb_stats *stats)
+{
+ struct lmdb_env *env = db2env(db);
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, true);
+ if (ret != 0) {
+ return ret;
+ }
+
+ MDB_stat stat;
+ stats->count++;
+ ret = mdb_stat(txn, env->dbi, &stat);
+
+ if (ret == MDB_SUCCESS) {
+ return stat.ms_entries;
+ } else {
+ txn_abort(env);
+ return lmdb_error(ret);
+ }
+}
+
+static int reopen_env(struct lmdb_env *env, struct kr_cdb_stats *stats, const size_t mapsize)
+{
+ /* Keep copy as it points to current handle internals. */
+ const char *path;
+ int ret = mdb_env_get_path(env->env, &path);
+ if (ret != MDB_SUCCESS) {
+ return lmdb_error(ret);
+ }
+ auto_free char *path_copy = strdup(path);
+ cdb_close_env(env, stats);
+ return cdb_open_env(env, path_copy, mapsize, stats);
+}
+
+static int cdb_check_health(kr_cdb_pt db, struct kr_cdb_stats *stats)
+{
+ struct lmdb_env *env = db2env(db);
+
+ struct stat st;
+ if (stat(env->mdb_data_path, &st)) {
+ int ret = errno;
+ return kr_error(ret);
+ }
+
+ if (st.st_dev != env->st_dev || st.st_ino != env->st_ino) {
+ kr_log_debug(CACHE, "cache file has been replaced, reopening\n");
+ int ret = reopen_env(env, stats, 0); // we accept mapsize from the new file
+ return ret == 0 ? 1 : ret;
+ }
+
+ /* Cache check through file size works OK without reopening,
+ * contrary to methods based on mdb_env_info(). */
+ if (st.st_size == env->st_size)
+ return kr_ok();
+ kr_log_info(CACHE, "detected size change (by another instance?) of file '%s'"
+ ": file size %zu -> file size %zu\n",
+ env->mdb_data_path, (size_t)env->st_size, (size_t)st.st_size);
+ env->st_size = st.st_size; // avoid retrying in cycle even if we fail
+ return refresh_mapsize(env);
+}
+
+/** Obtain exclusive (advisory) lock by creating a file, returning FD or negative kr_error().
+ * The lock is auto-released by OS in case the process finishes in any way (file remains). */
+static int lockfile_get(const char *path)
+{
+ if (kr_fails_assert(path))
+ return kr_error(EINVAL);
+ const int fd = open(path, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ if (fd < 0)
+ return kr_error(errno);
+
+ struct flock lock_info;
+ memset(&lock_info, 0, sizeof(lock_info));
+ lock_info.l_type = F_WRLCK;
+ lock_info.l_whence = SEEK_SET;
+ lock_info.l_start = 0;
+ lock_info.l_len = 1; // it's OK for locks to extend beyond the end of the file
+ int err;
+ do {
+ err = fcntl(fd, F_SETLK, &lock_info);
+ } while (err == -1 && errno == EINTR);
+ if (err) {
+ close(fd);
+ return kr_error(errno);
+ }
+ return fd;
+}
+
+/** Release and remove lockfile created by lockfile_get(). Return kr_error(). */
+static int lockfile_release(int fd)
+{
+ if (kr_fails_assert(fd > 0)) // fd == 0 is surely a mistake, in our case at least
+ return kr_error(EINVAL);
+ if (close(fd)) {
+ return kr_error(errno);
+ } else {
+ return kr_ok();
+ }
+}
+
+static int cdb_clear(kr_cdb_pt db, struct kr_cdb_stats *stats)
+{
+ struct lmdb_env *env = db2env(db);
+ stats->clear++;
+ /* First try mdb_drop() to clear the DB; this may fail with ENOSPC. */
+ {
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, false);
+ if (ret == kr_ok()) {
+ ret = lmdb_error(mdb_drop(txn, env->dbi, 0));
+ if (ret == kr_ok()) {
+ ret = cdb_commit(db, stats);
+ }
+ if (ret == kr_ok()) {
+ return ret;
+ }
+ }
+ kr_log_info(CACHE, "clearing error, falling back\n");
+ }
+ /* Fallback: we'll remove the database files and reopen.
+ * Other instances can continue to use the removed lmdb,
+ * though it's best for them to reopen soon. */
+
+ /* We are about to switch to a different file, so end all txns, to be sure. */
+ txn_free_ro(env);
+ (void) cdb_commit(db, stats);
+
+ const char *path = NULL;
+ int ret = mdb_env_get_path(env->env, &path);
+ if (ret != MDB_SUCCESS) {
+ return lmdb_error(ret);
+ }
+ auto_free char *mdb_lockfile = kr_strcatdup(2, path, "/lock.mdb");
+ auto_free char *lockfile = kr_strcatdup(2, path, "/krcachelock");
+ if (!mdb_lockfile || !lockfile) {
+ return kr_error(ENOMEM);
+ }
+
+ /* Find if we get a lock on lockfile. */
+ const int lockfile_fd = lockfile_get(lockfile);
+ if (lockfile_fd < 0) {
+ kr_log_error(CACHE, "clearing failed to get ./krcachelock (%s); retry later\n",
+ kr_strerror(lockfile_fd));
+ /* As we're out of space (almost certainly - mdb_drop didn't work),
+ * we will retry on the next failing write operation. */
+ return kr_error(EAGAIN);
+ }
+
+ /* We acquired lockfile. Now find whether *.mdb are what we have open now.
+ * If they are not we don't want to remove them; most likely they have been
+ * cleaned by another instance. */
+ ret = cdb_check_health(db, stats);
+ if (ret != 0) {
+ if (ret == 1) // file changed and reopened successfully
+ ret = kr_ok();
+ // else pass some other error
+ } else {
+ kr_log_debug(CACHE, "clear: identical files, unlinking\n");
+ // coverity[toctou]
+ unlink(env->mdb_data_path);
+ unlink(mdb_lockfile);
+ ret = reopen_env(env, stats, env->mapsize);
+ }
+
+ /* Environment updated, release lockfile. */
+ int lrerr = lockfile_release(lockfile_fd);
+ if (lrerr) {
+ kr_log_error(CACHE, "failed to release ./krcachelock: %s\n",
+ kr_strerror(lrerr));
+ }
+ return ret;
+}
+
+static int cdb_readv(kr_cdb_pt db, struct kr_cdb_stats *stats,
+ const knot_db_val_t *key, knot_db_val_t *val, int maxcount)
+{
+ struct lmdb_env *env = db2env(db);
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, true);
+ if (ret) {
+ return ret;
+ }
+
+ for (int i = 0; i < maxcount; ++i) {
+ /* Convert key structs */
+ MDB_val _key = val_knot2mdb(key[i]);
+ MDB_val _val = val_knot2mdb(val[i]);
+ stats->read++;
+ ret = mdb_get(txn, env->dbi, &_key, &_val);
+ if (ret != MDB_SUCCESS) {
+ if (ret == MDB_NOTFOUND) {
+ stats->read_miss++;
+ } else {
+ txn_abort(env);
+ }
+ ret = lmdb_error(ret);
+ if (ret == kr_error(ENOSPC)) {
+ /* we're likely to be forced to cache clear anyway */
+ ret = kr_error(ENOENT);
+ }
+ return ret;
+ }
+ /* Update the result. */
+ val[i] = val_mdb2knot(_val);
+ }
+ return kr_ok();
+}
+
+static int cdb_write(struct lmdb_env *env, MDB_txn **txn, const knot_db_val_t *key,
+ knot_db_val_t *val, unsigned flags,
+ struct kr_cdb_stats *stats)
+{
+ /* Convert key structs and write */
+ MDB_val _key = val_knot2mdb(*key);
+ MDB_val _val = val_knot2mdb(*val);
+ stats->write++;
+ int ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
+
+ /* We don't try to recover from MDB_TXN_FULL. */
+ if (ret != MDB_SUCCESS) {
+ txn_abort(env);
+ return lmdb_error(ret);
+ }
+
+ /* Update the result. */
+ val->data = _val.mv_data;
+ val->len = _val.mv_size;
+ return kr_ok();
+}
+
+static int cdb_writev(kr_cdb_pt db, struct kr_cdb_stats *stats,
+ const knot_db_val_t *key, knot_db_val_t *val, int maxcount)
+{
+ struct lmdb_env *env = db2env(db);
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, false);
+
+ for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
+ /* This is LMDB specific optimisation,
+ * if caller specifies value with NULL data and non-zero length,
+ * LMDB will preallocate the entry for caller and leave write
+ * transaction open, caller is responsible for syncing thus committing transaction.
+ */
+ unsigned mdb_flags = 0;
+ if (val[i].len > 0 && val[i].data == NULL) {
+ mdb_flags |= MDB_RESERVE;
+ }
+ ret = cdb_write(env, &txn, &key[i], &val[i], mdb_flags, stats);
+ }
+
+ return ret;
+}
+
+static int cdb_remove(kr_cdb_pt db, struct kr_cdb_stats *stats,
+ knot_db_val_t keys[], int maxcount)
+{
+ struct lmdb_env *env = db2env(db);
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, false);
+ int deleted = 0;
+
+ for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
+ MDB_val _key = val_knot2mdb(keys[i]);
+ MDB_val val = { 0, NULL };
+ stats->remove++;
+ ret = lmdb_error(mdb_del(txn, env->dbi, &_key, &val));
+ if (ret == kr_ok())
+ deleted++;
+ else if (ret == KNOT_ENOENT) {
+ stats->remove_miss++;
+ ret = kr_ok(); /* skip over non-existing entries */
+ } else {
+ txn_abort(env);
+ break;
+ }
+ }
+
+ return ret < 0 ? ret : deleted;
+}
+
+static int cdb_match(kr_cdb_pt db, struct kr_cdb_stats *stats,
+ knot_db_val_t *key, knot_db_val_t keyval[][2], int maxcount)
+{
+ struct lmdb_env *env = db2env(db);
+ MDB_txn *txn = NULL;
+ int ret = txn_get(env, &txn, true);
+ if (ret != 0) {
+ return ret;
+ }
+
+ /* LATER(optim.): use txn_curs_get() instead, to save resources. */
+ MDB_cursor *cur = NULL;
+ ret = mdb_cursor_open(txn, env->dbi, &cur);
+ if (ret != 0) {
+ txn_abort(env);
+ return lmdb_error(ret);
+ }
+
+ MDB_val cur_key = val_knot2mdb(*key);
+ MDB_val cur_val = { 0, NULL };
+ stats->match++;
+ ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE);
+ if (ret != MDB_SUCCESS) {
+ mdb_cursor_close(cur);
+ if (ret != MDB_NOTFOUND) {
+ txn_abort(env);
+ }
+ return lmdb_error(ret);
+ }
+
+ int results = 0;
+ while (ret == MDB_SUCCESS) {
+ /* Retrieve current key and compare with prefix */
+ if (cur_key.mv_size < key->len || memcmp(cur_key.mv_data, key->data, key->len) != 0) {
+ break;
+ }
+ /* Add to result set */
+ if (results < maxcount) {
+ keyval[results][0] = val_mdb2knot(cur_key);
+ keyval[results][1] = val_mdb2knot(cur_val);
+ ++results;
+ } else {
+ break;
+ }
+ stats->match++;
+ ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT);
+ }
+
+ mdb_cursor_close(cur);
+ if (ret != MDB_SUCCESS && ret != MDB_NOTFOUND) {
+ txn_abort(env);
+ return lmdb_error(ret);
+ } else if (results == 0) {
+ stats->match_miss++;
+ }
+ return results;
+}
+
+
+static int cdb_read_leq(kr_cdb_pt db, struct kr_cdb_stats *stats,
+ knot_db_val_t *key, knot_db_val_t *val)
+{
+ if (kr_fails_assert(db && key && key->data && val))
+ return kr_error(EINVAL);
+ struct lmdb_env *env = db2env(db);
+ MDB_cursor *curs = NULL;
+ int ret = txn_curs_get(env, &curs, stats);
+ if (ret) return ret;
+
+ MDB_val key2_m = val_knot2mdb(*key);
+ MDB_val val2_m = { 0, NULL };
+ stats->read_leq++;
+ ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_SET_RANGE);
+ if (ret) goto failure;
+ /* test for equality //:unlikely */
+ if (key2_m.mv_size == key->len
+ && memcmp(key2_m.mv_data, key->data, key->len) == 0) {
+ ret = 0; /* equality */
+ goto success;
+ }
+ stats->read_leq_miss++;
+
+ /* we must be greater than key; do one step to smaller */
+ stats->read_leq++;
+ ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_PREV);
+ if (ret) goto failure;
+ ret = 1;
+success:
+ /* finalize the output */
+ *key = val_mdb2knot(key2_m);
+ *val = val_mdb2knot(val2_m);
+ return ret;
+failure:
+ if (ret == MDB_NOTFOUND) {
+ stats->read_leq_miss++;
+ } else {
+ txn_abort(env);
+ }
+ return lmdb_error(ret);
+}
+
+static double cdb_usage_percent(kr_cdb_pt db)
+{
+ knot_db_t *kdb = kr_cdb_pt2knot_db_t(db);
+ const size_t db_size = knot_db_lmdb_get_mapsize(kdb);
+ const size_t db_usage_abs = knot_db_lmdb_get_usage(kdb);
+ const double db_usage = (double)db_usage_abs / db_size * 100.0;
+ free(kdb);
+ return db_usage;
+}
+
+static size_t cdb_get_maxsize(kr_cdb_pt db)
+{
+ return db2env(db)->mapsize;
+}
+
+/** Conversion between knot and lmdb structs. */
+knot_db_t *kr_cdb_pt2knot_db_t(kr_cdb_pt db)
+{
+ /* this is struct lmdb_env as in resolver/cdb_lmdb.c */
+ const struct lmdb_env *kres_db = db2env(db);
+ struct libknot_lmdb_env *libknot_db = malloc(sizeof(*libknot_db));
+ if (libknot_db != NULL) {
+ libknot_db->shared = false;
+ libknot_db->pool = NULL;
+ libknot_db->env = kres_db->env;
+ libknot_db->dbi = kres_db->dbi;
+ }
+ return libknot_db;
+}
+
+const struct kr_cdb_api *kr_cdb_lmdb(void)
+{
+ static const struct kr_cdb_api api = {
+ "lmdb",
+ cdb_init, cdb_deinit, cdb_count, cdb_clear, cdb_commit,
+ cdb_readv, cdb_writev, cdb_remove,
+ cdb_match,
+ cdb_read_leq,
+ cdb_usage_percent,
+ cdb_get_maxsize,
+ cdb_check_health,
+ };
+
+ return &api;
+}