summaryrefslogtreecommitdiffstats
path: root/src/knot/journal
diff options
context:
space:
mode:
Diffstat (limited to 'src/knot/journal')
-rw-r--r--src/knot/journal/journal_basic.c92
-rw-r--r--src/knot/journal/journal_basic.h118
-rw-r--r--src/knot/journal/journal_metadata.c422
-rw-r--r--src/knot/journal/journal_metadata.h187
-rw-r--r--src/knot/journal/journal_read.c436
-rw-r--r--src/knot/journal/journal_read.h158
-rw-r--r--src/knot/journal/journal_write.c333
-rw-r--r--src/knot/journal/journal_write.h121
-rw-r--r--src/knot/journal/knot_lmdb.c770
-rw-r--r--src/knot/journal/knot_lmdb.h446
-rw-r--r--src/knot/journal/serialization.c501
-rw-r--r--src/knot/journal/serialization.h169
12 files changed, 3753 insertions, 0 deletions
diff --git a/src/knot/journal/journal_basic.c b/src/knot/journal/journal_basic.c
new file mode 100644
index 0000000..825130a
--- /dev/null
+++ b/src/knot/journal/journal_basic.c
@@ -0,0 +1,92 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/journal/journal_basic.h"
+#include "knot/journal/journal_metadata.h"
+#include "libknot/error.h"
+
+MDB_val journal_changeset_id_to_key(bool zone_in_journal, uint32_t serial, const knot_dname_t *zone)
+{
+ if (zone_in_journal) {
+ return knot_lmdb_make_key("NIS", zone, (uint32_t)0, "bootstrap");
+ } else {
+ return knot_lmdb_make_key("NII", zone, (uint32_t)0, serial);
+ }
+}
+
+MDB_val journal_make_chunk_key(const knot_dname_t *apex, uint32_t ch_from, bool zij, uint32_t chunk_id)
+{
+ if (zij) {
+ return knot_lmdb_make_key("NISI", apex, (uint32_t)0, "bootstrap", chunk_id);
+ } else {
+ return knot_lmdb_make_key("NIII", apex, (uint32_t)0, ch_from, chunk_id);
+ }
+}
+
+MDB_val journal_zone_prefix(const knot_dname_t *zone)
+{
+ return knot_lmdb_make_key("NI", zone, (uint32_t)0);
+}
+
+void journal_del_zone(knot_lmdb_txn_t *txn, const knot_dname_t *zone)
+{
+ assert(txn->is_rw);
+ MDB_val prefix = journal_zone_prefix(zone);
+ knot_lmdb_del_prefix(txn, &prefix);
+ free(prefix.mv_data);
+}
+
+void journal_make_header(void *chunk, uint32_t ch_serial_to)
+{
+ knot_lmdb_make_key_part(chunk, JOURNAL_HEADER_SIZE, "IILLL", ch_serial_to,
+ (uint32_t)0 /* we no longer care for # of chunks */,
+ (uint64_t)0, (uint64_t)0, (uint64_t)0);
+}
+
+uint32_t journal_next_serial(const MDB_val *chunk)
+{
+ return knot_wire_read_u32(chunk->mv_data);
+}
+
+bool journal_serial_to(knot_lmdb_txn_t *txn, bool zij, uint32_t serial,
+ const knot_dname_t *zone, uint32_t *serial_to)
+{
+ MDB_val key = journal_changeset_id_to_key(zij, serial, zone);
+ bool found = knot_lmdb_find_prefix(txn, &key);
+ if (found && serial_to != NULL) {
+ *serial_to = journal_next_serial(&txn->cur_val);
+ }
+ free(key.mv_data);
+ return found;
+}
+
+bool journal_allow_flush(zone_journal_t j)
+{
+ conf_val_t val = conf_zone_get(j.conf, C_ZONEFILE_SYNC, j.zone);
+ return conf_int(&val) >= 0;
+}
+
+size_t journal_conf_max_usage(zone_journal_t j)
+{
+ conf_val_t val = conf_zone_get(j.conf, C_JOURNAL_MAX_USAGE, j.zone);
+ return conf_int(&val);
+}
+
+size_t journal_conf_max_changesets(zone_journal_t j)
+{
+ conf_val_t val = conf_zone_get(j.conf, C_JOURNAL_MAX_DEPTH, j.zone);
+ return conf_int(&val);
+}
diff --git a/src/knot/journal/journal_basic.h b/src/knot/journal/journal_basic.h
new file mode 100644
index 0000000..8804d7b
--- /dev/null
+++ b/src/knot/journal/journal_basic.h
@@ -0,0 +1,118 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "knot/conf/conf.h"
+#include "knot/journal/knot_lmdb.h"
+#include "knot/updates/changesets.h"
+#include "libknot/dname.h"
+
+typedef struct {
+ knot_lmdb_db_t *db;
+ const knot_dname_t *zone;
+ void *conf; // needed only for journal write operations
+} zone_journal_t;
+
+#define JOURNAL_CHUNK_MAX (70 * 1024) // must be at least 64k + 6B
+#define JOURNAL_CHUNK_THRESH (15 * 1024)
+#define JOURNAL_HEADER_SIZE (32)
+
+/*! \brief Convert journal_mode to LMDB environment flags. */
+inline static unsigned journal_env_flags(int journal_mode, bool readonly)
+{
+ return (journal_mode == JOURNAL_MODE_ASYNC ? (MDB_WRITEMAP | MDB_MAPASYNC) : 0) |
+ (readonly ? MDB_RDONLY : 0);
+}
+
+/*!
+ * \brief Create a database key prefix to search for a changeset.
+ *
+ * \param zone_in_journal True if searching for zone-in-journal special changeset.
+ * \param serial Serial-from of the changeset to be searched for. Ignored if 'zone_in_journal'.
+ * \param zone Name of the zone.
+ *
+ * \return DB key. 'mv_data' shall be freed later. 'mv_data' is NULL on failure.
+ */
+MDB_val journal_changeset_id_to_key(bool zone_in_journal, uint32_t serial, const knot_dname_t *zone);
+
+/*!
+ * \brief Create a database key for changeset chunk.
+ *
+ * \param apex Zone apex owner name.
+ * \param ch_from Serial "from" of the stored changeset.
+ * \param zij Zone-in-journal is stored.
+ * \param chunk_id Ordinal number of this changeset's chunk.
+ *
+ * \return DB key. 'mv_data' shall be freed later. 'mv_data' is NULL on failure.
+ */
+MDB_val journal_make_chunk_key(const knot_dname_t *apex, uint32_t ch_from, bool zij, uint32_t chunk_id);
+
+/*!
+ * \brief Return a key prefix to operate with all zone-related records.
+ */
+MDB_val journal_zone_prefix(const knot_dname_t *zone);
+
+/*!
+ * \brief Delete all zone-related records from journal with open read-write txn.
+ */
+void journal_del_zone(knot_lmdb_txn_t *txn, const knot_dname_t *zone);
+
+/*!
+ * \brief Initialise chunk header.
+ *
+ * \param chunk Pointer to the changeset chunk. It must be at least JOURNAL_HEADER_SIZE, perhaps more.
+ * \param ch Serial-to of the changeset being serialized.
+ */
+void journal_make_header(void *chunk, uint32_t ch_serial_to);
+
+/*!
+ * \brief Obtain serial-to of the serialized changeset.
+ *
+ * \param chunk Any chunk of a serialized changeset.
+ *
+ * \return The changeset's serial-to.
+ */
+uint32_t journal_next_serial(const MDB_val *chunk);
+
+/*!
+ * \brief Obtain serial-to of a changeset stored in journal.
+ *
+ * \param txn Journal DB transaction.
+ * \param zij True if changeset in question is zone-in-journal.
+ * \param serial Serial-from of the changeset in question.
+ * \param zone Zone name.
+ * \param serial_to Output: serial-to of the changeset in question.
+ *
+ * \return True if the changeset exists in the journal.
+ */
+bool journal_serial_to(knot_lmdb_txn_t *txn, bool zij, uint32_t serial,
+ const knot_dname_t *zone, uint32_t *serial_to);
+
+/*! \brief Return true if the changeset in question exists in the journal. */
+inline static bool journal_contains(knot_lmdb_txn_t *txn, bool zone, uint32_t serial, const knot_dname_t *zone_name)
+{
+ return journal_serial_to(txn, zone, serial, zone_name, NULL);
+}
+
+/*! \brief Return true if the journal may be flushed according to conf. */
+bool journal_allow_flush(zone_journal_t j);
+
+/*! \brief Return configured maximal per-zone usage of journal DB. */
+size_t journal_conf_max_usage(zone_journal_t j);
+
+/*! \brief Return configured maximal depth of journal. */
+size_t journal_conf_max_changesets(zone_journal_t j);
diff --git a/src/knot/journal/journal_metadata.c b/src/knot/journal/journal_metadata.c
new file mode 100644
index 0000000..b133534
--- /dev/null
+++ b/src/knot/journal/journal_metadata.c
@@ -0,0 +1,422 @@
+/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/journal/journal_metadata.h"
+
+#include "libknot/endian.h"
+#include "libknot/error.h"
+
+static void fix_endian(void *data, size_t data_size, bool in)
+{
+ union {
+ uint8_t u8;
+ uint16_t u16;
+ uint32_t u32;
+ uint64_t u64;
+ } before, after;
+
+ memcpy(&before, data, data_size);
+ switch (data_size) {
+ case sizeof(uint8_t):
+ return;
+ case sizeof(uint16_t):
+ after.u16 = in ? be16toh(before.u16) : htobe16(before.u16);
+ break;
+ case sizeof(uint32_t):
+ after.u32 = in ? be32toh(before.u32) : htobe32(before.u32);
+ break;
+ case sizeof(uint64_t):
+ after.u64 = in ? be64toh(before.u64) : htobe64(before.u64);
+ break;
+ default:
+ assert(0);
+ }
+ memcpy(data, &after, data_size);
+}
+
+static MDB_val metadata_key(const knot_dname_t *zone, const char *metadata)
+{
+ if (zone == NULL) {
+ return knot_lmdb_make_key("IS", (uint32_t)0, metadata);
+ } else {
+ return knot_lmdb_make_key("NIS", zone, (uint32_t)0, metadata);
+ }
+}
+
+static bool del_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata)
+{
+ MDB_val key = metadata_key(zone, metadata);
+ if (key.mv_data != NULL) {
+ knot_lmdb_del_prefix(txn, &key);
+ free(key.mv_data);
+ }
+ return (key.mv_data != NULL);
+}
+
+static bool get_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata)
+{
+ MDB_val key = metadata_key(zone, metadata);
+ bool ret = knot_lmdb_find(txn, &key, KNOT_LMDB_EXACT); // not FORCE
+ free(key.mv_data);
+ return ret;
+}
+
+static bool get_metadata_numeric(knot_lmdb_txn_t *txn, const knot_dname_t *zone,
+ const char *metadata, void *result, size_t result_size)
+{
+ if (get_metadata(txn, zone, metadata)) {
+ if (txn->cur_val.mv_size == result_size) {
+ memcpy(result, txn->cur_val.mv_data, result_size);
+ fix_endian(result, result_size, true);
+ return true;
+ } else {
+ txn->ret = KNOT_EMALF;
+ }
+ }
+ return false;
+}
+
+bool get_metadata32(knot_lmdb_txn_t *txn, const knot_dname_t *zone,
+ const char *metadata, uint32_t *result)
+{
+ return get_metadata_numeric(txn, zone, metadata, result, sizeof(*result));
+}
+
+bool get_metadata64(knot_lmdb_txn_t *txn, const knot_dname_t *zone,
+ const char *metadata, uint64_t *result)
+{
+ return get_metadata_numeric(txn, zone, metadata, result, sizeof(*result));
+}
+
+bool get_metadata64or32(knot_lmdb_txn_t *txn, const knot_dname_t *zone,
+ const char *metadata, uint64_t *result)
+{
+ if (txn->ret != KNOT_EOK) {
+ return false;
+ }
+ bool ret = get_metadata64(txn, zone, metadata, result);
+ if (txn->ret == KNOT_EMALF) {
+ uint32_t res32 = 0;
+ txn->ret = KNOT_EOK;
+ ret = get_metadata32(txn, zone, metadata, &res32);
+ *result = res32;
+ }
+ return ret;
+}
+
+void set_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata,
+ const void *valp, size_t val_size, bool numeric)
+{
+ MDB_val key = metadata_key(zone, metadata);
+ MDB_val val = { val_size, NULL };
+ if (knot_lmdb_insert(txn, &key, &val)) {
+ memcpy(val.mv_data, valp, val_size);
+ if (numeric) {
+ fix_endian(val.mv_data, val_size, false);
+ }
+ }
+ free(key.mv_data);
+}
+
+static int64_t last_occupied_diff(knot_lmdb_txn_t *txn)
+{
+ uint64_t occupied_now = knot_lmdb_usage(txn), occupied_last = 0;
+ (void)get_metadata64(txn, NULL, "last_total_occupied", &occupied_last);
+ return (int64_t)occupied_now - (int64_t)occupied_last;
+}
+
+void update_last_inserter(knot_lmdb_txn_t *txn, const knot_dname_t *new_inserter)
+{
+ uint64_t occupied_now = knot_lmdb_usage(txn), lis_occupied = 0;
+ int64_t occupied_diff = last_occupied_diff(txn);
+ knot_dname_t *last_inserter = get_metadata(txn, NULL, "last_inserter_zone") ?
+ knot_dname_copy(txn->cur_val.mv_data, NULL) : NULL;
+ if (occupied_diff == 0 || last_inserter == NULL) {
+ goto update_inserter;
+ }
+ (void)get_metadata64(txn, last_inserter, "occupied", &lis_occupied);
+ lis_occupied = MAX(0, (int64_t)lis_occupied + occupied_diff);
+ set_metadata(txn, last_inserter, "occupied", &lis_occupied, sizeof(lis_occupied), true);
+
+update_inserter:
+ if (new_inserter == NULL) {
+ del_metadata(txn, NULL, "last_inserter_zone");
+ } else if (last_inserter == NULL || !knot_dname_is_equal(last_inserter, new_inserter)) {
+ set_metadata(txn, NULL, "last_inserter_zone", new_inserter, knot_dname_size(new_inserter), false);
+ }
+ free(last_inserter);
+ set_metadata(txn, NULL, "last_total_occupied", &occupied_now, sizeof(occupied_now), true);
+}
+
+uint64_t journal_get_occupied(knot_lmdb_txn_t *txn, const knot_dname_t *zone)
+{
+ uint64_t res = 0;
+ get_metadata64(txn, zone, "occupied", &res);
+ return res;
+}
+
+static int first_digit(char * of)
+{
+ unsigned maj, min;
+ return sscanf(of, "%u.%u", &maj, &min) == 2 ? maj : -1;
+}
+
+void journal_load_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, journal_metadata_t *md)
+{
+ memset(md, 0, sizeof(*md));
+ if (get_metadata(txn, NULL, "version")) {
+ switch (first_digit(txn->cur_val.mv_data)) {
+ case 3:
+ // TODO warning about downgrade
+ // FALLTHROUGH
+ case 1:
+ // still supported
+ // FALLTHROUGH
+ case 2:
+ // normal operation
+ break;
+ case 0:
+ // failed to read version
+ txn->ret = KNOT_ENOENT;
+ return;
+ default:
+ txn->ret = KNOT_ENOTSUP;
+ return;
+ }
+ }
+ md->_new_zone = !get_metadata32(txn, zone, "flags", &md->flags);
+ (void)get_metadata32(txn, zone, "first_serial", &md->first_serial);
+ (void)get_metadata32(txn, zone, "last_serial_to", &md->serial_to);
+ (void)get_metadata32(txn, zone, "merged_serial", &md->merged_serial);
+ (void)get_metadata32(txn, zone, "changeset_count", &md->changeset_count);
+ if (!get_metadata32(txn, zone, "flushed_upto", &md->flushed_upto)) {
+ // importing from version 1.0
+ if ((md->flags & JOURNAL_LAST_FLUSHED_VALID)) {
+ uint32_t last_flushed = 0;
+ if (!get_metadata32(txn, zone, "last_flushed", &last_flushed) ||
+ !journal_serial_to(txn, false, last_flushed, zone, &md->flushed_upto)) {
+ txn->ret = KNOT_EMALF;
+ } else {
+ md->flags &= ~JOURNAL_LAST_FLUSHED_VALID;
+ }
+ } else {
+ md->flushed_upto = md->first_serial;
+ }
+ }
+
+}
+
+void journal_store_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const journal_metadata_t *md)
+{
+ set_metadata(txn, zone, "first_serial", &md->first_serial, sizeof(md->first_serial), true);
+ set_metadata(txn, zone, "last_serial_to", &md->serial_to, sizeof(md->serial_to), true);
+ set_metadata(txn, zone, "flushed_upto", &md->flushed_upto, sizeof(md->flushed_upto), true);
+ set_metadata(txn, zone, "merged_serial", &md->merged_serial, sizeof(md->merged_serial), true);
+ set_metadata(txn, zone, "changeset_count", &md->changeset_count, sizeof(md->changeset_count), true);
+ set_metadata(txn, zone, "flags", &md->flags, sizeof(md->flags), true);
+ set_metadata(txn, NULL, "version", "2.0", 4, false);
+ if (md->_new_zone) {
+ uint64_t journal_count = 0;
+ (void)get_metadata64or32(txn, NULL, "journal_count", &journal_count);
+ ++journal_count;
+ set_metadata(txn, NULL, "journal_count", &journal_count, sizeof(journal_count), true);
+ }
+}
+
+void journal_metadata_after_delete(journal_metadata_t *md, uint32_t deleted_upto,
+ size_t deleted_count)
+{
+ if (deleted_count == 0) {
+ return;
+ }
+ assert((md->flags & JOURNAL_SERIAL_TO_VALID));
+ if (deleted_upto == md->serial_to) {
+ assert(md->flushed_upto == md->serial_to);
+ assert(md->changeset_count == deleted_count);
+ md->flags &= ~JOURNAL_SERIAL_TO_VALID;
+ }
+ md->first_serial = deleted_upto;
+ md->changeset_count -= deleted_count;
+}
+
+void journal_metadata_after_merge(journal_metadata_t *md, bool merged_zij, uint32_t merged_serial,
+ uint32_t merged_serial_to, uint32_t original_serial_to)
+{
+ md->flushed_upto = merged_serial_to;
+ if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) {
+ assert(!merged_zij);
+ assert(merged_serial == md->merged_serial);
+ } else if (!merged_zij) {
+ md->merged_serial = merged_serial;
+ md->flags |= JOURNAL_MERGED_SERIAL_VALID;
+ assert(merged_serial == md->first_serial);
+ journal_metadata_after_delete(md, original_serial_to, 1); // the merged changeset writes itself instead of first one
+ }
+}
+
+void journal_metadata_after_insert(journal_metadata_t *md, uint32_t serial, uint32_t serial_to)
+{
+ if (md->first_serial == md->serial_to) { // no changesets yet
+ md->first_serial = serial;
+ md->flushed_upto = serial;
+ }
+ md->serial_to = serial_to;
+ md->flags |= JOURNAL_SERIAL_TO_VALID;
+ md->changeset_count++;
+}
+
+void journal_metadata_after_extra(journal_metadata_t *md, uint32_t serial, uint32_t serial_to)
+{
+ assert(!(md->flags & JOURNAL_MERGED_SERIAL_VALID));
+ md->merged_serial = serial;
+ md->flushed_upto = serial_to;
+ md->flags |= (JOURNAL_MERGED_SERIAL_VALID | JOURNAL_LAST_FLUSHED_VALID);
+}
+
+void journal_del_zone_txn(knot_lmdb_txn_t *txn, const knot_dname_t *zone)
+{
+ uint64_t md_occupied = 0;
+ (void)get_metadata64(txn, zone, "occupied", &md_occupied);
+ journal_del_zone(txn, zone);
+ set_metadata(txn, zone, "occupied", &md_occupied, sizeof(md_occupied), true);
+}
+
+int journal_scrape_with_md(zone_journal_t j, bool check_existence)
+{
+ if (check_existence && !journal_is_existing(j)) {
+ return KNOT_EOK;
+ }
+ knot_lmdb_txn_t txn = { 0 };
+ knot_lmdb_begin(j.db, &txn, true);
+
+ update_last_inserter(&txn, NULL);
+ journal_del_zone(&txn, j.zone);
+
+ knot_lmdb_commit(&txn);
+ return txn.ret;
+}
+
+int journal_copy_with_md(knot_lmdb_db_t *from, knot_lmdb_db_t *to, const knot_dname_t *zone)
+{
+ knot_lmdb_txn_t tr = { 0 }, tw = { 0 };
+ tr.ret = knot_lmdb_open(from);
+ tw.ret = knot_lmdb_open(to);
+ if (tr.ret != KNOT_EOK || tw.ret != KNOT_EOK) {
+ goto done;
+ }
+ knot_lmdb_begin(from, &tr, true);
+ knot_lmdb_begin(to, &tw, true);
+ update_last_inserter(&tr, NULL);
+ MDB_val prefix = journal_zone_prefix(zone);
+ knot_lmdb_copy_prefix(&tr, &tw, &prefix);
+ free(prefix.mv_data);
+ knot_lmdb_commit(&tw);
+ knot_lmdb_commit(&tr);
+done:
+ return tr.ret == KNOT_EOK ? tw.ret : tr.ret;
+}
+
+int journal_set_flushed(zone_journal_t j)
+{
+ knot_lmdb_txn_t txn = { 0 };
+ journal_metadata_t md = { 0 };
+ knot_lmdb_begin(j.db, &txn, true);
+ journal_load_metadata(&txn, j.zone, &md);
+
+ md.flushed_upto = md.serial_to;
+
+ journal_store_metadata(&txn, j.zone, &md);
+ knot_lmdb_commit(&txn);
+ return txn.ret;
+}
+
+int journal_info(zone_journal_t j, bool *exists, uint32_t *first_serial, bool *has_zij,
+ uint32_t *serial_to, bool *has_merged, uint32_t *merged_serial,
+ uint64_t *occupied, uint64_t *occupied_total)
+{
+ if (knot_lmdb_exists(j.db) == KNOT_ENODB) {
+ *exists = false;
+ return KNOT_EOK;
+ }
+ int ret = knot_lmdb_open(j.db);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ knot_lmdb_txn_t txn = { 0 };
+ journal_metadata_t md = { 0 };
+ knot_lmdb_begin(j.db, &txn, false);
+ journal_load_metadata(&txn, j.zone, &md);
+ *exists = (md.flags & JOURNAL_SERIAL_TO_VALID);
+ if (first_serial != NULL) {
+ *first_serial = md.first_serial;
+ }
+ if (has_zij != NULL) {
+ *has_zij = journal_contains(&txn, true, 0, j.zone);
+ }
+ if (serial_to != NULL) {
+ *serial_to = md.serial_to;
+ }
+ if (has_merged != NULL) {
+ *has_merged = (md.flags & JOURNAL_MERGED_SERIAL_VALID);
+ }
+ if (merged_serial != NULL) {
+ *merged_serial = md.merged_serial;
+ }
+ if (occupied != NULL) {
+ *occupied = 0;
+ get_metadata64(&txn, j.zone, "occupied", occupied);
+
+ if (get_metadata(&txn, NULL, "last_inserter_zone") &&
+ knot_dname_is_equal(j.zone, txn.cur_val.mv_data)) {
+ *occupied = MAX(0, (int64_t)*occupied + last_occupied_diff(&txn));
+ }
+ }
+ if (occupied_total != NULL) {
+ *occupied_total = knot_lmdb_usage(&txn);
+ }
+ knot_lmdb_abort(&txn);
+ return txn.ret;
+}
+
+int journals_walk(knot_lmdb_db_t *db, journals_walk_cb_t cb, void *ctx)
+{
+ int ret = knot_lmdb_exists(db);
+ if (ret == KNOT_EOK) {
+ ret = knot_lmdb_open(db);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ knot_lmdb_txn_t txn = { 0 };
+ knot_lmdb_begin(db, &txn, false);
+ knot_dname_storage_t search_data = { 0 };
+ MDB_val search = { 1, search_data };
+ while (knot_lmdb_find(&txn, &search, KNOT_LMDB_GEQ)) {
+ knot_dname_t *found = txn.cur_key.mv_data;
+ uint32_t unused_flags;
+ if (get_metadata32(&txn, found, "flags", &unused_flags)) {
+ // matched journal DB key appears to be a zone name
+ txn.ret = cb(found, ctx);
+ }
+
+ // update searched key to next after found zone
+ search.mv_size = knot_dname_size(found);
+ memcpy(search.mv_data, found, search.mv_size);
+ ((uint8_t *)search.mv_data)[search.mv_size - 1]++;
+ }
+ knot_lmdb_abort(&txn);
+ return txn.ret;
+}
diff --git a/src/knot/journal/journal_metadata.h b/src/knot/journal/journal_metadata.h
new file mode 100644
index 0000000..246d899
--- /dev/null
+++ b/src/knot/journal/journal_metadata.h
@@ -0,0 +1,187 @@
+/* Copyright (C) 2020 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "knot/journal/journal_basic.h"
+
+typedef struct {
+ uint32_t first_serial;
+ uint32_t serial_to;
+ uint32_t flushed_upto;
+ uint32_t merged_serial;
+ uint32_t changeset_count;
+ uint32_t flags; // a bitmap of flags, see enum below
+ bool _new_zone; // private: if there were no metadata at all previously
+} journal_metadata_t;
+
+enum journal_metadata_flags {
+ JOURNAL_LAST_FLUSHED_VALID = (1 << 0), // deprecated
+ JOURNAL_SERIAL_TO_VALID = (1 << 1),
+ JOURNAL_MERGED_SERIAL_VALID = (1 << 2),
+};
+
+typedef int (*journals_walk_cb_t)(const knot_dname_t *zone, void *ctx);
+
+/*!
+ * \brief Update the computation of DB resources used by each zone.
+ *
+ * Because the amount of used space is bigger than sum of changesets' serialized_sizes,
+ * journal uses a complicated way to compute each zone's used space: there is a metadata
+ * showing always the previously-inserting zone. Before the next insert, it is computed
+ * how the total usage of the DB changed during the previous insert (or delete), and the
+ * usage increase (or decrease) is accounted on the bill of the previous inserter.
+ *
+ * \param txn Journal DB transaction.
+ * \param new_inserter Name of the zone that is going to insert now. Might be NULL if no insert nor delete will be done.
+ */
+void update_last_inserter(knot_lmdb_txn_t *txn, const knot_dname_t *new_inserter);
+
+/* \brief Return the journal database usage by given zone. */
+uint64_t journal_get_occupied(knot_lmdb_txn_t *txn, const knot_dname_t *zone);
+
+/*!
+ * \brief Load the metadata from DB into structure.
+ *
+ * \param txn Journal DB transaction.
+ * \param zone Zone name.
+ * \param md Output: metadata structure.
+ */
+void journal_load_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, journal_metadata_t *md);
+
+/*!
+ * \brief Store the metadata from structure into DB.
+ *
+ * \param txn Journal DB transaction.
+ * \param zone Zone name.
+ * \param md Metadata structure.
+ */
+void journal_store_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const journal_metadata_t *md);
+
+/*!
+ * \brief Update metadata according to what was deleted.
+ *
+ * \param md Metadata structure to be updated.
+ * \param deleted_upto Serial-to of the last deleted changeset.
+ * \param deleted_count Number of deleted changesets.
+ */
+void journal_metadata_after_delete(journal_metadata_t *md, uint32_t deleted_upto,
+ size_t deleted_count);
+
+/*!
+ * \brief Update metadata according to what was merged.
+ *
+ * \param md Metadata structure to be updated.
+ * \param merged_zij True if it was a merge into zone-in-journal.
+ * \param merged_serial Serial-from of the merged changeset (ignored if 'merged_zij').
+ * \param merged_serial_to Serial-to of the merged changeset.
+ * \param original_serial_to Previous serial-to of the merged changeset before the merge.
+ */
+void journal_metadata_after_merge(journal_metadata_t *md, bool merged_zij, uint32_t merged_serial,
+ uint32_t merged_serial_to, uint32_t original_serial_to);
+
+/*!
+ * \brief Update metadata according to what was inserted.
+ *
+ * \param md Metadata structure to be updated.
+ * \param serial Serial-from of the inserted changeset.
+ * \param serial_to Serial-to of the inserted changeset.
+ */
+void journal_metadata_after_insert(journal_metadata_t *md, uint32_t serial, uint32_t serial_to);
+
+/*!
+ * \brief Update metadata according to inserted extra changeset.
+ *
+ * \param md Metadata structure to be updated.
+ * \param serial Serial-from of the inserted changeset.
+ * \param serial_to Serial-to of the inserted changeset.
+ */
+void journal_metadata_after_extra(journal_metadata_t *md, uint32_t serial, uint32_t serial_to);
+
+/*!
+ * \brief Delete all zone records in a txn that will later write to the same zone.
+ *
+ * \note The difference against journal_del_zone(), which purges even metadata, incl "occupied".
+ * \note This preserves keeping track of space occupied/freed by this zone.
+ */
+void journal_del_zone_txn(knot_lmdb_txn_t *txn, const knot_dname_t *zone);
+
+/*!
+ * \brief Completely delete all journal records belonging to this zone, including metadata.
+ *
+ * \param j Journal to be scraped.
+ * \param check_existence Don't operate if the journal seems not to exist.
+ *
+ * \return KNOT_E*
+ */
+int journal_scrape_with_md(zone_journal_t j, bool check_existence);
+
+/*!
+ * \brief Copy all records related to this zone from one journal DB to another.
+ *
+ * \param from DB to copy from.
+ * \param to DB to copy to.
+ * \param zone Journal zone.
+ *
+ * \return KNOT_E*
+ */
+int journal_copy_with_md(knot_lmdb_db_t *from, knot_lmdb_db_t *to, const knot_dname_t *zone);
+
+/*!
+ * \brief Update the metadata stored in journal DB after a zone flush.
+ *
+ * \param j Journal to be notified about flush.
+ *
+ * \return KNOT_E*
+ */
+int journal_set_flushed(zone_journal_t j);
+
+/*!
+ * \brief Obtain information about the zone's journal from the DB (mostly metadata).
+ *
+ * \param j Zone journal.
+ * \param exists Output: bool if the zone exists in the journal.
+ * \param first_serial Optional output: serial-from of the first changeset in journal.
+ * \param has_zij Optional output: bool if there is zone-in-journal.
+ * \param serial_to Optional output: serial.to of the last changeset in journal.
+ * \param has_merged Optional output: bool if there is a special (non zone-in-journal) merged changeset.
+ * \param merged_serial Optional output: serial-from of the merged changeset.
+ * \param occupied Optional output: DB space occupied by this zones.
+ * \param occupied_total Optional output: DB space occupied in total by all zones.
+ *
+ * \return KNOT_E*
+ */
+int journal_info(zone_journal_t j, bool *exists, uint32_t *first_serial, bool *has_zij,
+ uint32_t *serial_to, bool *has_merged, uint32_t *merged_serial,
+ uint64_t *occupied, uint64_t *occupied_total);
+
+/*! \brief Return true if this zone exists in journal DB. */
+inline static bool journal_is_existing(zone_journal_t j) {
+ bool ex = false;
+ (void)journal_info(j, &ex, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
+ return ex;
+}
+
+/*!
+ * \brief Call a function for each zone being in the journal DB.
+ *
+ * \param db Journal database.
+ * \param cb Callback to be called for each zone-name found.
+ * \param ctx Arbitrary context to be passed to the callback.
+ *
+ * \return An error code from either journal operations or from the callback.
+ */
+int journals_walk(knot_lmdb_db_t *db, journals_walk_cb_t cb, void *ctx);
diff --git a/src/knot/journal/journal_read.c b/src/knot/journal/journal_read.c
new file mode 100644
index 0000000..6c4fc32
--- /dev/null
+++ b/src/knot/journal/journal_read.c
@@ -0,0 +1,436 @@
+/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/journal/journal_read.h"
+
+#include "knot/journal/journal_metadata.h"
+#include "knot/journal/knot_lmdb.h"
+
+#include "contrib/macros.h"
+#include "contrib/ucw/lists.h"
+#include "contrib/wire_ctx.h"
+#include "libknot/error.h"
+
+#include <stdlib.h>
+
+struct journal_read {
+ knot_lmdb_txn_t txn;
+ MDB_val key_prefix;
+ const knot_dname_t *zone;
+ wire_ctx_t wire;
+ uint32_t next;
+};
+
+int journal_read_get_error(const journal_read_t *ctx, int another_error)
+{
+ return (ctx == NULL || ctx->txn.ret == KNOT_EOK ? another_error : ctx->txn.ret);
+}
+
+static void update_ctx_wire(journal_read_t *ctx)
+{
+ ctx->wire = wire_ctx_init_const(ctx->txn.cur_val.mv_data, ctx->txn.cur_val.mv_size);
+ wire_ctx_skip(&ctx->wire, JOURNAL_HEADER_SIZE);
+}
+
+static bool go_next_changeset(journal_read_t *ctx, bool go_zone, const knot_dname_t *zone)
+{
+ free(ctx->key_prefix.mv_data);
+ ctx->key_prefix = journal_changeset_id_to_key(go_zone, ctx->next, zone);
+ if (!knot_lmdb_find_prefix(&ctx->txn, &ctx->key_prefix)) {
+ return false;
+ }
+ if (!go_zone && ctx->next == journal_next_serial(&ctx->txn.cur_val)) {
+ ctx->txn.ret = KNOT_ELOOP;
+ return false;
+ }
+ ctx->next = journal_next_serial(&ctx->txn.cur_val);
+ update_ctx_wire(ctx);
+ return true;
+}
+
+int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx)
+{
+ *ctx = NULL;
+ if (!journal_is_existing(j)) { // this also opens the LMDB if not already
+ return KNOT_ENOENT;
+ }
+
+ journal_read_t *newctx = calloc(1, sizeof(*newctx));
+ if (newctx == NULL) {
+ return KNOT_ENOMEM;
+ }
+
+ newctx->zone = j.zone;
+ newctx->next = serial_from;
+
+ knot_lmdb_begin(j.db, &newctx->txn, false);
+
+ if (go_next_changeset(newctx, read_zone, j.zone)) {
+ *ctx = newctx;
+ return KNOT_EOK;
+ } else {
+ journal_read_end(newctx);
+ return KNOT_ENOENT;
+ }
+}
+
+void journal_read_end(journal_read_t *ctx)
+{
+ if (ctx != NULL) {
+ free(ctx->key_prefix.mv_data);
+ knot_lmdb_abort(&ctx->txn);
+ free(ctx);
+ }
+}
+
+static bool make_data_available(journal_read_t *ctx)
+{
+ if (wire_ctx_available(&ctx->wire) == 0) {
+ if (!knot_lmdb_next(&ctx->txn)) {
+ return false;
+ }
+ if (!knot_lmdb_is_prefix_of(&ctx->key_prefix, &ctx->txn.cur_key)) {
+ return false;
+ }
+ if (ctx->next != journal_next_serial(&ctx->txn.cur_val)) {
+ // consistency check, see also MR !1270
+ ctx->txn.ret = KNOT_EMALF;
+ return false;
+ }
+ update_ctx_wire(ctx);
+ }
+ return true;
+}
+
+// thoughts for next design of journal serialization:
+// - one TTL per rrset
+// - endian
+// - optionally storing whole rdataset at once?
+
+bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rrset, bool allow_next_changeset)
+{
+ //knot_rdataset_clear(&rrset->rrs, NULL);
+ //memset(rrset, 0, sizeof(*rrset));
+ if (!make_data_available(ctx)) {
+ if (!allow_next_changeset || !go_next_changeset(ctx, false, ctx->zone)) {
+ return false;
+ }
+ }
+ rrset->owner = knot_dname_copy(ctx->wire.position, NULL);
+ wire_ctx_skip(&ctx->wire, knot_dname_size(rrset->owner));
+ rrset->type = wire_ctx_read_u16(&ctx->wire);
+ rrset->rclass = wire_ctx_read_u16(&ctx->wire);
+ uint16_t rrs_count = wire_ctx_read_u16(&ctx->wire);
+ for (int i = 0; i < rrs_count && ctx->wire.error == KNOT_EOK; i++) {
+ if (!make_data_available(ctx)) {
+ ctx->wire.error = KNOT_EFEWDATA;
+ }
+ // TODO think of how to export serialized rr directly to knot_rdataset_add
+ // focus on: even address aligning
+ uint32_t ttl = wire_ctx_read_u32(&ctx->wire);
+ if (i == 0) {
+ rrset->ttl = ttl;
+ }
+ uint16_t len = wire_ctx_read_u16(&ctx->wire);
+ if (ctx->wire.error == KNOT_EOK) {
+ ctx->wire.error = knot_rrset_add_rdata(rrset, ctx->wire.position, len, NULL);
+ }
+ wire_ctx_skip(&ctx->wire, len);
+ }
+ if (ctx->txn.ret == KNOT_EOK) {
+ ctx->txn.ret = ctx->wire.error == KNOT_ERANGE ? KNOT_EMALF : ctx->wire.error;
+ }
+ if (ctx->txn.ret == KNOT_EOK) {
+ return true;
+ } else {
+ journal_read_clear_rrset(rrset);
+ return false;
+ }
+}
+
+void journal_read_clear_rrset(knot_rrset_t *rr)
+{
+ knot_rrset_clear(rr, NULL);
+}
+
+int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx)
+{
+ knot_rrset_t rr = { 0 };
+ bool in_remove_section = false;
+ int ret = KNOT_EOK;
+ while (ret == KNOT_EOK && journal_read_rrset(read, &rr, true)) {
+ if (rr_is_apex_soa(&rr, read->zone)) {
+ in_remove_section = !in_remove_section;
+ }
+ ret = cb(in_remove_section, &rr, ctx);
+ journal_read_clear_rrset(&rr);
+ }
+ ret = journal_read_get_error(read, ret);
+ journal_read_end(read);
+ return ret;
+}
+
+static int add_rr_to_contents(zone_contents_t *z, const knot_rrset_t *rrset)
+{
+ zone_node_t *n = NULL;
+ return zone_contents_add_rr(z, rrset, &n);
+ // Shall we ignore ETTL ?
+}
+
+bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch)
+{
+ zone_contents_t *tree = zone_contents_new(ctx->zone, false);
+ knot_rrset_t *soa = calloc(1, sizeof(*soa)), rr = { 0 };
+ if (tree == NULL || soa == NULL) {
+ ctx->txn.ret = KNOT_ENOMEM;
+ goto fail;
+ }
+ memset(ch, 0, sizeof(*ch));
+
+ if (!journal_read_rrset(ctx, soa, true)) {
+ goto fail;
+ }
+ while (journal_read_rrset(ctx, &rr, false)) {
+ if (rr_is_apex_soa(&rr, ctx->zone)) {
+ if (ch->soa_from != NULL) {
+ ctx->txn.ret = KNOT_EMALF;
+ goto fail;
+ }
+ ch->soa_from = soa;
+ ch->remove = tree;
+ soa = malloc(sizeof(*soa));
+ tree = zone_contents_new(ctx->zone, false);
+ if (tree == NULL || soa == NULL) {
+ ctx->txn.ret = KNOT_ENOMEM;
+ goto fail;
+ }
+ *soa = rr; // note this tricky assignment
+ memset(&rr, 0, sizeof(rr));
+ } else {
+ ctx->txn.ret = add_rr_to_contents(tree, &rr);
+ journal_read_clear_rrset(&rr);
+ }
+ }
+
+ if (ctx->txn.ret == KNOT_EOK) {
+ ch->soa_to = soa;
+ ch->add = tree;
+ return true;
+ } else {
+fail:
+ journal_read_clear_rrset(&rr);
+ journal_read_clear_rrset(soa);
+ free(soa);
+ changeset_clear(ch);
+ zone_contents_deep_free(tree);
+ return false;
+ }
+}
+
+void journal_read_clear_changeset(changeset_t *ch)
+{
+ changeset_clear(ch);
+ memset(ch, 0, sizeof(*ch));
+}
+
+static int just_load_md(zone_journal_t j, journal_metadata_t *md, bool *has_zij)
+{
+ knot_lmdb_txn_t txn = { 0 };
+ knot_lmdb_begin(j.db, &txn, false);
+ journal_load_metadata(&txn, j.zone, md);
+ if (has_zij != NULL) {
+ *has_zij = journal_contains(&txn, true, 0, j.zone);
+ }
+ knot_lmdb_abort(&txn);
+ return txn.ret;
+}
+
+int journal_walk_from(zone_journal_t j, uint32_t from,
+ journal_walk_cb_t cb, void *ctx)
+{
+ bool at_least_one = false;
+ journal_metadata_t md = { 0 };
+ journal_read_t *read = NULL;
+ changeset_t ch;
+
+ int ret = just_load_md(j, &md, NULL);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ if ((md.flags & JOURNAL_SERIAL_TO_VALID) && from != md.serial_to &&
+ ret == KNOT_EOK) {
+ ret = journal_read_begin(j, false, from, &read);
+ while (ret == KNOT_EOK && journal_read_changeset(read, &ch)) {
+ ret = cb(false, &ch, ctx);
+ at_least_one = true;
+ journal_read_clear_changeset(&ch);
+ }
+ ret = journal_read_get_error(read, ret);
+ journal_read_end(read);
+ }
+ if (!at_least_one && ret == KNOT_EOK) {
+ ret = cb(false, NULL, ctx);
+ }
+ return ret;
+}
+
+// beware, this function does not operate in single txn!
+int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx)
+{
+ int ret = knot_lmdb_exists(j.db);
+ if (ret == KNOT_ENODB) {
+ ret = cb(true, NULL, ctx);
+ if (ret == KNOT_EOK) {
+ ret = cb(false, NULL, ctx);
+ }
+ return ret;
+ } else if (ret == KNOT_EOK) {
+ ret = knot_lmdb_open(j.db);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ journal_metadata_t md = { 0 };
+ journal_read_t *read = NULL;
+ changeset_t ch;
+ bool zone_in_j = false;
+ ret = just_load_md(j, &md, &zone_in_j);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ if (zone_in_j) {
+ ret = journal_read_begin(j, true, 0, &read);
+ goto read_one_special;
+ } else if ((md.flags & JOURNAL_MERGED_SERIAL_VALID)) {
+ ret = journal_read_begin(j, false, md.merged_serial, &read);
+read_one_special:
+ if (ret == KNOT_EOK && journal_read_changeset(read, &ch)) {
+ ret = cb(true, &ch, ctx);
+ journal_read_clear_changeset(&ch);
+ }
+ ret = journal_read_get_error(read, ret);
+ journal_read_end(read);
+ read = NULL;
+ } else {
+ ret = cb(true, NULL, ctx);
+ }
+
+ if (ret == KNOT_EOK) {
+ ret = journal_walk_from(j, md.first_serial, cb, ctx);
+ }
+ return ret;
+}
+
+typedef struct {
+ size_t observed_count;
+ size_t observed_merged;
+ uint32_t merged_serial;
+ size_t observed_zij;
+ uint32_t first_serial;
+ bool first_serial_valid;
+ uint32_t last_serial;
+ bool last_serial_valid;
+} check_ctx_t;
+
+static int check_cb(bool special, const changeset_t *ch, void *vctx)
+{
+ check_ctx_t *ctx = vctx;
+ if (special && ch != NULL) {
+ if (ch->remove == NULL) {
+ ctx->observed_zij++;
+ ctx->last_serial = changeset_to(ch);
+ ctx->last_serial_valid = true;
+ } else {
+ ctx->merged_serial = changeset_from(ch);
+ ctx->observed_merged++;
+ }
+ } else if (ch != NULL) {
+ if (!ctx->first_serial_valid) {
+ ctx->first_serial = changeset_from(ch);
+ ctx->first_serial_valid = true;
+ }
+ ctx->last_serial = changeset_to(ch);
+ ctx->last_serial_valid = true;
+ ctx->observed_count++;
+ }
+ return KNOT_EOK;
+}
+
+static bool eq(bool a, bool b)
+{
+ return a ? b : !b;
+}
+
+int journal_sem_check(zone_journal_t j)
+{
+ check_ctx_t ctx = { 0 };
+ journal_metadata_t md = { 0 };
+ bool has_zij = false;
+
+ if (!journal_is_existing(j)) {
+ return KNOT_EOK;
+ }
+
+ int ret = just_load_md(j, &md, &has_zij);
+ if (ret == KNOT_EOK) {
+ ret = journal_walk(j, check_cb, &ctx);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ if (!eq((md.flags & JOURNAL_SERIAL_TO_VALID), ctx.last_serial_valid)) {
+ return 101;
+ }
+ if (ctx.last_serial_valid && ctx.last_serial != md.serial_to) {
+ return 102;
+ }
+ if (!eq((md.flags & JOURNAL_MERGED_SERIAL_VALID), (ctx.observed_merged > 0))) {
+ return 103;
+ }
+ if (ctx.observed_merged > 1) {
+ return 104;
+ }
+ if (ctx.observed_merged == 1 && ctx.merged_serial != md.merged_serial) {
+ return 105;
+ }
+ if (!eq(has_zij, (ctx.observed_zij > 0))) {
+ return 106;
+ }
+ if (ctx.observed_zij > 1) {
+ return 107;
+ }
+ if (ctx.observed_zij + ctx.observed_merged > 1) {
+ return 108;
+ }
+ if (!eq(((md.flags & JOURNAL_SERIAL_TO_VALID) && md.first_serial != md.serial_to), ctx.first_serial_valid)) {
+ return 109;
+ }
+ if (!eq(ctx.first_serial_valid, (ctx.observed_count > 0))) {
+ return 110;
+ }
+ if (ctx.first_serial_valid && ctx.first_serial != md.first_serial) {
+ return 111;
+ }
+ if (ctx.observed_count != md.changeset_count) {
+ return 112;
+ }
+ if (ctx.observed_merged > 0 && ctx.observed_count == 0) {
+ return 113;
+ }
+ return KNOT_EOK;
+}
diff --git a/src/knot/journal/journal_read.h b/src/knot/journal/journal_read.h
new file mode 100644
index 0000000..92cad9f
--- /dev/null
+++ b/src/knot/journal/journal_read.h
@@ -0,0 +1,158 @@
+/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "knot/journal/journal_basic.h"
+
+typedef struct journal_read journal_read_t;
+
+typedef int (*journal_read_cb_t)(bool in_remove_section, const knot_rrset_t *rr, void *ctx);
+
+typedef int (*journal_walk_cb_t)(bool special, const changeset_t *ch, void *ctx);
+
+/*!
+ * \brief Start reading journal from specified changeset.
+ *
+ * \param j Journal to be read.
+ * \param read_zone True if reading shall start with zone-in-journal.
+ * \param serial_from Serial-from of the changeset to be started at (ignored if 'read_zone').
+ * \param ctx Output: journal reading context initialised.
+ *
+ * \return KNOT_E*
+ */
+int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx);
+
+/*!
+ * \brief Read a single RRSet from a journal changeset.
+ *
+ * \param ctx Journal reading context.
+ * \param rr Output: RRSet to be filled with serialized data.
+ * \param allow_next_changeset True to allow jumping to next changeset.
+ *
+ * \return False if no more RRSet in this changeset/journal, or failure.
+ */
+bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rr, bool allow_next_changeset);
+
+/*!
+ * \brief Free up heap allocations by journal_read_rrset().
+ *
+ * \param rr RRSet initialised by journal_read_rrset().
+ */
+void journal_read_clear_rrset(knot_rrset_t *rr);
+
+// TODO move somewhere. Libknot?
+inline static bool rr_is_apex_soa(const knot_rrset_t *rr, const knot_dname_t *apex)
+{
+ return (rr->type == KNOT_RRTYPE_SOA && knot_dname_is_equal(rr->owner, apex));
+}
+
+/*!
+ * \brief Read all RRSets up to the end of journal, calling a function for each.
+ *
+ * \note Closes reading context at the end.
+ *
+ * \param read Journal reading context.
+ * \param cb Callback to be called on each read.
+ * \param ctx Arbitrary context to be passed to the callback.
+ *
+ * \return An error code from either journal operations or from the callback.
+ */
+int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx);
+
+/*!
+ * \brief Read a single changeset from journal.
+ *
+ * \param ctx Journal reading context.
+ * \param ch Output: changeset to be filled with serialized data.
+ *
+ * \return False if no more changesets in the journal, or failure.
+ */
+bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch);
+
+/*!
+ * \brief Free up heap allocations by journal_read_changeset().
+ *
+ * \param ch Changeset initialised by journal_read_changeset().
+ */
+void journal_read_clear_changeset(changeset_t *ch);
+
+/*!
+ * \brief Obtain error code from the journal_read operations previously performed.
+ *
+ * \param ctx Journal reading context.
+ * \param another_error An error code from outside the reading operations to be combined.
+ *
+ * \return KNOT_EOK if completely every operation succeeded, KNOT_E*
+ */
+int journal_read_get_error(const journal_read_t *ctx, int another_error);
+
+/*!
+ * \brief Finalise journal reading.
+ *
+ * \param ctx Journal reading context (will be freed).
+ */
+void journal_read_end(journal_read_t *ctx);
+
+/*!
+ * \brief Call a function for each changeset in journal.
+ *
+ * This is a variant of journal_walk() see below.
+ * The difference is that iteration starts at specified serial.
+ * Similarly to how IXFR works.
+ * The callback is called for each found changeset, or just once
+ * with ch=NULL if none is found.
+ *
+ * \param j Zone journal to be read.
+ * \param from SOA serial to start at.
+ * \param cb Callback to be called for each changeset (or its non-existence).
+ * \param ctx Arbitrary context to be passed to the callback.
+ *
+ * \return An error code from either journal operations or from the callback.
+ * \retval KNOT_ENOENT if the journal is not empty, but the requested serial not present.
+ */
+int journal_walk_from(zone_journal_t j, uint32_t from,
+ journal_walk_cb_t cb, void *ctx);
+
+/*!
+ * \brief Call a function for each changeset stored in journal.
+ *
+ * First, the callback will be called for the special changeset -
+ * either zone-in-journal or merged changeset, with special=true.
+ * If there is no such, it will be called anyway with ch=NULL.
+ *
+ * Than, the callback will be called for each regular changeset
+ * with special=false. If there is none, it will be called once
+ * with ch=NULL.
+ *
+ * \param j Zone journal to be read.
+ * \param cb Callback to be called for each changeset (or its non-existence).
+ * \param ctx Arbitrary context to be passed to the callback.
+ *
+ * \return An error code from either journal operations or from the callback.
+ */
+int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx);
+
+/*!
+ * \brief Perform semantic check of the zone journal (consistency, metadata...).
+ *
+ * \param j Zone journal to be checked.
+ *
+ * \retval KNOT_E* ( < 0 ) if an error during journal operation.
+ * \retval > 100 if some inconsistency found.
+ * \return KNOT_EOK of all ok.
+ */
+int journal_sem_check(zone_journal_t j);
diff --git a/src/knot/journal/journal_write.c b/src/knot/journal/journal_write.c
new file mode 100644
index 0000000..ad1247b
--- /dev/null
+++ b/src/knot/journal/journal_write.c
@@ -0,0 +1,333 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/journal/journal_write.h"
+
+#include "contrib/macros.h"
+#include "knot/journal/journal_metadata.h"
+#include "knot/journal/journal_read.h"
+#include "knot/journal/serialization.h"
+#include "libknot/error.h"
+
+static void journal_write_serialize(knot_lmdb_txn_t *txn, serialize_ctx_t *ser,
+ const knot_dname_t *apex, bool zij, uint32_t ch_from, uint32_t ch_to)
+{
+ MDB_val chunk;
+ uint32_t i = 0;
+ while (serialize_unfinished(ser) && txn->ret == KNOT_EOK) {
+ serialize_prepare(ser, JOURNAL_CHUNK_THRESH - JOURNAL_HEADER_SIZE,
+ JOURNAL_CHUNK_MAX - JOURNAL_HEADER_SIZE, &chunk.mv_size);
+ if (chunk.mv_size == 0) {
+ break; // beware! If this is omitted, it creates empty chunk => EMALF when reading.
+ }
+ chunk.mv_size += JOURNAL_HEADER_SIZE;
+ chunk.mv_data = NULL;
+ MDB_val key = journal_make_chunk_key(apex, ch_from, zij, i);
+ if (knot_lmdb_insert(txn, &key, &chunk)) {
+ journal_make_header(chunk.mv_data, ch_to);
+ serialize_chunk(ser, chunk.mv_data + JOURNAL_HEADER_SIZE, chunk.mv_size - JOURNAL_HEADER_SIZE);
+ }
+ free(key.mv_data);
+ i++;
+ }
+ int ret = serialize_deinit(ser);
+ if (txn->ret == KNOT_EOK) {
+ txn->ret = ret;
+ }
+}
+
+void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch)
+{
+ serialize_ctx_t *ser = serialize_init(ch);
+ if (ser == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ return;
+ }
+ if (ch->remove == NULL) {
+ journal_write_serialize(txn, ser, ch->soa_to->owner, true, 0, changeset_to(ch));
+ } else {
+ journal_write_serialize(txn, ser, ch->soa_to->owner, false, changeset_from(ch), changeset_to(ch));
+ }
+}
+
+void journal_write_zone(knot_lmdb_txn_t *txn, const zone_contents_t *z)
+{
+ serialize_ctx_t *ser = serialize_zone_init(z);
+ if (ser == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ return;
+ }
+ journal_write_serialize(txn, ser, z->apex->owner, true, 0, zone_contents_serial(z));
+}
+
+void journal_write_zone_diff(knot_lmdb_txn_t *txn, const zone_diff_t *z)
+{
+ serialize_ctx_t *ser = serialize_zone_diff_init(z);
+ if (ser == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ return;
+ }
+ journal_write_serialize(txn, ser, z->apex->owner, false, zone_diff_from(z), zone_diff_to(z));
+}
+
+static bool delete_one(knot_lmdb_txn_t *txn, bool del_zij, uint32_t del_serial,
+ const knot_dname_t *zone, uint64_t *freed, uint32_t *next_serial)
+{
+ *freed = 0;
+ MDB_val prefix = journal_changeset_id_to_key(del_zij, del_serial, zone);
+ knot_lmdb_foreach(txn, &prefix) {
+ *freed += txn->cur_val.mv_size;
+ *next_serial = journal_next_serial(&txn->cur_val);
+ knot_lmdb_del_cur(txn);
+ }
+ free(prefix.mv_data);
+ return (*freed > 0);
+}
+
+static int merge_cb(bool remove, const knot_rrset_t *rr, void *ctx)
+{
+ changeset_t *ch = ctx;
+ return remove ? (rr_is_apex_soa(rr, ch->soa_to->owner) ?
+ KNOT_EOK : changeset_add_removal(ch, rr, CHANGESET_CHECK))
+ : changeset_add_addition(ch, rr, CHANGESET_CHECK);
+}
+
+void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij,
+ uint32_t merge_serial, uint32_t *original_serial_to)
+{
+ changeset_t merge;
+ memset(&merge, 0, sizeof(merge));
+ journal_read_t *read = NULL;
+ txn->ret = journal_read_begin(j, merge_zij, merge_serial, &read);
+ if (txn->ret != KNOT_EOK) {
+ return;
+ }
+ if (journal_read_changeset(read, &merge)) {
+ *original_serial_to = changeset_to(&merge);
+ }
+ txn->ret = journal_read_rrsets(read, merge_cb, &merge);
+
+ // deleting seems redundant since the merge changeset will be overwritten
+ // but it would cause EMALF or invalid data if the new merged has less chunks than before
+ uint32_t del_next_serial;
+ uint64_t del_freed;
+ delete_one(txn, merge_zij, merge_serial, j.zone, &del_freed, &del_next_serial);
+ assert(del_freed > 0 && del_next_serial == *original_serial_to);
+
+ journal_write_changeset(txn, &merge);
+ journal_read_clear_changeset(&merge);
+}
+
+static void delete_merged(knot_lmdb_txn_t *txn, const knot_dname_t *zone,
+ journal_metadata_t *md, uint64_t *freed)
+{
+ if (!(md->flags & JOURNAL_MERGED_SERIAL_VALID)) {
+ return;
+ }
+ uint32_t unused = 0;
+ delete_one(txn, false, md->merged_serial, zone, freed, &unused);
+ md->merged_serial = 0;
+ md->flags &= ~JOURNAL_MERGED_SERIAL_VALID;
+}
+
+bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone,
+ uint64_t tofree_size, size_t tofree_count, uint32_t stop_at_serial,
+ uint64_t *freed_size, size_t *freed_count, uint32_t *stopped_at)
+{
+ *freed_size = 0;
+ *freed_count = 0;
+ uint64_t freed_now;
+ while (from != stop_at_serial &&
+ (*freed_size < tofree_size || *freed_count < tofree_count) &&
+ delete_one(txn, false, from, zone, &freed_now, stopped_at)) {
+ *freed_size += freed_now;
+ ++(*freed_count);
+ from = *stopped_at;
+ }
+ return (*freed_count > 0);
+}
+
+void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md)
+{
+ bool flush = journal_allow_flush(j);
+ uint32_t merge_orig = 0;
+ if (journal_contains(txn, true, 0, j.zone)) {
+ journal_merge(j, txn, true, 0, &merge_orig);
+ if (!flush) {
+ journal_metadata_after_merge(md, true, 0, md->serial_to, merge_orig);
+ }
+ } else if (!flush) {
+ uint32_t merge_serial = ((md->flags & JOURNAL_MERGED_SERIAL_VALID) ? md->merged_serial : md->first_serial);
+ journal_merge(j, txn, false, merge_serial, &merge_orig);
+ journal_metadata_after_merge(md, false, merge_serial, md->serial_to, merge_orig);
+ }
+
+ if (flush) {
+ // delete merged serial if (very unlikely) exists
+ if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) {
+ uint64_t unused64;
+ uint32_t unused32;
+ (void)delete_one(txn, false, md->merged_serial, j.zone, &unused64, &unused32);
+ md->flags &= ~JOURNAL_MERGED_SERIAL_VALID;
+ }
+
+ // commit partial job and ask zone to flush itself
+ journal_store_metadata(txn, j.zone, md);
+ knot_lmdb_commit(txn);
+ if (txn->ret == KNOT_EOK) {
+ txn->ret = KNOT_EBUSY;
+ }
+ }
+}
+
+#define U_MINUS(minuend, subtrahend) ((minuend) - MIN((minuend), (subtrahend)))
+
+void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md,
+ int64_t max_usage, ssize_t max_count)
+{
+ uint64_t occupied = journal_get_occupied(txn, j.zone), freed;
+ uint64_t need_tofree = U_MINUS(occupied, max_usage);
+ size_t count = md->changeset_count, removed;
+ size_t need_todel = U_MINUS(count, max_count);
+
+ while ((need_tofree > 0 || need_todel > 0) && txn->ret == KNOT_EOK) {
+ uint32_t del_from = md->first_serial; // don't move this line outside of the loop
+ uint32_t del_upto = md->flushed_upto;
+ (void)journal_serial_to(txn, true, 0, j.zone, &del_upto); // in case zij present and wrong flushed_upto, avoid discontinuity
+ freed = 0;
+ removed = 0;
+ journal_delete(txn, del_from, j.zone, need_tofree, need_todel,
+ del_upto, &freed, &removed, &del_from);
+ if (freed == 0) {
+ if (del_upto != md->serial_to) {
+ journal_try_flush(j, txn, md);
+ } else {
+ txn->ret = KNOT_ESPACE;
+ break;
+ }
+ } else {
+ journal_metadata_after_delete(md, del_from, removed);
+ need_tofree = U_MINUS(need_tofree, freed);
+ need_todel = U_MINUS(need_todel, removed);
+ }
+ }
+}
+
+int journal_insert_zone(zone_journal_t j, const zone_contents_t *z)
+{
+ changeset_t fake_ch = { .add = (zone_contents_t *)z };
+ size_t ch_size = changeset_serialized_size(&fake_ch);
+ size_t max_usage = journal_conf_max_usage(j);
+ if (ch_size >= max_usage) {
+ return KNOT_ESPACE;
+ }
+ int ret = knot_lmdb_open(j.db);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ knot_lmdb_txn_t txn = { 0 };
+ knot_lmdb_begin(j.db, &txn, true);
+
+ update_last_inserter(&txn, j.zone);
+ journal_del_zone_txn(&txn, j.zone);
+
+ journal_write_zone(&txn, z);
+
+ journal_metadata_t md = { 0 };
+ md.flags = JOURNAL_SERIAL_TO_VALID;
+ md.serial_to = zone_contents_serial(z);
+ md.first_serial = md.serial_to;
+ journal_store_metadata(&txn, j.zone, &md);
+
+ knot_lmdb_commit(&txn);
+ return txn.ret;
+}
+
+int journal_insert(zone_journal_t j, const changeset_t *ch, const changeset_t *extra,
+ const zone_diff_t *zdiff)
+{
+ assert(zdiff == NULL || (ch == NULL && extra == NULL));
+
+ size_t ch_size = zdiff == NULL ? changeset_serialized_size(ch) :
+ zone_diff_serialized_size(*zdiff);
+ size_t max_usage = journal_conf_max_usage(j);
+ if (ch_size >= max_usage) {
+ return KNOT_ESPACE;
+ }
+
+ uint32_t ch_from = zdiff == NULL ? changeset_from(ch) : zone_diff_from(zdiff);
+ uint32_t ch_to = zdiff == NULL ? changeset_to(ch) : zone_diff_to(zdiff);
+ if (extra != NULL && (changeset_to(extra) != ch_to ||
+ changeset_from(extra) == ch_from)) {
+ return KNOT_EINVAL;
+ }
+ int ret = knot_lmdb_open(j.db);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ knot_lmdb_txn_t txn = { 0 };
+ journal_metadata_t md = { 0 };
+ knot_lmdb_begin(j.db, &txn, true);
+ journal_load_metadata(&txn, j.zone, &md);
+
+ update_last_inserter(&txn, j.zone);
+
+ if (extra != NULL) {
+ if (journal_contains(&txn, true, 0, j.zone)) {
+ txn.ret = KNOT_ESEMCHECK;
+ }
+ uint64_t merged_freed = 0;
+ delete_merged(&txn, j.zone, &md, &merged_freed);
+ ch_size += changeset_serialized_size(extra);
+ ch_size -= merged_freed;
+ md.flushed_upto = md.serial_to; // set temporarily
+ md.flags |= JOURNAL_LAST_FLUSHED_VALID;
+ }
+
+ size_t chs_limit = journal_conf_max_changesets(j);
+ journal_fix_occupation(j, &txn, &md, max_usage - ch_size, chs_limit - 1);
+
+ // avoid discontinuity
+ if ((md.flags & JOURNAL_SERIAL_TO_VALID) && md.serial_to != ch_from) {
+ if (journal_contains(&txn, true, 0, j.zone)) {
+ txn.ret = KNOT_ESEMCHECK;
+ } else {
+ journal_del_zone_txn(&txn, j.zone);
+ memset(&md, 0, sizeof(md));
+ }
+ }
+
+ // avoid cycle
+ if (journal_contains(&txn, false, ch_to, j.zone)) {
+ journal_fix_occupation(j, &txn, &md, INT64_MAX, 1);
+ }
+
+ if (zdiff == NULL) {
+ journal_write_changeset(&txn, ch);
+ } else {
+ journal_write_zone_diff(&txn, zdiff);
+ }
+ journal_metadata_after_insert(&md, ch_from, ch_to);
+
+ if (extra != NULL) {
+ journal_write_changeset(&txn, extra);
+ journal_metadata_after_extra(&md, changeset_from(extra), changeset_to(extra));
+ }
+
+ journal_store_metadata(&txn, j.zone, &md);
+ knot_lmdb_commit(&txn);
+ return txn.ret;
+}
diff --git a/src/knot/journal/journal_write.h b/src/knot/journal/journal_write.h
new file mode 100644
index 0000000..a55fd34
--- /dev/null
+++ b/src/knot/journal/journal_write.h
@@ -0,0 +1,121 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "knot/journal/journal_basic.h"
+#include "knot/journal/journal_metadata.h"
+#include "knot/journal/serialization.h"
+
+/*!
+ * \brief Serialize a changeset into chunks and write it into DB with no checks and metadata update.
+ *
+ * \param txn Journal DB transaction.
+ * \param ch Changeset to be written.
+ */
+void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch);
+
+/*!
+ * \brief Serialize zone contents aka "bootstrap" changeset into journal, no checks.
+ *
+ * \param txn Journal DB transaction.
+ * \param z Zone contents to be written.
+ */
+void journal_write_zone(knot_lmdb_txn_t *txn, const zone_contents_t *z);
+
+/*!
+ * \brief Merge all following changeset into one of journal changeset.
+ *
+ * \param j Zone journal.
+ * \param txn Journal DB transaction.
+ * \param merge_zij True if we shall merge into zone-in-journal.
+ * \param merge_serial Serial-from of the changeset to be merged into (ignored if 'merge_zij').
+ * \param original_serial_to Output: previous serial-to of the merged changeset before merge.
+ *
+ * \note The error code will be in thx->ret.
+ */
+void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij,
+ uint32_t merge_serial, uint32_t *original_serial_to);
+
+/*!
+ * \brief Delete some journal changesets in attempt to fulfill usage quotas.
+ *
+ * \param txn Journal DB transaction.
+ * \param from Serial-from of the first changeset to be deleted.
+ * \param zone Zone name.
+ * \param tofree_size Amount of data (in bytes) to be at least deleted.
+ * \param tofree_count Number of changesets to be at least deleted.
+ * \param stop_at_serial Must not delete the changeset with this serial-from.
+ * \param freed_size Output: amount of data really deleted.
+ * \param freed_count Output: number of changesets really freed.
+ * \param stopped_at Output: serial-to of the last deleted changeset.
+ *
+ * \return True if something was deleted (not necessarily fulfilling tofree_*).
+ */
+bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone,
+ uint64_t tofree_size, size_t tofree_count, uint32_t stop_at_serial,
+ uint64_t *freed_size, size_t *freed_count, uint32_t *stopped_at);
+
+/*!
+ * \brief Perform a merge or zone flush in order to enable deleting more changesets.
+ *
+ * \param j Zone journal.
+ * \param txn Journal DB transaction.
+ * \param md Journal metadata.
+ *
+ * \note It might set txn->ret to KNOT_EBUSY to fail out from this operation and let the zone flush itself.
+ */
+void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md);
+
+/*!
+ * \brief Perform delete/merge/flush operations to fulfill configured journal quotas.
+ *
+ * \param j Zone journal.
+ * \param txn Journal DB transaction.
+ * \param md Journal metadata.
+ * \param max_usage Configured maximum usage (in bytes) of journal DB by this zone.
+ * \param max_count Configured maximum number of changesets.
+ */
+void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md,
+ int64_t max_usage, ssize_t max_count);
+
+/*!
+ * \brief Store zone-in-journal into the journal, update metadata.
+ *
+ * \param j Zone journal.
+ * \param z Zone contents to be stored.
+ *
+ * \return KNOT_E*
+ */
+int journal_insert_zone(zone_journal_t j, const zone_contents_t *z);
+
+/*!
+ * \brief Store changeset into journal, fulfilling quotas and updating metadata.
+ *
+ * \param j Zone journal.
+ * \param ch Changeset to be stored.
+ * \param extra Extra changeset to be stored in the role of merged changeset.
+ * \param zdiff Zone diff to be stored instead of changeset.
+ *
+ * \note The extra changesetis being stored on zone load, it is basically the diff
+ * between zonefile and loaded zone contents. Afterwards, it will be treated
+ * the same like merged changeset. Inserting it requires no zone-in-journal
+ * present and leads to deleting any previous merged changeset.
+ *
+ * \return KNOT_E*
+ */
+int journal_insert(zone_journal_t j, const changeset_t *ch, const changeset_t *extra,
+ const zone_diff_t *zdiff);
diff --git a/src/knot/journal/knot_lmdb.c b/src/knot/journal/knot_lmdb.c
new file mode 100644
index 0000000..bc17462
--- /dev/null
+++ b/src/knot/journal/knot_lmdb.c
@@ -0,0 +1,770 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <stdarg.h>
+#include <stdio.h> // snprintf
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "knot/journal/knot_lmdb.h"
+
+#include "knot/conf/conf.h"
+#include "contrib/files.h"
+#include "contrib/wire_ctx.h"
+#include "libknot/dname.h"
+#include "libknot/endian.h"
+#include "libknot/error.h"
+
+#define LMDB_DIR_MODE 0770
+#define LMDB_FILE_MODE 0660
+
+static void err_to_knot(int *err)
+{
+ switch (*err) {
+ case MDB_SUCCESS:
+ *err = KNOT_EOK;
+ break;
+ case MDB_NOTFOUND:
+ *err = KNOT_ENOENT;
+ break;
+ case MDB_TXN_FULL:
+ *err = KNOT_ELIMIT;
+ break;
+ case MDB_MAP_FULL:
+ case ENOSPC:
+ *err = KNOT_ESPACE;
+ break;
+ default:
+ *err = (*err < 0 ? *err : -*err);
+ }
+}
+
+void knot_lmdb_init(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags, const char *dbname)
+{
+#ifdef __OpenBSD__
+ /*
+ * Enforce that MDB_WRITEMAP is set.
+ *
+ * MDB assumes a unified buffer cache.
+ *
+ * See https://www.openldap.org/pub/hyc/mdm-paper.pdf section 3.1,
+ * references 17, 18, and 19.
+ *
+ * From Howard Chu: "This requirement can be relaxed in the
+ * current version of the library. If you create the environment
+ * with the MDB_WRITEMAP option then all reads and writes are
+ * performed using mmap, so the file buffer cache is irrelevant.
+ * Of course then you lose the protection that the read-only
+ * map offers."
+ */
+ env_flags |= MDB_WRITEMAP;
+#endif
+ db->env = NULL;
+ db->path = strdup(path);
+ db->mapsize = mapsize;
+ db->env_flags = env_flags;
+ db->dbname = dbname;
+ pthread_mutex_init(&db->opening_mutex, NULL);
+ db->maxdbs = 2;
+ db->maxreaders = conf_lmdb_readers(conf());
+}
+
+static int lmdb_stat(const char *lmdb_path, struct stat *st)
+{
+ char data_mdb[strlen(lmdb_path) + 10];
+ (void)snprintf(data_mdb, sizeof(data_mdb), "%s/data.mdb", lmdb_path);
+ if (stat(data_mdb, st) == 0) {
+ return st->st_size > 0 ? KNOT_EOK : KNOT_ENODB;
+ } else if (errno == ENOENT) {
+ return KNOT_ENODB;
+ } else {
+ return knot_map_errno();
+ }
+}
+
+int knot_lmdb_exists(knot_lmdb_db_t *db)
+{
+ if (db->env != NULL) {
+ return KNOT_EOK;
+ }
+ if (db->path == NULL) {
+ return KNOT_ENODB;
+ }
+ struct stat unused;
+ return lmdb_stat(db->path, &unused);
+}
+
+static int fix_mapsize(knot_lmdb_db_t *db)
+{
+ if (db->mapsize == 0) {
+ struct stat st;
+ int ret = lmdb_stat(db->path, &st);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ db->mapsize = st.st_size * 2; // twice the size as DB might grow while we read it
+ db->env_flags |= MDB_RDONLY;
+ }
+ return KNOT_EOK;
+}
+
+size_t knot_lmdb_copy_size(knot_lmdb_db_t *to_copy)
+{
+ size_t copy_size = 1048576;
+ struct stat st;
+ if (lmdb_stat(to_copy->path, &st) == KNOT_EOK) {
+ copy_size += st.st_size * 2;
+ }
+ return copy_size;
+}
+
+static int lmdb_open(knot_lmdb_db_t *db)
+{
+ MDB_txn *init_txn = NULL;
+
+ if (db->env != NULL) {
+ return KNOT_EOK;
+ }
+
+ if (db->path == NULL) {
+ return KNOT_ENOMEM;
+ }
+
+ int ret = fix_mapsize(db);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ ret = make_dir(db->path, LMDB_DIR_MODE, true);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ long page_size = sysconf(_SC_PAGESIZE);
+ if (page_size <= 0) {
+ return KNOT_ERROR;
+ }
+ size_t mapsize = (db->mapsize / page_size + 1) * page_size;
+
+ ret = mdb_env_create(&db->env);
+ if (ret != MDB_SUCCESS) {
+ err_to_knot(&ret);
+ return ret;
+ }
+
+ ret = mdb_env_set_mapsize(db->env, mapsize);
+ if (ret == MDB_SUCCESS) {
+ ret = mdb_env_set_maxdbs(db->env, db->maxdbs);
+ }
+ if (ret == MDB_SUCCESS) {
+ ret = mdb_env_set_maxreaders(db->env, db->maxreaders);
+ }
+ if (ret == MDB_SUCCESS) {
+ ret = mdb_env_open(db->env, db->path, db->env_flags, LMDB_FILE_MODE);
+ }
+ if (ret == MDB_SUCCESS) {
+ unsigned init_txn_flags = (db->env_flags & MDB_RDONLY);
+ ret = mdb_txn_begin(db->env, NULL, init_txn_flags, &init_txn);
+ if (ret == MDB_READERS_FULL) {
+ int cleared = 0;
+ ret = mdb_reader_check(db->env, &cleared);
+ if (ret == MDB_SUCCESS) {
+ ret = mdb_txn_begin(db->env, NULL, init_txn_flags, &init_txn);
+ }
+ }
+ }
+ if (ret == MDB_SUCCESS) {
+ ret = mdb_dbi_open(init_txn, db->dbname, MDB_CREATE, &db->dbi);
+ }
+ if (ret == MDB_SUCCESS) {
+ ret = mdb_txn_commit(init_txn);
+ }
+
+ if (ret != MDB_SUCCESS) {
+ if (init_txn != NULL) {
+ mdb_txn_abort(init_txn);
+ }
+ mdb_env_close(db->env);
+ db->env = NULL;
+ }
+ err_to_knot(&ret);
+ return ret;
+}
+
+int knot_lmdb_open(knot_lmdb_db_t *db)
+{
+ pthread_mutex_lock(&db->opening_mutex);
+ int ret = lmdb_open(db);
+ pthread_mutex_unlock(&db->opening_mutex);
+ return ret;
+}
+
+static void lmdb_close(knot_lmdb_db_t *db)
+{
+ if (db->env != NULL) {
+ mdb_dbi_close(db->env, db->dbi);
+ mdb_env_close(db->env);
+ db->env = NULL;
+ }
+}
+
+void knot_lmdb_close(knot_lmdb_db_t *db)
+{
+ pthread_mutex_lock(&db->opening_mutex);
+ lmdb_close(db);
+ pthread_mutex_unlock(&db->opening_mutex);
+}
+
+static int lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags)
+{
+#ifdef __OpenBSD__
+ env_flags |= MDB_WRITEMAP;
+#endif
+ if (strcmp(db->path, path) == 0 && db->mapsize == mapsize && db->env_flags == env_flags) {
+ return KNOT_EOK;
+ }
+ if (db->env != NULL) {
+ return KNOT_EISCONN;
+ }
+ free(db->path);
+ db->path = strdup(path);
+ db->mapsize = mapsize;
+ db->env_flags = env_flags;
+ return KNOT_EOK;
+}
+
+int knot_lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags)
+{
+ pthread_mutex_lock(&db->opening_mutex);
+ int ret = lmdb_reinit(db, path, mapsize, env_flags);
+ pthread_mutex_unlock(&db->opening_mutex);
+ return ret;
+}
+
+int knot_lmdb_reconfigure(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags)
+{
+ pthread_mutex_lock(&db->opening_mutex);
+ int ret = lmdb_reinit(db, path, mapsize, env_flags);
+ if (ret != KNOT_EOK) {
+ lmdb_close(db);
+ ret = lmdb_reinit(db, path, mapsize, env_flags);
+ if (ret == KNOT_EOK) {
+ ret = lmdb_open(db);
+ }
+ }
+ pthread_mutex_unlock(&db->opening_mutex);
+ return ret;
+}
+
+void knot_lmdb_deinit(knot_lmdb_db_t *db)
+{
+ knot_lmdb_close(db);
+ pthread_mutex_destroy(&db->opening_mutex);
+ free(db->path);
+}
+
+void knot_lmdb_begin(knot_lmdb_db_t *db, knot_lmdb_txn_t *txn, bool rw)
+{
+ txn->ret = mdb_txn_begin(db->env, NULL, rw ? 0 : MDB_RDONLY, &txn->txn);
+ err_to_knot(&txn->ret);
+ if (txn->ret == KNOT_EOK) {
+ txn->opened = true;
+ txn->db = db;
+ txn->is_rw = rw;
+ }
+}
+
+void knot_lmdb_abort(knot_lmdb_txn_t *txn)
+{
+ if (txn->opened) {
+ if (txn->cursor != NULL) {
+ mdb_cursor_close(txn->cursor);
+ txn->cursor = NULL;
+ }
+ mdb_txn_abort(txn->txn);
+ txn->opened = false;
+ }
+}
+
+static bool txn_semcheck(knot_lmdb_txn_t *txn)
+{
+ if (!txn->opened && txn->ret == KNOT_EOK) {
+ txn->ret = KNOT_ESEMCHECK;
+ }
+ if (txn->ret != KNOT_EOK) {
+ knot_lmdb_abort(txn);
+ return false;
+ }
+ return true;
+}
+
+void knot_lmdb_commit(knot_lmdb_txn_t *txn)
+{
+ if (!txn_semcheck(txn)) {
+ return;
+ }
+ if (txn->cursor != NULL) {
+ mdb_cursor_close(txn->cursor);
+ txn->cursor = NULL;
+ }
+ txn->ret = mdb_txn_commit(txn->txn);
+ err_to_knot(&txn->ret);
+ txn->opened = false;
+}
+
+// save the programmer's frequent checking for ENOMEM when creating search keys
+static bool txn_enomem(knot_lmdb_txn_t *txn, const MDB_val *tocheck)
+{
+ if (tocheck->mv_data == NULL) {
+ txn->ret = KNOT_ENOMEM;
+ knot_lmdb_abort(txn);
+ return false;
+ }
+ return true;
+}
+
+static bool init_cursor(knot_lmdb_txn_t *txn)
+{
+ if (txn->cursor == NULL) {
+ txn->ret = mdb_cursor_open(txn->txn, txn->db->dbi, &txn->cursor);
+ err_to_knot(&txn->ret);
+ if (txn->ret != KNOT_EOK) {
+ knot_lmdb_abort(txn);
+ return false;
+ }
+ }
+ return true;
+}
+
+static bool curget(knot_lmdb_txn_t *txn, MDB_cursor_op op)
+{
+ txn->ret = mdb_cursor_get(txn->cursor, &txn->cur_key, &txn->cur_val, op);
+ err_to_knot(&txn->ret);
+ if (txn->ret == KNOT_ENOENT) {
+ txn->ret = KNOT_EOK;
+ return false;
+ }
+ return (txn->ret == KNOT_EOK);
+}
+
+static int mdb_val_clone(const MDB_val *orig, MDB_val *clone)
+{
+ clone->mv_data = malloc(orig->mv_size);
+ if (clone->mv_data == NULL) {
+ return KNOT_ENOMEM;
+ }
+ clone->mv_size = orig->mv_size;
+ memcpy(clone->mv_data, orig->mv_data, clone->mv_size);
+ return KNOT_EOK;
+}
+
+bool knot_lmdb_find(knot_lmdb_txn_t *txn, MDB_val *what, knot_lmdb_find_t how)
+{
+ if (!txn_semcheck(txn) || !init_cursor(txn) || !txn_enomem(txn, what)) {
+ return false;
+ }
+ txn->cur_key.mv_size = what->mv_size;
+ txn->cur_key.mv_data = what->mv_data;
+ txn->cur_val.mv_size = 0;
+ txn->cur_val.mv_data = NULL;
+ knot_lmdb_find_t cmp = (how & 3);
+ bool succ = curget(txn, cmp == KNOT_LMDB_EXACT ? MDB_SET : MDB_SET_RANGE);
+ if (cmp == KNOT_LMDB_LEQ && txn->ret == KNOT_EOK) {
+ // LEQ is not supported by LMDB, we use GEQ and go back
+ if (succ) {
+ if (txn->cur_key.mv_size != what->mv_size ||
+ memcmp(txn->cur_key.mv_data, what->mv_data, what->mv_size) != 0) {
+ succ = curget(txn, MDB_PREV);
+ }
+ } else {
+ succ = curget(txn, MDB_LAST);
+ }
+ }
+
+ if ((how & KNOT_LMDB_FORCE) && !succ && txn->ret == KNOT_EOK) {
+ txn->ret = KNOT_ENOENT;
+ }
+
+ return succ;
+}
+
+// this is not bulletproof thread-safe (in case of LMDB fail-teardown, but mostly OK
+int knot_lmdb_find_threadsafe(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val, knot_lmdb_find_t how)
+{
+ assert(how == KNOT_LMDB_EXACT);
+ if (key->mv_data == NULL) {
+ return KNOT_ENOMEM;
+ }
+ if (!txn->opened) {
+ return KNOT_EINVAL;
+ }
+ if (txn->ret != KNOT_EOK) {
+ return txn->ret;
+ }
+ MDB_val tmp = { 0 };
+ int ret = mdb_get(txn->txn, txn->db->dbi, key, &tmp);
+ err_to_knot(&ret);
+ if (ret == KNOT_EOK) {
+ ret = mdb_val_clone(&tmp, val);
+ }
+ return ret;
+}
+
+bool knot_lmdb_first(knot_lmdb_txn_t *txn)
+{
+ return txn_semcheck(txn) && init_cursor(txn) && curget(txn, MDB_FIRST);
+}
+
+bool knot_lmdb_next(knot_lmdb_txn_t *txn)
+{
+ if (txn->cursor == NULL && txn->ret == KNOT_EOK) {
+ txn->ret = KNOT_EINVAL;
+ }
+ if (!txn_semcheck(txn)) {
+ return false;
+ }
+ return curget(txn, MDB_NEXT);
+}
+
+bool knot_lmdb_is_prefix_of(const MDB_val *prefix, const MDB_val *of)
+{
+ return prefix->mv_size <= of->mv_size &&
+ memcmp(prefix->mv_data, of->mv_data, prefix->mv_size) == 0;
+}
+
+void knot_lmdb_del_cur(knot_lmdb_txn_t *txn)
+{
+ if (txn_semcheck(txn)) {
+ txn->ret = mdb_cursor_del(txn->cursor, 0);
+ err_to_knot(&txn->ret);
+ }
+}
+
+void knot_lmdb_del_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix)
+{
+ knot_lmdb_foreach(txn, prefix) {
+ knot_lmdb_del_cur(txn);
+ }
+}
+
+int knot_lmdb_apply_threadsafe(knot_lmdb_txn_t *txn, const MDB_val *key, bool prefix, lmdb_apply_cb cb, void *ctx)
+{
+ MDB_cursor *cursor;
+ int ret = mdb_cursor_open(txn->txn, txn->db->dbi, &cursor);
+ err_to_knot(&ret);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ MDB_val getkey = *key, getval = { 0 };
+ ret = mdb_cursor_get(cursor, &getkey, &getval, prefix ? MDB_SET_RANGE : MDB_SET);
+ err_to_knot(&ret);
+ if (ret != KNOT_EOK) {
+ mdb_cursor_close(cursor);
+ if (prefix && ret == KNOT_ENOENT) {
+ return KNOT_EOK;
+ }
+ return ret;
+ }
+
+ if (prefix) {
+ while (knot_lmdb_is_prefix_of(key, &getkey) && ret == KNOT_EOK) {
+ ret = cb(&getkey, &getval, ctx);
+ if (ret == KNOT_EOK) {
+ ret = mdb_cursor_get(cursor, &getkey, &getval, MDB_NEXT);
+ err_to_knot(&ret);
+ }
+ }
+ if (ret == KNOT_ENOENT) {
+ ret = KNOT_EOK;
+ }
+ } else {
+ ret = cb(&getkey, &getval, ctx);
+ }
+ mdb_cursor_close(cursor);
+ return ret;
+}
+
+bool knot_lmdb_insert(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val)
+{
+ if (txn_semcheck(txn) && txn_enomem(txn, key)) {
+ unsigned flags = (val->mv_size > 0 && val->mv_data == NULL ? MDB_RESERVE : 0);
+ txn->ret = mdb_put(txn->txn, txn->db->dbi, key, val, flags);
+ err_to_knot(&txn->ret);
+ }
+ return (txn->ret == KNOT_EOK);
+}
+
+int knot_lmdb_quick_insert(knot_lmdb_db_t *db, MDB_val key, MDB_val val)
+{
+ if (val.mv_data == NULL) {
+ free(key.mv_data);
+ return KNOT_ENOMEM;
+ }
+ knot_lmdb_txn_t txn = { 0 };
+ knot_lmdb_begin(db, &txn, true);
+ knot_lmdb_insert(&txn, &key, &val);
+ free(key.mv_data);
+ free(val.mv_data);
+ knot_lmdb_commit(&txn);
+ return txn.ret;
+}
+
+int knot_lmdb_copy_prefix(knot_lmdb_txn_t *from, knot_lmdb_txn_t *to, MDB_val *prefix)
+{
+ knot_lmdb_foreach(to, prefix) {
+ knot_lmdb_del_cur(to);
+ }
+ if (to->ret != KNOT_EOK) {
+ return to->ret;
+ }
+ knot_lmdb_foreach(from, prefix) {
+ knot_lmdb_insert(to, &from->cur_key, &from->cur_val);
+ }
+ return from->ret == KNOT_EOK ? to->ret : from->ret;
+}
+
+int knot_lmdb_copy_prefixes(knot_lmdb_db_t *from, knot_lmdb_db_t *to,
+ MDB_val *prefixes, size_t n_prefixes)
+{
+ if (n_prefixes < 1) {
+ return KNOT_EOK;
+ }
+ if (from == NULL || to == NULL || prefixes == NULL) {
+ return KNOT_EINVAL;
+ }
+ int ret = knot_lmdb_open(from);
+ if (ret == KNOT_EOK) {
+ ret = knot_lmdb_open(to);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ knot_lmdb_txn_t tr = { 0 }, tw = { 0 };
+ knot_lmdb_begin(from, &tr, false);
+ knot_lmdb_begin(to, &tw, true);
+ for (size_t i = 0; i < n_prefixes && ret == KNOT_EOK; i++) {
+ ret = knot_lmdb_copy_prefix(&tr, &tw, &prefixes[i]);
+ }
+ knot_lmdb_commit(&tw);
+ knot_lmdb_commit(&tr);
+ return ret == KNOT_EOK ? tw.ret : ret;
+}
+
+size_t knot_lmdb_usage(knot_lmdb_txn_t *txn)
+{
+ if (!txn_semcheck(txn)) {
+ return 0;
+ }
+ MDB_stat st = { 0 };
+ txn->ret = mdb_stat(txn->txn, txn->db->dbi, &st);
+ err_to_knot(&txn->ret);
+
+ size_t pgs_used = st.ms_branch_pages + st.ms_leaf_pages + st.ms_overflow_pages;
+ return (pgs_used * st.ms_psize);
+}
+
+static bool make_key_part(void *key_data, size_t key_len, const char *format, va_list arg)
+{
+ wire_ctx_t wire = wire_ctx_init(key_data, key_len);
+ const char *tmp_s;
+ const knot_dname_t *tmp_d;
+ const void *tmp_v;
+ size_t tmp;
+
+ for (const char *f = format; *f != '\0'; f++) {
+ switch (*f) {
+ case 'B':
+ wire_ctx_write_u8(&wire, va_arg(arg, int));
+ break;
+ case 'H':
+ wire_ctx_write_u16(&wire, va_arg(arg, int));
+ break;
+ case 'I':
+ wire_ctx_write_u32(&wire, va_arg(arg, uint32_t));
+ break;
+ case 'L':
+ wire_ctx_write_u64(&wire, va_arg(arg, uint64_t));
+ break;
+ case 'S':
+ tmp_s = va_arg(arg, const char *);
+ wire_ctx_write(&wire, tmp_s, strlen(tmp_s) + 1);
+ break;
+ case 'N':
+ tmp_d = va_arg(arg, const knot_dname_t *);
+ wire_ctx_write(&wire, tmp_d, knot_dname_size(tmp_d));
+ break;
+ case 'D':
+ tmp_v = va_arg(arg, const void *);
+ tmp = va_arg(arg, size_t);
+ wire_ctx_write(&wire, tmp_v, tmp);
+ break;
+ }
+ }
+
+ return wire.error == KNOT_EOK && wire_ctx_available(&wire) == 0;
+}
+
+MDB_val knot_lmdb_make_key(const char *format, ...)
+{
+ MDB_val key = { 0 };
+ va_list arg;
+ const char *tmp_s;
+ const knot_dname_t *tmp_d;
+
+ // first, just determine the size of the key
+ va_start(arg, format);
+ for (const char *f = format; *f != '\0'; f++) {
+ switch (*f) {
+ case 'B':
+ key.mv_size += sizeof(uint8_t);
+ (void)va_arg(arg, int); // uint8_t will be promoted to int
+ break;
+ case 'H':
+ key.mv_size += sizeof(uint16_t);
+ (void)va_arg(arg, int); // uint16_t will be promoted to int
+ break;
+ case 'I':
+ key.mv_size += sizeof(uint32_t);
+ (void)va_arg(arg, uint32_t);
+ break;
+ case 'L':
+ key.mv_size += sizeof(uint64_t);
+ (void)va_arg(arg, uint64_t);
+ break;
+ case 'S':
+ tmp_s = va_arg(arg, const char *);
+ key.mv_size += strlen(tmp_s) + 1;
+ break;
+ case 'N':
+ tmp_d = va_arg(arg, const knot_dname_t *);
+ key.mv_size += knot_dname_size(tmp_d);
+ break;
+ case 'D':
+ (void)va_arg(arg, const void *);
+ key.mv_size += va_arg(arg, size_t);
+ break;
+ }
+ }
+ va_end(arg);
+
+ // second, alloc the key and fill it
+ if (key.mv_size > 0) {
+ key.mv_data = malloc(key.mv_size);
+ }
+ if (key.mv_data == NULL) {
+ return key;
+ }
+ va_start(arg, format);
+ bool succ = make_key_part(key.mv_data, key.mv_size, format, arg);
+ assert(succ);
+ (void)succ;
+ va_end(arg);
+ return key;
+}
+
+bool knot_lmdb_make_key_part(void *key_data, size_t key_len, const char *format, ...)
+{
+ va_list arg;
+ va_start(arg, format);
+ bool succ = make_key_part(key_data, key_len, format, arg);
+ va_end(arg);
+ return succ;
+}
+
+static bool unmake_key_part(const void *key_data, size_t key_len, const char *format, va_list arg)
+{
+ if (key_data == NULL) {
+ return false;
+ }
+ wire_ctx_t wire = wire_ctx_init_const(key_data, key_len);
+ for (const char *f = format; *f != '\0' && wire.error == KNOT_EOK && wire_ctx_available(&wire) > 0; f++) {
+ void *tmp = va_arg(arg, void *);
+ size_t tmsize;
+ switch (*f) {
+ case 'B':
+ if (tmp == NULL) {
+ wire_ctx_skip(&wire, sizeof(uint8_t));
+ } else {
+ *(uint8_t *)tmp = wire_ctx_read_u8(&wire);
+ }
+ break;
+ case 'H':
+ if (tmp == NULL) {
+ wire_ctx_skip(&wire, sizeof(uint16_t));
+ } else {
+ *(uint16_t *)tmp = wire_ctx_read_u16(&wire);
+ }
+ break;
+ case 'I':
+ if (tmp == NULL) {
+ wire_ctx_skip(&wire, sizeof(uint32_t));
+ } else {
+ *(uint32_t *)tmp = wire_ctx_read_u32(&wire);
+ }
+ break;
+ case 'L':
+ if (tmp == NULL) {
+ wire_ctx_skip(&wire, sizeof(uint64_t));
+ } else {
+ *(uint64_t *)tmp = wire_ctx_read_u64(&wire);
+ }
+ break;
+ case 'S':
+ if (tmp != NULL) {
+ *(const char **)tmp = (const char *)wire.position;
+ }
+ wire_ctx_skip(&wire, strlen((const char *)wire.position) + 1);
+ break;
+ case 'N':
+ if (tmp != NULL) {
+ *(const knot_dname_t **)tmp = (const knot_dname_t *)wire.position;
+ }
+ wire_ctx_skip(&wire, knot_dname_size((const knot_dname_t *)wire.position));
+ break;
+ case 'D':
+ tmsize = va_arg(arg, size_t);
+ if (tmp != NULL) {
+ memcpy(tmp, wire.position, tmsize);
+ }
+ wire_ctx_skip(&wire, tmsize);
+ break;
+ }
+ }
+ return (wire.error == KNOT_EOK && wire_ctx_available(&wire) == 0);
+}
+
+bool knot_lmdb_unmake_key(const void *key_data, size_t key_len, const char *format, ...)
+{
+ va_list arg;
+ va_start(arg, format);
+ bool succ = unmake_key_part(key_data, key_len, format, arg);
+ va_end(arg);
+ return succ;
+}
+
+bool knot_lmdb_unmake_curval(knot_lmdb_txn_t *txn, const char *format, ...)
+{
+ va_list arg;
+ va_start(arg, format);
+ bool succ = unmake_key_part(txn->cur_val.mv_data, txn->cur_val.mv_size, format, arg);
+ va_end(arg);
+ if (!succ && txn->ret == KNOT_EOK) {
+ txn->ret = KNOT_EMALF;
+ }
+ return succ;
+}
diff --git a/src/knot/journal/knot_lmdb.h b/src/knot/journal/knot_lmdb.h
new file mode 100644
index 0000000..6214a10
--- /dev/null
+++ b/src/knot/journal/knot_lmdb.h
@@ -0,0 +1,446 @@
+/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <lmdb.h>
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <pthread.h>
+
+typedef struct knot_lmdb_db {
+ MDB_dbi dbi;
+ MDB_env *env;
+ pthread_mutex_t opening_mutex;
+
+ // those are static options. Set them after knot_lmdb_init().
+ unsigned maxdbs;
+ unsigned maxreaders;
+
+ // those are internal options. Please don't touch them directly.
+ size_t mapsize;
+ unsigned env_flags; // MDB_NOTLS, MDB_RDONLY, MDB_WRITEMAP, MDB_DUPSORT, MDB_NOSYNC, MDB_MAPASYNC
+ const char *dbname;
+ char *path;
+} knot_lmdb_db_t;
+
+typedef struct {
+ MDB_txn *txn;
+ MDB_cursor *cursor;
+ MDB_val cur_key;
+ MDB_val cur_val;
+
+ bool opened;
+ bool is_rw;
+ int ret;
+ knot_lmdb_db_t *db;
+} knot_lmdb_txn_t;
+
+typedef enum {
+ KNOT_LMDB_EXACT = 3, /*! \brief Search for exactly matching key. */
+ KNOT_LMDB_LEQ = 1, /*! \brief Search lexicographically lower or equal key. */
+ KNOT_LMDB_GEQ = 2, /*! \brief Search lexicographically greater or equal key. */
+ KNOT_LMDB_FORCE = 4, /*! \brief If no matching key found, consider it a transaction failure (KNOT_ENOENT). */
+} knot_lmdb_find_t;
+
+/*!
+ * \brief Callback used in sweep functions.
+ *
+ * \retval true for zones to preserve.
+ * \retval false for zones to remove.
+ */
+typedef bool (*sweep_cb)(const uint8_t *zone, void *data);
+
+/*!
+ * \brief Callback used in copy functions.
+ *
+ * \retval true if the current record shall be copied
+ * \retval false if the current record shall be skipped
+ */
+typedef bool (*knot_lmdb_copy_cb)(MDB_val *cur_key, MDB_val *cur_val);
+
+/*!
+ * \brief Initialise the DB handling structure.
+ *
+ * \param db DB handling structure.
+ * \param path Path to LMDB database on filesystem.
+ * \param mapsize Maximum size of the DB on FS.
+ * \param env_flags LMDB environment flags (e.g. MDB_RDONLY)
+ * \param dbname Optional: name of the sub-database.
+ */
+void knot_lmdb_init(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags, const char *dbname);
+
+/*!
+ * \brief Check if the database exists on the filesystem.
+ *
+ * \param db The DB in question.
+ *
+ * \retval KNOT_EOK The database exists (and is accessible for stat() ).
+ * \retval KNOT_ENODB The database doesn't exist.
+ * \return KNOT_E* explaining why stat() failed.
+ */
+int knot_lmdb_exists(knot_lmdb_db_t *db);
+
+/*!
+ * \brief Big enough mapsize for new database to hold a copy of to_copy.
+ */
+size_t knot_lmdb_copy_size(knot_lmdb_db_t *to_copy);
+
+/*!
+ * \brief Open the previously initialised DB.
+ *
+ * \param db The DB to be opened.
+ *
+ * \note If db->mapsize is zero, it will be set to twice the current size, and DB opened read-only!
+ *
+ * \return KNOT_E*
+ */
+int knot_lmdb_open(knot_lmdb_db_t *db);
+
+/*!
+ * \brief Close the database, but keep it initialised.
+ *
+ * \param db The DB to be closed.
+ */
+void knot_lmdb_close(knot_lmdb_db_t *db);
+
+/*!
+ * \brief Re-initialise existing DB with modified parameters.
+ *
+ * \note If the parameters differ and DB is open, it will be refused.
+ *
+ * \param db The DB to be modified.
+ * \param path New path to the DB.
+ * \param mapsize New mapsize.
+ * \param env_flags New LMDB environment flags.
+ *
+ * \return KNOT_EOK on success, KNOT_EISCONN if not possible.
+ */
+int knot_lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags);
+
+/*!
+ * \brief Re-open opened DB with modified parameters.
+ *
+ * \note The DB will be first closed, re-initialised and finally opened again.
+ *
+ * \note There must not be any DB transaction during this process.
+ *
+ * \param db The DB to be modified.
+ * \param path New path to the DB.
+ * \param mapsize New mapsize.
+ * \param env_flags New LMDB environment flags.
+ *
+ * \return KNOT_E*
+ */
+int knot_lmdb_reconfigure(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags);
+
+/*!
+ * \brief Close and de-initialise DB.
+ *
+ * \param db DB to be deinitialized.
+ */
+void knot_lmdb_deinit(knot_lmdb_db_t *db);
+
+/*!
+ * \brief Return true if DB is open.
+ */
+inline static bool knot_lmdb_is_open(knot_lmdb_db_t *db) { return db != NULL && db->env != NULL; }
+
+/*!
+ * \brief Start a DB transaction.
+ *
+ * \param db The database.
+ * \param txn Transaction handling structure to be initialised.
+ * \param rw True for read-write transaction, false for read-only.
+ *
+ * \note The error code will be stored in txn->ret.
+ */
+void knot_lmdb_begin(knot_lmdb_db_t *db, knot_lmdb_txn_t *txn, bool rw);
+
+/*!
+ * \brief Abort a transaction.
+ *
+ * \param txn Transaction to be aborted.
+ */
+void knot_lmdb_abort(knot_lmdb_txn_t *txn);
+
+/*!
+ * \brief Commit a transaction, or abort it if id had failured.
+ *
+ * \param txn Transaction to be committed.
+ *
+ * \note If txn->ret equals KNOT_EOK afterwards, whole DB transaction was successful.
+ */
+void knot_lmdb_commit(knot_lmdb_txn_t *txn);
+
+/*!
+ * \brief Find a key in database. The matched key will be in txn->cur_key and its value in txn->cur_val.
+ *
+ * \param txn DB transaction.
+ * \param what Key to be searched for.
+ * \param how Method of comparing keys. See comments at knot_lmdb_find_t.
+ *
+ * \note It's possible to use knot_lmdb_next() subsequently to iterate over following keys.
+ *
+ * \return True if a key found, false if none or failure.
+ */
+bool knot_lmdb_find(knot_lmdb_txn_t *txn, MDB_val *what, knot_lmdb_find_t how);
+
+/*!
+ * \brief Simple database lookup in case txn shared among threads.
+ *
+ * \param txn DB transaction share among threads.
+ * \param key Key to be searched for.
+ * \param val Output: database value.
+ * \param how Must be KNOT_LMDB_EXACT.
+ *
+ * \note Free val->mv_data afterwards!
+ *
+ * \retval KNOT_ENOENT no such key in DB.
+ * \return KNOT_E*
+ */
+int knot_lmdb_find_threadsafe(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val, knot_lmdb_find_t how);
+
+/*!
+ * \brief Start iteration the whole DB from lexicographically first key.
+ *
+ * \note The first DB record will be in txn->cur_key and txn->cur_val.
+ *
+ * \param txn DB transaction.
+ *
+ * \return True if ok, false if no key at all or failure.
+ */
+bool knot_lmdb_first(knot_lmdb_txn_t *txn);
+
+/*!
+ * \brief Iterate to the lexicographically next key (sets txn->cur_key and txn->cur_val).
+ *
+ * \param txn DB transaction.
+ *
+ * \return True if ok, false if behind the end of DB or failure.
+ */
+bool knot_lmdb_next(knot_lmdb_txn_t *txn);
+
+/*!
+ * \brief Check if one DB key is a prefix of another,
+ *
+ * \param prefix DB key prefix.
+ * \param of Another DB key.
+ *
+ * \return True iff 'prefix' is a prefix of 'of'.
+ */
+bool knot_lmdb_is_prefix_of(const MDB_val *prefix, const MDB_val *of);
+
+/*!
+ * \brief Find leftmost key in DB matching given prefix.
+ *
+ * \param txn DB transaction.
+ * \param prefix Prefix searched for.
+ *
+ * \return True if found, false if none or failure.
+ */
+inline static bool knot_lmdb_find_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix)
+{
+ return knot_lmdb_find(txn, prefix, KNOT_LMDB_GEQ) &&
+ knot_lmdb_is_prefix_of(prefix, &txn->cur_key);
+}
+
+/*!
+ * \brief Execute following block of commands for every key in DB matching given prefix.
+ *
+ * \param txn DB transaction.
+ * \param prefix Prefix searched for.
+ */
+#define knot_lmdb_foreach(txn, prefix) \
+ for (bool _knot_lmdb_foreach_found = knot_lmdb_find((txn), (prefix), KNOT_LMDB_GEQ); \
+ _knot_lmdb_foreach_found && knot_lmdb_is_prefix_of((prefix), &(txn)->cur_key); \
+ _knot_lmdb_foreach_found = knot_lmdb_next((txn)))
+
+/*!
+ * \brief Execute following block of commands for every key in DB.
+ *
+ * \param txn DB transaction.
+ */
+#define knot_lmdb_forwhole(txn) \
+ for (bool _knot_lmdb_forwhole_any = knot_lmdb_first((txn)); \
+ _knot_lmdb_forwhole_any; \
+ _knot_lmdb_forwhole_any = knot_lmdb_next((txn)))
+
+/*!
+ * \brief Delete the one DB record, that the iteration is currently pointing to.
+ *
+ * \note It's safe to delete during an uncomplicated iteration, e.g. knot_lmdb_foreach().
+ *
+ * \param txn DB transaction.
+ */
+void knot_lmdb_del_cur(knot_lmdb_txn_t *txn);
+
+/*!
+ * \brief Delete all DB records matching given key prefix.
+ *
+ * \param txn DB transaction.
+ * \param prefix Prefix to be deleted.
+ */
+void knot_lmdb_del_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix);
+
+typedef int (*lmdb_apply_cb)(MDB_val *key, MDB_val *val, void *ctx);
+
+/*!
+ * \brief Call a callback for any item matching given key.
+ *
+ * \note This function does not affect fields within txn struct,
+ * thus can be used on txn shared between threads.
+ *
+ * \param txn DB transaction.
+ * \param key Key to be searched for.
+ * \param prefix The 'key' is in fact prefix, apply on all items matching prefix.
+ * \param cb Callback to be called.
+ * \param ctx Arbitrary context for the callback.
+ *
+ * \return KNOT_E*
+ */
+int knot_lmdb_apply_threadsafe(knot_lmdb_txn_t *txn, const MDB_val *key, bool prefix, lmdb_apply_cb cb, void *ctx);
+
+/*!
+ * \brief Insert a new record into the DB.
+ *
+ * \note If a record with equal key already exists in the DB, its value will be quietly overwritten.
+ *
+ * \param txn DB transaction.
+ * \param key Inserted key.
+ * \param val Inserted value.
+ *
+ * \return False if failure.
+ */
+bool knot_lmdb_insert(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val);
+
+/*!
+ * \brief Open a transaction, insert a record, commit and free key's and val's mv_data.
+ *
+ * \param db DB to be inserted into.
+ * \param key Inserted key.
+ * \param val Inserted val.
+ *
+ * \return KNOT_E*
+ */
+int knot_lmdb_quick_insert(knot_lmdb_db_t *db, MDB_val key, MDB_val val);
+
+/*!
+ * \brief Copy all records matching given key prefix.
+ *
+ * \param from Open RO/RW transaction in the database to copy from.
+ * \param to Open RW txn in the DB to copy to.
+ * \param prefix Prefix for matching records to be copied.
+ *
+ * \note Prior to copying, all records from the target DB, matching the prefix, will be deleted!
+ *
+ * \return KNOT_E*
+ *
+ * \note KNOT_EOK even if none records matched the prefix (and were copied).
+ */
+int knot_lmdb_copy_prefix(knot_lmdb_txn_t *from, knot_lmdb_txn_t *to, MDB_val *prefix);
+
+/*!
+ * \brief Copy all records matching any of multiple prefixes.
+ *
+ * \param from DB to copy from.
+ * \param to DB to copy to.
+ * \param prefixes List of prefixes to match.
+ * \param n_prefixes Number of prefixes in the list.
+ *
+ * \note Prior to copying, all records from the target DB, matching any of the prefixes, will be deleted!
+ *
+ * \return KNOT_E*
+ */
+int knot_lmdb_copy_prefixes(knot_lmdb_db_t *from, knot_lmdb_db_t *to,
+ MDB_val *prefixes, size_t n_prefixes);
+
+/*!
+ * \brief Amount of bytes used by the DB storage.
+ *
+ * \note According to LMDB design, it will be a multiple of page size, which is usually 4096.
+ *
+ * \param txn DB transaction.
+ *
+ * \return DB usage.
+ */
+size_t knot_lmdb_usage(knot_lmdb_txn_t *txn);
+
+/*!
+ * \brief Serialize various parameters into a DB key.
+ *
+ * \param format Specifies the number and type of parameters.
+ * \param ... For each character in 'format', one or two parameters with the actual values.
+ *
+ * \return DB key structure. 'mv_data' needs to be freed later. 'mv_data' is NULL on failure.
+ *
+ * Possible format characters are:
+ * - B for a byte
+ * - H for uint16
+ * - I for uint32
+ * - L for uint64, like H and I, the serialization converts them to big endian
+ * - S for zero-terminated string
+ * - N for a domain name (in knot_dname_t* format)
+ * - D for fixed-size data (takes two params: void* and size_t)
+ */
+MDB_val knot_lmdb_make_key(const char *format, ...);
+
+/*!
+ * \brief Serialize various parameters into prepared buffer.
+ *
+ * \param key_data Pointer to the buffer.
+ * \param key_len Size of the buffer.
+ * \param format Specifies the number and type of parameters.
+ * \param ... For each character in 'format', one or two parameters with the actual values.
+ *
+ * \note See comment at knot_lmdb_make_key().
+ *
+ * \return True if ok and the serialization took exactly 'key_len', false on failure.
+ */
+bool knot_lmdb_make_key_part(void *key_data, size_t key_len, const char *format, ...);
+
+/*!
+ * \brief Deserialize various parameters from a buffer.
+ *
+ * \note 'format' must exactly correspond with what the data in buffer actually are.
+ *
+ * \param key_data Pointer to the buffer.
+ * \param key_len Size of the buffer.
+ * \param format Specifies the number and type of parameters.
+ * \param ... For each character in 'format', pointer to where the values will be stored.
+ *
+ * \note For B, H, I, L; provide simply pointers to variables of corresponding type.
+ * \note For S, N; provide pointer to pointer - it will be set to pointing inside the buffer, so no allocation here.
+ * \note For D, provide void* and size_t, the data will be copied.
+ *
+ * \return True if no failure.
+ */
+bool knot_lmdb_unmake_key(const void *key_data, size_t key_len, const char *format, ...);
+
+/*!
+ * \brief Deserialize various parameters from txn->cur_val. Set txn->ret to KNOT_EMALF if failure.
+ *
+ * \param txn DB transaction.
+ * \param format Specifies the number and type of parameters.
+ * \param ... For each character in 'format', pointer to where the values will be stored.
+ *
+ * \note See comment at knot_lmdb_unmake_key().
+ *
+ * \return True if no failure.
+ */
+bool knot_lmdb_unmake_curval(knot_lmdb_txn_t *txn, const char *format, ...);
diff --git a/src/knot/journal/serialization.c b/src/knot/journal/serialization.c
new file mode 100644
index 0000000..5758481
--- /dev/null
+++ b/src/knot/journal/serialization.c
@@ -0,0 +1,501 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <assert.h>
+
+#include "knot/journal/serialization.h"
+#include "knot/zone/zone-tree.h"
+
+#define SERIALIZE_RRSET_INIT (-1)
+#define SERIALIZE_RRSET_DONE ((1L<<16)+1)
+
+typedef enum {
+ PHASE_ZONE_SOA,
+ PHASE_ZONE_NODES,
+ PHASE_ZONE_NSEC3,
+ PHASE_SOA_1,
+ PHASE_REM,
+ PHASE_SOA_2,
+ PHASE_ADD,
+ PHASE_END,
+} serialize_phase_t;
+
+#define RRSET_BUF_MAXSIZE 256
+
+struct serialize_ctx {
+ zone_diff_t zdiff;
+ zone_tree_it_t zit;
+ zone_node_t *n;
+ uint16_t node_pos;
+ bool zone_diff;
+ bool zone_diff_add;
+ int ret;
+
+ const changeset_t *ch;
+ changeset_iter_t it;
+ serialize_phase_t changeset_phase;
+ long rrset_phase;
+ knot_rrset_t rrset_buf[RRSET_BUF_MAXSIZE];
+ size_t rrset_buf_size;
+ list_t free_rdatasets;
+};
+
+serialize_ctx_t *serialize_init(const changeset_t *ch)
+{
+ serialize_ctx_t *ctx = calloc(1, sizeof(*ctx));
+ if (ctx == NULL) {
+ return NULL;
+ }
+
+ ctx->ch = ch;
+ ctx->changeset_phase = ch->soa_from != NULL ? PHASE_SOA_1 : PHASE_SOA_2;
+ ctx->rrset_phase = SERIALIZE_RRSET_INIT;
+ ctx->rrset_buf_size = 0;
+ init_list(&ctx->free_rdatasets);
+
+ return ctx;
+}
+
+serialize_ctx_t *serialize_zone_init(const zone_contents_t *z)
+{
+ serialize_ctx_t *ctx = calloc(1, sizeof(*ctx));
+ if (ctx == NULL) {
+ return NULL;
+ }
+
+ zone_diff_from_zone(&ctx->zdiff, z);
+ ctx->changeset_phase = PHASE_ZONE_SOA;
+ ctx->rrset_phase = SERIALIZE_RRSET_INIT;
+ ctx->rrset_buf_size = 0;
+ init_list(&ctx->free_rdatasets);
+
+ return ctx;
+}
+
+serialize_ctx_t *serialize_zone_diff_init(const zone_diff_t *z)
+{
+ serialize_ctx_t *ctx = calloc(1, sizeof(*ctx));
+ if (ctx == NULL) {
+ return NULL;
+ }
+
+ ctx->zone_diff = true;
+ ctx->zdiff = *z;
+ zone_diff_reverse(&ctx->zdiff); // start with removals of counterparts
+
+ ctx->changeset_phase = PHASE_ZONE_SOA;
+ ctx->rrset_phase = SERIALIZE_RRSET_INIT;
+ ctx->rrset_buf_size = 0;
+ init_list(&ctx->free_rdatasets);
+
+ return ctx;
+}
+
+static knot_rrset_t get_next_rrset(serialize_ctx_t *ctx)
+{
+ knot_rrset_t res;
+ knot_rrset_init_empty(&res);
+ switch (ctx->changeset_phase) {
+ case PHASE_ZONE_SOA:
+ zone_tree_it_begin(&ctx->zdiff.nodes, &ctx->zit);
+ ctx->changeset_phase = PHASE_ZONE_NODES;
+ return node_rrset(ctx->zdiff.apex, KNOT_RRTYPE_SOA);
+ case PHASE_ZONE_NODES:
+ case PHASE_ZONE_NSEC3:
+ while (ctx->n == NULL || ctx->node_pos >= ctx->n->rrset_count) {
+ if (zone_tree_it_finished(&ctx->zit)) {
+ zone_tree_it_free(&ctx->zit);
+ if (ctx->changeset_phase == PHASE_ZONE_NSEC3 ||
+ zone_tree_is_empty(&ctx->zdiff.nsec3s)) {
+ if (ctx->zone_diff && !ctx->zone_diff_add) {
+ ctx->zone_diff_add = true;
+ zone_diff_reverse(&ctx->zdiff);
+ zone_tree_it_begin(&ctx->zdiff.nodes, &ctx->zit);
+ ctx->changeset_phase = PHASE_ZONE_NODES;
+ return node_rrset(ctx->zdiff.apex, KNOT_RRTYPE_SOA);
+ } else {
+ ctx->changeset_phase = PHASE_END;
+ return res;
+ }
+ } else {
+ zone_tree_it_begin(&ctx->zdiff.nsec3s, &ctx->zit);
+ ctx->changeset_phase = PHASE_ZONE_NSEC3;
+ }
+ }
+ ctx->n = zone_tree_it_val(&ctx->zit);
+ zone_tree_it_next(&ctx->zit);
+ ctx->node_pos = 0;
+ }
+ res = node_rrset_at(ctx->n, ctx->node_pos++);
+ if (ctx->n == ctx->zdiff.apex && res.type == KNOT_RRTYPE_SOA) {
+ return get_next_rrset(ctx);
+ }
+ if (ctx->zone_diff) {
+ knot_rrset_t counter_rr = node_rrset(binode_counterpart(ctx->n), res.type);
+ if (counter_rr.ttl == res.ttl && !knot_rrset_empty(&counter_rr)) {
+ if (knot_rdataset_subset(&res.rrs, &counter_rr.rrs)) {
+ return get_next_rrset(ctx);
+ }
+ knot_rdataset_t rd_copy;
+ ctx->ret = knot_rdataset_copy(&rd_copy, &res.rrs, NULL);
+ if (ctx->ret == KNOT_EOK) {
+ knot_rdataset_subtract(&rd_copy, &counter_rr.rrs, NULL);
+ ptrlist_add(&ctx->free_rdatasets, rd_copy.rdata, NULL);
+ res.rrs = rd_copy;
+ assert(!knot_rrset_empty(&res));
+ } else {
+ ctx->changeset_phase = PHASE_END;
+ }
+ }
+ }
+ return res;
+ case PHASE_SOA_1:
+ changeset_iter_rem(&ctx->it, ctx->ch);
+ ctx->changeset_phase = PHASE_REM;
+ return *ctx->ch->soa_from;
+ case PHASE_REM:
+ res = changeset_iter_next(&ctx->it);
+ if (knot_rrset_empty(&res)) {
+ changeset_iter_clear(&ctx->it);
+ changeset_iter_add(&ctx->it, ctx->ch);
+ ctx->changeset_phase = PHASE_ADD;
+ return *ctx->ch->soa_to;
+ }
+ return res;
+ case PHASE_SOA_2:
+ if (ctx->it.node != NULL) {
+ changeset_iter_clear(&ctx->it);
+ }
+ changeset_iter_add(&ctx->it, ctx->ch);
+ ctx->changeset_phase = PHASE_ADD;
+ return *ctx->ch->soa_to;
+ case PHASE_ADD:
+ res = changeset_iter_next(&ctx->it);
+ if (knot_rrset_empty(&res)) {
+ changeset_iter_clear(&ctx->it);
+ ctx->changeset_phase = PHASE_END;
+ }
+ return res;
+ default:
+ return res;
+ }
+}
+
+void serialize_prepare(serialize_ctx_t *ctx, size_t thresh_size,
+ size_t max_size, size_t *realsize)
+{
+ *realsize = 0;
+
+ // check if we are in middle of a rrset
+ if (ctx->rrset_buf_size > 0) {
+ ctx->rrset_buf[0] = ctx->rrset_buf[ctx->rrset_buf_size - 1];
+ ctx->rrset_buf_size = 1;
+
+ // memory optimization: free all buffered rrsets except last one
+ ptrnode_t *n, *next;
+ WALK_LIST_DELSAFE(n, next, ctx->free_rdatasets) {
+ if (n != TAIL(ctx->free_rdatasets)) {
+ free(n->d);
+ rem_node(&n->n);
+ free(n);
+ }
+ }
+ } else {
+ ctx->rrset_buf[0] = get_next_rrset(ctx);
+ if (ctx->changeset_phase == PHASE_END) {
+ ctx->rrset_buf_size = 0;
+ return;
+ }
+ ctx->rrset_buf_size = 1;
+ }
+
+ size_t candidate = 0;
+ long tmp_phase = ctx->rrset_phase;
+ while (1) {
+ if (tmp_phase >= ctx->rrset_buf[ctx->rrset_buf_size - 1].rrs.count) {
+ if (ctx->rrset_buf_size >= RRSET_BUF_MAXSIZE) {
+ return;
+ }
+ ctx->rrset_buf[ctx->rrset_buf_size++] = get_next_rrset(ctx);
+ if (ctx->changeset_phase == PHASE_END) {
+ ctx->rrset_buf_size--;
+ return;
+ }
+ tmp_phase = SERIALIZE_RRSET_INIT;
+ }
+ if (tmp_phase == SERIALIZE_RRSET_INIT) {
+ candidate += 3 * sizeof(uint16_t) +
+ knot_dname_size(ctx->rrset_buf[ctx->rrset_buf_size - 1].owner);
+ } else {
+ candidate += sizeof(uint32_t) + sizeof(uint16_t) +
+ knot_rdataset_at(&ctx->rrset_buf[ctx->rrset_buf_size - 1].rrs, tmp_phase)->len;
+ }
+ if (candidate > max_size) {
+ return;
+ }
+ *realsize = candidate;
+ if (candidate >= thresh_size) {
+ return;
+ }
+ tmp_phase++;
+ }
+}
+
+void serialize_chunk(serialize_ctx_t *ctx, uint8_t *dst_chunk, size_t chunk_size)
+{
+ wire_ctx_t wire = wire_ctx_init(dst_chunk, chunk_size);
+
+ for (size_t i = 0; ; ) {
+ if (ctx->rrset_phase >= ctx->rrset_buf[i].rrs.count) {
+ if (++i >= ctx->rrset_buf_size) {
+ break;
+ }
+ ctx->rrset_phase = SERIALIZE_RRSET_INIT;
+ }
+ if (ctx->rrset_phase == SERIALIZE_RRSET_INIT) {
+ int size = knot_dname_to_wire(wire.position, ctx->rrset_buf[i].owner,
+ wire_ctx_available(&wire));
+ if (size < 0 || wire_ctx_available(&wire) < size + 3 * sizeof(uint16_t)) {
+ break;
+ }
+ wire_ctx_skip(&wire, size);
+ wire_ctx_write_u16(&wire, ctx->rrset_buf[i].type);
+ wire_ctx_write_u16(&wire, ctx->rrset_buf[i].rclass);
+ wire_ctx_write_u16(&wire, ctx->rrset_buf[i].rrs.count);
+ } else {
+ const knot_rdata_t *rr = knot_rdataset_at(&ctx->rrset_buf[i].rrs,
+ ctx->rrset_phase);
+ assert(rr);
+ uint16_t rdlen = rr->len;
+ if (wire_ctx_available(&wire) < sizeof(uint32_t) + sizeof(uint16_t) + rdlen) {
+ break;
+ }
+ // Compatibility, but one TTL per rrset would be enough.
+ wire_ctx_write_u32(&wire, ctx->rrset_buf[i].ttl);
+ wire_ctx_write_u16(&wire, rdlen);
+ wire_ctx_write(&wire, rr->data, rdlen);
+ }
+ ctx->rrset_phase++;
+ }
+ assert(wire.error == KNOT_EOK);
+}
+
+bool serialize_unfinished(serialize_ctx_t *ctx)
+{
+ return ctx->changeset_phase < PHASE_END;
+}
+
+int serialize_deinit(serialize_ctx_t *ctx)
+{
+ if (ctx->it.node != NULL) {
+ changeset_iter_clear(&ctx->it);
+ }
+ if (ctx->zit.tree != NULL) {
+ zone_tree_it_free(&ctx->zit);
+ }
+ ptrnode_t *n, *next;
+ WALK_LIST_DELSAFE(n, next, ctx->free_rdatasets) {
+ free(n->d);
+ rem_node(&n->n);
+ free(n);
+ }
+ int ret = ctx->ret;
+ free(ctx);
+ return ret;
+}
+
+static uint64_t rrset_binary_size(const knot_rrset_t *rrset)
+{
+ if (rrset == NULL || rrset->rrs.count == 0) {
+ return 0;
+ }
+
+ // Owner size + type + class + RR count.
+ uint64_t size = knot_dname_size(rrset->owner) + 3 * sizeof(uint16_t);
+
+ // RRs.
+ knot_rdata_t *rr = rrset->rrs.rdata;
+ for (uint16_t i = 0; i < rrset->rrs.count; i++) {
+ // TTL + RR size + RR.
+ size += sizeof(uint32_t) + sizeof(uint16_t) + rr->len;
+ rr = knot_rdataset_next(rr);
+ }
+
+ return size;
+}
+
+static size_t node_diff_size(zone_node_t *node)
+{
+ size_t res = 0;
+ knot_rrset_t rr, counter_rr;
+ for (int i = 0; i < node->rrset_count; i++) {
+ rr = node_rrset_at(node, i);
+ counter_rr = node_rrset(binode_counterpart(node), rr.type);
+ if (!knot_rrset_equal(&rr, &counter_rr, true)) {
+ res += rrset_binary_size(&rr);
+ }
+ }
+ return res;
+}
+
+size_t zone_diff_serialized_size(zone_diff_t diff)
+{
+ size_t res = 0;
+ for (int i = 0; i < 2; i++) {
+ zone_diff_reverse(&diff);
+ zone_tree_it_t it = { 0 };
+ int ret = zone_tree_it_double_begin(&diff.nodes, diff.nsec3s.trie != NULL ?
+ &diff.nsec3s : NULL, &it);
+ if (ret != KNOT_EOK) {
+ return 0;
+ }
+ while (!zone_tree_it_finished(&it)) {
+ res += node_diff_size(zone_tree_it_val(&it));
+ zone_tree_it_next(&it);
+ }
+ zone_tree_it_free(&it);
+ }
+ return res;
+}
+
+size_t changeset_serialized_size(const changeset_t *ch)
+{
+ if (ch == NULL) {
+ return 0;
+ }
+
+ size_t soa_from_size = rrset_binary_size(ch->soa_from);
+ size_t soa_to_size = rrset_binary_size(ch->soa_to);
+
+ changeset_iter_t it;
+ if (ch->remove == NULL) {
+ changeset_iter_add(&it, ch);
+ } else {
+ changeset_iter_all(&it, ch);
+ }
+
+ size_t change_size = 0;
+ knot_rrset_t rrset = changeset_iter_next(&it);
+ while (!knot_rrset_empty(&rrset)) {
+ change_size += rrset_binary_size(&rrset);
+ rrset = changeset_iter_next(&it);
+ }
+
+ changeset_iter_clear(&it);
+
+ return soa_from_size + soa_to_size + change_size;
+}
+
+int serialize_rrset(wire_ctx_t *wire, const knot_rrset_t *rrset)
+{
+ assert(wire != NULL && rrset != NULL);
+
+ // write owner, type, class, rrcnt
+ int size = knot_dname_to_wire(wire->position, rrset->owner,
+ wire_ctx_available(wire));
+ if (size < 0 || wire_ctx_available(wire) < size + 3 * sizeof(uint16_t)) {
+ assert(0);
+ }
+ wire_ctx_skip(wire, size);
+ wire_ctx_write_u16(wire, rrset->type);
+ wire_ctx_write_u16(wire, rrset->rclass);
+ wire_ctx_write_u16(wire, rrset->rrs.count);
+
+ for (size_t phase = 0; phase < rrset->rrs.count; phase++) {
+ const knot_rdata_t *rr = knot_rdataset_at(&rrset->rrs, phase);
+ assert(rr);
+ uint16_t rdlen = rr->len;
+ if (wire_ctx_available(wire) < sizeof(uint32_t) + sizeof(uint16_t) + rdlen) {
+ assert(0);
+ }
+ wire_ctx_write_u32(wire, rrset->ttl);
+ wire_ctx_write_u16(wire, rdlen);
+ wire_ctx_write(wire, rr->data, rdlen);
+ assert(wire->error == KNOT_EOK);
+ }
+
+ return KNOT_EOK;
+}
+
+int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset)
+{
+ assert(wire != NULL && rrset != NULL);
+
+ // Read owner, rtype, rclass and RR count.
+ int size = knot_dname_size(wire->position);
+ if (size < 0) {
+ assert(0);
+ }
+ knot_dname_t *owner = knot_dname_copy(wire->position, NULL);
+ if (owner == NULL || wire_ctx_available(wire) < size + 3 * sizeof(uint16_t)) {
+ knot_dname_free(owner, NULL);
+ return KNOT_EMALF;
+ }
+ wire_ctx_skip(wire, size);
+ uint16_t type = wire_ctx_read_u16(wire);
+ uint16_t rclass = wire_ctx_read_u16(wire);
+ uint16_t rrcount = wire_ctx_read_u16(wire);
+ if (wire->error != KNOT_EOK) {
+ knot_dname_free(owner, NULL);
+ return wire->error;
+ }
+ if (rrset->owner != NULL) {
+ if (knot_dname_cmp(owner, rrset->owner) != 0) {
+ knot_dname_free(owner, NULL);
+ return KNOT_ESEMCHECK;
+ }
+ knot_rrset_clear(rrset, NULL);
+ }
+ knot_rrset_init(rrset, owner, type, rclass, 0);
+
+ for (size_t phase = 0; phase < rrcount && wire_ctx_available(wire) > 0; phase++) {
+ uint32_t ttl = wire_ctx_read_u32(wire);
+ uint32_t rdata_size = wire_ctx_read_u16(wire);
+ if (phase == 0) {
+ rrset->ttl = ttl;
+ }
+ if (wire->error != KNOT_EOK ||
+ wire_ctx_available(wire) < rdata_size ||
+ knot_rrset_add_rdata(rrset, wire->position, rdata_size,
+ NULL) != KNOT_EOK) {
+ knot_rrset_clear(rrset, NULL);
+ return KNOT_EMALF;
+ }
+ wire_ctx_skip(wire, rdata_size);
+ assert(wire->error == KNOT_EOK);
+ }
+
+ return KNOT_EOK;
+}
+
+size_t rrset_serialized_size(const knot_rrset_t *rrset)
+{
+ if (rrset == NULL) {
+ return 0;
+ }
+
+ // Owner size + type + class + RR count.
+ size_t size = knot_dname_size(rrset->owner) + 3 * sizeof(uint16_t);
+
+ for (uint16_t i = 0; i < rrset->rrs.count; i++) {
+ const knot_rdata_t *rr = knot_rdataset_at(&rrset->rrs, i);
+ assert(rr);
+ // TTL + RR size + RR.
+ size += sizeof(uint32_t) + sizeof(uint16_t) + rr->len;
+ }
+
+ return size;
+}
diff --git a/src/knot/journal/serialization.h b/src/knot/journal/serialization.h
new file mode 100644
index 0000000..621dcdb
--- /dev/null
+++ b/src/knot/journal/serialization.h
@@ -0,0 +1,169 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+#include "libknot/rrset.h"
+#include "libknot/rrtype/soa.h"
+#include "knot/updates/changesets.h"
+#include "contrib/wire_ctx.h"
+
+typedef struct zone_diff {
+ zone_tree_t nodes;
+ zone_tree_t nsec3s;
+ zone_node_t *apex;
+} zone_diff_t;
+
+inline static void zone_diff_reverse(zone_diff_t *diff)
+{
+ diff->nodes.flags ^= ZONE_TREE_BINO_SECOND;
+ diff->nsec3s.flags ^= ZONE_TREE_BINO_SECOND;
+ diff->apex = binode_counterpart(diff->apex);
+}
+
+inline static void zone_diff_from_zone(zone_diff_t *diff, const zone_contents_t *z)
+{
+ diff->nodes = *z->nodes;
+ if (z->nsec3_nodes != NULL) {
+ diff->nsec3s = *z->nsec3_nodes;
+ } else {
+ memset(&diff->nsec3s, 0, sizeof(diff->nsec3s));
+ }
+ diff->apex = z->apex;
+}
+
+inline static uint32_t zone_diff_to(const zone_diff_t *diff)
+{
+ return knot_soa_serial(node_rdataset(diff->apex, KNOT_RRTYPE_SOA)->rdata);
+}
+
+inline static uint32_t zone_diff_from(const zone_diff_t *diff)
+{
+ return knot_soa_serial(node_rdataset(binode_counterpart(diff->apex), KNOT_RRTYPE_SOA)->rdata);
+}
+
+typedef struct serialize_ctx serialize_ctx_t;
+
+/*!
+ * \brief Init serialization context.
+ *
+ * \param ch Changeset to be serialized.
+ *
+ * \return Context.
+ */
+serialize_ctx_t *serialize_init(const changeset_t *ch);
+
+/*!
+ * \brief Init serialization context.
+ *
+ * \param z Zone to be serialized like zone-in-journal changeset.
+ *
+ * \return Context.
+ */
+serialize_ctx_t *serialize_zone_init(const zone_contents_t *z);
+
+/*!
+ * \brief Init serialization context.
+ *
+ * \param z Zone with binodes being updated.
+ *
+ * \return Context.
+ */
+serialize_ctx_t *serialize_zone_diff_init(const zone_diff_t *z);
+
+/*!
+ * \brief Pre-check and space computation before serializing a chunk.
+ *
+ * \note This MUST be called before each serialize_chunk() !
+ *
+ * \param ctx Serializing context.
+ * \param thresh_size Optimal size of next chunk.
+ * \param max_size Maximum size of next chunk.
+ * \param realsize Output: real exact size of next chunk.
+ */
+void serialize_prepare(serialize_ctx_t *ctx, size_t thresh_size,
+ size_t max_size, size_t *realsize);
+
+/*!
+ * \brief Perform one step of serializiation: fill one chunk.
+ *
+ * \param ctx Serializing context.
+ * \param chunk Pointer on allocated memory to be serialized into.
+ * \param chunk_size Its size. It MUST be the same as returned from serialize_prepare().
+ */
+void serialize_chunk(serialize_ctx_t *ctx, uint8_t *chunk, size_t chunk_size);
+
+/*! \brief Tells if there remains something of the changeset
+ * to be serialized into next chunk(s) yet. */
+bool serialize_unfinished(serialize_ctx_t *ctx);
+
+/*!
+ * \brief Free serialization context.
+ *
+ * \return KNOT_E* if there were errors during serialization.
+ */
+int serialize_deinit(serialize_ctx_t *ctx);
+
+/*!
+ * \brief Returns size of serialized changeset from zone diff.
+ *
+ * \warning Not accurate! This is an upper bound, suitable for policy enforcement etc.
+ *
+ * \param[in] diff Zone diff structure to create changeset from.
+ *
+ * \return Size of the resulting changeset.
+ */
+size_t zone_diff_serialized_size(zone_diff_t diff);
+
+/*!
+ * \brief Returns size of changeset in serialized form.
+ *
+ * \param[in] ch Changeset whose size we want to compute.
+ *
+ * \return Size of the changeset.
+ */
+size_t changeset_serialized_size(const changeset_t *ch);
+
+/*!
+ * \brief Simply serialize RRset w/o any chunking.
+ *
+ * \param wire
+ * \param rrset
+ *
+ * \return KNOT_E*
+ */
+int serialize_rrset(wire_ctx_t *wire, const knot_rrset_t *rrset);
+
+/*!
+ * \brief Simply deserialize RRset w/o any chunking.
+ *
+ * \param wire
+ * \param rrset
+ *
+ * \return KNOT_E*
+ */
+int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset);
+
+/*!
+ * \brief Space needed to serialize RRset.
+ *
+ * \param rrset RRset.
+ *
+ * \return RRset binary size.
+ */
+size_t rrset_serialized_size(const knot_rrset_t *rrset);