1
0
Fork 0
knot-resolver/lib/cache/cdb_lmdb.c
Daniel Baumann fbc604e215
Adding upstream version 5.7.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:56:17 +02:00

868 lines
22 KiB
C

/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
* SPDX-License-Identifier: GPL-3.0-or-later
*/
#include <fcntl.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <lmdb.h>
#include "contrib/cleanup.h"
#include "contrib/ucw/lib.h"
#include "lib/cache/cdb_lmdb.h"
#include "lib/cache/cdb_api.h"
#include "lib/utils.h"
/* Defines */
#define LMDB_DIR_MODE 0770
#define LMDB_FILE_MODE 0660
/* TODO: we rely on mirrors of these two structs not changing layout
* in libknot and knot resolver! */
struct lmdb_env
{
size_t mapsize;
MDB_dbi dbi;
MDB_env *env;
/** Cached transactions
*
* - only one of (ro,rw) may be active at once
* - non-NULL .ro may be active or reset
* - non-NULL .rw is always active
*/
struct {
bool ro_active, ro_curs_active;
MDB_txn *ro, *rw;
MDB_cursor *ro_curs;
} txn;
/* Cached part of struct stat for data.mdb. */
dev_t st_dev;
ino_t st_ino;
off_t st_size;
const char *mdb_data_path; /**< path to data.mdb, for convenience */
};
struct libknot_lmdb_env {
bool shared;
unsigned dbi;
void *env;
knot_mm_t *pool;
};
/** Type-safe conversion helper.
*
* We keep lmdb_env as a separate type from kr_db_pt, as different implementation of API
* would need to define the contents differently.
*/
static inline struct lmdb_env * db2env(kr_cdb_pt db)
{
return (struct lmdb_env *)db;
}
static inline kr_cdb_pt env2db(struct lmdb_env *env)
{
return (kr_cdb_pt)env;
}
static int cdb_commit(kr_cdb_pt db, struct kr_cdb_stats *stats);
/** @brief Convert LMDB error code. */
static int lmdb_error(int error)
{
switch (error) {
case MDB_SUCCESS:
return kr_ok();
case MDB_NOTFOUND:
return kr_error(ENOENT);
case ENOSPC:
case MDB_MAP_FULL:
case MDB_TXN_FULL:
return kr_error(ENOSPC);
default:
kr_log_error(CACHE, "LMDB error: %s\n", mdb_strerror(error));
return kr_error(error);
}
}
/** Conversion between knot and lmdb structs for values. */
static inline knot_db_val_t val_mdb2knot(MDB_val v)
{
return (knot_db_val_t){ .len = v.mv_size, .data = v.mv_data };
}
static inline MDB_val val_knot2mdb(knot_db_val_t v)
{
return (MDB_val){ .mv_size = v.len, .mv_data = v.data };
}
/** Refresh mapsize value from file, including env->mapsize.
* It's much lighter than reopen_env(). */
static int refresh_mapsize(struct lmdb_env *env)
{
int ret = cdb_commit(env2db(env), NULL);
if (!ret) ret = lmdb_error(mdb_env_set_mapsize(env->env, 0));
if (ret) return ret;
MDB_envinfo info;
ret = lmdb_error(mdb_env_info(env->env, &info));
if (ret) return ret;
env->mapsize = info.me_mapsize;
if (env->mapsize != env->st_size) {
kr_log_info(CACHE, "suspicious size of cache file '%s'"
": file size %zu != LMDB map size %zu\n",
env->mdb_data_path, (size_t)env->st_size, env->mapsize);
}
return kr_ok();
}
static void clear_stale_readers(struct lmdb_env *env)
{
int cleared;
int ret = mdb_reader_check(env->env, &cleared);
if (ret != MDB_SUCCESS) {
kr_log_error(CACHE, "failed to clear stale reader locks: "
"LMDB error %d %s\n", ret, mdb_strerror(ret));
} else if (cleared != 0) {
kr_log_info(CACHE, "cleared %d stale reader locks\n", cleared);
}
}
#define FLAG_RENEW (2*MDB_RDONLY)
/** mdb_txn_begin or _renew + handle retries in some situations
*
* The retrying logic is so ugly that it has its own function.
* \note this assumes no transactions are active
* \return MDB_ errcode, not usual kr_error(...)
*/
static int txn_get_noresize(struct lmdb_env *env, unsigned int flag, MDB_txn **txn)
{
if (kr_fails_assert(!env->txn.rw && (!env->txn.ro || !env->txn.ro_active)))
return kr_error(1);
int attempts = 0;
int ret;
retry:
/* Do a few attempts in case we encounter multiple issues at once. */
if (++attempts > 2)
return kr_error(1);
if (flag == FLAG_RENEW) {
ret = mdb_txn_renew(*txn);
} else {
ret = mdb_txn_begin(env->env, NULL, flag, txn);
}
if (unlikely(ret == MDB_MAP_RESIZED)) {
kr_log_info(CACHE, "detected size increased by another process\n");
ret = refresh_mapsize(env);
if (ret == 0)
goto retry;
} else if (unlikely(ret == MDB_READERS_FULL)) {
clear_stale_readers(env);
goto retry;
}
return ret;
}
/** Obtain a transaction. (they're cached in env->txn) */
static int txn_get(struct lmdb_env *env, MDB_txn **txn, bool rdonly)
{
if (kr_fails_assert(env && txn))
return kr_error(EINVAL);
if (env->txn.rw) {
/* Reuse the *open* RW txn even if only reading is requested.
* We leave the management of this to the cdb_commit command.
* The user may e.g. want to do some reads between the writes. */
*txn = env->txn.rw;
return kr_ok();
}
if (!rdonly) {
/* avoid two active transactions */
if (env->txn.ro && env->txn.ro_active) {
mdb_txn_reset(env->txn.ro);
env->txn.ro_active = false;
env->txn.ro_curs_active = false;
}
int ret = txn_get_noresize(env, 0/*RW*/, &env->txn.rw);
if (ret == MDB_SUCCESS) {
*txn = env->txn.rw;
kr_assert(*txn);
}
return lmdb_error(ret);
}
/* Get an active RO txn and return it. */
int ret = MDB_SUCCESS;
if (!env->txn.ro) { //:unlikely
ret = txn_get_noresize(env, MDB_RDONLY, &env->txn.ro);
} else if (!env->txn.ro_active) {
ret = txn_get_noresize(env, FLAG_RENEW, &env->txn.ro);
}
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
env->txn.ro_active = true;
*txn = env->txn.ro;
kr_assert(*txn);
return kr_ok();
}
static int cdb_commit(kr_cdb_pt db, struct kr_cdb_stats *stats)
{
struct lmdb_env *env = db2env(db);
int ret = kr_ok();
if (env->txn.rw) {
if (stats) stats->commit++;
ret = lmdb_error(mdb_txn_commit(env->txn.rw));
env->txn.rw = NULL; /* the transaction got freed even in case of errors */
} else if (env->txn.ro && env->txn.ro_active) {
mdb_txn_reset(env->txn.ro);
env->txn.ro_active = false;
env->txn.ro_curs_active = false;
}
return ret;
}
/** Obtain a read-only cursor (and a read-only transaction). */
static int txn_curs_get(struct lmdb_env *env, MDB_cursor **curs, struct kr_cdb_stats *stats)
{
if (kr_fails_assert(env && curs))
return kr_error(EINVAL);
if (env->txn.ro_curs_active)
goto success;
/* Only in a read-only txn; TODO: it's a bit messy/coupled */
if (env->txn.rw) {
int ret = cdb_commit(env2db(env), stats);
if (ret) return ret;
}
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, true);
if (ret) return ret;
if (env->txn.ro_curs) {
ret = mdb_cursor_renew(txn, env->txn.ro_curs);
} else {
ret = mdb_cursor_open(txn, env->dbi, &env->txn.ro_curs);
}
if (ret) return lmdb_error(ret);
env->txn.ro_curs_active = true;
success:
kr_assert(env->txn.ro_curs_active && env->txn.ro && env->txn.ro_active
&& !env->txn.rw);
*curs = env->txn.ro_curs;
kr_assert(*curs);
return kr_ok();
}
static void txn_free_ro(struct lmdb_env *env)
{
if (env->txn.ro_curs) {
mdb_cursor_close(env->txn.ro_curs);
env->txn.ro_curs = NULL;
}
if (env->txn.ro) {
mdb_txn_abort(env->txn.ro);
env->txn.ro = NULL;
}
}
/** Abort all transactions.
*
* This is useful after an error happens, as those (always?) require abortion.
* It's possible that _reset() would suffice and marking cursor inactive,
* but these errors should be rare so let's close them completely. */
static void txn_abort(struct lmdb_env *env)
{
txn_free_ro(env);
if (env->txn.rw) {
mdb_txn_abort(env->txn.rw);
env->txn.rw = NULL; /* the transaction got freed even in case of errors */
}
}
/*! \brief Close the database. */
static void cdb_close_env(struct lmdb_env *env, struct kr_cdb_stats *stats)
{
if (kr_fails_assert(env && env->env))
return;
/* Get rid of any transactions. */
txn_free_ro(env);
cdb_commit(env2db(env), stats);
mdb_env_sync(env->env, 1);
stats->close++;
mdb_dbi_close(env->env, env->dbi);
mdb_env_close(env->env);
free_const(env->mdb_data_path);
memset(env, 0, sizeof(*env));
}
/** We assume that *env is zeroed and we return it zeroed on errors. */
static int cdb_open_env(struct lmdb_env *env, const char *path, const size_t mapsize,
struct kr_cdb_stats *stats)
{
int ret = mkdir(path, LMDB_DIR_MODE);
if (ret && errno != EEXIST) return kr_error(errno);
stats->open++;
ret = mdb_env_create(&env->env);
if (ret != MDB_SUCCESS) return lmdb_error(ret);
env->mdb_data_path = kr_absolutize_path(path, "data.mdb");
if (!env->mdb_data_path) {
ret = ENOMEM;
goto error_sys;
}
/* Set map size, rounded to page size. */
errno = 0;
const long pagesize = sysconf(_SC_PAGESIZE);
if (errno) {
ret = errno;
goto error_sys;
}
const bool size_requested = mapsize;
if (size_requested) {
env->mapsize = (mapsize / pagesize) * pagesize;
ret = mdb_env_set_mapsize(env->env, env->mapsize);
if (ret != MDB_SUCCESS) goto error_mdb;
}
/* Cache doesn't require durability, we can be
* loose with the requirements as a tradeoff for speed. */
const unsigned flags = MDB_WRITEMAP | MDB_MAPASYNC | MDB_NOTLS;
ret = mdb_env_open(env->env, path, flags, LMDB_FILE_MODE);
if (ret != MDB_SUCCESS) goto error_mdb;
mdb_filehandle_t fd = -1;
ret = mdb_env_get_fd(env->env, &fd);
if (ret != MDB_SUCCESS) goto error_mdb;
struct stat st;
if (fstat(fd, &st)) {
ret = errno;
goto error_sys;
}
env->st_dev = st.st_dev;
env->st_ino = st.st_ino;
env->st_size = st.st_size;
/* Get the real mapsize. Shrinking can be restricted, etc.
* Unfortunately this is only reliable when not setting the size explicitly. */
if (!size_requested) {
ret = refresh_mapsize(env);
if (ret) goto error_sys;
}
/* Open the database. */
MDB_txn *txn = NULL;
ret = mdb_txn_begin(env->env, NULL, 0, &txn);
if (ret != MDB_SUCCESS) goto error_mdb;
ret = mdb_dbi_open(txn, NULL, 0, &env->dbi);
if (ret != MDB_SUCCESS) {
mdb_txn_abort(txn);
goto error_mdb;
}
#if !defined(__MACOSX__) && !(defined(__APPLE__) && defined(__MACH__))
if (size_requested) {
ret = posix_fallocate(fd, 0, MAX(env->mapsize, env->st_size));
} else {
ret = 0;
}
if (ret == EINVAL || ret == EOPNOTSUPP) {
/* POSIX says this can happen when the feature isn't supported by the FS.
* We haven't seen this happen on Linux+glibc but it was reported on
* Linux+musl and FreeBSD. */
kr_log_info(CACHE, "space pre-allocation failed and ignored; "
"your (file)system probably doesn't support it.\n");
} else if (ret != 0) {
mdb_txn_abort(txn);
goto error_sys;
}
#endif
stats->commit++;
ret = mdb_txn_commit(txn);
if (ret != MDB_SUCCESS) goto error_mdb;
/* Stale RO transactions could have been left behind by a cashing process
* (e.g. one whose termination lead to spawning the current one).
* According to docs they might hold onto some space until we clear them. */
clear_stale_readers(env);
return kr_ok();
error_mdb:
ret = lmdb_error(ret);
error_sys:
free_const(env->mdb_data_path);
stats->close++;
mdb_env_close(env->env);
memset(env, 0, sizeof(*env));
return kr_error(ret);
}
static int cdb_init(kr_cdb_pt *db, struct kr_cdb_stats *stats,
struct kr_cdb_opts *opts, knot_mm_t *pool)
{
if (!db || !stats || !opts) {
return kr_error(EINVAL);
}
/* Open the database. */
struct lmdb_env *env = calloc(1, sizeof(*env));
if (!env) {
return kr_error(ENOMEM);
}
int ret = cdb_open_env(env, opts->path, opts->maxsize, stats);
if (ret != 0) {
free(env);
return ret;
}
*db = env2db(env);
return 0;
}
static void cdb_deinit(kr_cdb_pt db, struct kr_cdb_stats *stats)
{
cdb_close_env(db2env(db), stats);
free(db);
}
static int cdb_count(kr_cdb_pt db, struct kr_cdb_stats *stats)
{
struct lmdb_env *env = db2env(db);
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, true);
if (ret != 0) {
return ret;
}
MDB_stat stat;
stats->count++;
ret = mdb_stat(txn, env->dbi, &stat);
if (ret == MDB_SUCCESS) {
return stat.ms_entries;
} else {
txn_abort(env);
return lmdb_error(ret);
}
}
static int reopen_env(struct lmdb_env *env, struct kr_cdb_stats *stats, const size_t mapsize)
{
/* Keep copy as it points to current handle internals. */
const char *path;
int ret = mdb_env_get_path(env->env, &path);
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
auto_free char *path_copy = strdup(path);
cdb_close_env(env, stats);
return cdb_open_env(env, path_copy, mapsize, stats);
}
static int cdb_check_health(kr_cdb_pt db, struct kr_cdb_stats *stats)
{
struct lmdb_env *env = db2env(db);
struct stat st;
if (stat(env->mdb_data_path, &st)) {
int ret = errno;
return kr_error(ret);
}
if (st.st_dev != env->st_dev || st.st_ino != env->st_ino) {
kr_log_debug(CACHE, "cache file has been replaced, reopening\n");
int ret = reopen_env(env, stats, 0); // we accept mapsize from the new file
return ret == 0 ? 1 : ret;
}
/* Cache check through file size works OK without reopening,
* contrary to methods based on mdb_env_info(). */
if (st.st_size == env->st_size)
return kr_ok();
kr_log_info(CACHE, "detected size change (by another instance?) of file '%s'"
": file size %zu -> file size %zu\n",
env->mdb_data_path, (size_t)env->st_size, (size_t)st.st_size);
env->st_size = st.st_size; // avoid retrying in cycle even if we fail
return refresh_mapsize(env);
}
/** Obtain exclusive (advisory) lock by creating a file, returning FD or negative kr_error().
* The lock is auto-released by OS in case the process finishes in any way (file remains). */
static int lockfile_get(const char *path)
{
if (kr_fails_assert(path))
return kr_error(EINVAL);
const int fd = open(path, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0)
return kr_error(errno);
struct flock lock_info;
memset(&lock_info, 0, sizeof(lock_info));
lock_info.l_type = F_WRLCK;
lock_info.l_whence = SEEK_SET;
lock_info.l_start = 0;
lock_info.l_len = 1; // it's OK for locks to extend beyond the end of the file
int err;
do {
err = fcntl(fd, F_SETLK, &lock_info);
} while (err == -1 && errno == EINTR);
if (err) {
close(fd);
return kr_error(errno);
}
return fd;
}
/** Release and remove lockfile created by lockfile_get(). Return kr_error(). */
static int lockfile_release(int fd)
{
if (kr_fails_assert(fd > 0)) // fd == 0 is surely a mistake, in our case at least
return kr_error(EINVAL);
if (close(fd)) {
return kr_error(errno);
} else {
return kr_ok();
}
}
static int cdb_clear(kr_cdb_pt db, struct kr_cdb_stats *stats)
{
struct lmdb_env *env = db2env(db);
stats->clear++;
/* First try mdb_drop() to clear the DB; this may fail with ENOSPC. */
{
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, false);
if (ret == kr_ok()) {
ret = lmdb_error(mdb_drop(txn, env->dbi, 0));
if (ret == kr_ok()) {
ret = cdb_commit(db, stats);
}
if (ret == kr_ok()) {
return ret;
}
}
kr_log_info(CACHE, "clearing error, falling back\n");
}
/* Fallback: we'll remove the database files and reopen.
* Other instances can continue to use the removed lmdb,
* though it's best for them to reopen soon. */
/* We are about to switch to a different file, so end all txns, to be sure. */
txn_free_ro(env);
(void) cdb_commit(db, stats);
const char *path = NULL;
int ret = mdb_env_get_path(env->env, &path);
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
auto_free char *mdb_lockfile = kr_strcatdup(2, path, "/lock.mdb");
auto_free char *lockfile = kr_strcatdup(2, path, "/krcachelock");
if (!mdb_lockfile || !lockfile) {
return kr_error(ENOMEM);
}
/* Find if we get a lock on lockfile. */
const int lockfile_fd = lockfile_get(lockfile);
if (lockfile_fd < 0) {
kr_log_error(CACHE, "clearing failed to get ./krcachelock (%s); retry later\n",
kr_strerror(lockfile_fd));
/* As we're out of space (almost certainly - mdb_drop didn't work),
* we will retry on the next failing write operation. */
return kr_error(EAGAIN);
}
/* We acquired lockfile. Now find whether *.mdb are what we have open now.
* If they are not we don't want to remove them; most likely they have been
* cleaned by another instance. */
ret = cdb_check_health(db, stats);
if (ret != 0) {
if (ret == 1) // file changed and reopened successfully
ret = kr_ok();
// else pass some other error
} else {
kr_log_debug(CACHE, "clear: identical files, unlinking\n");
// coverity[toctou]
unlink(env->mdb_data_path);
unlink(mdb_lockfile);
ret = reopen_env(env, stats, env->mapsize);
}
/* Environment updated, release lockfile. */
int lrerr = lockfile_release(lockfile_fd);
if (lrerr) {
kr_log_error(CACHE, "failed to release ./krcachelock: %s\n",
kr_strerror(lrerr));
}
return ret;
}
static int cdb_readv(kr_cdb_pt db, struct kr_cdb_stats *stats,
const knot_db_val_t *key, knot_db_val_t *val, int maxcount)
{
struct lmdb_env *env = db2env(db);
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, true);
if (ret) {
return ret;
}
for (int i = 0; i < maxcount; ++i) {
/* Convert key structs */
MDB_val _key = val_knot2mdb(key[i]);
MDB_val _val = val_knot2mdb(val[i]);
stats->read++;
ret = mdb_get(txn, env->dbi, &_key, &_val);
if (ret != MDB_SUCCESS) {
if (ret == MDB_NOTFOUND) {
stats->read_miss++;
} else {
txn_abort(env);
}
ret = lmdb_error(ret);
if (ret == kr_error(ENOSPC)) {
/* we're likely to be forced to cache clear anyway */
ret = kr_error(ENOENT);
}
return ret;
}
/* Update the result. */
val[i] = val_mdb2knot(_val);
}
return kr_ok();
}
static int cdb_write(struct lmdb_env *env, MDB_txn **txn, const knot_db_val_t *key,
knot_db_val_t *val, unsigned flags,
struct kr_cdb_stats *stats)
{
/* Convert key structs and write */
MDB_val _key = val_knot2mdb(*key);
MDB_val _val = val_knot2mdb(*val);
stats->write++;
int ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
/* We don't try to recover from MDB_TXN_FULL. */
if (ret != MDB_SUCCESS) {
txn_abort(env);
return lmdb_error(ret);
}
/* Update the result. */
val->data = _val.mv_data;
val->len = _val.mv_size;
return kr_ok();
}
static int cdb_writev(kr_cdb_pt db, struct kr_cdb_stats *stats,
const knot_db_val_t *key, knot_db_val_t *val, int maxcount)
{
struct lmdb_env *env = db2env(db);
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, false);
for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
/* This is LMDB specific optimisation,
* if caller specifies value with NULL data and non-zero length,
* LMDB will preallocate the entry for caller and leave write
* transaction open, caller is responsible for syncing thus committing transaction.
*/
unsigned mdb_flags = 0;
if (val[i].len > 0 && val[i].data == NULL) {
mdb_flags |= MDB_RESERVE;
}
ret = cdb_write(env, &txn, &key[i], &val[i], mdb_flags, stats);
}
return ret;
}
static int cdb_remove(kr_cdb_pt db, struct kr_cdb_stats *stats,
knot_db_val_t keys[], int maxcount)
{
struct lmdb_env *env = db2env(db);
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, false);
int deleted = 0;
for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
MDB_val _key = val_knot2mdb(keys[i]);
MDB_val val = { 0, NULL };
stats->remove++;
ret = lmdb_error(mdb_del(txn, env->dbi, &_key, &val));
if (ret == kr_ok())
deleted++;
else if (ret == KNOT_ENOENT) {
stats->remove_miss++;
ret = kr_ok(); /* skip over non-existing entries */
} else {
txn_abort(env);
break;
}
}
return ret < 0 ? ret : deleted;
}
static int cdb_match(kr_cdb_pt db, struct kr_cdb_stats *stats,
knot_db_val_t *key, knot_db_val_t keyval[][2], int maxcount)
{
struct lmdb_env *env = db2env(db);
MDB_txn *txn = NULL;
int ret = txn_get(env, &txn, true);
if (ret != 0) {
return ret;
}
/* LATER(optim.): use txn_curs_get() instead, to save resources. */
MDB_cursor *cur = NULL;
ret = mdb_cursor_open(txn, env->dbi, &cur);
if (ret != 0) {
txn_abort(env);
return lmdb_error(ret);
}
MDB_val cur_key = val_knot2mdb(*key);
MDB_val cur_val = { 0, NULL };
stats->match++;
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE);
if (ret != MDB_SUCCESS) {
mdb_cursor_close(cur);
if (ret != MDB_NOTFOUND) {
txn_abort(env);
}
return lmdb_error(ret);
}
int results = 0;
while (ret == MDB_SUCCESS) {
/* Retrieve current key and compare with prefix */
if (cur_key.mv_size < key->len || memcmp(cur_key.mv_data, key->data, key->len) != 0) {
break;
}
/* Add to result set */
if (results < maxcount) {
keyval[results][0] = val_mdb2knot(cur_key);
keyval[results][1] = val_mdb2knot(cur_val);
++results;
} else {
break;
}
stats->match++;
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT);
}
mdb_cursor_close(cur);
if (ret != MDB_SUCCESS && ret != MDB_NOTFOUND) {
txn_abort(env);
return lmdb_error(ret);
} else if (results == 0) {
stats->match_miss++;
}
return results;
}
static int cdb_read_leq(kr_cdb_pt db, struct kr_cdb_stats *stats,
knot_db_val_t *key, knot_db_val_t *val)
{
if (kr_fails_assert(db && key && key->data && val))
return kr_error(EINVAL);
struct lmdb_env *env = db2env(db);
MDB_cursor *curs = NULL;
int ret = txn_curs_get(env, &curs, stats);
if (ret) return ret;
MDB_val key2_m = val_knot2mdb(*key);
MDB_val val2_m = { 0, NULL };
stats->read_leq++;
ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_SET_RANGE);
if (ret) goto failure;
/* test for equality //:unlikely */
if (key2_m.mv_size == key->len
&& memcmp(key2_m.mv_data, key->data, key->len) == 0) {
ret = 0; /* equality */
goto success;
}
stats->read_leq_miss++;
/* we must be greater than key; do one step to smaller */
stats->read_leq++;
ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_PREV);
if (ret) goto failure;
ret = 1;
success:
/* finalize the output */
*key = val_mdb2knot(key2_m);
*val = val_mdb2knot(val2_m);
return ret;
failure:
if (ret == MDB_NOTFOUND) {
stats->read_leq_miss++;
} else {
txn_abort(env);
}
return lmdb_error(ret);
}
static double cdb_usage_percent(kr_cdb_pt db)
{
knot_db_t *kdb = kr_cdb_pt2knot_db_t(db);
const size_t db_size = knot_db_lmdb_get_mapsize(kdb);
const size_t db_usage_abs = knot_db_lmdb_get_usage(kdb);
const double db_usage = (double)db_usage_abs / db_size * 100.0;
free(kdb);
return db_usage;
}
static size_t cdb_get_maxsize(kr_cdb_pt db)
{
return db2env(db)->mapsize;
}
/** Conversion between knot and lmdb structs. */
knot_db_t *kr_cdb_pt2knot_db_t(kr_cdb_pt db)
{
/* this is struct lmdb_env as in resolver/cdb_lmdb.c */
const struct lmdb_env *kres_db = db2env(db);
struct libknot_lmdb_env *libknot_db = malloc(sizeof(*libknot_db));
if (libknot_db != NULL) {
libknot_db->shared = false;
libknot_db->pool = NULL;
libknot_db->env = kres_db->env;
libknot_db->dbi = kres_db->dbi;
}
return libknot_db;
}
const struct kr_cdb_api *kr_cdb_lmdb(void)
{
static const struct kr_cdb_api api = {
"lmdb",
cdb_init, cdb_deinit, cdb_count, cdb_clear, cdb_commit,
cdb_readv, cdb_writev, cdb_remove,
cdb_match,
cdb_read_leq,
cdb_usage_percent,
cdb_get_maxsize,
cdb_check_health,
};
return &api;
}