summaryrefslogtreecommitdiffstats
path: root/src/knot/journal/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/knot/journal/journal.c')
-rw-r--r--src/knot/journal/journal.c2197
1 files changed, 2197 insertions, 0 deletions
diff --git a/src/knot/journal/journal.c b/src/knot/journal/journal.c
new file mode 100644
index 0000000..c3ab541
--- /dev/null
+++ b/src/knot/journal/journal.c
@@ -0,0 +1,2197 @@
+/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <inttypes.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <stdarg.h>
+
+#include "knot/journal/journal.h"
+#include "knot/common/log.h"
+#include "contrib/files.h"
+#include "contrib/ctype.h"
+#include "libknot/endian.h"
+#include "contrib/dynarray.h"
+
+/*! \brief Journal version. */
+#define JOURNAL_VERSION "1.0"
+/*! \brief Changeset chunk size. */
+#define CHUNK_MAX (70 * 1024)
+/*! \brief Max number of concurrent DB readers. */
+#define JOURNAL_MAX_READERS 630
+
+/*! \brief Various metadata DB key strings. Also hardcoded in macro txn_commit()! */
+#define MDKEY_GLOBAL_VERSION "version"
+#define MDKEY_GLOBAL_JOURNAL_COUNT "journal_count"
+#define MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED "last_total_occupied"
+#define MDKEY_GLOBAL_LAST_INSERTER_ZONE "last_inserter_zone"
+#define MDKEY_PERZONE_OCCUPIED "occupied"
+#define MDKEY_PERZONE_FLAGS "flags"
+#define KEY_BOOTSTRAP_CHANGESET "bootstrap"
+
+/*! \brief The number of unused bytes in DB key. */
+#define DB_KEY_UNUSED_ZERO (4)
+
+/*! \brief Metadata inserted on the beginning of each chunk:
+ * uint32_t serial_to + uint32_t chunk_count + 24B unused */
+#define JOURNAL_HEADER_SIZE (32)
+
+// eventually move to contrib and reuse as needed
+#define local_array_max_static_size (100)
+#define local_array(type, name, size) \
+ type name ## _static__[local_array_max_static_size] = { 0 }; \
+ type *name ## _dynamic__ = ((size) > local_array_max_static_size ? calloc((size), sizeof(type)) : NULL); \
+ type *name = ((size) > local_array_max_static_size ? name ## _dynamic__ : name ## _static__);
+#define local_array_free(name) { free(name ## _dynamic__); }
+
+enum {
+ LAST_FLUSHED_VALID = 1 << 0, /* "last flush is valid" flag. */
+ SERIAL_TO_VALID = 1 << 1, /* "last serial_to is valid" flag. */
+ MERGED_SERIAL_VALID = 1 << 2, /* "serial_from" of merged changeset. */
+ DIRTY_SERIAL_VALID = 1 << 3, /* "dirty_serial" is present in the DB. */
+ FIRST_SERIAL_INVALID = 1 << 4, /* "first_serial" is not valid. */
+};
+
+static bool journal_flush_allowed(journal_t *j) {
+ conf_val_t val = conf_zone_get(conf(), C_ZONEFILE_SYNC, j->zone);
+ return conf_int(&val) >= 0;
+}
+
+static bool journal_merge_allowed(journal_t *j) {
+ return !journal_flush_allowed(j); // TODO think of other behaviour, e.g. setting
+}
+
+static size_t journal_max_usage(journal_t *j)
+{
+ conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_USAGE, j->zone);
+ return conf_int(&val);
+}
+
+static size_t journal_max_changesets(journal_t *j)
+{
+ conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_DEPTH, j->zone);
+ return conf_int(&val);
+}
+
+static float journal_tofree_factor(journal_t *j)
+{
+ return 2.0f;
+}
+
+static float journal_minfree_factor(journal_t *j)
+{
+ return 0.33f;
+}
+
+/*
+ * ***************************** PART I *******************************
+ *
+ * Transaction manipulation functions
+ *
+ * ********************************************************************
+ */
+
+typedef struct {
+ uint32_t first_serial; // Serial_from of the first changeset.
+ uint32_t last_serial; // Serial_from of the last changeset.
+ uint32_t last_serial_to; // Serial_to of the last changeset.
+ uint32_t last_flushed; // Serial_from of the last flushed (or merged) chengeset.
+ uint32_t merged_serial; // "serial_from" of merged changeset.
+ uint32_t dirty_serial; // Serial_from of an incompletely inserted changeset which shall be deleted (see DB_MAX_INSERT_TXN).
+ uint32_t changeset_count; // Number of changesets in this journal.
+ uint32_t flags; // LAST_FLUSHED_VALID, SERIAL_TO_VALID, MERGED_SERIAL_VALID.
+} metadata_t;
+
+typedef struct journal_txn {
+ journal_t *j;
+ knot_db_txn_t *txn;
+ int ret;
+ bool opened;
+
+ bool is_rw;
+
+ knot_db_iter_t *iter;
+
+ knot_db_val_t key;
+ knot_db_val_t val;
+ uint8_t key_raw[512];
+
+ metadata_t shadow_md;
+} txn_t;
+
+static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t *res);
+static void md_get32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res);
+static void md_set(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t val);
+static void md_set32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t val);
+
+static void txn_init(txn_t *txn, knot_db_txn_t *db_txn, journal_t *j)
+{
+ memset(txn, 0, sizeof(*txn));
+ txn->j = j;
+ txn->txn = db_txn;
+ txn->key.data = &txn->key_raw;
+}
+
+#define local_txn_t(txn_name, journal) \
+ knot_db_txn_t __db_txn_ ## txn_name; \
+ txn_t __local_txn_ ## txn_name; \
+ txn_t *txn_name = &__local_txn_ ## txn_name; \
+ txn_init(txn_name, &__db_txn_ ## txn_name, (journal))
+
+/*
+ * Structure of the DB key:
+ * Metadata:
+ * | [ zone_name | \0 ] | unused zero 4B | metadata_key | \0 |
+ *
+ * Changeset:
+ * | zone_name | \0 | unused zero 4B | (be32)serial_from | (be32)chunk_index |
+ * or
+ * | zone_name | \0 | unused zero 4B | metadata_key | \0 | (be32)serial_from |
+ *
+ * Structure of the changeset:
+ * | (be32)serial_to | (be32)#of_chunks | unused zero 24B | serialized_changeset...
+ *
+ */
+
+static bool key_is_ok(const knot_db_val_t *key, bool zone_related)
+{
+ const uint8_t *it = key->data;
+ ssize_t it_len = key->len;
+ if (zone_related) {
+ size_t dname_len = knot_dname_size(it);
+ it += dname_len;
+ it_len -= dname_len;
+ }
+ it += 4;
+ it_len -= 4;
+
+ return ((zone_related && it_len == 8) || // normal changeset
+ (is_lower(*it) && !is_lower(*(it-1)))); // metadata
+}
+
+static void txn_key_str(txn_t *txn, const knot_dname_t *zone, const char *key)
+{
+ size_t zone_size = knot_dname_size(zone);
+ txn->key.len = zone_size + DB_KEY_UNUSED_ZERO + strlen(key) + 1;
+ if (txn->key.len > 512) {
+ txn->ret = KNOT_ERROR;
+ return;
+ }
+ if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
+ memset(txn->key.data + zone_size, 0, DB_KEY_UNUSED_ZERO);
+ strcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO, key);
+ assert(key_is_ok(&txn->key, zone != NULL));
+}
+
+static void txn_key_2u32(txn_t *txn, const knot_dname_t *zone, uint32_t key1, uint32_t key2)
+{
+ size_t zone_size = knot_dname_size(zone);
+ txn->key.len = zone_size + DB_KEY_UNUSED_ZERO + 2*sizeof(uint32_t);
+ if (txn->key.len > 512) {
+ txn->ret = KNOT_ERROR;
+ return;
+ }
+ if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
+ memset(txn->key.data + zone_size, 0, DB_KEY_UNUSED_ZERO);
+ uint32_t key_be1 = htobe32(key1);
+ uint32_t key_be2 = htobe32(key2);
+ memcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO, &key_be1, sizeof(uint32_t));
+ memcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO + sizeof(uint32_t),
+ &key_be2, sizeof(uint32_t));
+ assert(key_is_ok(&txn->key, zone != NULL));
+}
+
+static void txn_key_str_u32(txn_t *txn, const knot_dname_t *zone, const char *key1, uint32_t key2)
+{
+ size_t zone_size = knot_dname_size(zone);
+ txn->key.len = zone_size + DB_KEY_UNUSED_ZERO + strlen(key1) + 1 + sizeof(uint32_t);
+ if (txn->key.len > 512) {
+ txn->ret = KNOT_ERROR;
+ return;
+ }
+ if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
+ memset(txn->key.data + zone_size, 0, DB_KEY_UNUSED_ZERO);
+ strcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO, key1);
+ uint32_t key_be2 = htobe32(key2);
+ memcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO + strlen(key1) + 1,
+ &key_be2, sizeof(uint32_t));
+ assert(key_is_ok(&txn->key, zone != NULL));
+}
+
+static int txn_cmpkey(txn_t *txn, knot_db_val_t *key2)
+{
+ if (txn->key.len != key2->len) {
+ return (txn->key.len < key2->len ? -1 : 1);
+ }
+ if (key2->len == 0) {
+ return 0;
+ }
+ return memcmp(txn->key.data, key2->data, key2->len);
+}
+
+static void txn_val_u64(txn_t *txn, uint64_t *res)
+{
+ if (txn->ret != KNOT_EOK) {
+ return;
+ }
+ uint32_t beval32;
+ uint64_t beval;
+ switch (txn->val.len) {
+ case sizeof(uint32_t):
+ memcpy(&beval32, (uint32_t *)txn->val.data, sizeof(beval32));
+ *res = (uint64_t)be32toh(beval32);
+ break;
+ case sizeof(uint64_t):
+ memcpy(&beval, (uint64_t *)txn->val.data, sizeof(beval));
+ *res = be64toh(beval);
+ break;
+ default:
+ txn->ret = KNOT_EMALF;
+ }
+}
+
+#define txn_begin_md(md) md_get32(txn, txn->j->zone, #md, &txn->shadow_md.md)
+#define txn_commit_md(md) md_set32(txn, txn->j->zone, #md, txn->shadow_md.md)
+
+#define txn_check_open(txn) if (((txn)->ret = ((txn)->opened ? (txn)->ret : KNOT_EINVAL)) != KNOT_EOK) return
+#define txn_check_ret(txn) if (((txn)->ret = ((txn)->opened ? (txn)->ret : KNOT_EINVAL)) != KNOT_EOK) return ((txn)->ret)
+
+static void txn_begin(txn_t *txn, bool write_allowed)
+{
+ if (txn->ret == KNOT_EOK && txn->opened) {
+ txn->ret = KNOT_EINVAL;
+ }
+ if (txn->ret != KNOT_EOK) {
+ return;
+ }
+
+ txn->ret = txn->j->db->db_api->txn_begin(txn->j->db->db, txn->txn,
+ (write_allowed ? 0 : KNOT_DB_RDONLY));
+
+ txn->is_rw = write_allowed;
+ txn->opened = true;
+
+ txn_begin_md(first_serial);
+ txn_begin_md(last_serial);
+ txn_begin_md(last_serial_to);
+ txn_begin_md(last_flushed);
+ txn_begin_md(merged_serial);
+ txn_begin_md(dirty_serial);
+ txn_begin_md(changeset_count);
+ txn_begin_md(flags);
+}
+
+static void txn_find_force(txn_t *txn)
+{
+ txn_check_open(txn);
+ txn->ret = txn->j->db->db_api->find(txn->txn, &txn->key, &txn->val, 0);
+}
+
+static bool txn_find(txn_t *txn)
+{
+ if (txn->ret != KNOT_EOK || !txn->opened) {
+ return false;
+ }
+ txn_find_force(txn);
+ if (txn->ret == KNOT_ENOENT) {
+ txn->ret = KNOT_EOK;
+ return false;
+ }
+ return (txn->ret == KNOT_EOK);
+}
+
+static void txn_insert(txn_t *txn)
+{
+ txn_check_open(txn);
+ txn->ret = txn->j->db->db_api->insert(txn->txn, &txn->key, &txn->val, 0);
+}
+
+static void txn_del(txn_t *txn)
+{
+ txn_check_open(txn);
+ txn->ret = txn->j->db->db_api->del(txn->txn, &txn->key);
+}
+
+static void txn_iter_begin(txn_t *txn)
+{
+ txn_check_open(txn);
+ txn->iter = txn->j->db->db_api->iter_begin(txn->txn, KNOT_DB_FIRST);
+ if (txn->iter == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ }
+}
+
+#define txn_check_iter if (txn->iter == NULL && txn->ret == KNOT_EOK) txn->ret = KNOT_EINVAL; \
+ if (txn->ret != KNOT_EOK) return;
+
+static void txn_iter_seek(txn_t *txn)
+{
+ txn_check_iter
+ txn->iter = txn->j->db->db_api->iter_seek(txn->iter, &txn->key, 0);
+ if (txn->iter == NULL) {
+ txn->ret = KNOT_ENOENT;
+ }
+}
+
+static void txn_iter_key(txn_t *txn, knot_db_val_t *at_key)
+{
+ txn_check_iter
+ txn->ret = txn->j->db->db_api->iter_key(txn->iter, at_key);
+}
+
+static void txn_iter_val(txn_t *txn)
+{
+ txn_check_iter
+ txn->ret = txn->j->db->db_api->iter_val(txn->iter, &txn->val);
+}
+
+static void txn_iter_next(txn_t *txn)
+{
+ txn_check_iter
+ txn->iter = txn->j->db->db_api->iter_next(txn->iter);
+ if (txn->iter == NULL) {
+ txn->ret = KNOT_ENOENT;
+ }
+}
+
+static void txn_iter_finish(txn_t *txn)
+{
+ if (txn->iter != NULL) {
+ txn->j->db->db_api->iter_finish(txn->iter);
+ }
+ txn->iter = NULL;
+}
+
+static void txn_abort(txn_t *txn)
+{
+ if (txn->opened) {
+ txn_iter_finish(txn);
+ txn->j->db->db_api->txn_abort(txn->txn);
+ txn->opened = false;
+ }
+}
+
+static void txn_commit(txn_t *txn)
+{
+ if (txn->is_rw) {
+ txn_commit_md(first_serial);
+ txn_commit_md(last_serial);
+ txn_commit_md(last_serial_to);
+ txn_commit_md(last_flushed);
+ txn_commit_md(merged_serial);
+ txn_commit_md(dirty_serial);
+ txn_commit_md(changeset_count);
+ txn_commit_md(flags);
+ }
+
+ if (txn->ret != KNOT_EOK) {
+ txn_abort(txn);
+ return;
+ }
+
+ txn_iter_finish(txn);
+ txn->ret = txn->j->db->db_api->txn_commit(txn->txn);
+
+ if (txn->ret == KNOT_EOK) {
+ txn->opened = false;
+ }
+ txn_abort(txn); // no effect if all ok
+}
+
+void journal_txn_commit(struct journal_txn *txn)
+{
+ if (txn != NULL) {
+ txn_commit(txn);
+ }
+}
+
+static void txn_restart(txn_t *txn)
+{
+ txn_commit(txn);
+ assert(!txn->opened);
+ if (txn->ret == KNOT_EOK) {
+ txn_begin(txn, txn->is_rw);
+ }
+}
+
+static void txn_reuse(txn_t **txn, txn_t *to_reuse, bool write_allowed)
+{
+ if (to_reuse == NULL) {
+ txn_begin(*txn, write_allowed);
+ } else {
+ *txn = to_reuse;
+ }
+}
+
+static void txn_unreuse(txn_t **txn, txn_t *reused)
+{
+ if (reused == NULL) {
+ txn_commit(*txn);
+ }
+}
+
+#define reuse_txn(name, journal, to_reuse, wa) local_txn_t(name, journal); txn_reuse(&name, to_reuse, wa)
+#define unreuse_txn(name, reused) txn_unreuse(&name, reused)
+
+/*
+ * ***************************** PART II ******************************
+ *
+ * DB metadata manip. and Chunk metadata headers
+ *
+ * ********************************************************************
+ */
+
+static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t *res)
+{
+ txn_check_open(txn);
+ txn_key_str(txn, zone, mdkey);
+ uint64_t res1 = 0;
+ if (txn_find(txn)) {
+ txn_val_u64(txn, &res1);
+ }
+ *res = res1;
+}
+
+static void md_get32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res)
+{
+ uint64_t res1 = 0;
+ md_get(txn, zone, mdkey, &res1);
+ if (res1 > UINT32_MAX) {
+ txn->ret = KNOT_EMALF;
+ } else {
+ *res = (uint32_t)res1;
+ }
+}
+
+// allocates res
+static void md_get_common_last_inserter_zone(txn_t *txn, knot_dname_t **res)
+{
+ txn_check_open(txn);
+ txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
+ if (txn_find(txn)) {
+ *res = knot_dname_copy(txn->val.data, NULL);
+ } else {
+ *res = NULL;
+ }
+}
+
+static int md_set_common_last_inserter_zone(txn_t *txn, knot_dname_t *zone)
+{
+ txn_check_ret(txn);
+ txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
+ txn->val.len = knot_dname_size(zone);
+ txn->val.data = zone;
+ txn_insert(txn);
+ return txn->ret;
+}
+
+static void md_del_last_inserter_zone(txn_t *txn, knot_dname_t *if_equals)
+{
+ txn_check_open(txn);
+ txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
+ if (txn_find(txn)) {
+ if (if_equals == NULL || knot_dname_is_equal(txn->val.data, if_equals)) {
+ txn_del(txn);
+ }
+ }
+}
+
+static void md_get_common_last_occupied(txn_t *txn, size_t *res)
+{
+ uint64_t sres = 0;
+ md_get(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, &sres);
+ *res = (size_t) sres;
+}
+
+static void md_set(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t val)
+{
+ txn_key_str(txn, zone, mdkey);
+ uint64_t val1 = htobe64(val);
+ txn->val.len = sizeof(uint64_t);
+ txn->val.data = &val1;
+ txn_insert(txn);
+}
+
+static void md_set32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t val)
+{
+ txn_key_str(txn, zone, mdkey);
+ uint32_t val1 = htobe32(val);
+ txn->val.len = sizeof(uint32_t);
+ txn->val.data = &val1;
+ txn_insert(txn);
+}
+
+static bool md_flag(txn_t *txn, int flag)
+{
+ return (txn->shadow_md.flags & flag);
+}
+
+/*! \brief Marks metadata as flushed */
+static void md_flush(txn_t *txn)
+{
+ if (md_flag(txn, SERIAL_TO_VALID) && !md_flag(txn, FIRST_SERIAL_INVALID)) {
+ txn->shadow_md.last_flushed = txn->shadow_md.last_serial;
+ txn->shadow_md.flags |= LAST_FLUSHED_VALID;
+ }
+}
+
+static int md_flushed(txn_t *txn)
+{
+ return (!md_flag(txn, SERIAL_TO_VALID) ||
+ (md_flag(txn, LAST_FLUSHED_VALID) &&
+ serial_equal(txn->shadow_md.last_flushed, txn->shadow_md.last_serial)));
+}
+
+static void make_header(knot_db_val_t *to, uint32_t serial_to, int chunk_count)
+{
+ assert(to->len >= JOURNAL_HEADER_SIZE);
+ assert(chunk_count > 0);
+
+ uint32_t be_serial_to = htobe32(serial_to);
+ uint32_t be_chunk_count = htobe32((uint32_t)chunk_count);
+
+ memcpy(to->data, &be_serial_to, sizeof(be_serial_to));
+ memcpy(to->data + sizeof(be_serial_to), &be_chunk_count, sizeof(be_chunk_count));
+ memset(to->data + sizeof(be_serial_to) + sizeof(be_chunk_count), 0,
+ JOURNAL_HEADER_SIZE - sizeof(be_serial_to) - sizeof(be_chunk_count));
+}
+
+/*! \brief read properties from chunk header "from". All the output params are optional */
+static void unmake_header(const knot_db_val_t *from, uint32_t *serial_to,
+ int *chunk_count, size_t *header_size)
+{
+ assert(from->len >= JOURNAL_HEADER_SIZE);
+
+ uint32_t be_serial_to, be_chunk_count;
+ if (serial_to != NULL) {
+ memcpy(&be_serial_to, from->data, sizeof(be_serial_to));
+ *serial_to = be32toh(be_serial_to);
+ }
+ if (chunk_count != NULL) {
+ memcpy(&be_chunk_count, from->data + sizeof(be_serial_to), sizeof(be_chunk_count));
+ assert(be32toh(be_chunk_count) <= INT_MAX);
+ *chunk_count = (int)be32toh(be_chunk_count);
+ }
+ if (header_size != NULL) {
+ *header_size = JOURNAL_HEADER_SIZE;
+ }
+}
+
+static int first_digit(char * of)
+{
+ return atoi(of);
+}
+
+static void md_update_journal_count(txn_t * txn, int change_amount)
+{
+ uint64_t jcnt = 0;
+ md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &jcnt);
+ md_set(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, jcnt + change_amount);
+}
+
+static int initial_md_check(journal_t *j, bool *dirty_present)
+{
+ *dirty_present = 0;
+
+ bool something_updated = false;
+
+ local_txn_t(txn, j);
+ txn_begin(txn, true);
+ txn_key_str(txn, NULL, MDKEY_GLOBAL_VERSION);
+ if (!txn_find(txn)) {
+ txn->val.len = strlen(JOURNAL_VERSION) + 1;
+ txn->val.data = JOURNAL_VERSION;
+ txn_insert(txn);
+ something_updated = true;
+ } else {
+ char * jver = txn->val.data;
+ if (first_digit(jver) != first_digit(JOURNAL_VERSION)) {
+ txn_abort(txn);
+ return KNOT_ENOTSUP;
+ }
+ }
+ txn_key_str(txn, j->zone, MDKEY_PERZONE_FLAGS);
+ if (!txn_find(txn)) {
+ md_update_journal_count(txn, +1);
+ something_updated = true;
+ }
+ *dirty_present = md_flag(txn, DIRTY_SERIAL_VALID);
+
+ if (something_updated) {
+ txn_commit(txn);
+ } else { // abort to gain up speed when opening a lot of zones
+ txn_abort(txn);
+ }
+
+ return txn->ret;
+}
+
+/*
+ * **************************** PART III ******************************
+ *
+ * DB iteration
+ *
+ * ********************************************************************
+ */
+
+enum {
+ JOURNAL_ITERATION_CHUNKS, // call the iteration callback for each chunk read, with just the chunk in ctx->val
+ JOURNAL_ITERATION_CHANGESETS // call the iteration callback after the last chunk of a changeset read, with all its chunks in ctx->val
+};
+
+typedef struct {
+ txn_t *txn; // DB txn not to be touched by callback, just contains journal pointer
+ uint32_t serial; // serial-from of current changeset
+ uint32_t serial_to; // serial-to of current changeset
+ const int method; // JOURNAL_ITERATION_CHUNKS or JOURNAL_ITERATION_CHANGESETS, to be set by the caller of iterate()
+ int chunk_index; // index of current chunk
+ int chunk_count; // # of chunks of current changeset
+ knot_db_val_t *val; // one val if JOURNAL_ITERATION_CHUNKS; chunk_count vals if JOURNAL_ITERATION_CHANGESETS
+ knot_db_iter_t *iter; // DB iteration context, not to be touched by callback
+ void *iter_context; // anything to send to the callback by the caller of iterate(), untouched by iterate()
+} iteration_ctx_t;
+
+typedef int (*iteration_cb_t)(iteration_ctx_t *ctx);
+
+/*!
+ * \brief Move iter to next changeset chunk.
+ *
+ * Try optimisticly fast move to next DB item. But the changeset can be out of order,
+ * so if we don't succeed (different serial or end of DB), we lookup next serial slowly.
+ */
+
+static void get_iter_next(iteration_ctx_t *ctx, iteration_cb_t key_cb)
+{
+ knot_db_val_t other_key = { 0 };
+
+ txn_check_open(ctx->txn);
+ txn_iter_next(ctx->txn);
+ txn_iter_key(ctx->txn, &other_key);
+ key_cb(ctx);
+ if (ctx->txn->ret == KNOT_ENOENT ||
+ (ctx->txn->ret == KNOT_EOK && txn_cmpkey(ctx->txn, &other_key) != 0)) {
+ ctx->txn->ret = KNOT_EOK;
+ if (ctx->txn->iter != NULL) {
+ txn_iter_finish(ctx->txn);
+ }
+ txn_iter_begin(ctx->txn);
+ txn_iter_seek(ctx->txn);
+ }
+}
+
+static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method,
+ void *iter_context, uint32_t first, uint32_t last, iteration_cb_t key_cb)
+{
+ reuse_txn(txn, j, _txn, true);
+
+ iteration_ctx_t ctx = {
+ .method = method,
+ .iter_context = iter_context,
+ .txn = txn,
+ .serial = first,
+ .chunk_index = 0
+ };
+
+ knot_db_val_t *vals = NULL;
+
+ txn_iter_begin(txn);
+
+ key_cb(&ctx);
+ txn_iter_seek(txn);
+
+ ctx.val = &txn->val;
+
+ while (true) {
+ txn_iter_val(txn);
+ if (txn->ret != KNOT_EOK) {
+ break;
+ }
+
+ unmake_header(&txn->val, &ctx.serial_to, &ctx.chunk_count, NULL);
+
+ if (method == JOURNAL_ITERATION_CHANGESETS) {
+ if (ctx.chunk_index == 0) {
+ if (vals != NULL) free(vals);
+ vals = malloc(ctx.chunk_count * sizeof(knot_db_val_t));
+ if (vals == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ break;
+ }
+ ctx.val = vals;
+ }
+ memcpy(vals + ctx.chunk_index, &txn->val, sizeof(knot_db_val_t));
+ }
+
+ if (method == JOURNAL_ITERATION_CHUNKS) {
+ txn->ret = cb(&ctx);
+ }
+
+ if (ctx.chunk_index == ctx.chunk_count - 1) { // hit last chunk of current changeset
+ if (method == JOURNAL_ITERATION_CHANGESETS) {
+ txn->ret = cb(&ctx);
+ }
+
+ if (ctx.serial == last) {
+ break; // standard loop exit here
+ }
+
+ ctx.serial = ctx.serial_to;
+ ctx.chunk_index = 0;
+ } else {
+ ctx.chunk_index++;
+ }
+
+ get_iter_next(&ctx, key_cb);
+ }
+
+ if (vals != NULL) {
+ free(vals);
+ }
+ txn_iter_finish(txn);
+
+ unreuse_txn(txn, _txn);
+
+ return txn->ret;
+}
+
+static int normal_iterkeycb(iteration_ctx_t *ctx)
+{
+ txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
+ return KNOT_EOK;
+}
+
+/*
+ * ***************************** PART IV ******************************
+ *
+ * Reading changesets
+ *
+ * ********************************************************************
+ */
+
+/*! \brief Deserialize changeset from chunks (in vals) */
+static int vals_to_changeset(knot_db_val_t *vals, int nvals,
+ const knot_dname_t *zone_name, changeset_t **ch)
+{
+ local_array(uint8_t *, valps, nvals)
+ local_array(size_t, vallens, nvals)
+ if (valps == NULL || vallens == NULL) {
+ local_array_free(valps)
+ local_array_free(vallens)
+ return KNOT_ENOMEM;
+ }
+
+ for (size_t i = 0; i < nvals; i++) {
+ valps[i] = vals[i].data + JOURNAL_HEADER_SIZE;
+ vallens[i] = vals[i].len - JOURNAL_HEADER_SIZE;
+ }
+
+ changeset_t *t_ch = changeset_new(zone_name);
+ if (t_ch == NULL) {
+ local_array_free(valps)
+ local_array_free(vallens)
+ return KNOT_ENOMEM;
+ }
+
+ int ret = changeset_deserialize(t_ch, valps, vallens, nvals);
+
+ local_array_free(valps)
+ local_array_free(vallens)
+ if (ret != KNOT_EOK) {
+ changeset_free(t_ch);
+ return ret;
+ }
+ *ch = t_ch;
+ return KNOT_EOK;
+}
+
+static int vals_to_chgset_ctx(knot_db_val_t *vals, int nvals, uint32_t serial_from,
+ uint32_t serial_to, chgset_ctx_t **ch)
+{
+ if (nvals < 1) {
+ return KNOT_EINVAL;
+ }
+
+ chgset_ctx_t *t_ch = chgset_ctx_create(nvals);
+ if (t_ch == NULL) {
+ return KNOT_ENOMEM;
+ }
+
+ for (size_t i = 0; i < nvals; i++) {
+ t_ch->src_chunks[i] = vals[i].data + JOURNAL_HEADER_SIZE;
+ t_ch->chunk_sizes[i] = vals[i].len - JOURNAL_HEADER_SIZE;
+ }
+
+ t_ch->serial_from = serial_from;
+ t_ch->serial_to = serial_to;
+
+ *ch = t_ch;
+ return KNOT_EOK;
+}
+
+static int load_one_itercb(iteration_ctx_t *ctx)
+{
+ changeset_t *ch = NULL, **targ = ctx->iter_context;
+ if (*targ != NULL) {
+ return KNOT_EINVAL;
+ }
+
+ int ret = vals_to_changeset(ctx->val, ctx->chunk_count, ctx->txn->j->zone, &ch);
+ if (ret == KNOT_EOK) *targ = ch;
+ return ret;
+}
+
+static int load_list_itercb(iteration_ctx_t *ctx)
+{
+ changeset_t *ch = NULL;
+ list_t *chlist = *(list_t **) ctx->iter_context;
+
+ int ret = vals_to_changeset(ctx->val, ctx->chunk_count, ctx->txn->j->zone, &ch);
+
+ if (ret == KNOT_EOK) {
+ add_tail(chlist, &ch->n);
+ }
+ return ret;
+}
+
+static int load_list_ctx_itercb(iteration_ctx_t *ctx)
+{
+ chgset_ctx_t *ch = NULL;
+ list_t *chlist = *(list_t **) ctx->iter_context;
+
+ int ret = vals_to_chgset_ctx(ctx->val, ctx->chunk_count, ctx->serial, ctx->serial_to, &ch);
+
+ if (ret == KNOT_EOK) {
+ add_tail(chlist, &ch->n);
+ }
+ return ret;
+}
+
+/*! \brief Load one changeset (with serial) from DB */
+static int load_one(journal_t *j, txn_t *_txn, uint32_t serial, changeset_t **ch)
+{
+ reuse_txn(txn, j, _txn, false);
+ changeset_t *rch = NULL;
+ iterate(j, txn, load_one_itercb, JOURNAL_ITERATION_CHANGESETS, &rch, serial, serial, normal_iterkeycb);
+ unreuse_txn(txn, _txn);
+ if (txn->ret == KNOT_EOK) {
+ if (rch == NULL) txn->ret = KNOT_ENOENT;
+ else *ch = rch;
+ }
+ return txn->ret;
+}
+
+static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch,
+ const uint32_t *only_if_serial)
+{
+ assert(*mch == NULL);
+
+ reuse_txn(txn, j, _txn, false);
+ txn_check_ret(txn);
+ uint32_t ms = txn->shadow_md.merged_serial, fl = txn->shadow_md.flags;
+
+ if ((fl & MERGED_SERIAL_VALID) &&
+ (only_if_serial == NULL || serial_equal(ms, *only_if_serial))) {
+ load_one(j, txn, ms, mch);
+ }
+ unreuse_txn(txn, _txn);
+
+ return txn->ret;
+}
+
+int journal_load_changesets(journal_t *j, list_t *dst, uint32_t from)
+{
+ if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;
+
+ local_txn_t(txn, j);
+ txn_begin(txn, false);
+
+ uint32_t ls = txn->shadow_md.last_serial;
+ iterate(j, txn, load_list_itercb, JOURNAL_ITERATION_CHANGESETS, &dst, from,
+ ls, normal_iterkeycb);
+ txn_commit(txn);
+
+ return txn->ret;
+}
+
+int journal_load_chgset_ctx(journal_t *j, chgset_ctx_list_t *dst, uint32_t from)
+{
+ if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;
+
+ txn_t *txn = calloc(1, sizeof(*txn) + sizeof(*txn->txn));
+ if (txn == NULL) {
+ return KNOT_ENOMEM;
+ }
+ txn_init(txn, ((void *)txn) + sizeof(*txn), j);
+ txn_begin(txn, false);
+
+ init_list(&dst->l);
+ dst->txn = txn;
+ list_t *dstl = &dst->l;
+
+ uint32_t ls = txn->shadow_md.last_serial;
+ iterate(j, txn, load_list_ctx_itercb, JOURNAL_ITERATION_CHANGESETS, &dstl, from,
+ ls, normal_iterkeycb);
+
+ if (txn->ret != KNOT_EOK) {
+ int ret = txn->ret;
+ txn_commit(txn);
+ free(txn);
+ return ret;
+ }
+
+ return txn->ret;
+}
+
+int load_bootstrap_iterkeycb(iteration_ctx_t *ctx)
+{
+ txn_key_str_u32(ctx->txn, ctx->txn->j->zone, KEY_BOOTSTRAP_CHANGESET, ctx->chunk_index);
+ return KNOT_EOK;
+}
+
+static int load_bootstrap_changeset(journal_t *j, txn_t *_txn, changeset_t **ch)
+{
+ reuse_txn(txn, j, _txn, false);
+ changeset_t *rch = NULL;
+ iterate(j, txn, load_one_itercb, JOURNAL_ITERATION_CHANGESETS, &rch,
+ 0, 0, load_bootstrap_iterkeycb);
+ unreuse_txn(txn, _txn);
+ if (txn->ret == KNOT_EOK) {
+ if (rch == NULL) txn->ret = KNOT_ENOENT;
+ else *ch = rch;
+ }
+ return txn->ret;
+}
+
+static bool has_bootstrap_changeset(journal_t *j, txn_t *_txn)
+{
+ reuse_txn(txn, j, _txn, false);
+ txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, 0);
+ bool res = txn_find(txn);
+ unreuse_txn(txn, _txn);
+ return res;
+}
+
+int journal_load_bootstrap(journal_t *j, list_t *dst)
+{
+ if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;
+
+ local_txn_t(txn, j);
+ txn_begin(txn, false);
+
+ changeset_t *bch = NULL;
+ load_bootstrap_changeset(j, txn, &bch);
+ if (bch == NULL) {
+ txn->ret = KNOT_ENOENT;
+ goto jlb_end;
+ }
+ add_tail(dst, &bch->n);
+ uint32_t from = knot_soa_serial(bch->soa_to->rrs.rdata);
+
+ uint32_t ls = txn->shadow_md.last_serial;
+ iterate(j, txn, load_list_itercb, JOURNAL_ITERATION_CHANGESETS, &dst,
+ from, ls, normal_iterkeycb);
+ if (txn->ret == KNOT_ENOENT) {
+ txn->ret = KNOT_EOK;
+ }
+jlb_end:
+ txn_commit(txn);
+ return txn->ret;
+}
+
+/*
+ * ***************************** PART V *******************************
+ *
+ * Deleting changesets
+ *
+ * ********************************************************************
+ */
+
+typedef struct {
+ size_t freed_approx;
+ size_t to_be_freed;
+} delete_status_t;
+
+static int del_upto_itercb(iteration_ctx_t *ctx)
+{
+ txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
+ txn_del(ctx->txn);
+ txn_check_ret(ctx->txn);
+
+ // one whole changeset has been deleted => update metadata.
+ // We are sure that the deleted changeset is first at this time.
+ // If it's not merged changeset, point first_serial to next one
+ if (ctx->chunk_index == ctx->chunk_count - 1) {
+ if (!md_flag(ctx->txn, MERGED_SERIAL_VALID) ||
+ !serial_equal(ctx->txn->shadow_md.merged_serial,ctx->serial)) {
+ ctx->txn->shadow_md.first_serial = ctx->serial_to;
+ ctx->txn->shadow_md.changeset_count--;
+ }
+ if (serial_equal(ctx->txn->shadow_md.last_flushed, ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
+ }
+ if (serial_equal(ctx->txn->shadow_md.last_serial, ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
+ }
+ if (serial_equal(ctx->txn->shadow_md.merged_serial,ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~MERGED_SERIAL_VALID;
+ }
+ }
+ return KNOT_EOK;
+}
+
+/*! \brief Delete from beginning of DB up to "last" changeset including.
+ * Please ensure (dbfirst == j->metadata.first_serial) */
+static int delete_upto(journal_t *j, txn_t *txn, uint32_t dbfirst, uint32_t last)
+{
+ return iterate(j, txn, del_upto_itercb, JOURNAL_ITERATION_CHUNKS, NULL,
+ dbfirst, last, normal_iterkeycb);
+}
+
+static int delete_merged_changeset(journal_t *j, txn_t *t)
+{
+ reuse_txn(txn, j, t, true);
+ txn_check_ret(txn);
+ if (!md_flag(txn, MERGED_SERIAL_VALID)) {
+ txn->ret = KNOT_ENOENT;
+ } else {
+ delete_upto(j, txn, txn->shadow_md.merged_serial, txn->shadow_md.merged_serial);
+ }
+ unreuse_txn(txn, t);
+ return txn->ret;
+}
+
+static int delete_bootstrap_changeset(journal_t *j, txn_t *_txn);
+
+static int drop_journal(journal_t *j, txn_t *_txn)
+{
+ reuse_txn(txn, j, _txn, true);
+ txn_check_ret(txn);
+ if (md_flag(txn, MERGED_SERIAL_VALID)) {
+ delete_merged_changeset(j, txn);
+ }
+ if (md_flag(txn, SERIAL_TO_VALID) && !md_flag(txn, FIRST_SERIAL_INVALID)) {
+ delete_upto(j, txn, txn->shadow_md.first_serial, txn->shadow_md.last_serial);
+ }
+ delete_bootstrap_changeset(j, txn);
+ md_del_last_inserter_zone(txn, j->zone);
+ md_set(txn, j->zone, MDKEY_PERZONE_OCCUPIED, 0);
+ unreuse_txn(txn, _txn);
+ return txn->ret;
+}
+
+static int del_tofree_itercb(iteration_ctx_t *ctx)
+{
+ delete_status_t *ds = ctx->iter_context;
+
+ if (ds->to_be_freed == 0) {
+ return KNOT_EOK; // all done, just running through the rest of records w/o change
+ }
+
+ txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
+ txn_del(ctx->txn);
+ txn_check_ret(ctx->txn);
+
+ ds->freed_approx += /*4096 + */ctx->val->len;
+
+ // when whole changeset deleted, check target and update metadata
+ if (ctx->chunk_index == ctx->chunk_count - 1) {
+ ctx->txn->shadow_md.first_serial = ctx->serial_to;
+ ctx->txn->shadow_md.changeset_count--;
+ if (serial_equal(ctx->txn->shadow_md.last_flushed, ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
+ ds->to_be_freed = 0; // prevents deleting unflushed changesets
+ }
+ if (serial_equal(ctx->txn->shadow_md.last_serial, ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
+ }
+ if (ds->freed_approx >= ds->to_be_freed) {
+ ds->to_be_freed = 0;
+ }
+ }
+
+ return KNOT_EOK;
+}
+
+/*!
+ * \brief Deletes from j->db oldest changesets to free up space
+ *
+ * It tries deleting olny flushed changesets, preserves all unflushed ones.
+ *
+ * \retval KNOT_EOK if no error, even if too little or nothing deleted (check really_freed for result); KNOT_E* if error
+ */
+static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *really_freed)
+{
+ reuse_txn(txn, j, _txn, true);
+ txn_check_ret(txn);
+
+ if (!md_flag(txn, LAST_FLUSHED_VALID)) {
+ *really_freed = 0;
+ return KNOT_EOK;
+ }
+ delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_freed };
+ iterate(j, txn, del_tofree_itercb, JOURNAL_ITERATION_CHUNKS, &ds,
+ txn->shadow_md.first_serial, txn->shadow_md.last_serial, normal_iterkeycb);
+ unreuse_txn(txn, _txn);
+
+ if (txn->ret == KNOT_EOK) {
+ *really_freed = ds.freed_approx;
+ }
+ return txn->ret;
+}
+
+static int del_count_itercb(iteration_ctx_t *ctx)
+{
+ delete_status_t *ds = ctx->iter_context;
+ if (ds->freed_approx >= ds->to_be_freed) {
+ return KNOT_EOK;
+ }
+ txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
+ txn_del(ctx->txn);
+ txn_check_ret(ctx->txn);
+
+ // when whole changeset deleted, check target and update metadata
+ if (ctx->chunk_index == ctx->chunk_count - 1) {
+ ctx->txn->shadow_md.first_serial = ctx->serial_to;
+ ctx->txn->shadow_md.changeset_count--;
+ if (serial_equal(ctx->txn->shadow_md.last_flushed, ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
+ ds->to_be_freed = ds->freed_approx; // prevents deleting unflushed changesets
+ }
+ if (serial_equal(ctx->txn->shadow_md.last_serial, ctx->serial)) {
+ ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
+ }
+ ds->freed_approx++;
+ }
+ return KNOT_EOK;
+}
+
+/*!
+ * \brief Deletes specified number of changesets
+ *
+ * It tries deleting olny flushed changesets, preserves all unflushed ones.
+ *
+ * \retval KNOT_EOK if no error, even if too little or nothing deleted (check really_deleted for result)
+ * \return KNOT_E* if error
+ */
+static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t *really_deleted)
+{
+ reuse_txn(txn, j, _txn, true);
+ txn_check_ret(txn);
+
+ if (!md_flag(txn, LAST_FLUSHED_VALID)) {
+ *really_deleted = 0;
+ return KNOT_EOK;
+ }
+ delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_deleted };
+ iterate(j, txn, del_count_itercb, JOURNAL_ITERATION_CHUNKS, &ds,
+ txn->shadow_md.first_serial, txn->shadow_md.last_serial, normal_iterkeycb);
+ unreuse_txn(txn, _txn);
+
+ if (txn->ret == KNOT_EOK) {
+ *really_deleted = ds.freed_approx;
+ }
+ return txn->ret;
+}
+
+static int delete_dirty_serial(journal_t *j, txn_t *_txn)
+{
+ reuse_txn(txn, j, _txn, true);
+ txn_check_ret(txn);
+
+ if (!md_flag(txn, DIRTY_SERIAL_VALID)) return KNOT_EOK;
+
+ uint32_t ds = txn->shadow_md.dirty_serial, chunk = 0;
+
+ txn_key_2u32(txn, j->zone, ds, chunk);
+ while (txn_find(txn)) {
+ txn_del(txn);
+ txn_key_2u32(txn, j->zone, ds, ++chunk);
+ }
+ unreuse_txn(txn, _txn);
+ if (txn->ret == KNOT_EOK) {
+ txn->shadow_md.flags &= ~DIRTY_SERIAL_VALID;
+ }
+ return txn->ret;
+}
+
+static int delete_bootstrap_changeset(journal_t *j, txn_t *_txn)
+{
+ reuse_txn(txn, j, _txn, false);
+ uint32_t chunk = 0;
+ txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, chunk);
+ while (txn_find(txn)) {
+ txn_del(txn);
+ txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, ++chunk);
+ }
+ unreuse_txn(txn, _txn);
+ return txn->ret;
+}
+
+/*
+ * ***************************** PART VI ******************************
+ *
+ * Writing changesets
+ *
+ * ********************************************************************
+ */
+
+static int merge_itercb(iteration_ctx_t *ctx)
+{
+ changeset_t *ch = NULL, *mch = *(changeset_t **)ctx->iter_context;
+
+ int ret = vals_to_changeset(ctx->val, ctx->chunk_count, ctx->txn->j->zone, &ch);
+ if (ret == KNOT_EOK) {
+ ret = changeset_merge(mch, ch, 0);
+ changeset_free(ch);
+ }
+ return ret;
+}
+
+static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **mch, bool *merged_bootstrap)
+{
+ reuse_txn(txn, j, _txn, false);
+ txn_check_ret(txn);
+ *mch = NULL;
+ if (md_flushed(txn)) {
+ goto m_u_ch_end;
+ }
+ uint32_t from;
+ txn->ret = load_bootstrap_changeset(j, txn, mch);
+ *merged_bootstrap = (txn->ret == KNOT_EOK);
+ if (txn->ret == KNOT_ENOENT) { // no bootstrap changeset (normal operation)
+ bool was_merged = md_flag(txn, MERGED_SERIAL_VALID);
+ bool was_flushed = md_flag(txn, LAST_FLUSHED_VALID);
+ txn->ret = KNOT_EOK;
+ from = was_merged ? txn->shadow_md.merged_serial :
+ (was_flushed ? txn->shadow_md.last_flushed :
+ txn->shadow_md.first_serial);
+ txn->ret = load_one(j, txn, from, mch);
+ if (!was_merged && was_flushed && txn->ret == KNOT_EOK) {
+ // we have to jump to ONE AFTER last_flushed
+ from = knot_soa_serial((*mch)->soa_to->rrs.rdata);
+ changeset_free(*mch);
+ *mch = NULL;
+ txn->ret = load_one(j, txn, from, mch);
+ }
+ }
+ if (txn->ret != KNOT_EOK) {
+ goto m_u_ch_end;
+ }
+ from = knot_soa_serial((*mch)->soa_to->rrs.rdata);
+
+ if (!serial_equal(from, txn->shadow_md.last_serial_to)) {
+ txn->ret = iterate(j, txn, merge_itercb, JOURNAL_ITERATION_CHANGESETS,
+ mch, from, txn->shadow_md.last_serial, normal_iterkeycb);
+ }
+
+m_u_ch_end:
+ unreuse_txn(txn, _txn);
+ if (txn->ret != KNOT_EOK && *mch != NULL) {
+ changeset_free(*mch);
+ *mch = NULL;
+ }
+ return txn->ret;
+}
+
+dynarray_declare(chunk, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC, 32)
+dynarray_define(chunk, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC)
+
+// uses local context, e.g.: j, txn, changesets, nchs, serialized_size_total, store_changeset_cleanup, inserting_merged
+#define try_flush \
+ if (!md_flushed(txn)) { \
+ if (journal_merge_allowed(j)) { \
+ changeset_t *merged; \
+ merge_unflushed_changesets(j, txn, &merged, &merged_into_bootstrap); \
+ if (txn->ret != KNOT_EOK) { \
+ goto store_changeset_cleanup; \
+ } \
+ add_tail(changesets, &merged->n); \
+ nchs++; \
+ serialized_size_merged += changeset_serialized_size(merged); \
+ md_flush(txn); \
+ inserting_merged = true; \
+ } \
+ else { \
+ txn->ret = KNOT_EBUSY; \
+ goto store_changeset_cleanup; \
+ } \
+ }
+
+static int store_changesets(journal_t *j, list_t *changesets)
+{
+ // PART 1 : initializers, compute serialized_sizes, transaction start
+ changeset_t *ch;
+
+ size_t nchs = 0, inserted_size = 0;
+ size_t serialized_size_changes = 0, serialized_size_merged = 0;
+
+ size_t chunks = 0;
+
+ bool inserting_merged = false;
+ bool merged_into_bootstrap = false;
+ bool inserting_bootstrap = false;
+
+ size_t occupied_last, occupied_now = knot_db_lmdb_get_usage(j->db->db);
+
+ WALK_LIST(ch, *changesets) {
+ nchs++;
+ serialized_size_changes += changeset_serialized_size(ch);
+ if (ch->soa_from == NULL) {
+ inserting_bootstrap = true;
+ }
+ }
+
+ local_txn_t(txn, j);
+ txn_begin(txn, true);
+
+ bool zone_in_journal = has_bootstrap_changeset(j, txn);
+ bool merge_allowed = journal_merge_allowed(j);
+
+ // if you're tempted to add dirty_serial deletion somewhere here, you're wrong. Don't do it.
+
+ // PART 2 : recalculating the previous insert's occupy change
+ md_get_common_last_occupied(txn, &occupied_last);
+ md_set(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, occupied_now);
+ if (occupied_now != occupied_last) {
+ knot_dname_t *last_zone = NULL;
+ uint64_t lz_occupied;
+ md_get_common_last_inserter_zone(txn, &last_zone);
+ if (last_zone != NULL) {
+ md_get(txn, last_zone, MDKEY_PERZONE_OCCUPIED, &lz_occupied);
+ lz_occupied = (lz_occupied + occupied_now > occupied_last ?
+ lz_occupied + occupied_now - occupied_last : 0);
+ md_set(txn, last_zone, MDKEY_PERZONE_OCCUPIED, lz_occupied);
+ free(last_zone);
+ }
+ }
+ md_set_common_last_inserter_zone(txn, j->zone);
+
+ // PART 3a : delete all if inserting bootstrap changeset
+ if (inserting_bootstrap) {
+ drop_journal(j, txn);
+ txn_restart(txn);
+ }
+
+ // PART 3b : check if we exceeded designed occupation and delete some
+ uint64_t occupied = 0, occupied_max;
+ md_get(txn, j->zone, MDKEY_PERZONE_OCCUPIED, &occupied);
+ occupied_max = journal_max_usage(j);
+ occupied += serialized_size_changes;
+ if (occupied > occupied_max) {
+ size_t freed;
+ size_t tofree = (occupied - occupied_max) * journal_tofree_factor(j);
+ size_t free_min = tofree * journal_minfree_factor(j);
+ delete_tofree(j, txn, tofree, &freed);
+ if (freed < free_min) {
+ tofree -= freed;
+ free_min -= freed;
+ try_flush
+ tofree += serialized_size_merged;
+ delete_tofree(j, txn, tofree, &freed);
+ if (freed < free_min) {
+ txn->ret = KNOT_ESPACE;
+ log_zone_warning(j->zone, "journal, unable to make free space for insert, "
+ "required: %"PRIu64", max: %"PRIu64,
+ occupied, occupied_max);
+ goto store_changeset_cleanup;
+ }
+ }
+ }
+
+ // PART 3c : check if we exceeded history depth
+ long over_limit = (long)txn->shadow_md.changeset_count - journal_max_changesets(j) +
+ list_size(changesets) - (inserting_merged ? 1 : 0);
+ if (zone_in_journal && over_limit > 0 && !merge_allowed) {
+ txn->ret = KNOT_ESPACE;
+ log_zone_warning(j->zone, "journal, unable to make free slot for insert");
+ goto store_changeset_cleanup;
+ } else if (over_limit > 0) {
+ size_t deled;
+ delete_count(j, txn, over_limit, &deled);
+ over_limit -= deled;
+ if (over_limit > 0) {
+ try_flush
+ delete_count(j, txn, over_limit, &deled);
+ // ignore further errors here, the limit is not so important
+ }
+ }
+
+ // PART 4: continuity and duplicity check
+ changeset_t * chs_head = (HEAD(*changesets));
+ bool is_first_bootstrap = (chs_head->soa_from == NULL);
+ uint32_t serial = is_first_bootstrap ? 0 : knot_soa_serial(chs_head->soa_from->rrs.rdata);
+ if (md_flag(txn, SERIAL_TO_VALID) && (is_first_bootstrap ||
+ !serial_equal(txn->shadow_md.last_serial_to, serial)) &&
+ !inserting_bootstrap /* if inserting bootstrap, drop_journal() was called, so no discontinuity */) {
+ log_zone_warning(j->zone, "journal, discontinuity in changes history (%u -> %u), dropping older changesets",
+ txn->shadow_md.last_serial_to, serial);
+ if (zone_in_journal) {
+ txn->ret = KNOT_ERANGE; // we can't drop history if zone-in-journal, so this is forbidden
+ goto store_changeset_cleanup;
+ } else if (merge_allowed) {
+ // flush would only merge and drop would delete the merge, so skip it
+ } else {
+ try_flush
+ }
+ drop_journal(j, txn);
+ txn_restart(txn);
+ }
+ WALK_LIST(ch, *changesets) {
+ uint32_t serial_to = knot_soa_serial(ch->soa_to->rrs.rdata);
+ bool is_this_bootstrap = (ch->soa_from == NULL);
+ bool is_this_merged = (inserting_merged && ch == TAIL(*changesets));
+ if (is_this_bootstrap || is_this_merged) {
+ continue;
+ }
+ txn_key_2u32(txn, j->zone, serial_to, 0);
+ if (txn_find(txn)) {
+ log_zone_warning(j->zone, "journal, duplicate changeset serial (%u), dropping older changesets",
+ serial_to);
+ if (zone_in_journal) {
+ if (merge_allowed) {
+ try_flush // merge will get rid of the duplicity => OK
+ } else {
+ txn->ret = KNOT_EEXIST; // we can't fix it in this case, refuse to do it
+ goto store_changeset_cleanup;
+ }
+ } else {
+ try_flush
+ }
+ delete_upto(j, txn, txn->shadow_md.first_serial, serial_to);
+ txn_restart(txn);
+ }
+ }
+
+ // PART 5: serializing into lmdb
+ WALK_LIST(ch, *changesets) {
+ if (txn->ret != KNOT_EOK) {
+ break;
+ }
+
+ chunk_dynarray_t dchunks = { 0 };
+ chunks = 0;
+
+ serialize_ctx_t *sctx = serialize_init(ch);
+ if (sctx == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ break;
+ }
+
+ bool is_this_merged = (inserting_merged && ch == TAIL(*changesets));
+ bool is_this_bootstrap = (ch->soa_from == NULL);
+ uint32_t serial = is_this_bootstrap ? 0 : knot_soa_serial(ch->soa_from->rrs.rdata);
+ uint32_t serial_to = knot_soa_serial(ch->soa_to->rrs.rdata);
+
+ while (serialize_unfinished(sctx)) {
+ size_t chunk_size;
+ serialize_prepare(sctx, CHUNK_MAX - JOURNAL_HEADER_SIZE, &chunk_size);
+ if (chunk_size == 0) {
+ break;
+ }
+
+ inserted_size += chunk_size;
+
+ if (is_this_bootstrap) {
+ txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, chunks);
+ } else {
+ txn_key_2u32(txn, j->zone, serial, chunks);
+ }
+
+ txn->val.data = NULL;
+ txn->val.len = chunk_size + JOURNAL_HEADER_SIZE;
+
+ txn_insert(txn);
+ if (txn->ret != KNOT_EOK) break;
+
+ chunk_dynarray_add(&dchunks, &txn->val);
+
+ chunks++;
+
+ serialize_chunk(sctx, txn->val.data + JOURNAL_HEADER_SIZE, chunk_size);
+ }
+
+ serialize_deinit(sctx);
+
+ dynarray_foreach(chunk, knot_db_val_t, val, dchunks) {
+ make_header(val, serial_to, chunks);
+ }
+ chunk_dynarray_free(&dchunks);
+
+ // PART 7: metadata update
+ if (txn->ret != KNOT_EOK) {
+ break;
+ }
+ if (is_this_merged && !merged_into_bootstrap) {
+ txn->shadow_md.flags |= MERGED_SERIAL_VALID;
+ txn->shadow_md.merged_serial = serial;
+ }
+ else if (is_this_bootstrap) {
+ if (!md_flag(txn, SERIAL_TO_VALID) || !is_this_merged) {
+ txn->shadow_md.flags |= FIRST_SERIAL_INVALID;
+ txn->shadow_md.last_serial_to = serial_to;
+ }
+ txn->shadow_md.flags |= SERIAL_TO_VALID;
+ } else {
+ if (!md_flag(txn, SERIAL_TO_VALID) || md_flag(txn, FIRST_SERIAL_INVALID)) {
+ txn->shadow_md.first_serial = serial;
+ }
+ txn->shadow_md.flags &= ~FIRST_SERIAL_INVALID;
+ txn->shadow_md.flags |= SERIAL_TO_VALID;
+ txn->shadow_md.last_serial = serial;
+ txn->shadow_md.last_serial_to = serial_to;
+ txn->shadow_md.changeset_count++;
+ }
+ }
+
+ // PART X : finalization and cleanup
+
+store_changeset_cleanup:
+
+ txn_commit(txn);
+
+ if (txn->ret != KNOT_EOK) {
+ local_txn_t(ddtxn, j);
+ txn_begin(ddtxn, true);
+ if (md_flag(ddtxn, DIRTY_SERIAL_VALID)) {
+ delete_dirty_serial(j, ddtxn);
+ }
+ txn_commit(ddtxn);
+ }
+
+ changeset_t *dbgchst = TAIL(*changesets);
+
+ if (inserting_merged) {
+ // free the merged changeset
+ rem_node(&dbgchst->n);
+ changeset_free(dbgchst);
+ }
+
+ return txn->ret;
+}
+#undef try_flush
+
+int journal_store_changeset(journal_t *journal, changeset_t *ch)
+{
+ if (journal == NULL || journal->db == NULL || ch == NULL) return KNOT_EINVAL;
+
+ changeset_t *ch_shallowcopy = malloc(sizeof(changeset_t));
+ if (ch_shallowcopy == NULL) {
+ return KNOT_ENOMEM;
+ }
+ memcpy(ch_shallowcopy, ch, sizeof(changeset_t)); // we need to copy the changeset_t structure not to break ch->n
+
+ list_t list;
+ init_list(&list);
+ add_tail(&list, &ch_shallowcopy->n);
+ int ret = store_changesets(journal, &list);
+
+ free(ch_shallowcopy);
+ return ret;
+}
+
+int journal_store_changesets(journal_t *journal, list_t *src)
+{
+ if (journal == NULL || journal->db == NULL || src == NULL) return KNOT_EINVAL;
+ return store_changesets(journal, src);
+}
+
+/*
+ * **************************** PART VII ******************************
+ *
+ * Journal initialization and global manipulation
+ *
+ * ********************************************************************
+ */
+
+journal_t *journal_new()
+{
+ return calloc(1, sizeof(journal_t));
+}
+
+void journal_free(journal_t **journal)
+{
+ if (journal == NULL || *journal == NULL) return;
+
+ if ((*journal)->zone != NULL) {
+ free((knot_dname_t *)(*journal)->zone);
+ }
+ free(*journal);
+ *journal = NULL;
+}
+
+static int open_journal_db_unsafe(journal_db_t **db)
+{
+ if ((*db)->db != NULL) return KNOT_EOK;
+
+ struct knot_db_lmdb_opts opts = KNOT_DB_LMDB_OPTS_INITIALIZER;
+ opts.path = (*db)->path;
+ opts.mapsize = (*db)->fslimit;
+ opts.maxdbs = 1;
+ opts.maxreaders = JOURNAL_MAX_READERS;
+ opts.flags.env = ((*db)->mode == JOURNAL_MODE_ASYNC ?
+ KNOT_DB_LMDB_WRITEMAP | KNOT_DB_LMDB_MAPASYNC : 0);
+ opts.flags.env |= KNOT_DB_LMDB_NOTLS;
+
+ int ret = (*db)->db_api->init(&(*db)->db, NULL, &opts);
+ if (ret != KNOT_EOK) {
+ (*db)->db = NULL;
+ return ret;
+ }
+
+ size_t real_fslimit = knot_db_lmdb_get_mapsize((*db)->db);
+ (*db)->fslimit = real_fslimit;
+
+ return KNOT_EOK;
+}
+
+int journal_open_db(journal_db_t **db)
+{
+ if (*db == NULL) return KNOT_EINVAL;
+ pthread_mutex_lock(&(*db)->db_mutex);
+ int ret = open_journal_db_unsafe(db);
+ pthread_mutex_unlock(&(*db)->db_mutex);
+ return ret;
+}
+
+int journal_open(journal_t *journal, journal_db_t **db, const knot_dname_t *zone_name)
+{
+ int ret = KNOT_EOK;
+
+ if (journal == NULL || (*db) == NULL) return KNOT_EINVAL;
+ if (journal->db != NULL) {
+ return KNOT_EOK;
+ }
+
+ // open shared journal DB if not already
+ if ((*db)->db == NULL) {
+ ret = journal_open_db(db);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ journal->db = *db;
+
+ journal->zone = knot_dname_copy(zone_name, NULL);
+ if (journal->zone == NULL) {
+ return KNOT_ENOMEM;
+ }
+
+ bool dirty_serial_valid;
+ ret = initial_md_check(journal, &dirty_serial_valid);
+
+ if (ret == KNOT_EOK && dirty_serial_valid) {
+ delete_dirty_serial(journal, NULL);
+ }
+
+ return ret;
+}
+
+void journal_close(journal_t *journal)
+{
+ journal->db = NULL;
+ free(journal->zone);
+ journal->zone = NULL;
+}
+
+int journal_db_init(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit,
+ journal_mode_t mode)
+{
+ if (*db != NULL) {
+ return KNOT_EOK;
+ }
+ *db = malloc(sizeof(journal_db_t));
+ if (*db == NULL) {
+ return KNOT_ENOMEM;
+ }
+ journal_db_t dbinit = {
+ .db = NULL,
+ .db_api = knot_db_lmdb_api(),
+ .path = strdup(lmdb_dir_path),
+ .fslimit = ((lmdb_fslimit < JOURNAL_MIN_FSLIMIT) ? JOURNAL_MIN_FSLIMIT : lmdb_fslimit),
+ .mode = mode,
+ };
+ memcpy(*db, &dbinit, sizeof(journal_db_t));
+ pthread_mutex_init(&(*db)->db_mutex, NULL);
+ return KNOT_EOK;
+}
+
+static void destroy_journal_db(journal_db_t **db)
+{
+ assert((*db)->db == NULL);
+
+ pthread_mutex_destroy(&(*db)->db_mutex);
+ free((*db)->path);
+ free((*db));
+ *db = NULL;
+}
+
+void journal_db_close(journal_db_t **db)
+{
+ if (db == NULL || *db == NULL) {
+ return;
+ }
+
+ pthread_mutex_lock(&(*db)->db_mutex);
+ if ((*db)->db != NULL) {
+ (*db)->db_api->deinit((*db)->db);
+ (*db)->db = NULL;
+ }
+ pthread_mutex_unlock(&(*db)->db_mutex);
+
+ destroy_journal_db(db);
+}
+
+int journal_flush(journal_t *journal)
+{
+ if (journal == NULL || journal->db == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ local_txn_t(txn, journal);
+ txn_begin(txn, true);
+ md_flush(txn);
+ txn_commit(txn);
+ return txn->ret;
+}
+
+bool journal_exists(journal_db_t **db, knot_dname_t *zone_name)
+{
+ if (db == NULL || *db == NULL || zone_name == NULL) {
+ return false;
+ }
+
+ if ((*db)->db == NULL) {
+ struct stat st;
+ if (stat((*db)->path, &st) != 0 || st.st_size == 0) {
+ return false;
+ }
+ int ret = journal_open_db(db);
+ if (ret != KNOT_EOK) {
+ return false;
+ }
+ }
+
+ journal_t fake_journal = { .db = *db, .zone = zone_name };
+ local_txn_t(txn, &fake_journal);
+ txn_begin(txn, false);
+ txn_key_str(txn, zone_name, MDKEY_PERZONE_FLAGS);
+ bool res = txn_find(txn);
+ txn_abort(txn);
+
+ return res;
+}
+
+static knot_db_val_t *dbval_copy(const knot_db_val_t *from)
+{
+ knot_db_val_t *to = malloc(sizeof(knot_db_val_t) + from->len);
+ if (to != NULL) {
+ memcpy(to, from, sizeof(knot_db_val_t));
+ to->data = to + 1; // == ((uit8_t *)to) + sizeof(knot_db_val_t)
+ memcpy(to->data, from->data, from->len);
+ }
+ return to;
+} // TODO think of moving this fun into different place/lib
+
+int journal_scrape(journal_t *j)
+{
+ if (j->db == NULL) return KNOT_EINVAL;
+ local_txn_t(txn, j);
+ txn_begin(txn, true);
+ txn_check_ret(txn);
+
+ knot_db_val_t key = { .len = 0, .data = "" };
+
+ list_t to_del;
+ init_list(&to_del);
+
+ txn_iter_begin(txn);
+ while (txn->ret == KNOT_EOK && txn->iter != NULL) {
+ txn_iter_key(txn, &key);
+ if (knot_dname_is_equal((const knot_dname_t *) key.data, j->zone)
+ && key_is_ok(&key, true)) {
+ knot_db_val_t * inskey = dbval_copy(&key);
+ if (inskey == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ goto scrape_end;
+ }
+ ptrlist_add(&to_del, inskey, NULL);
+ }
+ txn_iter_next(txn);
+ }
+ if (txn->ret == KNOT_ENOENT) {
+ txn->ret = KNOT_EOK;
+ }
+ txn_iter_finish(txn);
+
+ ptrnode_t *del_one;
+ if (txn->ret == KNOT_EOK) {
+ WALK_LIST(del_one, to_del) {
+ txn->ret = j->db->db_api->del(txn->txn, (knot_db_val_t *)del_one->d);
+ }
+ if (!EMPTY_LIST(to_del)) {
+ md_update_journal_count(txn, -1);
+ }
+
+ md_del_last_inserter_zone(txn, j->zone);
+
+ txn->ret = j->db->db_api->txn_commit(txn->txn);
+ }
+scrape_end:
+ ptrlist_deep_free(&to_del, NULL);
+
+ return txn->ret;
+}
+
+void journal_metadata_info(journal_t *j, bool *has_bootstrap, kserial_t *merged_serial,
+ kserial_t *first_serial, kserial_t *last_flushed, kserial_t *serial_to,
+ uint64_t *occupied, uint64_t *occupied_all_zones)
+{
+ // NOTE: there is NEVER the situation that only merged changeset would be present and no common changeset in db.
+
+ if (j == NULL || j->db == NULL) {
+ if (has_bootstrap != NULL) {
+ *has_bootstrap = false;
+ }
+ if (merged_serial != NULL) {
+ merged_serial->valid = false;
+ }
+ if (first_serial != NULL) {
+ first_serial->valid = false;
+ }
+ if (last_flushed != NULL) {
+ last_flushed->valid = false;
+ }
+ if (serial_to != NULL) {
+ serial_to->valid = false;
+ }
+ if (occupied != NULL) {
+ *occupied = 0;
+ }
+ return;
+ }
+
+ uint64_t occupied_total = knot_db_lmdb_get_usage(j->db->db);
+
+ local_txn_t(txn, j);
+ txn_begin(txn, false);
+ txn_check_open(txn);
+
+ if (has_bootstrap != NULL) {
+ *has_bootstrap = has_bootstrap_changeset(j, txn);
+ }
+ if (merged_serial != NULL) {
+ merged_serial->valid = md_flag(txn, MERGED_SERIAL_VALID);
+ merged_serial->serial = txn->shadow_md.merged_serial;
+ }
+ if (first_serial != NULL) {
+ first_serial->valid = !md_flag(txn, FIRST_SERIAL_INVALID);
+ first_serial->serial = txn->shadow_md.first_serial;
+ }
+ if (last_flushed != NULL) {
+ last_flushed->valid = md_flag(txn, LAST_FLUSHED_VALID);
+ last_flushed->serial = txn->shadow_md.last_flushed;
+ }
+ if (serial_to != NULL) {
+ serial_to->valid = md_flag(txn, SERIAL_TO_VALID);
+ serial_to->serial = txn->shadow_md.last_serial_to;
+ }
+ if (occupied != NULL) {
+ md_get(txn, j->zone, MDKEY_PERZONE_OCCUPIED, occupied);
+ knot_dname_t *last_inserter = NULL;
+ md_get_common_last_inserter_zone(txn, &last_inserter);
+ if (last_inserter != NULL && knot_dname_is_equal(last_inserter, j->zone)) {
+ size_t lz_occupied;
+ md_get_common_last_occupied(txn, &lz_occupied);
+ *occupied += occupied_total - lz_occupied;
+ }
+ free(last_inserter);
+ }
+ if (occupied_all_zones != NULL) {
+ *occupied_all_zones = occupied_total;
+ }
+
+ txn_abort(txn);
+}
+
+int journal_drop_changesets(journal_t *journal)
+{
+ return drop_journal(journal, NULL);
+}
+
+int journal_db_list_zones(journal_db_t **db, list_t *zones)
+{
+ uint64_t expected_count;
+
+ if (list_size(zones) > 0) {
+ return KNOT_EINVAL;
+ }
+
+ if ((*db)->db == NULL) {
+ int ret = journal_open_db(db);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ }
+
+ journal_t fake_journal = { .db = *db, .zone = (knot_dname_t *)"" };
+ local_txn_t(txn, &fake_journal);
+ txn_begin(txn, false);
+ md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &expected_count);
+ txn_check_ret(txn);
+
+ knot_db_val_t key;
+ txn_iter_begin(txn);
+ while (txn->ret == KNOT_EOK && txn->iter != NULL) {
+ txn_iter_key(txn, &key);
+
+ int metaflag_len = strlen(MDKEY_PERZONE_FLAGS);
+ char *compare_metaflag = key.data;
+ compare_metaflag += key.len - 1;
+ if (txn->ret == KNOT_EOK && *compare_metaflag == '\0') {
+ compare_metaflag -= metaflag_len;
+ if (strcmp(compare_metaflag, MDKEY_PERZONE_FLAGS) == 0) {
+ knot_dname_t *found_zone = knot_dname_copy((const knot_dname_t *)key.data, NULL);
+ if (found_zone == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ break;
+ }
+ ptrlist_add(zones, found_zone, NULL);
+ }
+ }
+ txn_iter_next(txn);
+ }
+ if (txn->ret == KNOT_ENOENT) {
+ txn->ret = KNOT_EOK;
+ }
+ txn_iter_finish(txn);
+ txn_abort(txn);
+ if (list_size(zones) < 1) {
+ txn->ret = KNOT_ENOENT;
+ }
+ return txn->ret;
+}
+
+/*
+ * *************************** PART VIII ******************************
+ *
+ * Journal check
+ *
+ * ********************************************************************
+ */
+
+static void _jch_print(const knot_dname_t *zname, int warn_level, const char *format, ...)
+{
+ char buf[512] = "journal check: ";
+ char *zname_ch = NULL;
+
+ va_list args;
+ va_start(args, format);
+ vsprintf(buf + strlen(buf), format, args);
+ va_end(args);
+
+ switch (warn_level) {
+ case JOURNAL_CHECK_STDERR:
+ zname_ch = knot_dname_to_str_alloc(zname);
+ fprintf(stderr, "[%s] %s\n", zname_ch, buf);
+ free(zname_ch);
+ break;
+ case JOURNAL_CHECK_INFO:
+ log_zone_info(zname, "%s", buf);
+ break;
+ case JOURNAL_CHECK_WARN:
+ log_zone_error(zname, "%s", buf);
+ break;
+ default:
+ break;
+ }
+}
+
+#define jch_print(wl, fmt_args...) if ((wl) <= warn_level) _jch_print(j->zone, warn_level, fmt_args)
+#define jch_info(fmt_args...) jch_print(JOURNAL_CHECK_INFO, fmt_args)
+#define jch_warn(fmt_args...) jch_print((allok = 0, JOURNAL_CHECK_WARN), fmt_args)
+#define jch_txn(comment, fatal) do { if (txn->ret != KNOT_EOK) { \
+ jch_warn("failed transaction: %s (%s)", (comment), knot_strerror(txn->ret)); \
+ if (fatal) return txn->ret; } } while (0)
+
+int journal_check(journal_t *j, journal_check_level_t warn_level)
+{
+ int ret, allok = 1;
+ changeset_t *ch = NULL;
+ uint32_t sfrom, sto;
+ uint32_t first_unflushed;
+ uint32_t chcount;
+
+ jch_info("started");
+
+ if (j->db == NULL) {
+ jch_warn("is not open");
+ return KNOT_ESEMCHECK;
+ }
+
+ local_txn_t(txn, j);
+ txn_begin(txn, true);
+ jch_txn("begin", true);
+
+ jch_info("metadata: flags >> %d << fs %u ls %u lst %u lf %u ms %u ds %u cnt %u",
+ txn->shadow_md.flags, txn->shadow_md.first_serial, txn->shadow_md.last_serial,
+ txn->shadow_md.last_serial_to, txn->shadow_md.last_flushed, txn->shadow_md.merged_serial,
+ txn->shadow_md.dirty_serial, txn->shadow_md.changeset_count);
+
+ chcount = txn->shadow_md.changeset_count;
+ first_unflushed = txn->shadow_md.first_serial;
+
+ if (md_flag(txn, DIRTY_SERIAL_VALID)) {
+ jch_warn("there is some post-crash mess in the DB");
+ }
+
+ if (!md_flag(txn, SERIAL_TO_VALID)) {
+ if (md_flag(txn, LAST_FLUSHED_VALID)) {
+ jch_warn("journal flagged empty but last_flushed valid");
+ }
+ if (md_flag(txn, MERGED_SERIAL_VALID)) {
+ jch_warn("no other than merged changeset present, this should not happen");
+ }
+ goto check_merged;
+ }
+
+ if (md_flag(txn, FIRST_SERIAL_INVALID)) {
+ jch_info("there is just the bootstrap changeset in journal");
+ ret = load_bootstrap_changeset(j, txn, &ch);
+ if (ret != KNOT_EOK) {
+ jch_warn("can't read bootstrap changeset (%s)", knot_strerror(ret));
+ } else {
+ changeset_free(ch);
+ }
+ goto check_merged;
+ } else {
+ ret = load_bootstrap_changeset(j, txn, &ch);
+ switch (ret) {
+ case KNOT_EOK:
+ sto = knot_soa_serial(ch->soa_to->rrs.rdata);
+ jch_info("bootstrap changeset loaded, sto %u", sto);
+ changeset_free(ch);
+ break;
+ case KNOT_ENOENT:
+ txn->ret = KNOT_EOK;
+ break;
+ default:
+ jch_info("failed to read bootstrap changeset (%s)", knot_strerror(ret));
+ break;
+ }
+ }
+
+ ret = load_one(j, txn, txn->shadow_md.first_serial, &ch);
+ if (ret != KNOT_EOK) {
+ jch_warn("can't read first changeset %u (%s)",
+ txn->shadow_md.first_serial, knot_strerror(ret));
+ goto check_merged;
+ }
+
+ sfrom = knot_soa_serial(ch->soa_from->rrs.rdata), sto = knot_soa_serial(ch->soa_to->rrs.rdata);
+ if (!serial_equal(txn->shadow_md.first_serial, sfrom)) {
+ jch_warn("first changeset's serial 'from' %u is not ok", sfrom);
+ }
+
+ if (md_flag(txn, LAST_FLUSHED_VALID)) {
+ changeset_free(ch);
+ ret = load_one(j, txn, txn->shadow_md.last_flushed, &ch);
+ if (ret != KNOT_EOK) {
+ jch_warn("can't read last flushed changeset %u (%s)",
+ txn->shadow_md.last_flushed, knot_strerror(ret));
+ } else {
+ first_unflushed = knot_soa_serial(ch->soa_to->rrs.rdata);
+ }
+ }
+ if (ret == KNOT_EOK) {
+ changeset_free(ch);
+ }
+
+ if (serial_equal(txn->shadow_md.last_serial_to, sto)) {
+ jch_info("there is just one changeset in the journal");
+ goto check_merged;
+ }
+ ret = load_one(j, txn, sto, &ch);
+ if (ret != KNOT_EOK) {
+ jch_warn("can't read second changeset %u (%s)", sto, knot_strerror(ret));
+ } else {
+ sfrom = knot_soa_serial(ch->soa_from->rrs.rdata);
+ if (!serial_equal(sfrom, sto)) {
+ jch_warn("second changeset's serial 'from' %u is not ok", sfrom);
+ }
+ changeset_free(ch);
+ }
+
+ sfrom = txn->shadow_md.first_serial;
+ sto = txn->shadow_md.last_serial_to;
+ txn_commit(txn);
+ jch_txn("commit", true);
+
+ list_t l;
+ init_list(&l);
+ ret = journal_load_changesets(j, &l, sfrom);
+ if (ret != KNOT_EOK) {
+ jch_warn("can't read all changesets %u -> %u (%s)", sfrom, sto, knot_strerror(ret));
+ goto check_merged;
+ }
+ jch_info("listed %zu changesets", list_size(&l));
+ if (list_size(&l) != chcount) {
+ jch_warn("expected %u changesets but found %zu", chcount, list_size(&l));
+ }
+
+ ch = HEAD(l);
+ if (!serial_equal(sfrom, knot_soa_serial(ch->soa_from->rrs.rdata))) {
+ jch_warn("first listed changeset's serial 'from' %u is not ok",
+ knot_soa_serial(ch->soa_from->rrs.rdata));
+ }
+ ch = TAIL(l);
+ if (!serial_equal(sto, knot_soa_serial(ch->soa_to->rrs.rdata))) {
+ jch_warn("last listed changeset's serial 'to' %u is not ok",
+ knot_soa_serial(ch->soa_to->rrs.rdata));
+ }
+ changesets_free(&l);
+
+check_merged:
+ if (txn->opened) txn_abort(txn);
+ txn_begin(txn, false);
+ jch_txn("begin2", true);
+ if (md_flag(txn, MERGED_SERIAL_VALID)) {
+ ch = NULL;
+ ret = load_merged_changeset(j, txn, &ch, NULL);
+ if (ret != KNOT_EOK) {
+ jch_warn("can't read merged changeset (%s)", knot_strerror(ret));
+ } else {
+ sfrom = knot_soa_serial(ch->soa_from->rrs.rdata);
+ sto = knot_soa_serial(ch->soa_to->rrs.rdata);
+ jch_info("merged changeset %u -> %u (size %zu)", sfrom, sto,
+ changeset_serialized_size(ch));
+ if (!serial_equal(sfrom, txn->shadow_md.merged_serial)) {
+ jch_warn("merged changeset's serial 'from' is not ok");
+ }
+ if (!serial_equal(sto, first_unflushed)) {
+ jch_warn("merged changeset's serial 'to' is not ok");
+ }
+ changeset_free(ch);
+ }
+ }
+ txn_commit(txn);
+ jch_txn("commit2", true);
+
+ if (allok) {
+ jch_info("passed without errors");
+ }
+
+ return (allok ? KNOT_EOK : KNOT_ERROR);
+}