diff options
Diffstat (limited to '')
-rw-r--r-- | src/knot/journal/journal_basic.c | 92 | ||||
-rw-r--r-- | src/knot/journal/journal_basic.h | 118 | ||||
-rw-r--r-- | src/knot/journal/journal_metadata.c | 422 | ||||
-rw-r--r-- | src/knot/journal/journal_metadata.h | 187 | ||||
-rw-r--r-- | src/knot/journal/journal_read.c | 436 | ||||
-rw-r--r-- | src/knot/journal/journal_read.h | 158 | ||||
-rw-r--r-- | src/knot/journal/journal_write.c | 333 | ||||
-rw-r--r-- | src/knot/journal/journal_write.h | 121 | ||||
-rw-r--r-- | src/knot/journal/knot_lmdb.c | 770 | ||||
-rw-r--r-- | src/knot/journal/knot_lmdb.h | 446 | ||||
-rw-r--r-- | src/knot/journal/serialization.c | 501 | ||||
-rw-r--r-- | src/knot/journal/serialization.h | 169 |
12 files changed, 3753 insertions, 0 deletions
diff --git a/src/knot/journal/journal_basic.c b/src/knot/journal/journal_basic.c new file mode 100644 index 0000000..825130a --- /dev/null +++ b/src/knot/journal/journal_basic.c @@ -0,0 +1,92 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_basic.h" +#include "knot/journal/journal_metadata.h" +#include "libknot/error.h" + +MDB_val journal_changeset_id_to_key(bool zone_in_journal, uint32_t serial, const knot_dname_t *zone) +{ + if (zone_in_journal) { + return knot_lmdb_make_key("NIS", zone, (uint32_t)0, "bootstrap"); + } else { + return knot_lmdb_make_key("NII", zone, (uint32_t)0, serial); + } +} + +MDB_val journal_make_chunk_key(const knot_dname_t *apex, uint32_t ch_from, bool zij, uint32_t chunk_id) +{ + if (zij) { + return knot_lmdb_make_key("NISI", apex, (uint32_t)0, "bootstrap", chunk_id); + } else { + return knot_lmdb_make_key("NIII", apex, (uint32_t)0, ch_from, chunk_id); + } +} + +MDB_val journal_zone_prefix(const knot_dname_t *zone) +{ + return knot_lmdb_make_key("NI", zone, (uint32_t)0); +} + +void journal_del_zone(knot_lmdb_txn_t *txn, const knot_dname_t *zone) +{ + assert(txn->is_rw); + MDB_val prefix = journal_zone_prefix(zone); + knot_lmdb_del_prefix(txn, &prefix); + free(prefix.mv_data); +} + +void journal_make_header(void *chunk, uint32_t ch_serial_to) +{ + knot_lmdb_make_key_part(chunk, JOURNAL_HEADER_SIZE, "IILLL", ch_serial_to, + (uint32_t)0 /* we no longer care for # of chunks */, + (uint64_t)0, (uint64_t)0, (uint64_t)0); +} + +uint32_t journal_next_serial(const MDB_val *chunk) +{ + return knot_wire_read_u32(chunk->mv_data); +} + +bool journal_serial_to(knot_lmdb_txn_t *txn, bool zij, uint32_t serial, + const knot_dname_t *zone, uint32_t *serial_to) +{ + MDB_val key = journal_changeset_id_to_key(zij, serial, zone); + bool found = knot_lmdb_find_prefix(txn, &key); + if (found && serial_to != NULL) { + *serial_to = journal_next_serial(&txn->cur_val); + } + free(key.mv_data); + return found; +} + +bool journal_allow_flush(zone_journal_t j) +{ + conf_val_t val = conf_zone_get(j.conf, C_ZONEFILE_SYNC, j.zone); + return conf_int(&val) >= 0; +} + +size_t journal_conf_max_usage(zone_journal_t j) +{ + conf_val_t val = conf_zone_get(j.conf, C_JOURNAL_MAX_USAGE, j.zone); + return conf_int(&val); +} + +size_t journal_conf_max_changesets(zone_journal_t j) +{ + conf_val_t val = conf_zone_get(j.conf, C_JOURNAL_MAX_DEPTH, j.zone); + return conf_int(&val); +} diff --git a/src/knot/journal/journal_basic.h b/src/knot/journal/journal_basic.h new file mode 100644 index 0000000..8804d7b --- /dev/null +++ b/src/knot/journal/journal_basic.h @@ -0,0 +1,118 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/conf/conf.h" +#include "knot/journal/knot_lmdb.h" +#include "knot/updates/changesets.h" +#include "libknot/dname.h" + +typedef struct { + knot_lmdb_db_t *db; + const knot_dname_t *zone; + void *conf; // needed only for journal write operations +} zone_journal_t; + +#define JOURNAL_CHUNK_MAX (70 * 1024) // must be at least 64k + 6B +#define JOURNAL_CHUNK_THRESH (15 * 1024) +#define JOURNAL_HEADER_SIZE (32) + +/*! \brief Convert journal_mode to LMDB environment flags. */ +inline static unsigned journal_env_flags(int journal_mode, bool readonly) +{ + return (journal_mode == JOURNAL_MODE_ASYNC ? (MDB_WRITEMAP | MDB_MAPASYNC) : 0) | + (readonly ? MDB_RDONLY : 0); +} + +/*! + * \brief Create a database key prefix to search for a changeset. + * + * \param zone_in_journal True if searching for zone-in-journal special changeset. + * \param serial Serial-from of the changeset to be searched for. Ignored if 'zone_in_journal'. + * \param zone Name of the zone. + * + * \return DB key. 'mv_data' shall be freed later. 'mv_data' is NULL on failure. + */ +MDB_val journal_changeset_id_to_key(bool zone_in_journal, uint32_t serial, const knot_dname_t *zone); + +/*! + * \brief Create a database key for changeset chunk. + * + * \param apex Zone apex owner name. + * \param ch_from Serial "from" of the stored changeset. + * \param zij Zone-in-journal is stored. + * \param chunk_id Ordinal number of this changeset's chunk. + * + * \return DB key. 'mv_data' shall be freed later. 'mv_data' is NULL on failure. + */ +MDB_val journal_make_chunk_key(const knot_dname_t *apex, uint32_t ch_from, bool zij, uint32_t chunk_id); + +/*! + * \brief Return a key prefix to operate with all zone-related records. + */ +MDB_val journal_zone_prefix(const knot_dname_t *zone); + +/*! + * \brief Delete all zone-related records from journal with open read-write txn. + */ +void journal_del_zone(knot_lmdb_txn_t *txn, const knot_dname_t *zone); + +/*! + * \brief Initialise chunk header. + * + * \param chunk Pointer to the changeset chunk. It must be at least JOURNAL_HEADER_SIZE, perhaps more. + * \param ch Serial-to of the changeset being serialized. + */ +void journal_make_header(void *chunk, uint32_t ch_serial_to); + +/*! + * \brief Obtain serial-to of the serialized changeset. + * + * \param chunk Any chunk of a serialized changeset. + * + * \return The changeset's serial-to. + */ +uint32_t journal_next_serial(const MDB_val *chunk); + +/*! + * \brief Obtain serial-to of a changeset stored in journal. + * + * \param txn Journal DB transaction. + * \param zij True if changeset in question is zone-in-journal. + * \param serial Serial-from of the changeset in question. + * \param zone Zone name. + * \param serial_to Output: serial-to of the changeset in question. + * + * \return True if the changeset exists in the journal. + */ +bool journal_serial_to(knot_lmdb_txn_t *txn, bool zij, uint32_t serial, + const knot_dname_t *zone, uint32_t *serial_to); + +/*! \brief Return true if the changeset in question exists in the journal. */ +inline static bool journal_contains(knot_lmdb_txn_t *txn, bool zone, uint32_t serial, const knot_dname_t *zone_name) +{ + return journal_serial_to(txn, zone, serial, zone_name, NULL); +} + +/*! \brief Return true if the journal may be flushed according to conf. */ +bool journal_allow_flush(zone_journal_t j); + +/*! \brief Return configured maximal per-zone usage of journal DB. */ +size_t journal_conf_max_usage(zone_journal_t j); + +/*! \brief Return configured maximal depth of journal. */ +size_t journal_conf_max_changesets(zone_journal_t j); diff --git a/src/knot/journal/journal_metadata.c b/src/knot/journal/journal_metadata.c new file mode 100644 index 0000000..b133534 --- /dev/null +++ b/src/knot/journal/journal_metadata.c @@ -0,0 +1,422 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_metadata.h" + +#include "libknot/endian.h" +#include "libknot/error.h" + +static void fix_endian(void *data, size_t data_size, bool in) +{ + union { + uint8_t u8; + uint16_t u16; + uint32_t u32; + uint64_t u64; + } before, after; + + memcpy(&before, data, data_size); + switch (data_size) { + case sizeof(uint8_t): + return; + case sizeof(uint16_t): + after.u16 = in ? be16toh(before.u16) : htobe16(before.u16); + break; + case sizeof(uint32_t): + after.u32 = in ? be32toh(before.u32) : htobe32(before.u32); + break; + case sizeof(uint64_t): + after.u64 = in ? be64toh(before.u64) : htobe64(before.u64); + break; + default: + assert(0); + } + memcpy(data, &after, data_size); +} + +static MDB_val metadata_key(const knot_dname_t *zone, const char *metadata) +{ + if (zone == NULL) { + return knot_lmdb_make_key("IS", (uint32_t)0, metadata); + } else { + return knot_lmdb_make_key("NIS", zone, (uint32_t)0, metadata); + } +} + +static bool del_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata) +{ + MDB_val key = metadata_key(zone, metadata); + if (key.mv_data != NULL) { + knot_lmdb_del_prefix(txn, &key); + free(key.mv_data); + } + return (key.mv_data != NULL); +} + +static bool get_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata) +{ + MDB_val key = metadata_key(zone, metadata); + bool ret = knot_lmdb_find(txn, &key, KNOT_LMDB_EXACT); // not FORCE + free(key.mv_data); + return ret; +} + +static bool get_metadata_numeric(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, void *result, size_t result_size) +{ + if (get_metadata(txn, zone, metadata)) { + if (txn->cur_val.mv_size == result_size) { + memcpy(result, txn->cur_val.mv_data, result_size); + fix_endian(result, result_size, true); + return true; + } else { + txn->ret = KNOT_EMALF; + } + } + return false; +} + +bool get_metadata32(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, uint32_t *result) +{ + return get_metadata_numeric(txn, zone, metadata, result, sizeof(*result)); +} + +bool get_metadata64(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, uint64_t *result) +{ + return get_metadata_numeric(txn, zone, metadata, result, sizeof(*result)); +} + +bool get_metadata64or32(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, uint64_t *result) +{ + if (txn->ret != KNOT_EOK) { + return false; + } + bool ret = get_metadata64(txn, zone, metadata, result); + if (txn->ret == KNOT_EMALF) { + uint32_t res32 = 0; + txn->ret = KNOT_EOK; + ret = get_metadata32(txn, zone, metadata, &res32); + *result = res32; + } + return ret; +} + +void set_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata, + const void *valp, size_t val_size, bool numeric) +{ + MDB_val key = metadata_key(zone, metadata); + MDB_val val = { val_size, NULL }; + if (knot_lmdb_insert(txn, &key, &val)) { + memcpy(val.mv_data, valp, val_size); + if (numeric) { + fix_endian(val.mv_data, val_size, false); + } + } + free(key.mv_data); +} + +static int64_t last_occupied_diff(knot_lmdb_txn_t *txn) +{ + uint64_t occupied_now = knot_lmdb_usage(txn), occupied_last = 0; + (void)get_metadata64(txn, NULL, "last_total_occupied", &occupied_last); + return (int64_t)occupied_now - (int64_t)occupied_last; +} + +void update_last_inserter(knot_lmdb_txn_t *txn, const knot_dname_t *new_inserter) +{ + uint64_t occupied_now = knot_lmdb_usage(txn), lis_occupied = 0; + int64_t occupied_diff = last_occupied_diff(txn); + knot_dname_t *last_inserter = get_metadata(txn, NULL, "last_inserter_zone") ? + knot_dname_copy(txn->cur_val.mv_data, NULL) : NULL; + if (occupied_diff == 0 || last_inserter == NULL) { + goto update_inserter; + } + (void)get_metadata64(txn, last_inserter, "occupied", &lis_occupied); + lis_occupied = MAX(0, (int64_t)lis_occupied + occupied_diff); + set_metadata(txn, last_inserter, "occupied", &lis_occupied, sizeof(lis_occupied), true); + +update_inserter: + if (new_inserter == NULL) { + del_metadata(txn, NULL, "last_inserter_zone"); + } else if (last_inserter == NULL || !knot_dname_is_equal(last_inserter, new_inserter)) { + set_metadata(txn, NULL, "last_inserter_zone", new_inserter, knot_dname_size(new_inserter), false); + } + free(last_inserter); + set_metadata(txn, NULL, "last_total_occupied", &occupied_now, sizeof(occupied_now), true); +} + +uint64_t journal_get_occupied(knot_lmdb_txn_t *txn, const knot_dname_t *zone) +{ + uint64_t res = 0; + get_metadata64(txn, zone, "occupied", &res); + return res; +} + +static int first_digit(char * of) +{ + unsigned maj, min; + return sscanf(of, "%u.%u", &maj, &min) == 2 ? maj : -1; +} + +void journal_load_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, journal_metadata_t *md) +{ + memset(md, 0, sizeof(*md)); + if (get_metadata(txn, NULL, "version")) { + switch (first_digit(txn->cur_val.mv_data)) { + case 3: + // TODO warning about downgrade + // FALLTHROUGH + case 1: + // still supported + // FALLTHROUGH + case 2: + // normal operation + break; + case 0: + // failed to read version + txn->ret = KNOT_ENOENT; + return; + default: + txn->ret = KNOT_ENOTSUP; + return; + } + } + md->_new_zone = !get_metadata32(txn, zone, "flags", &md->flags); + (void)get_metadata32(txn, zone, "first_serial", &md->first_serial); + (void)get_metadata32(txn, zone, "last_serial_to", &md->serial_to); + (void)get_metadata32(txn, zone, "merged_serial", &md->merged_serial); + (void)get_metadata32(txn, zone, "changeset_count", &md->changeset_count); + if (!get_metadata32(txn, zone, "flushed_upto", &md->flushed_upto)) { + // importing from version 1.0 + if ((md->flags & JOURNAL_LAST_FLUSHED_VALID)) { + uint32_t last_flushed = 0; + if (!get_metadata32(txn, zone, "last_flushed", &last_flushed) || + !journal_serial_to(txn, false, last_flushed, zone, &md->flushed_upto)) { + txn->ret = KNOT_EMALF; + } else { + md->flags &= ~JOURNAL_LAST_FLUSHED_VALID; + } + } else { + md->flushed_upto = md->first_serial; + } + } + +} + +void journal_store_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const journal_metadata_t *md) +{ + set_metadata(txn, zone, "first_serial", &md->first_serial, sizeof(md->first_serial), true); + set_metadata(txn, zone, "last_serial_to", &md->serial_to, sizeof(md->serial_to), true); + set_metadata(txn, zone, "flushed_upto", &md->flushed_upto, sizeof(md->flushed_upto), true); + set_metadata(txn, zone, "merged_serial", &md->merged_serial, sizeof(md->merged_serial), true); + set_metadata(txn, zone, "changeset_count", &md->changeset_count, sizeof(md->changeset_count), true); + set_metadata(txn, zone, "flags", &md->flags, sizeof(md->flags), true); + set_metadata(txn, NULL, "version", "2.0", 4, false); + if (md->_new_zone) { + uint64_t journal_count = 0; + (void)get_metadata64or32(txn, NULL, "journal_count", &journal_count); + ++journal_count; + set_metadata(txn, NULL, "journal_count", &journal_count, sizeof(journal_count), true); + } +} + +void journal_metadata_after_delete(journal_metadata_t *md, uint32_t deleted_upto, + size_t deleted_count) +{ + if (deleted_count == 0) { + return; + } + assert((md->flags & JOURNAL_SERIAL_TO_VALID)); + if (deleted_upto == md->serial_to) { + assert(md->flushed_upto == md->serial_to); + assert(md->changeset_count == deleted_count); + md->flags &= ~JOURNAL_SERIAL_TO_VALID; + } + md->first_serial = deleted_upto; + md->changeset_count -= deleted_count; +} + +void journal_metadata_after_merge(journal_metadata_t *md, bool merged_zij, uint32_t merged_serial, + uint32_t merged_serial_to, uint32_t original_serial_to) +{ + md->flushed_upto = merged_serial_to; + if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + assert(!merged_zij); + assert(merged_serial == md->merged_serial); + } else if (!merged_zij) { + md->merged_serial = merged_serial; + md->flags |= JOURNAL_MERGED_SERIAL_VALID; + assert(merged_serial == md->first_serial); + journal_metadata_after_delete(md, original_serial_to, 1); // the merged changeset writes itself instead of first one + } +} + +void journal_metadata_after_insert(journal_metadata_t *md, uint32_t serial, uint32_t serial_to) +{ + if (md->first_serial == md->serial_to) { // no changesets yet + md->first_serial = serial; + md->flushed_upto = serial; + } + md->serial_to = serial_to; + md->flags |= JOURNAL_SERIAL_TO_VALID; + md->changeset_count++; +} + +void journal_metadata_after_extra(journal_metadata_t *md, uint32_t serial, uint32_t serial_to) +{ + assert(!(md->flags & JOURNAL_MERGED_SERIAL_VALID)); + md->merged_serial = serial; + md->flushed_upto = serial_to; + md->flags |= (JOURNAL_MERGED_SERIAL_VALID | JOURNAL_LAST_FLUSHED_VALID); +} + +void journal_del_zone_txn(knot_lmdb_txn_t *txn, const knot_dname_t *zone) +{ + uint64_t md_occupied = 0; + (void)get_metadata64(txn, zone, "occupied", &md_occupied); + journal_del_zone(txn, zone); + set_metadata(txn, zone, "occupied", &md_occupied, sizeof(md_occupied), true); +} + +int journal_scrape_with_md(zone_journal_t j, bool check_existence) +{ + if (check_existence && !journal_is_existing(j)) { + return KNOT_EOK; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + + update_last_inserter(&txn, NULL); + journal_del_zone(&txn, j.zone); + + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_copy_with_md(knot_lmdb_db_t *from, knot_lmdb_db_t *to, const knot_dname_t *zone) +{ + knot_lmdb_txn_t tr = { 0 }, tw = { 0 }; + tr.ret = knot_lmdb_open(from); + tw.ret = knot_lmdb_open(to); + if (tr.ret != KNOT_EOK || tw.ret != KNOT_EOK) { + goto done; + } + knot_lmdb_begin(from, &tr, true); + knot_lmdb_begin(to, &tw, true); + update_last_inserter(&tr, NULL); + MDB_val prefix = journal_zone_prefix(zone); + knot_lmdb_copy_prefix(&tr, &tw, &prefix); + free(prefix.mv_data); + knot_lmdb_commit(&tw); + knot_lmdb_commit(&tr); +done: + return tr.ret == KNOT_EOK ? tw.ret : tr.ret; +} + +int journal_set_flushed(zone_journal_t j) +{ + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + journal_load_metadata(&txn, j.zone, &md); + + md.flushed_upto = md.serial_to; + + journal_store_metadata(&txn, j.zone, &md); + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_info(zone_journal_t j, bool *exists, uint32_t *first_serial, bool *has_zij, + uint32_t *serial_to, bool *has_merged, uint32_t *merged_serial, + uint64_t *occupied, uint64_t *occupied_total) +{ + if (knot_lmdb_exists(j.db) == KNOT_ENODB) { + *exists = false; + return KNOT_EOK; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, false); + journal_load_metadata(&txn, j.zone, &md); + *exists = (md.flags & JOURNAL_SERIAL_TO_VALID); + if (first_serial != NULL) { + *first_serial = md.first_serial; + } + if (has_zij != NULL) { + *has_zij = journal_contains(&txn, true, 0, j.zone); + } + if (serial_to != NULL) { + *serial_to = md.serial_to; + } + if (has_merged != NULL) { + *has_merged = (md.flags & JOURNAL_MERGED_SERIAL_VALID); + } + if (merged_serial != NULL) { + *merged_serial = md.merged_serial; + } + if (occupied != NULL) { + *occupied = 0; + get_metadata64(&txn, j.zone, "occupied", occupied); + + if (get_metadata(&txn, NULL, "last_inserter_zone") && + knot_dname_is_equal(j.zone, txn.cur_val.mv_data)) { + *occupied = MAX(0, (int64_t)*occupied + last_occupied_diff(&txn)); + } + } + if (occupied_total != NULL) { + *occupied_total = knot_lmdb_usage(&txn); + } + knot_lmdb_abort(&txn); + return txn.ret; +} + +int journals_walk(knot_lmdb_db_t *db, journals_walk_cb_t cb, void *ctx) +{ + int ret = knot_lmdb_exists(db); + if (ret == KNOT_EOK) { + ret = knot_lmdb_open(db); + } + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(db, &txn, false); + knot_dname_storage_t search_data = { 0 }; + MDB_val search = { 1, search_data }; + while (knot_lmdb_find(&txn, &search, KNOT_LMDB_GEQ)) { + knot_dname_t *found = txn.cur_key.mv_data; + uint32_t unused_flags; + if (get_metadata32(&txn, found, "flags", &unused_flags)) { + // matched journal DB key appears to be a zone name + txn.ret = cb(found, ctx); + } + + // update searched key to next after found zone + search.mv_size = knot_dname_size(found); + memcpy(search.mv_data, found, search.mv_size); + ((uint8_t *)search.mv_data)[search.mv_size - 1]++; + } + knot_lmdb_abort(&txn); + return txn.ret; +} diff --git a/src/knot/journal/journal_metadata.h b/src/knot/journal/journal_metadata.h new file mode 100644 index 0000000..246d899 --- /dev/null +++ b/src/knot/journal/journal_metadata.h @@ -0,0 +1,187 @@ +/* Copyright (C) 2020 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/journal/journal_basic.h" + +typedef struct { + uint32_t first_serial; + uint32_t serial_to; + uint32_t flushed_upto; + uint32_t merged_serial; + uint32_t changeset_count; + uint32_t flags; // a bitmap of flags, see enum below + bool _new_zone; // private: if there were no metadata at all previously +} journal_metadata_t; + +enum journal_metadata_flags { + JOURNAL_LAST_FLUSHED_VALID = (1 << 0), // deprecated + JOURNAL_SERIAL_TO_VALID = (1 << 1), + JOURNAL_MERGED_SERIAL_VALID = (1 << 2), +}; + +typedef int (*journals_walk_cb_t)(const knot_dname_t *zone, void *ctx); + +/*! + * \brief Update the computation of DB resources used by each zone. + * + * Because the amount of used space is bigger than sum of changesets' serialized_sizes, + * journal uses a complicated way to compute each zone's used space: there is a metadata + * showing always the previously-inserting zone. Before the next insert, it is computed + * how the total usage of the DB changed during the previous insert (or delete), and the + * usage increase (or decrease) is accounted on the bill of the previous inserter. + * + * \param txn Journal DB transaction. + * \param new_inserter Name of the zone that is going to insert now. Might be NULL if no insert nor delete will be done. + */ +void update_last_inserter(knot_lmdb_txn_t *txn, const knot_dname_t *new_inserter); + +/* \brief Return the journal database usage by given zone. */ +uint64_t journal_get_occupied(knot_lmdb_txn_t *txn, const knot_dname_t *zone); + +/*! + * \brief Load the metadata from DB into structure. + * + * \param txn Journal DB transaction. + * \param zone Zone name. + * \param md Output: metadata structure. + */ +void journal_load_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, journal_metadata_t *md); + +/*! + * \brief Store the metadata from structure into DB. + * + * \param txn Journal DB transaction. + * \param zone Zone name. + * \param md Metadata structure. + */ +void journal_store_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const journal_metadata_t *md); + +/*! + * \brief Update metadata according to what was deleted. + * + * \param md Metadata structure to be updated. + * \param deleted_upto Serial-to of the last deleted changeset. + * \param deleted_count Number of deleted changesets. + */ +void journal_metadata_after_delete(journal_metadata_t *md, uint32_t deleted_upto, + size_t deleted_count); + +/*! + * \brief Update metadata according to what was merged. + * + * \param md Metadata structure to be updated. + * \param merged_zij True if it was a merge into zone-in-journal. + * \param merged_serial Serial-from of the merged changeset (ignored if 'merged_zij'). + * \param merged_serial_to Serial-to of the merged changeset. + * \param original_serial_to Previous serial-to of the merged changeset before the merge. + */ +void journal_metadata_after_merge(journal_metadata_t *md, bool merged_zij, uint32_t merged_serial, + uint32_t merged_serial_to, uint32_t original_serial_to); + +/*! + * \brief Update metadata according to what was inserted. + * + * \param md Metadata structure to be updated. + * \param serial Serial-from of the inserted changeset. + * \param serial_to Serial-to of the inserted changeset. + */ +void journal_metadata_after_insert(journal_metadata_t *md, uint32_t serial, uint32_t serial_to); + +/*! + * \brief Update metadata according to inserted extra changeset. + * + * \param md Metadata structure to be updated. + * \param serial Serial-from of the inserted changeset. + * \param serial_to Serial-to of the inserted changeset. + */ +void journal_metadata_after_extra(journal_metadata_t *md, uint32_t serial, uint32_t serial_to); + +/*! + * \brief Delete all zone records in a txn that will later write to the same zone. + * + * \note The difference against journal_del_zone(), which purges even metadata, incl "occupied". + * \note This preserves keeping track of space occupied/freed by this zone. + */ +void journal_del_zone_txn(knot_lmdb_txn_t *txn, const knot_dname_t *zone); + +/*! + * \brief Completely delete all journal records belonging to this zone, including metadata. + * + * \param j Journal to be scraped. + * \param check_existence Don't operate if the journal seems not to exist. + * + * \return KNOT_E* + */ +int journal_scrape_with_md(zone_journal_t j, bool check_existence); + +/*! + * \brief Copy all records related to this zone from one journal DB to another. + * + * \param from DB to copy from. + * \param to DB to copy to. + * \param zone Journal zone. + * + * \return KNOT_E* + */ +int journal_copy_with_md(knot_lmdb_db_t *from, knot_lmdb_db_t *to, const knot_dname_t *zone); + +/*! + * \brief Update the metadata stored in journal DB after a zone flush. + * + * \param j Journal to be notified about flush. + * + * \return KNOT_E* + */ +int journal_set_flushed(zone_journal_t j); + +/*! + * \brief Obtain information about the zone's journal from the DB (mostly metadata). + * + * \param j Zone journal. + * \param exists Output: bool if the zone exists in the journal. + * \param first_serial Optional output: serial-from of the first changeset in journal. + * \param has_zij Optional output: bool if there is zone-in-journal. + * \param serial_to Optional output: serial.to of the last changeset in journal. + * \param has_merged Optional output: bool if there is a special (non zone-in-journal) merged changeset. + * \param merged_serial Optional output: serial-from of the merged changeset. + * \param occupied Optional output: DB space occupied by this zones. + * \param occupied_total Optional output: DB space occupied in total by all zones. + * + * \return KNOT_E* + */ +int journal_info(zone_journal_t j, bool *exists, uint32_t *first_serial, bool *has_zij, + uint32_t *serial_to, bool *has_merged, uint32_t *merged_serial, + uint64_t *occupied, uint64_t *occupied_total); + +/*! \brief Return true if this zone exists in journal DB. */ +inline static bool journal_is_existing(zone_journal_t j) { + bool ex = false; + (void)journal_info(j, &ex, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + return ex; +} + +/*! + * \brief Call a function for each zone being in the journal DB. + * + * \param db Journal database. + * \param cb Callback to be called for each zone-name found. + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + */ +int journals_walk(knot_lmdb_db_t *db, journals_walk_cb_t cb, void *ctx); diff --git a/src/knot/journal/journal_read.c b/src/knot/journal/journal_read.c new file mode 100644 index 0000000..6c4fc32 --- /dev/null +++ b/src/knot/journal/journal_read.c @@ -0,0 +1,436 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_read.h" + +#include "knot/journal/journal_metadata.h" +#include "knot/journal/knot_lmdb.h" + +#include "contrib/macros.h" +#include "contrib/ucw/lists.h" +#include "contrib/wire_ctx.h" +#include "libknot/error.h" + +#include <stdlib.h> + +struct journal_read { + knot_lmdb_txn_t txn; + MDB_val key_prefix; + const knot_dname_t *zone; + wire_ctx_t wire; + uint32_t next; +}; + +int journal_read_get_error(const journal_read_t *ctx, int another_error) +{ + return (ctx == NULL || ctx->txn.ret == KNOT_EOK ? another_error : ctx->txn.ret); +} + +static void update_ctx_wire(journal_read_t *ctx) +{ + ctx->wire = wire_ctx_init_const(ctx->txn.cur_val.mv_data, ctx->txn.cur_val.mv_size); + wire_ctx_skip(&ctx->wire, JOURNAL_HEADER_SIZE); +} + +static bool go_next_changeset(journal_read_t *ctx, bool go_zone, const knot_dname_t *zone) +{ + free(ctx->key_prefix.mv_data); + ctx->key_prefix = journal_changeset_id_to_key(go_zone, ctx->next, zone); + if (!knot_lmdb_find_prefix(&ctx->txn, &ctx->key_prefix)) { + return false; + } + if (!go_zone && ctx->next == journal_next_serial(&ctx->txn.cur_val)) { + ctx->txn.ret = KNOT_ELOOP; + return false; + } + ctx->next = journal_next_serial(&ctx->txn.cur_val); + update_ctx_wire(ctx); + return true; +} + +int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx) +{ + *ctx = NULL; + if (!journal_is_existing(j)) { // this also opens the LMDB if not already + return KNOT_ENOENT; + } + + journal_read_t *newctx = calloc(1, sizeof(*newctx)); + if (newctx == NULL) { + return KNOT_ENOMEM; + } + + newctx->zone = j.zone; + newctx->next = serial_from; + + knot_lmdb_begin(j.db, &newctx->txn, false); + + if (go_next_changeset(newctx, read_zone, j.zone)) { + *ctx = newctx; + return KNOT_EOK; + } else { + journal_read_end(newctx); + return KNOT_ENOENT; + } +} + +void journal_read_end(journal_read_t *ctx) +{ + if (ctx != NULL) { + free(ctx->key_prefix.mv_data); + knot_lmdb_abort(&ctx->txn); + free(ctx); + } +} + +static bool make_data_available(journal_read_t *ctx) +{ + if (wire_ctx_available(&ctx->wire) == 0) { + if (!knot_lmdb_next(&ctx->txn)) { + return false; + } + if (!knot_lmdb_is_prefix_of(&ctx->key_prefix, &ctx->txn.cur_key)) { + return false; + } + if (ctx->next != journal_next_serial(&ctx->txn.cur_val)) { + // consistency check, see also MR !1270 + ctx->txn.ret = KNOT_EMALF; + return false; + } + update_ctx_wire(ctx); + } + return true; +} + +// thoughts for next design of journal serialization: +// - one TTL per rrset +// - endian +// - optionally storing whole rdataset at once? + +bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rrset, bool allow_next_changeset) +{ + //knot_rdataset_clear(&rrset->rrs, NULL); + //memset(rrset, 0, sizeof(*rrset)); + if (!make_data_available(ctx)) { + if (!allow_next_changeset || !go_next_changeset(ctx, false, ctx->zone)) { + return false; + } + } + rrset->owner = knot_dname_copy(ctx->wire.position, NULL); + wire_ctx_skip(&ctx->wire, knot_dname_size(rrset->owner)); + rrset->type = wire_ctx_read_u16(&ctx->wire); + rrset->rclass = wire_ctx_read_u16(&ctx->wire); + uint16_t rrs_count = wire_ctx_read_u16(&ctx->wire); + for (int i = 0; i < rrs_count && ctx->wire.error == KNOT_EOK; i++) { + if (!make_data_available(ctx)) { + ctx->wire.error = KNOT_EFEWDATA; + } + // TODO think of how to export serialized rr directly to knot_rdataset_add + // focus on: even address aligning + uint32_t ttl = wire_ctx_read_u32(&ctx->wire); + if (i == 0) { + rrset->ttl = ttl; + } + uint16_t len = wire_ctx_read_u16(&ctx->wire); + if (ctx->wire.error == KNOT_EOK) { + ctx->wire.error = knot_rrset_add_rdata(rrset, ctx->wire.position, len, NULL); + } + wire_ctx_skip(&ctx->wire, len); + } + if (ctx->txn.ret == KNOT_EOK) { + ctx->txn.ret = ctx->wire.error == KNOT_ERANGE ? KNOT_EMALF : ctx->wire.error; + } + if (ctx->txn.ret == KNOT_EOK) { + return true; + } else { + journal_read_clear_rrset(rrset); + return false; + } +} + +void journal_read_clear_rrset(knot_rrset_t *rr) +{ + knot_rrset_clear(rr, NULL); +} + +int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx) +{ + knot_rrset_t rr = { 0 }; + bool in_remove_section = false; + int ret = KNOT_EOK; + while (ret == KNOT_EOK && journal_read_rrset(read, &rr, true)) { + if (rr_is_apex_soa(&rr, read->zone)) { + in_remove_section = !in_remove_section; + } + ret = cb(in_remove_section, &rr, ctx); + journal_read_clear_rrset(&rr); + } + ret = journal_read_get_error(read, ret); + journal_read_end(read); + return ret; +} + +static int add_rr_to_contents(zone_contents_t *z, const knot_rrset_t *rrset) +{ + zone_node_t *n = NULL; + return zone_contents_add_rr(z, rrset, &n); + // Shall we ignore ETTL ? +} + +bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch) +{ + zone_contents_t *tree = zone_contents_new(ctx->zone, false); + knot_rrset_t *soa = calloc(1, sizeof(*soa)), rr = { 0 }; + if (tree == NULL || soa == NULL) { + ctx->txn.ret = KNOT_ENOMEM; + goto fail; + } + memset(ch, 0, sizeof(*ch)); + + if (!journal_read_rrset(ctx, soa, true)) { + goto fail; + } + while (journal_read_rrset(ctx, &rr, false)) { + if (rr_is_apex_soa(&rr, ctx->zone)) { + if (ch->soa_from != NULL) { + ctx->txn.ret = KNOT_EMALF; + goto fail; + } + ch->soa_from = soa; + ch->remove = tree; + soa = malloc(sizeof(*soa)); + tree = zone_contents_new(ctx->zone, false); + if (tree == NULL || soa == NULL) { + ctx->txn.ret = KNOT_ENOMEM; + goto fail; + } + *soa = rr; // note this tricky assignment + memset(&rr, 0, sizeof(rr)); + } else { + ctx->txn.ret = add_rr_to_contents(tree, &rr); + journal_read_clear_rrset(&rr); + } + } + + if (ctx->txn.ret == KNOT_EOK) { + ch->soa_to = soa; + ch->add = tree; + return true; + } else { +fail: + journal_read_clear_rrset(&rr); + journal_read_clear_rrset(soa); + free(soa); + changeset_clear(ch); + zone_contents_deep_free(tree); + return false; + } +} + +void journal_read_clear_changeset(changeset_t *ch) +{ + changeset_clear(ch); + memset(ch, 0, sizeof(*ch)); +} + +static int just_load_md(zone_journal_t j, journal_metadata_t *md, bool *has_zij) +{ + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, false); + journal_load_metadata(&txn, j.zone, md); + if (has_zij != NULL) { + *has_zij = journal_contains(&txn, true, 0, j.zone); + } + knot_lmdb_abort(&txn); + return txn.ret; +} + +int journal_walk_from(zone_journal_t j, uint32_t from, + journal_walk_cb_t cb, void *ctx) +{ + bool at_least_one = false; + journal_metadata_t md = { 0 }; + journal_read_t *read = NULL; + changeset_t ch; + + int ret = just_load_md(j, &md, NULL); + if (ret != KNOT_EOK) { + return ret; + } + + if ((md.flags & JOURNAL_SERIAL_TO_VALID) && from != md.serial_to && + ret == KNOT_EOK) { + ret = journal_read_begin(j, false, from, &read); + while (ret == KNOT_EOK && journal_read_changeset(read, &ch)) { + ret = cb(false, &ch, ctx); + at_least_one = true; + journal_read_clear_changeset(&ch); + } + ret = journal_read_get_error(read, ret); + journal_read_end(read); + } + if (!at_least_one && ret == KNOT_EOK) { + ret = cb(false, NULL, ctx); + } + return ret; +} + +// beware, this function does not operate in single txn! +int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx) +{ + int ret = knot_lmdb_exists(j.db); + if (ret == KNOT_ENODB) { + ret = cb(true, NULL, ctx); + if (ret == KNOT_EOK) { + ret = cb(false, NULL, ctx); + } + return ret; + } else if (ret == KNOT_EOK) { + ret = knot_lmdb_open(j.db); + } + if (ret != KNOT_EOK) { + return ret; + } + journal_metadata_t md = { 0 }; + journal_read_t *read = NULL; + changeset_t ch; + bool zone_in_j = false; + ret = just_load_md(j, &md, &zone_in_j); + if (ret != KNOT_EOK) { + return ret; + } + if (zone_in_j) { + ret = journal_read_begin(j, true, 0, &read); + goto read_one_special; + } else if ((md.flags & JOURNAL_MERGED_SERIAL_VALID)) { + ret = journal_read_begin(j, false, md.merged_serial, &read); +read_one_special: + if (ret == KNOT_EOK && journal_read_changeset(read, &ch)) { + ret = cb(true, &ch, ctx); + journal_read_clear_changeset(&ch); + } + ret = journal_read_get_error(read, ret); + journal_read_end(read); + read = NULL; + } else { + ret = cb(true, NULL, ctx); + } + + if (ret == KNOT_EOK) { + ret = journal_walk_from(j, md.first_serial, cb, ctx); + } + return ret; +} + +typedef struct { + size_t observed_count; + size_t observed_merged; + uint32_t merged_serial; + size_t observed_zij; + uint32_t first_serial; + bool first_serial_valid; + uint32_t last_serial; + bool last_serial_valid; +} check_ctx_t; + +static int check_cb(bool special, const changeset_t *ch, void *vctx) +{ + check_ctx_t *ctx = vctx; + if (special && ch != NULL) { + if (ch->remove == NULL) { + ctx->observed_zij++; + ctx->last_serial = changeset_to(ch); + ctx->last_serial_valid = true; + } else { + ctx->merged_serial = changeset_from(ch); + ctx->observed_merged++; + } + } else if (ch != NULL) { + if (!ctx->first_serial_valid) { + ctx->first_serial = changeset_from(ch); + ctx->first_serial_valid = true; + } + ctx->last_serial = changeset_to(ch); + ctx->last_serial_valid = true; + ctx->observed_count++; + } + return KNOT_EOK; +} + +static bool eq(bool a, bool b) +{ + return a ? b : !b; +} + +int journal_sem_check(zone_journal_t j) +{ + check_ctx_t ctx = { 0 }; + journal_metadata_t md = { 0 }; + bool has_zij = false; + + if (!journal_is_existing(j)) { + return KNOT_EOK; + } + + int ret = just_load_md(j, &md, &has_zij); + if (ret == KNOT_EOK) { + ret = journal_walk(j, check_cb, &ctx); + } + if (ret != KNOT_EOK) { + return ret; + } + + if (!eq((md.flags & JOURNAL_SERIAL_TO_VALID), ctx.last_serial_valid)) { + return 101; + } + if (ctx.last_serial_valid && ctx.last_serial != md.serial_to) { + return 102; + } + if (!eq((md.flags & JOURNAL_MERGED_SERIAL_VALID), (ctx.observed_merged > 0))) { + return 103; + } + if (ctx.observed_merged > 1) { + return 104; + } + if (ctx.observed_merged == 1 && ctx.merged_serial != md.merged_serial) { + return 105; + } + if (!eq(has_zij, (ctx.observed_zij > 0))) { + return 106; + } + if (ctx.observed_zij > 1) { + return 107; + } + if (ctx.observed_zij + ctx.observed_merged > 1) { + return 108; + } + if (!eq(((md.flags & JOURNAL_SERIAL_TO_VALID) && md.first_serial != md.serial_to), ctx.first_serial_valid)) { + return 109; + } + if (!eq(ctx.first_serial_valid, (ctx.observed_count > 0))) { + return 110; + } + if (ctx.first_serial_valid && ctx.first_serial != md.first_serial) { + return 111; + } + if (ctx.observed_count != md.changeset_count) { + return 112; + } + if (ctx.observed_merged > 0 && ctx.observed_count == 0) { + return 113; + } + return KNOT_EOK; +} diff --git a/src/knot/journal/journal_read.h b/src/knot/journal/journal_read.h new file mode 100644 index 0000000..92cad9f --- /dev/null +++ b/src/knot/journal/journal_read.h @@ -0,0 +1,158 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/journal/journal_basic.h" + +typedef struct journal_read journal_read_t; + +typedef int (*journal_read_cb_t)(bool in_remove_section, const knot_rrset_t *rr, void *ctx); + +typedef int (*journal_walk_cb_t)(bool special, const changeset_t *ch, void *ctx); + +/*! + * \brief Start reading journal from specified changeset. + * + * \param j Journal to be read. + * \param read_zone True if reading shall start with zone-in-journal. + * \param serial_from Serial-from of the changeset to be started at (ignored if 'read_zone'). + * \param ctx Output: journal reading context initialised. + * + * \return KNOT_E* + */ +int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx); + +/*! + * \brief Read a single RRSet from a journal changeset. + * + * \param ctx Journal reading context. + * \param rr Output: RRSet to be filled with serialized data. + * \param allow_next_changeset True to allow jumping to next changeset. + * + * \return False if no more RRSet in this changeset/journal, or failure. + */ +bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rr, bool allow_next_changeset); + +/*! + * \brief Free up heap allocations by journal_read_rrset(). + * + * \param rr RRSet initialised by journal_read_rrset(). + */ +void journal_read_clear_rrset(knot_rrset_t *rr); + +// TODO move somewhere. Libknot? +inline static bool rr_is_apex_soa(const knot_rrset_t *rr, const knot_dname_t *apex) +{ + return (rr->type == KNOT_RRTYPE_SOA && knot_dname_is_equal(rr->owner, apex)); +} + +/*! + * \brief Read all RRSets up to the end of journal, calling a function for each. + * + * \note Closes reading context at the end. + * + * \param read Journal reading context. + * \param cb Callback to be called on each read. + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + */ +int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx); + +/*! + * \brief Read a single changeset from journal. + * + * \param ctx Journal reading context. + * \param ch Output: changeset to be filled with serialized data. + * + * \return False if no more changesets in the journal, or failure. + */ +bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch); + +/*! + * \brief Free up heap allocations by journal_read_changeset(). + * + * \param ch Changeset initialised by journal_read_changeset(). + */ +void journal_read_clear_changeset(changeset_t *ch); + +/*! + * \brief Obtain error code from the journal_read operations previously performed. + * + * \param ctx Journal reading context. + * \param another_error An error code from outside the reading operations to be combined. + * + * \return KNOT_EOK if completely every operation succeeded, KNOT_E* + */ +int journal_read_get_error(const journal_read_t *ctx, int another_error); + +/*! + * \brief Finalise journal reading. + * + * \param ctx Journal reading context (will be freed). + */ +void journal_read_end(journal_read_t *ctx); + +/*! + * \brief Call a function for each changeset in journal. + * + * This is a variant of journal_walk() see below. + * The difference is that iteration starts at specified serial. + * Similarly to how IXFR works. + * The callback is called for each found changeset, or just once + * with ch=NULL if none is found. + * + * \param j Zone journal to be read. + * \param from SOA serial to start at. + * \param cb Callback to be called for each changeset (or its non-existence). + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + * \retval KNOT_ENOENT if the journal is not empty, but the requested serial not present. + */ +int journal_walk_from(zone_journal_t j, uint32_t from, + journal_walk_cb_t cb, void *ctx); + +/*! + * \brief Call a function for each changeset stored in journal. + * + * First, the callback will be called for the special changeset - + * either zone-in-journal or merged changeset, with special=true. + * If there is no such, it will be called anyway with ch=NULL. + * + * Than, the callback will be called for each regular changeset + * with special=false. If there is none, it will be called once + * with ch=NULL. + * + * \param j Zone journal to be read. + * \param cb Callback to be called for each changeset (or its non-existence). + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + */ +int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx); + +/*! + * \brief Perform semantic check of the zone journal (consistency, metadata...). + * + * \param j Zone journal to be checked. + * + * \retval KNOT_E* ( < 0 ) if an error during journal operation. + * \retval > 100 if some inconsistency found. + * \return KNOT_EOK of all ok. + */ +int journal_sem_check(zone_journal_t j); diff --git a/src/knot/journal/journal_write.c b/src/knot/journal/journal_write.c new file mode 100644 index 0000000..ad1247b --- /dev/null +++ b/src/knot/journal/journal_write.c @@ -0,0 +1,333 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_write.h" + +#include "contrib/macros.h" +#include "knot/journal/journal_metadata.h" +#include "knot/journal/journal_read.h" +#include "knot/journal/serialization.h" +#include "libknot/error.h" + +static void journal_write_serialize(knot_lmdb_txn_t *txn, serialize_ctx_t *ser, + const knot_dname_t *apex, bool zij, uint32_t ch_from, uint32_t ch_to) +{ + MDB_val chunk; + uint32_t i = 0; + while (serialize_unfinished(ser) && txn->ret == KNOT_EOK) { + serialize_prepare(ser, JOURNAL_CHUNK_THRESH - JOURNAL_HEADER_SIZE, + JOURNAL_CHUNK_MAX - JOURNAL_HEADER_SIZE, &chunk.mv_size); + if (chunk.mv_size == 0) { + break; // beware! If this is omitted, it creates empty chunk => EMALF when reading. + } + chunk.mv_size += JOURNAL_HEADER_SIZE; + chunk.mv_data = NULL; + MDB_val key = journal_make_chunk_key(apex, ch_from, zij, i); + if (knot_lmdb_insert(txn, &key, &chunk)) { + journal_make_header(chunk.mv_data, ch_to); + serialize_chunk(ser, chunk.mv_data + JOURNAL_HEADER_SIZE, chunk.mv_size - JOURNAL_HEADER_SIZE); + } + free(key.mv_data); + i++; + } + int ret = serialize_deinit(ser); + if (txn->ret == KNOT_EOK) { + txn->ret = ret; + } +} + +void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch) +{ + serialize_ctx_t *ser = serialize_init(ch); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + return; + } + if (ch->remove == NULL) { + journal_write_serialize(txn, ser, ch->soa_to->owner, true, 0, changeset_to(ch)); + } else { + journal_write_serialize(txn, ser, ch->soa_to->owner, false, changeset_from(ch), changeset_to(ch)); + } +} + +void journal_write_zone(knot_lmdb_txn_t *txn, const zone_contents_t *z) +{ + serialize_ctx_t *ser = serialize_zone_init(z); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + return; + } + journal_write_serialize(txn, ser, z->apex->owner, true, 0, zone_contents_serial(z)); +} + +void journal_write_zone_diff(knot_lmdb_txn_t *txn, const zone_diff_t *z) +{ + serialize_ctx_t *ser = serialize_zone_diff_init(z); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + return; + } + journal_write_serialize(txn, ser, z->apex->owner, false, zone_diff_from(z), zone_diff_to(z)); +} + +static bool delete_one(knot_lmdb_txn_t *txn, bool del_zij, uint32_t del_serial, + const knot_dname_t *zone, uint64_t *freed, uint32_t *next_serial) +{ + *freed = 0; + MDB_val prefix = journal_changeset_id_to_key(del_zij, del_serial, zone); + knot_lmdb_foreach(txn, &prefix) { + *freed += txn->cur_val.mv_size; + *next_serial = journal_next_serial(&txn->cur_val); + knot_lmdb_del_cur(txn); + } + free(prefix.mv_data); + return (*freed > 0); +} + +static int merge_cb(bool remove, const knot_rrset_t *rr, void *ctx) +{ + changeset_t *ch = ctx; + return remove ? (rr_is_apex_soa(rr, ch->soa_to->owner) ? + KNOT_EOK : changeset_add_removal(ch, rr, CHANGESET_CHECK)) + : changeset_add_addition(ch, rr, CHANGESET_CHECK); +} + +void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij, + uint32_t merge_serial, uint32_t *original_serial_to) +{ + changeset_t merge; + memset(&merge, 0, sizeof(merge)); + journal_read_t *read = NULL; + txn->ret = journal_read_begin(j, merge_zij, merge_serial, &read); + if (txn->ret != KNOT_EOK) { + return; + } + if (journal_read_changeset(read, &merge)) { + *original_serial_to = changeset_to(&merge); + } + txn->ret = journal_read_rrsets(read, merge_cb, &merge); + + // deleting seems redundant since the merge changeset will be overwritten + // but it would cause EMALF or invalid data if the new merged has less chunks than before + uint32_t del_next_serial; + uint64_t del_freed; + delete_one(txn, merge_zij, merge_serial, j.zone, &del_freed, &del_next_serial); + assert(del_freed > 0 && del_next_serial == *original_serial_to); + + journal_write_changeset(txn, &merge); + journal_read_clear_changeset(&merge); +} + +static void delete_merged(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + journal_metadata_t *md, uint64_t *freed) +{ + if (!(md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + return; + } + uint32_t unused = 0; + delete_one(txn, false, md->merged_serial, zone, freed, &unused); + md->merged_serial = 0; + md->flags &= ~JOURNAL_MERGED_SERIAL_VALID; +} + +bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone, + uint64_t tofree_size, size_t tofree_count, uint32_t stop_at_serial, + uint64_t *freed_size, size_t *freed_count, uint32_t *stopped_at) +{ + *freed_size = 0; + *freed_count = 0; + uint64_t freed_now; + while (from != stop_at_serial && + (*freed_size < tofree_size || *freed_count < tofree_count) && + delete_one(txn, false, from, zone, &freed_now, stopped_at)) { + *freed_size += freed_now; + ++(*freed_count); + from = *stopped_at; + } + return (*freed_count > 0); +} + +void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md) +{ + bool flush = journal_allow_flush(j); + uint32_t merge_orig = 0; + if (journal_contains(txn, true, 0, j.zone)) { + journal_merge(j, txn, true, 0, &merge_orig); + if (!flush) { + journal_metadata_after_merge(md, true, 0, md->serial_to, merge_orig); + } + } else if (!flush) { + uint32_t merge_serial = ((md->flags & JOURNAL_MERGED_SERIAL_VALID) ? md->merged_serial : md->first_serial); + journal_merge(j, txn, false, merge_serial, &merge_orig); + journal_metadata_after_merge(md, false, merge_serial, md->serial_to, merge_orig); + } + + if (flush) { + // delete merged serial if (very unlikely) exists + if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + uint64_t unused64; + uint32_t unused32; + (void)delete_one(txn, false, md->merged_serial, j.zone, &unused64, &unused32); + md->flags &= ~JOURNAL_MERGED_SERIAL_VALID; + } + + // commit partial job and ask zone to flush itself + journal_store_metadata(txn, j.zone, md); + knot_lmdb_commit(txn); + if (txn->ret == KNOT_EOK) { + txn->ret = KNOT_EBUSY; + } + } +} + +#define U_MINUS(minuend, subtrahend) ((minuend) - MIN((minuend), (subtrahend))) + +void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md, + int64_t max_usage, ssize_t max_count) +{ + uint64_t occupied = journal_get_occupied(txn, j.zone), freed; + uint64_t need_tofree = U_MINUS(occupied, max_usage); + size_t count = md->changeset_count, removed; + size_t need_todel = U_MINUS(count, max_count); + + while ((need_tofree > 0 || need_todel > 0) && txn->ret == KNOT_EOK) { + uint32_t del_from = md->first_serial; // don't move this line outside of the loop + uint32_t del_upto = md->flushed_upto; + (void)journal_serial_to(txn, true, 0, j.zone, &del_upto); // in case zij present and wrong flushed_upto, avoid discontinuity + freed = 0; + removed = 0; + journal_delete(txn, del_from, j.zone, need_tofree, need_todel, + del_upto, &freed, &removed, &del_from); + if (freed == 0) { + if (del_upto != md->serial_to) { + journal_try_flush(j, txn, md); + } else { + txn->ret = KNOT_ESPACE; + break; + } + } else { + journal_metadata_after_delete(md, del_from, removed); + need_tofree = U_MINUS(need_tofree, freed); + need_todel = U_MINUS(need_todel, removed); + } + } +} + +int journal_insert_zone(zone_journal_t j, const zone_contents_t *z) +{ + changeset_t fake_ch = { .add = (zone_contents_t *)z }; + size_t ch_size = changeset_serialized_size(&fake_ch); + size_t max_usage = journal_conf_max_usage(j); + if (ch_size >= max_usage) { + return KNOT_ESPACE; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + + update_last_inserter(&txn, j.zone); + journal_del_zone_txn(&txn, j.zone); + + journal_write_zone(&txn, z); + + journal_metadata_t md = { 0 }; + md.flags = JOURNAL_SERIAL_TO_VALID; + md.serial_to = zone_contents_serial(z); + md.first_serial = md.serial_to; + journal_store_metadata(&txn, j.zone, &md); + + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_insert(zone_journal_t j, const changeset_t *ch, const changeset_t *extra, + const zone_diff_t *zdiff) +{ + assert(zdiff == NULL || (ch == NULL && extra == NULL)); + + size_t ch_size = zdiff == NULL ? changeset_serialized_size(ch) : + zone_diff_serialized_size(*zdiff); + size_t max_usage = journal_conf_max_usage(j); + if (ch_size >= max_usage) { + return KNOT_ESPACE; + } + + uint32_t ch_from = zdiff == NULL ? changeset_from(ch) : zone_diff_from(zdiff); + uint32_t ch_to = zdiff == NULL ? changeset_to(ch) : zone_diff_to(zdiff); + if (extra != NULL && (changeset_to(extra) != ch_to || + changeset_from(extra) == ch_from)) { + return KNOT_EINVAL; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + journal_load_metadata(&txn, j.zone, &md); + + update_last_inserter(&txn, j.zone); + + if (extra != NULL) { + if (journal_contains(&txn, true, 0, j.zone)) { + txn.ret = KNOT_ESEMCHECK; + } + uint64_t merged_freed = 0; + delete_merged(&txn, j.zone, &md, &merged_freed); + ch_size += changeset_serialized_size(extra); + ch_size -= merged_freed; + md.flushed_upto = md.serial_to; // set temporarily + md.flags |= JOURNAL_LAST_FLUSHED_VALID; + } + + size_t chs_limit = journal_conf_max_changesets(j); + journal_fix_occupation(j, &txn, &md, max_usage - ch_size, chs_limit - 1); + + // avoid discontinuity + if ((md.flags & JOURNAL_SERIAL_TO_VALID) && md.serial_to != ch_from) { + if (journal_contains(&txn, true, 0, j.zone)) { + txn.ret = KNOT_ESEMCHECK; + } else { + journal_del_zone_txn(&txn, j.zone); + memset(&md, 0, sizeof(md)); + } + } + + // avoid cycle + if (journal_contains(&txn, false, ch_to, j.zone)) { + journal_fix_occupation(j, &txn, &md, INT64_MAX, 1); + } + + if (zdiff == NULL) { + journal_write_changeset(&txn, ch); + } else { + journal_write_zone_diff(&txn, zdiff); + } + journal_metadata_after_insert(&md, ch_from, ch_to); + + if (extra != NULL) { + journal_write_changeset(&txn, extra); + journal_metadata_after_extra(&md, changeset_from(extra), changeset_to(extra)); + } + + journal_store_metadata(&txn, j.zone, &md); + knot_lmdb_commit(&txn); + return txn.ret; +} diff --git a/src/knot/journal/journal_write.h b/src/knot/journal/journal_write.h new file mode 100644 index 0000000..a55fd34 --- /dev/null +++ b/src/knot/journal/journal_write.h @@ -0,0 +1,121 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/journal/journal_basic.h" +#include "knot/journal/journal_metadata.h" +#include "knot/journal/serialization.h" + +/*! + * \brief Serialize a changeset into chunks and write it into DB with no checks and metadata update. + * + * \param txn Journal DB transaction. + * \param ch Changeset to be written. + */ +void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch); + +/*! + * \brief Serialize zone contents aka "bootstrap" changeset into journal, no checks. + * + * \param txn Journal DB transaction. + * \param z Zone contents to be written. + */ +void journal_write_zone(knot_lmdb_txn_t *txn, const zone_contents_t *z); + +/*! + * \brief Merge all following changeset into one of journal changeset. + * + * \param j Zone journal. + * \param txn Journal DB transaction. + * \param merge_zij True if we shall merge into zone-in-journal. + * \param merge_serial Serial-from of the changeset to be merged into (ignored if 'merge_zij'). + * \param original_serial_to Output: previous serial-to of the merged changeset before merge. + * + * \note The error code will be in thx->ret. + */ +void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij, + uint32_t merge_serial, uint32_t *original_serial_to); + +/*! + * \brief Delete some journal changesets in attempt to fulfill usage quotas. + * + * \param txn Journal DB transaction. + * \param from Serial-from of the first changeset to be deleted. + * \param zone Zone name. + * \param tofree_size Amount of data (in bytes) to be at least deleted. + * \param tofree_count Number of changesets to be at least deleted. + * \param stop_at_serial Must not delete the changeset with this serial-from. + * \param freed_size Output: amount of data really deleted. + * \param freed_count Output: number of changesets really freed. + * \param stopped_at Output: serial-to of the last deleted changeset. + * + * \return True if something was deleted (not necessarily fulfilling tofree_*). + */ +bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone, + uint64_t tofree_size, size_t tofree_count, uint32_t stop_at_serial, + uint64_t *freed_size, size_t *freed_count, uint32_t *stopped_at); + +/*! + * \brief Perform a merge or zone flush in order to enable deleting more changesets. + * + * \param j Zone journal. + * \param txn Journal DB transaction. + * \param md Journal metadata. + * + * \note It might set txn->ret to KNOT_EBUSY to fail out from this operation and let the zone flush itself. + */ +void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md); + +/*! + * \brief Perform delete/merge/flush operations to fulfill configured journal quotas. + * + * \param j Zone journal. + * \param txn Journal DB transaction. + * \param md Journal metadata. + * \param max_usage Configured maximum usage (in bytes) of journal DB by this zone. + * \param max_count Configured maximum number of changesets. + */ +void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md, + int64_t max_usage, ssize_t max_count); + +/*! + * \brief Store zone-in-journal into the journal, update metadata. + * + * \param j Zone journal. + * \param z Zone contents to be stored. + * + * \return KNOT_E* + */ +int journal_insert_zone(zone_journal_t j, const zone_contents_t *z); + +/*! + * \brief Store changeset into journal, fulfilling quotas and updating metadata. + * + * \param j Zone journal. + * \param ch Changeset to be stored. + * \param extra Extra changeset to be stored in the role of merged changeset. + * \param zdiff Zone diff to be stored instead of changeset. + * + * \note The extra changesetis being stored on zone load, it is basically the diff + * between zonefile and loaded zone contents. Afterwards, it will be treated + * the same like merged changeset. Inserting it requires no zone-in-journal + * present and leads to deleting any previous merged changeset. + * + * \return KNOT_E* + */ +int journal_insert(zone_journal_t j, const changeset_t *ch, const changeset_t *extra, + const zone_diff_t *zdiff); diff --git a/src/knot/journal/knot_lmdb.c b/src/knot/journal/knot_lmdb.c new file mode 100644 index 0000000..bc17462 --- /dev/null +++ b/src/knot/journal/knot_lmdb.c @@ -0,0 +1,770 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <stdarg.h> +#include <stdio.h> // snprintf +#include <stdlib.h> +#include <sys/stat.h> +#include <unistd.h> + +#include "knot/journal/knot_lmdb.h" + +#include "knot/conf/conf.h" +#include "contrib/files.h" +#include "contrib/wire_ctx.h" +#include "libknot/dname.h" +#include "libknot/endian.h" +#include "libknot/error.h" + +#define LMDB_DIR_MODE 0770 +#define LMDB_FILE_MODE 0660 + +static void err_to_knot(int *err) +{ + switch (*err) { + case MDB_SUCCESS: + *err = KNOT_EOK; + break; + case MDB_NOTFOUND: + *err = KNOT_ENOENT; + break; + case MDB_TXN_FULL: + *err = KNOT_ELIMIT; + break; + case MDB_MAP_FULL: + case ENOSPC: + *err = KNOT_ESPACE; + break; + default: + *err = (*err < 0 ? *err : -*err); + } +} + +void knot_lmdb_init(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags, const char *dbname) +{ +#ifdef __OpenBSD__ + /* + * Enforce that MDB_WRITEMAP is set. + * + * MDB assumes a unified buffer cache. + * + * See https://www.openldap.org/pub/hyc/mdm-paper.pdf section 3.1, + * references 17, 18, and 19. + * + * From Howard Chu: "This requirement can be relaxed in the + * current version of the library. If you create the environment + * with the MDB_WRITEMAP option then all reads and writes are + * performed using mmap, so the file buffer cache is irrelevant. + * Of course then you lose the protection that the read-only + * map offers." + */ + env_flags |= MDB_WRITEMAP; +#endif + db->env = NULL; + db->path = strdup(path); + db->mapsize = mapsize; + db->env_flags = env_flags; + db->dbname = dbname; + pthread_mutex_init(&db->opening_mutex, NULL); + db->maxdbs = 2; + db->maxreaders = conf_lmdb_readers(conf()); +} + +static int lmdb_stat(const char *lmdb_path, struct stat *st) +{ + char data_mdb[strlen(lmdb_path) + 10]; + (void)snprintf(data_mdb, sizeof(data_mdb), "%s/data.mdb", lmdb_path); + if (stat(data_mdb, st) == 0) { + return st->st_size > 0 ? KNOT_EOK : KNOT_ENODB; + } else if (errno == ENOENT) { + return KNOT_ENODB; + } else { + return knot_map_errno(); + } +} + +int knot_lmdb_exists(knot_lmdb_db_t *db) +{ + if (db->env != NULL) { + return KNOT_EOK; + } + if (db->path == NULL) { + return KNOT_ENODB; + } + struct stat unused; + return lmdb_stat(db->path, &unused); +} + +static int fix_mapsize(knot_lmdb_db_t *db) +{ + if (db->mapsize == 0) { + struct stat st; + int ret = lmdb_stat(db->path, &st); + if (ret != KNOT_EOK) { + return ret; + } + db->mapsize = st.st_size * 2; // twice the size as DB might grow while we read it + db->env_flags |= MDB_RDONLY; + } + return KNOT_EOK; +} + +size_t knot_lmdb_copy_size(knot_lmdb_db_t *to_copy) +{ + size_t copy_size = 1048576; + struct stat st; + if (lmdb_stat(to_copy->path, &st) == KNOT_EOK) { + copy_size += st.st_size * 2; + } + return copy_size; +} + +static int lmdb_open(knot_lmdb_db_t *db) +{ + MDB_txn *init_txn = NULL; + + if (db->env != NULL) { + return KNOT_EOK; + } + + if (db->path == NULL) { + return KNOT_ENOMEM; + } + + int ret = fix_mapsize(db); + if (ret != KNOT_EOK) { + return ret; + } + + ret = make_dir(db->path, LMDB_DIR_MODE, true); + if (ret != KNOT_EOK) { + return ret; + } + + long page_size = sysconf(_SC_PAGESIZE); + if (page_size <= 0) { + return KNOT_ERROR; + } + size_t mapsize = (db->mapsize / page_size + 1) * page_size; + + ret = mdb_env_create(&db->env); + if (ret != MDB_SUCCESS) { + err_to_knot(&ret); + return ret; + } + + ret = mdb_env_set_mapsize(db->env, mapsize); + if (ret == MDB_SUCCESS) { + ret = mdb_env_set_maxdbs(db->env, db->maxdbs); + } + if (ret == MDB_SUCCESS) { + ret = mdb_env_set_maxreaders(db->env, db->maxreaders); + } + if (ret == MDB_SUCCESS) { + ret = mdb_env_open(db->env, db->path, db->env_flags, LMDB_FILE_MODE); + } + if (ret == MDB_SUCCESS) { + unsigned init_txn_flags = (db->env_flags & MDB_RDONLY); + ret = mdb_txn_begin(db->env, NULL, init_txn_flags, &init_txn); + if (ret == MDB_READERS_FULL) { + int cleared = 0; + ret = mdb_reader_check(db->env, &cleared); + if (ret == MDB_SUCCESS) { + ret = mdb_txn_begin(db->env, NULL, init_txn_flags, &init_txn); + } + } + } + if (ret == MDB_SUCCESS) { + ret = mdb_dbi_open(init_txn, db->dbname, MDB_CREATE, &db->dbi); + } + if (ret == MDB_SUCCESS) { + ret = mdb_txn_commit(init_txn); + } + + if (ret != MDB_SUCCESS) { + if (init_txn != NULL) { + mdb_txn_abort(init_txn); + } + mdb_env_close(db->env); + db->env = NULL; + } + err_to_knot(&ret); + return ret; +} + +int knot_lmdb_open(knot_lmdb_db_t *db) +{ + pthread_mutex_lock(&db->opening_mutex); + int ret = lmdb_open(db); + pthread_mutex_unlock(&db->opening_mutex); + return ret; +} + +static void lmdb_close(knot_lmdb_db_t *db) +{ + if (db->env != NULL) { + mdb_dbi_close(db->env, db->dbi); + mdb_env_close(db->env); + db->env = NULL; + } +} + +void knot_lmdb_close(knot_lmdb_db_t *db) +{ + pthread_mutex_lock(&db->opening_mutex); + lmdb_close(db); + pthread_mutex_unlock(&db->opening_mutex); +} + +static int lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags) +{ +#ifdef __OpenBSD__ + env_flags |= MDB_WRITEMAP; +#endif + if (strcmp(db->path, path) == 0 && db->mapsize == mapsize && db->env_flags == env_flags) { + return KNOT_EOK; + } + if (db->env != NULL) { + return KNOT_EISCONN; + } + free(db->path); + db->path = strdup(path); + db->mapsize = mapsize; + db->env_flags = env_flags; + return KNOT_EOK; +} + +int knot_lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags) +{ + pthread_mutex_lock(&db->opening_mutex); + int ret = lmdb_reinit(db, path, mapsize, env_flags); + pthread_mutex_unlock(&db->opening_mutex); + return ret; +} + +int knot_lmdb_reconfigure(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags) +{ + pthread_mutex_lock(&db->opening_mutex); + int ret = lmdb_reinit(db, path, mapsize, env_flags); + if (ret != KNOT_EOK) { + lmdb_close(db); + ret = lmdb_reinit(db, path, mapsize, env_flags); + if (ret == KNOT_EOK) { + ret = lmdb_open(db); + } + } + pthread_mutex_unlock(&db->opening_mutex); + return ret; +} + +void knot_lmdb_deinit(knot_lmdb_db_t *db) +{ + knot_lmdb_close(db); + pthread_mutex_destroy(&db->opening_mutex); + free(db->path); +} + +void knot_lmdb_begin(knot_lmdb_db_t *db, knot_lmdb_txn_t *txn, bool rw) +{ + txn->ret = mdb_txn_begin(db->env, NULL, rw ? 0 : MDB_RDONLY, &txn->txn); + err_to_knot(&txn->ret); + if (txn->ret == KNOT_EOK) { + txn->opened = true; + txn->db = db; + txn->is_rw = rw; + } +} + +void knot_lmdb_abort(knot_lmdb_txn_t *txn) +{ + if (txn->opened) { + if (txn->cursor != NULL) { + mdb_cursor_close(txn->cursor); + txn->cursor = NULL; + } + mdb_txn_abort(txn->txn); + txn->opened = false; + } +} + +static bool txn_semcheck(knot_lmdb_txn_t *txn) +{ + if (!txn->opened && txn->ret == KNOT_EOK) { + txn->ret = KNOT_ESEMCHECK; + } + if (txn->ret != KNOT_EOK) { + knot_lmdb_abort(txn); + return false; + } + return true; +} + +void knot_lmdb_commit(knot_lmdb_txn_t *txn) +{ + if (!txn_semcheck(txn)) { + return; + } + if (txn->cursor != NULL) { + mdb_cursor_close(txn->cursor); + txn->cursor = NULL; + } + txn->ret = mdb_txn_commit(txn->txn); + err_to_knot(&txn->ret); + txn->opened = false; +} + +// save the programmer's frequent checking for ENOMEM when creating search keys +static bool txn_enomem(knot_lmdb_txn_t *txn, const MDB_val *tocheck) +{ + if (tocheck->mv_data == NULL) { + txn->ret = KNOT_ENOMEM; + knot_lmdb_abort(txn); + return false; + } + return true; +} + +static bool init_cursor(knot_lmdb_txn_t *txn) +{ + if (txn->cursor == NULL) { + txn->ret = mdb_cursor_open(txn->txn, txn->db->dbi, &txn->cursor); + err_to_knot(&txn->ret); + if (txn->ret != KNOT_EOK) { + knot_lmdb_abort(txn); + return false; + } + } + return true; +} + +static bool curget(knot_lmdb_txn_t *txn, MDB_cursor_op op) +{ + txn->ret = mdb_cursor_get(txn->cursor, &txn->cur_key, &txn->cur_val, op); + err_to_knot(&txn->ret); + if (txn->ret == KNOT_ENOENT) { + txn->ret = KNOT_EOK; + return false; + } + return (txn->ret == KNOT_EOK); +} + +static int mdb_val_clone(const MDB_val *orig, MDB_val *clone) +{ + clone->mv_data = malloc(orig->mv_size); + if (clone->mv_data == NULL) { + return KNOT_ENOMEM; + } + clone->mv_size = orig->mv_size; + memcpy(clone->mv_data, orig->mv_data, clone->mv_size); + return KNOT_EOK; +} + +bool knot_lmdb_find(knot_lmdb_txn_t *txn, MDB_val *what, knot_lmdb_find_t how) +{ + if (!txn_semcheck(txn) || !init_cursor(txn) || !txn_enomem(txn, what)) { + return false; + } + txn->cur_key.mv_size = what->mv_size; + txn->cur_key.mv_data = what->mv_data; + txn->cur_val.mv_size = 0; + txn->cur_val.mv_data = NULL; + knot_lmdb_find_t cmp = (how & 3); + bool succ = curget(txn, cmp == KNOT_LMDB_EXACT ? MDB_SET : MDB_SET_RANGE); + if (cmp == KNOT_LMDB_LEQ && txn->ret == KNOT_EOK) { + // LEQ is not supported by LMDB, we use GEQ and go back + if (succ) { + if (txn->cur_key.mv_size != what->mv_size || + memcmp(txn->cur_key.mv_data, what->mv_data, what->mv_size) != 0) { + succ = curget(txn, MDB_PREV); + } + } else { + succ = curget(txn, MDB_LAST); + } + } + + if ((how & KNOT_LMDB_FORCE) && !succ && txn->ret == KNOT_EOK) { + txn->ret = KNOT_ENOENT; + } + + return succ; +} + +// this is not bulletproof thread-safe (in case of LMDB fail-teardown, but mostly OK +int knot_lmdb_find_threadsafe(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val, knot_lmdb_find_t how) +{ + assert(how == KNOT_LMDB_EXACT); + if (key->mv_data == NULL) { + return KNOT_ENOMEM; + } + if (!txn->opened) { + return KNOT_EINVAL; + } + if (txn->ret != KNOT_EOK) { + return txn->ret; + } + MDB_val tmp = { 0 }; + int ret = mdb_get(txn->txn, txn->db->dbi, key, &tmp); + err_to_knot(&ret); + if (ret == KNOT_EOK) { + ret = mdb_val_clone(&tmp, val); + } + return ret; +} + +bool knot_lmdb_first(knot_lmdb_txn_t *txn) +{ + return txn_semcheck(txn) && init_cursor(txn) && curget(txn, MDB_FIRST); +} + +bool knot_lmdb_next(knot_lmdb_txn_t *txn) +{ + if (txn->cursor == NULL && txn->ret == KNOT_EOK) { + txn->ret = KNOT_EINVAL; + } + if (!txn_semcheck(txn)) { + return false; + } + return curget(txn, MDB_NEXT); +} + +bool knot_lmdb_is_prefix_of(const MDB_val *prefix, const MDB_val *of) +{ + return prefix->mv_size <= of->mv_size && + memcmp(prefix->mv_data, of->mv_data, prefix->mv_size) == 0; +} + +void knot_lmdb_del_cur(knot_lmdb_txn_t *txn) +{ + if (txn_semcheck(txn)) { + txn->ret = mdb_cursor_del(txn->cursor, 0); + err_to_knot(&txn->ret); + } +} + +void knot_lmdb_del_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix) +{ + knot_lmdb_foreach(txn, prefix) { + knot_lmdb_del_cur(txn); + } +} + +int knot_lmdb_apply_threadsafe(knot_lmdb_txn_t *txn, const MDB_val *key, bool prefix, lmdb_apply_cb cb, void *ctx) +{ + MDB_cursor *cursor; + int ret = mdb_cursor_open(txn->txn, txn->db->dbi, &cursor); + err_to_knot(&ret); + if (ret != KNOT_EOK) { + return ret; + } + + MDB_val getkey = *key, getval = { 0 }; + ret = mdb_cursor_get(cursor, &getkey, &getval, prefix ? MDB_SET_RANGE : MDB_SET); + err_to_knot(&ret); + if (ret != KNOT_EOK) { + mdb_cursor_close(cursor); + if (prefix && ret == KNOT_ENOENT) { + return KNOT_EOK; + } + return ret; + } + + if (prefix) { + while (knot_lmdb_is_prefix_of(key, &getkey) && ret == KNOT_EOK) { + ret = cb(&getkey, &getval, ctx); + if (ret == KNOT_EOK) { + ret = mdb_cursor_get(cursor, &getkey, &getval, MDB_NEXT); + err_to_knot(&ret); + } + } + if (ret == KNOT_ENOENT) { + ret = KNOT_EOK; + } + } else { + ret = cb(&getkey, &getval, ctx); + } + mdb_cursor_close(cursor); + return ret; +} + +bool knot_lmdb_insert(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val) +{ + if (txn_semcheck(txn) && txn_enomem(txn, key)) { + unsigned flags = (val->mv_size > 0 && val->mv_data == NULL ? MDB_RESERVE : 0); + txn->ret = mdb_put(txn->txn, txn->db->dbi, key, val, flags); + err_to_knot(&txn->ret); + } + return (txn->ret == KNOT_EOK); +} + +int knot_lmdb_quick_insert(knot_lmdb_db_t *db, MDB_val key, MDB_val val) +{ + if (val.mv_data == NULL) { + free(key.mv_data); + return KNOT_ENOMEM; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(db, &txn, true); + knot_lmdb_insert(&txn, &key, &val); + free(key.mv_data); + free(val.mv_data); + knot_lmdb_commit(&txn); + return txn.ret; +} + +int knot_lmdb_copy_prefix(knot_lmdb_txn_t *from, knot_lmdb_txn_t *to, MDB_val *prefix) +{ + knot_lmdb_foreach(to, prefix) { + knot_lmdb_del_cur(to); + } + if (to->ret != KNOT_EOK) { + return to->ret; + } + knot_lmdb_foreach(from, prefix) { + knot_lmdb_insert(to, &from->cur_key, &from->cur_val); + } + return from->ret == KNOT_EOK ? to->ret : from->ret; +} + +int knot_lmdb_copy_prefixes(knot_lmdb_db_t *from, knot_lmdb_db_t *to, + MDB_val *prefixes, size_t n_prefixes) +{ + if (n_prefixes < 1) { + return KNOT_EOK; + } + if (from == NULL || to == NULL || prefixes == NULL) { + return KNOT_EINVAL; + } + int ret = knot_lmdb_open(from); + if (ret == KNOT_EOK) { + ret = knot_lmdb_open(to); + } + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t tr = { 0 }, tw = { 0 }; + knot_lmdb_begin(from, &tr, false); + knot_lmdb_begin(to, &tw, true); + for (size_t i = 0; i < n_prefixes && ret == KNOT_EOK; i++) { + ret = knot_lmdb_copy_prefix(&tr, &tw, &prefixes[i]); + } + knot_lmdb_commit(&tw); + knot_lmdb_commit(&tr); + return ret == KNOT_EOK ? tw.ret : ret; +} + +size_t knot_lmdb_usage(knot_lmdb_txn_t *txn) +{ + if (!txn_semcheck(txn)) { + return 0; + } + MDB_stat st = { 0 }; + txn->ret = mdb_stat(txn->txn, txn->db->dbi, &st); + err_to_knot(&txn->ret); + + size_t pgs_used = st.ms_branch_pages + st.ms_leaf_pages + st.ms_overflow_pages; + return (pgs_used * st.ms_psize); +} + +static bool make_key_part(void *key_data, size_t key_len, const char *format, va_list arg) +{ + wire_ctx_t wire = wire_ctx_init(key_data, key_len); + const char *tmp_s; + const knot_dname_t *tmp_d; + const void *tmp_v; + size_t tmp; + + for (const char *f = format; *f != '\0'; f++) { + switch (*f) { + case 'B': + wire_ctx_write_u8(&wire, va_arg(arg, int)); + break; + case 'H': + wire_ctx_write_u16(&wire, va_arg(arg, int)); + break; + case 'I': + wire_ctx_write_u32(&wire, va_arg(arg, uint32_t)); + break; + case 'L': + wire_ctx_write_u64(&wire, va_arg(arg, uint64_t)); + break; + case 'S': + tmp_s = va_arg(arg, const char *); + wire_ctx_write(&wire, tmp_s, strlen(tmp_s) + 1); + break; + case 'N': + tmp_d = va_arg(arg, const knot_dname_t *); + wire_ctx_write(&wire, tmp_d, knot_dname_size(tmp_d)); + break; + case 'D': + tmp_v = va_arg(arg, const void *); + tmp = va_arg(arg, size_t); + wire_ctx_write(&wire, tmp_v, tmp); + break; + } + } + + return wire.error == KNOT_EOK && wire_ctx_available(&wire) == 0; +} + +MDB_val knot_lmdb_make_key(const char *format, ...) +{ + MDB_val key = { 0 }; + va_list arg; + const char *tmp_s; + const knot_dname_t *tmp_d; + + // first, just determine the size of the key + va_start(arg, format); + for (const char *f = format; *f != '\0'; f++) { + switch (*f) { + case 'B': + key.mv_size += sizeof(uint8_t); + (void)va_arg(arg, int); // uint8_t will be promoted to int + break; + case 'H': + key.mv_size += sizeof(uint16_t); + (void)va_arg(arg, int); // uint16_t will be promoted to int + break; + case 'I': + key.mv_size += sizeof(uint32_t); + (void)va_arg(arg, uint32_t); + break; + case 'L': + key.mv_size += sizeof(uint64_t); + (void)va_arg(arg, uint64_t); + break; + case 'S': + tmp_s = va_arg(arg, const char *); + key.mv_size += strlen(tmp_s) + 1; + break; + case 'N': + tmp_d = va_arg(arg, const knot_dname_t *); + key.mv_size += knot_dname_size(tmp_d); + break; + case 'D': + (void)va_arg(arg, const void *); + key.mv_size += va_arg(arg, size_t); + break; + } + } + va_end(arg); + + // second, alloc the key and fill it + if (key.mv_size > 0) { + key.mv_data = malloc(key.mv_size); + } + if (key.mv_data == NULL) { + return key; + } + va_start(arg, format); + bool succ = make_key_part(key.mv_data, key.mv_size, format, arg); + assert(succ); + (void)succ; + va_end(arg); + return key; +} + +bool knot_lmdb_make_key_part(void *key_data, size_t key_len, const char *format, ...) +{ + va_list arg; + va_start(arg, format); + bool succ = make_key_part(key_data, key_len, format, arg); + va_end(arg); + return succ; +} + +static bool unmake_key_part(const void *key_data, size_t key_len, const char *format, va_list arg) +{ + if (key_data == NULL) { + return false; + } + wire_ctx_t wire = wire_ctx_init_const(key_data, key_len); + for (const char *f = format; *f != '\0' && wire.error == KNOT_EOK && wire_ctx_available(&wire) > 0; f++) { + void *tmp = va_arg(arg, void *); + size_t tmsize; + switch (*f) { + case 'B': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint8_t)); + } else { + *(uint8_t *)tmp = wire_ctx_read_u8(&wire); + } + break; + case 'H': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint16_t)); + } else { + *(uint16_t *)tmp = wire_ctx_read_u16(&wire); + } + break; + case 'I': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint32_t)); + } else { + *(uint32_t *)tmp = wire_ctx_read_u32(&wire); + } + break; + case 'L': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint64_t)); + } else { + *(uint64_t *)tmp = wire_ctx_read_u64(&wire); + } + break; + case 'S': + if (tmp != NULL) { + *(const char **)tmp = (const char *)wire.position; + } + wire_ctx_skip(&wire, strlen((const char *)wire.position) + 1); + break; + case 'N': + if (tmp != NULL) { + *(const knot_dname_t **)tmp = (const knot_dname_t *)wire.position; + } + wire_ctx_skip(&wire, knot_dname_size((const knot_dname_t *)wire.position)); + break; + case 'D': + tmsize = va_arg(arg, size_t); + if (tmp != NULL) { + memcpy(tmp, wire.position, tmsize); + } + wire_ctx_skip(&wire, tmsize); + break; + } + } + return (wire.error == KNOT_EOK && wire_ctx_available(&wire) == 0); +} + +bool knot_lmdb_unmake_key(const void *key_data, size_t key_len, const char *format, ...) +{ + va_list arg; + va_start(arg, format); + bool succ = unmake_key_part(key_data, key_len, format, arg); + va_end(arg); + return succ; +} + +bool knot_lmdb_unmake_curval(knot_lmdb_txn_t *txn, const char *format, ...) +{ + va_list arg; + va_start(arg, format); + bool succ = unmake_key_part(txn->cur_val.mv_data, txn->cur_val.mv_size, format, arg); + va_end(arg); + if (!succ && txn->ret == KNOT_EOK) { + txn->ret = KNOT_EMALF; + } + return succ; +} diff --git a/src/knot/journal/knot_lmdb.h b/src/knot/journal/knot_lmdb.h new file mode 100644 index 0000000..6214a10 --- /dev/null +++ b/src/knot/journal/knot_lmdb.h @@ -0,0 +1,446 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <lmdb.h> + +#include <stdbool.h> +#include <stdint.h> +#include <stdlib.h> +#include <pthread.h> + +typedef struct knot_lmdb_db { + MDB_dbi dbi; + MDB_env *env; + pthread_mutex_t opening_mutex; + + // those are static options. Set them after knot_lmdb_init(). + unsigned maxdbs; + unsigned maxreaders; + + // those are internal options. Please don't touch them directly. + size_t mapsize; + unsigned env_flags; // MDB_NOTLS, MDB_RDONLY, MDB_WRITEMAP, MDB_DUPSORT, MDB_NOSYNC, MDB_MAPASYNC + const char *dbname; + char *path; +} knot_lmdb_db_t; + +typedef struct { + MDB_txn *txn; + MDB_cursor *cursor; + MDB_val cur_key; + MDB_val cur_val; + + bool opened; + bool is_rw; + int ret; + knot_lmdb_db_t *db; +} knot_lmdb_txn_t; + +typedef enum { + KNOT_LMDB_EXACT = 3, /*! \brief Search for exactly matching key. */ + KNOT_LMDB_LEQ = 1, /*! \brief Search lexicographically lower or equal key. */ + KNOT_LMDB_GEQ = 2, /*! \brief Search lexicographically greater or equal key. */ + KNOT_LMDB_FORCE = 4, /*! \brief If no matching key found, consider it a transaction failure (KNOT_ENOENT). */ +} knot_lmdb_find_t; + +/*! + * \brief Callback used in sweep functions. + * + * \retval true for zones to preserve. + * \retval false for zones to remove. + */ +typedef bool (*sweep_cb)(const uint8_t *zone, void *data); + +/*! + * \brief Callback used in copy functions. + * + * \retval true if the current record shall be copied + * \retval false if the current record shall be skipped + */ +typedef bool (*knot_lmdb_copy_cb)(MDB_val *cur_key, MDB_val *cur_val); + +/*! + * \brief Initialise the DB handling structure. + * + * \param db DB handling structure. + * \param path Path to LMDB database on filesystem. + * \param mapsize Maximum size of the DB on FS. + * \param env_flags LMDB environment flags (e.g. MDB_RDONLY) + * \param dbname Optional: name of the sub-database. + */ +void knot_lmdb_init(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags, const char *dbname); + +/*! + * \brief Check if the database exists on the filesystem. + * + * \param db The DB in question. + * + * \retval KNOT_EOK The database exists (and is accessible for stat() ). + * \retval KNOT_ENODB The database doesn't exist. + * \return KNOT_E* explaining why stat() failed. + */ +int knot_lmdb_exists(knot_lmdb_db_t *db); + +/*! + * \brief Big enough mapsize for new database to hold a copy of to_copy. + */ +size_t knot_lmdb_copy_size(knot_lmdb_db_t *to_copy); + +/*! + * \brief Open the previously initialised DB. + * + * \param db The DB to be opened. + * + * \note If db->mapsize is zero, it will be set to twice the current size, and DB opened read-only! + * + * \return KNOT_E* + */ +int knot_lmdb_open(knot_lmdb_db_t *db); + +/*! + * \brief Close the database, but keep it initialised. + * + * \param db The DB to be closed. + */ +void knot_lmdb_close(knot_lmdb_db_t *db); + +/*! + * \brief Re-initialise existing DB with modified parameters. + * + * \note If the parameters differ and DB is open, it will be refused. + * + * \param db The DB to be modified. + * \param path New path to the DB. + * \param mapsize New mapsize. + * \param env_flags New LMDB environment flags. + * + * \return KNOT_EOK on success, KNOT_EISCONN if not possible. + */ +int knot_lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags); + +/*! + * \brief Re-open opened DB with modified parameters. + * + * \note The DB will be first closed, re-initialised and finally opened again. + * + * \note There must not be any DB transaction during this process. + * + * \param db The DB to be modified. + * \param path New path to the DB. + * \param mapsize New mapsize. + * \param env_flags New LMDB environment flags. + * + * \return KNOT_E* + */ +int knot_lmdb_reconfigure(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags); + +/*! + * \brief Close and de-initialise DB. + * + * \param db DB to be deinitialized. + */ +void knot_lmdb_deinit(knot_lmdb_db_t *db); + +/*! + * \brief Return true if DB is open. + */ +inline static bool knot_lmdb_is_open(knot_lmdb_db_t *db) { return db != NULL && db->env != NULL; } + +/*! + * \brief Start a DB transaction. + * + * \param db The database. + * \param txn Transaction handling structure to be initialised. + * \param rw True for read-write transaction, false for read-only. + * + * \note The error code will be stored in txn->ret. + */ +void knot_lmdb_begin(knot_lmdb_db_t *db, knot_lmdb_txn_t *txn, bool rw); + +/*! + * \brief Abort a transaction. + * + * \param txn Transaction to be aborted. + */ +void knot_lmdb_abort(knot_lmdb_txn_t *txn); + +/*! + * \brief Commit a transaction, or abort it if id had failured. + * + * \param txn Transaction to be committed. + * + * \note If txn->ret equals KNOT_EOK afterwards, whole DB transaction was successful. + */ +void knot_lmdb_commit(knot_lmdb_txn_t *txn); + +/*! + * \brief Find a key in database. The matched key will be in txn->cur_key and its value in txn->cur_val. + * + * \param txn DB transaction. + * \param what Key to be searched for. + * \param how Method of comparing keys. See comments at knot_lmdb_find_t. + * + * \note It's possible to use knot_lmdb_next() subsequently to iterate over following keys. + * + * \return True if a key found, false if none or failure. + */ +bool knot_lmdb_find(knot_lmdb_txn_t *txn, MDB_val *what, knot_lmdb_find_t how); + +/*! + * \brief Simple database lookup in case txn shared among threads. + * + * \param txn DB transaction share among threads. + * \param key Key to be searched for. + * \param val Output: database value. + * \param how Must be KNOT_LMDB_EXACT. + * + * \note Free val->mv_data afterwards! + * + * \retval KNOT_ENOENT no such key in DB. + * \return KNOT_E* + */ +int knot_lmdb_find_threadsafe(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val, knot_lmdb_find_t how); + +/*! + * \brief Start iteration the whole DB from lexicographically first key. + * + * \note The first DB record will be in txn->cur_key and txn->cur_val. + * + * \param txn DB transaction. + * + * \return True if ok, false if no key at all or failure. + */ +bool knot_lmdb_first(knot_lmdb_txn_t *txn); + +/*! + * \brief Iterate to the lexicographically next key (sets txn->cur_key and txn->cur_val). + * + * \param txn DB transaction. + * + * \return True if ok, false if behind the end of DB or failure. + */ +bool knot_lmdb_next(knot_lmdb_txn_t *txn); + +/*! + * \brief Check if one DB key is a prefix of another, + * + * \param prefix DB key prefix. + * \param of Another DB key. + * + * \return True iff 'prefix' is a prefix of 'of'. + */ +bool knot_lmdb_is_prefix_of(const MDB_val *prefix, const MDB_val *of); + +/*! + * \brief Find leftmost key in DB matching given prefix. + * + * \param txn DB transaction. + * \param prefix Prefix searched for. + * + * \return True if found, false if none or failure. + */ +inline static bool knot_lmdb_find_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix) +{ + return knot_lmdb_find(txn, prefix, KNOT_LMDB_GEQ) && + knot_lmdb_is_prefix_of(prefix, &txn->cur_key); +} + +/*! + * \brief Execute following block of commands for every key in DB matching given prefix. + * + * \param txn DB transaction. + * \param prefix Prefix searched for. + */ +#define knot_lmdb_foreach(txn, prefix) \ + for (bool _knot_lmdb_foreach_found = knot_lmdb_find((txn), (prefix), KNOT_LMDB_GEQ); \ + _knot_lmdb_foreach_found && knot_lmdb_is_prefix_of((prefix), &(txn)->cur_key); \ + _knot_lmdb_foreach_found = knot_lmdb_next((txn))) + +/*! + * \brief Execute following block of commands for every key in DB. + * + * \param txn DB transaction. + */ +#define knot_lmdb_forwhole(txn) \ + for (bool _knot_lmdb_forwhole_any = knot_lmdb_first((txn)); \ + _knot_lmdb_forwhole_any; \ + _knot_lmdb_forwhole_any = knot_lmdb_next((txn))) + +/*! + * \brief Delete the one DB record, that the iteration is currently pointing to. + * + * \note It's safe to delete during an uncomplicated iteration, e.g. knot_lmdb_foreach(). + * + * \param txn DB transaction. + */ +void knot_lmdb_del_cur(knot_lmdb_txn_t *txn); + +/*! + * \brief Delete all DB records matching given key prefix. + * + * \param txn DB transaction. + * \param prefix Prefix to be deleted. + */ +void knot_lmdb_del_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix); + +typedef int (*lmdb_apply_cb)(MDB_val *key, MDB_val *val, void *ctx); + +/*! + * \brief Call a callback for any item matching given key. + * + * \note This function does not affect fields within txn struct, + * thus can be used on txn shared between threads. + * + * \param txn DB transaction. + * \param key Key to be searched for. + * \param prefix The 'key' is in fact prefix, apply on all items matching prefix. + * \param cb Callback to be called. + * \param ctx Arbitrary context for the callback. + * + * \return KNOT_E* + */ +int knot_lmdb_apply_threadsafe(knot_lmdb_txn_t *txn, const MDB_val *key, bool prefix, lmdb_apply_cb cb, void *ctx); + +/*! + * \brief Insert a new record into the DB. + * + * \note If a record with equal key already exists in the DB, its value will be quietly overwritten. + * + * \param txn DB transaction. + * \param key Inserted key. + * \param val Inserted value. + * + * \return False if failure. + */ +bool knot_lmdb_insert(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val); + +/*! + * \brief Open a transaction, insert a record, commit and free key's and val's mv_data. + * + * \param db DB to be inserted into. + * \param key Inserted key. + * \param val Inserted val. + * + * \return KNOT_E* + */ +int knot_lmdb_quick_insert(knot_lmdb_db_t *db, MDB_val key, MDB_val val); + +/*! + * \brief Copy all records matching given key prefix. + * + * \param from Open RO/RW transaction in the database to copy from. + * \param to Open RW txn in the DB to copy to. + * \param prefix Prefix for matching records to be copied. + * + * \note Prior to copying, all records from the target DB, matching the prefix, will be deleted! + * + * \return KNOT_E* + * + * \note KNOT_EOK even if none records matched the prefix (and were copied). + */ +int knot_lmdb_copy_prefix(knot_lmdb_txn_t *from, knot_lmdb_txn_t *to, MDB_val *prefix); + +/*! + * \brief Copy all records matching any of multiple prefixes. + * + * \param from DB to copy from. + * \param to DB to copy to. + * \param prefixes List of prefixes to match. + * \param n_prefixes Number of prefixes in the list. + * + * \note Prior to copying, all records from the target DB, matching any of the prefixes, will be deleted! + * + * \return KNOT_E* + */ +int knot_lmdb_copy_prefixes(knot_lmdb_db_t *from, knot_lmdb_db_t *to, + MDB_val *prefixes, size_t n_prefixes); + +/*! + * \brief Amount of bytes used by the DB storage. + * + * \note According to LMDB design, it will be a multiple of page size, which is usually 4096. + * + * \param txn DB transaction. + * + * \return DB usage. + */ +size_t knot_lmdb_usage(knot_lmdb_txn_t *txn); + +/*! + * \brief Serialize various parameters into a DB key. + * + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', one or two parameters with the actual values. + * + * \return DB key structure. 'mv_data' needs to be freed later. 'mv_data' is NULL on failure. + * + * Possible format characters are: + * - B for a byte + * - H for uint16 + * - I for uint32 + * - L for uint64, like H and I, the serialization converts them to big endian + * - S for zero-terminated string + * - N for a domain name (in knot_dname_t* format) + * - D for fixed-size data (takes two params: void* and size_t) + */ +MDB_val knot_lmdb_make_key(const char *format, ...); + +/*! + * \brief Serialize various parameters into prepared buffer. + * + * \param key_data Pointer to the buffer. + * \param key_len Size of the buffer. + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', one or two parameters with the actual values. + * + * \note See comment at knot_lmdb_make_key(). + * + * \return True if ok and the serialization took exactly 'key_len', false on failure. + */ +bool knot_lmdb_make_key_part(void *key_data, size_t key_len, const char *format, ...); + +/*! + * \brief Deserialize various parameters from a buffer. + * + * \note 'format' must exactly correspond with what the data in buffer actually are. + * + * \param key_data Pointer to the buffer. + * \param key_len Size of the buffer. + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', pointer to where the values will be stored. + * + * \note For B, H, I, L; provide simply pointers to variables of corresponding type. + * \note For S, N; provide pointer to pointer - it will be set to pointing inside the buffer, so no allocation here. + * \note For D, provide void* and size_t, the data will be copied. + * + * \return True if no failure. + */ +bool knot_lmdb_unmake_key(const void *key_data, size_t key_len, const char *format, ...); + +/*! + * \brief Deserialize various parameters from txn->cur_val. Set txn->ret to KNOT_EMALF if failure. + * + * \param txn DB transaction. + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', pointer to where the values will be stored. + * + * \note See comment at knot_lmdb_unmake_key(). + * + * \return True if no failure. + */ +bool knot_lmdb_unmake_curval(knot_lmdb_txn_t *txn, const char *format, ...); diff --git a/src/knot/journal/serialization.c b/src/knot/journal/serialization.c new file mode 100644 index 0000000..5758481 --- /dev/null +++ b/src/knot/journal/serialization.c @@ -0,0 +1,501 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <assert.h> + +#include "knot/journal/serialization.h" +#include "knot/zone/zone-tree.h" + +#define SERIALIZE_RRSET_INIT (-1) +#define SERIALIZE_RRSET_DONE ((1L<<16)+1) + +typedef enum { + PHASE_ZONE_SOA, + PHASE_ZONE_NODES, + PHASE_ZONE_NSEC3, + PHASE_SOA_1, + PHASE_REM, + PHASE_SOA_2, + PHASE_ADD, + PHASE_END, +} serialize_phase_t; + +#define RRSET_BUF_MAXSIZE 256 + +struct serialize_ctx { + zone_diff_t zdiff; + zone_tree_it_t zit; + zone_node_t *n; + uint16_t node_pos; + bool zone_diff; + bool zone_diff_add; + int ret; + + const changeset_t *ch; + changeset_iter_t it; + serialize_phase_t changeset_phase; + long rrset_phase; + knot_rrset_t rrset_buf[RRSET_BUF_MAXSIZE]; + size_t rrset_buf_size; + list_t free_rdatasets; +}; + +serialize_ctx_t *serialize_init(const changeset_t *ch) +{ + serialize_ctx_t *ctx = calloc(1, sizeof(*ctx)); + if (ctx == NULL) { + return NULL; + } + + ctx->ch = ch; + ctx->changeset_phase = ch->soa_from != NULL ? PHASE_SOA_1 : PHASE_SOA_2; + ctx->rrset_phase = SERIALIZE_RRSET_INIT; + ctx->rrset_buf_size = 0; + init_list(&ctx->free_rdatasets); + + return ctx; +} + +serialize_ctx_t *serialize_zone_init(const zone_contents_t *z) +{ + serialize_ctx_t *ctx = calloc(1, sizeof(*ctx)); + if (ctx == NULL) { + return NULL; + } + + zone_diff_from_zone(&ctx->zdiff, z); + ctx->changeset_phase = PHASE_ZONE_SOA; + ctx->rrset_phase = SERIALIZE_RRSET_INIT; + ctx->rrset_buf_size = 0; + init_list(&ctx->free_rdatasets); + + return ctx; +} + +serialize_ctx_t *serialize_zone_diff_init(const zone_diff_t *z) +{ + serialize_ctx_t *ctx = calloc(1, sizeof(*ctx)); + if (ctx == NULL) { + return NULL; + } + + ctx->zone_diff = true; + ctx->zdiff = *z; + zone_diff_reverse(&ctx->zdiff); // start with removals of counterparts + + ctx->changeset_phase = PHASE_ZONE_SOA; + ctx->rrset_phase = SERIALIZE_RRSET_INIT; + ctx->rrset_buf_size = 0; + init_list(&ctx->free_rdatasets); + + return ctx; +} + +static knot_rrset_t get_next_rrset(serialize_ctx_t *ctx) +{ + knot_rrset_t res; + knot_rrset_init_empty(&res); + switch (ctx->changeset_phase) { + case PHASE_ZONE_SOA: + zone_tree_it_begin(&ctx->zdiff.nodes, &ctx->zit); + ctx->changeset_phase = PHASE_ZONE_NODES; + return node_rrset(ctx->zdiff.apex, KNOT_RRTYPE_SOA); + case PHASE_ZONE_NODES: + case PHASE_ZONE_NSEC3: + while (ctx->n == NULL || ctx->node_pos >= ctx->n->rrset_count) { + if (zone_tree_it_finished(&ctx->zit)) { + zone_tree_it_free(&ctx->zit); + if (ctx->changeset_phase == PHASE_ZONE_NSEC3 || + zone_tree_is_empty(&ctx->zdiff.nsec3s)) { + if (ctx->zone_diff && !ctx->zone_diff_add) { + ctx->zone_diff_add = true; + zone_diff_reverse(&ctx->zdiff); + zone_tree_it_begin(&ctx->zdiff.nodes, &ctx->zit); + ctx->changeset_phase = PHASE_ZONE_NODES; + return node_rrset(ctx->zdiff.apex, KNOT_RRTYPE_SOA); + } else { + ctx->changeset_phase = PHASE_END; + return res; + } + } else { + zone_tree_it_begin(&ctx->zdiff.nsec3s, &ctx->zit); + ctx->changeset_phase = PHASE_ZONE_NSEC3; + } + } + ctx->n = zone_tree_it_val(&ctx->zit); + zone_tree_it_next(&ctx->zit); + ctx->node_pos = 0; + } + res = node_rrset_at(ctx->n, ctx->node_pos++); + if (ctx->n == ctx->zdiff.apex && res.type == KNOT_RRTYPE_SOA) { + return get_next_rrset(ctx); + } + if (ctx->zone_diff) { + knot_rrset_t counter_rr = node_rrset(binode_counterpart(ctx->n), res.type); + if (counter_rr.ttl == res.ttl && !knot_rrset_empty(&counter_rr)) { + if (knot_rdataset_subset(&res.rrs, &counter_rr.rrs)) { + return get_next_rrset(ctx); + } + knot_rdataset_t rd_copy; + ctx->ret = knot_rdataset_copy(&rd_copy, &res.rrs, NULL); + if (ctx->ret == KNOT_EOK) { + knot_rdataset_subtract(&rd_copy, &counter_rr.rrs, NULL); + ptrlist_add(&ctx->free_rdatasets, rd_copy.rdata, NULL); + res.rrs = rd_copy; + assert(!knot_rrset_empty(&res)); + } else { + ctx->changeset_phase = PHASE_END; + } + } + } + return res; + case PHASE_SOA_1: + changeset_iter_rem(&ctx->it, ctx->ch); + ctx->changeset_phase = PHASE_REM; + return *ctx->ch->soa_from; + case PHASE_REM: + res = changeset_iter_next(&ctx->it); + if (knot_rrset_empty(&res)) { + changeset_iter_clear(&ctx->it); + changeset_iter_add(&ctx->it, ctx->ch); + ctx->changeset_phase = PHASE_ADD; + return *ctx->ch->soa_to; + } + return res; + case PHASE_SOA_2: + if (ctx->it.node != NULL) { + changeset_iter_clear(&ctx->it); + } + changeset_iter_add(&ctx->it, ctx->ch); + ctx->changeset_phase = PHASE_ADD; + return *ctx->ch->soa_to; + case PHASE_ADD: + res = changeset_iter_next(&ctx->it); + if (knot_rrset_empty(&res)) { + changeset_iter_clear(&ctx->it); + ctx->changeset_phase = PHASE_END; + } + return res; + default: + return res; + } +} + +void serialize_prepare(serialize_ctx_t *ctx, size_t thresh_size, + size_t max_size, size_t *realsize) +{ + *realsize = 0; + + // check if we are in middle of a rrset + if (ctx->rrset_buf_size > 0) { + ctx->rrset_buf[0] = ctx->rrset_buf[ctx->rrset_buf_size - 1]; + ctx->rrset_buf_size = 1; + + // memory optimization: free all buffered rrsets except last one + ptrnode_t *n, *next; + WALK_LIST_DELSAFE(n, next, ctx->free_rdatasets) { + if (n != TAIL(ctx->free_rdatasets)) { + free(n->d); + rem_node(&n->n); + free(n); + } + } + } else { + ctx->rrset_buf[0] = get_next_rrset(ctx); + if (ctx->changeset_phase == PHASE_END) { + ctx->rrset_buf_size = 0; + return; + } + ctx->rrset_buf_size = 1; + } + + size_t candidate = 0; + long tmp_phase = ctx->rrset_phase; + while (1) { + if (tmp_phase >= ctx->rrset_buf[ctx->rrset_buf_size - 1].rrs.count) { + if (ctx->rrset_buf_size >= RRSET_BUF_MAXSIZE) { + return; + } + ctx->rrset_buf[ctx->rrset_buf_size++] = get_next_rrset(ctx); + if (ctx->changeset_phase == PHASE_END) { + ctx->rrset_buf_size--; + return; + } + tmp_phase = SERIALIZE_RRSET_INIT; + } + if (tmp_phase == SERIALIZE_RRSET_INIT) { + candidate += 3 * sizeof(uint16_t) + + knot_dname_size(ctx->rrset_buf[ctx->rrset_buf_size - 1].owner); + } else { + candidate += sizeof(uint32_t) + sizeof(uint16_t) + + knot_rdataset_at(&ctx->rrset_buf[ctx->rrset_buf_size - 1].rrs, tmp_phase)->len; + } + if (candidate > max_size) { + return; + } + *realsize = candidate; + if (candidate >= thresh_size) { + return; + } + tmp_phase++; + } +} + +void serialize_chunk(serialize_ctx_t *ctx, uint8_t *dst_chunk, size_t chunk_size) +{ + wire_ctx_t wire = wire_ctx_init(dst_chunk, chunk_size); + + for (size_t i = 0; ; ) { + if (ctx->rrset_phase >= ctx->rrset_buf[i].rrs.count) { + if (++i >= ctx->rrset_buf_size) { + break; + } + ctx->rrset_phase = SERIALIZE_RRSET_INIT; + } + if (ctx->rrset_phase == SERIALIZE_RRSET_INIT) { + int size = knot_dname_to_wire(wire.position, ctx->rrset_buf[i].owner, + wire_ctx_available(&wire)); + if (size < 0 || wire_ctx_available(&wire) < size + 3 * sizeof(uint16_t)) { + break; + } + wire_ctx_skip(&wire, size); + wire_ctx_write_u16(&wire, ctx->rrset_buf[i].type); + wire_ctx_write_u16(&wire, ctx->rrset_buf[i].rclass); + wire_ctx_write_u16(&wire, ctx->rrset_buf[i].rrs.count); + } else { + const knot_rdata_t *rr = knot_rdataset_at(&ctx->rrset_buf[i].rrs, + ctx->rrset_phase); + assert(rr); + uint16_t rdlen = rr->len; + if (wire_ctx_available(&wire) < sizeof(uint32_t) + sizeof(uint16_t) + rdlen) { + break; + } + // Compatibility, but one TTL per rrset would be enough. + wire_ctx_write_u32(&wire, ctx->rrset_buf[i].ttl); + wire_ctx_write_u16(&wire, rdlen); + wire_ctx_write(&wire, rr->data, rdlen); + } + ctx->rrset_phase++; + } + assert(wire.error == KNOT_EOK); +} + +bool serialize_unfinished(serialize_ctx_t *ctx) +{ + return ctx->changeset_phase < PHASE_END; +} + +int serialize_deinit(serialize_ctx_t *ctx) +{ + if (ctx->it.node != NULL) { + changeset_iter_clear(&ctx->it); + } + if (ctx->zit.tree != NULL) { + zone_tree_it_free(&ctx->zit); + } + ptrnode_t *n, *next; + WALK_LIST_DELSAFE(n, next, ctx->free_rdatasets) { + free(n->d); + rem_node(&n->n); + free(n); + } + int ret = ctx->ret; + free(ctx); + return ret; +} + +static uint64_t rrset_binary_size(const knot_rrset_t *rrset) +{ + if (rrset == NULL || rrset->rrs.count == 0) { + return 0; + } + + // Owner size + type + class + RR count. + uint64_t size = knot_dname_size(rrset->owner) + 3 * sizeof(uint16_t); + + // RRs. + knot_rdata_t *rr = rrset->rrs.rdata; + for (uint16_t i = 0; i < rrset->rrs.count; i++) { + // TTL + RR size + RR. + size += sizeof(uint32_t) + sizeof(uint16_t) + rr->len; + rr = knot_rdataset_next(rr); + } + + return size; +} + +static size_t node_diff_size(zone_node_t *node) +{ + size_t res = 0; + knot_rrset_t rr, counter_rr; + for (int i = 0; i < node->rrset_count; i++) { + rr = node_rrset_at(node, i); + counter_rr = node_rrset(binode_counterpart(node), rr.type); + if (!knot_rrset_equal(&rr, &counter_rr, true)) { + res += rrset_binary_size(&rr); + } + } + return res; +} + +size_t zone_diff_serialized_size(zone_diff_t diff) +{ + size_t res = 0; + for (int i = 0; i < 2; i++) { + zone_diff_reverse(&diff); + zone_tree_it_t it = { 0 }; + int ret = zone_tree_it_double_begin(&diff.nodes, diff.nsec3s.trie != NULL ? + &diff.nsec3s : NULL, &it); + if (ret != KNOT_EOK) { + return 0; + } + while (!zone_tree_it_finished(&it)) { + res += node_diff_size(zone_tree_it_val(&it)); + zone_tree_it_next(&it); + } + zone_tree_it_free(&it); + } + return res; +} + +size_t changeset_serialized_size(const changeset_t *ch) +{ + if (ch == NULL) { + return 0; + } + + size_t soa_from_size = rrset_binary_size(ch->soa_from); + size_t soa_to_size = rrset_binary_size(ch->soa_to); + + changeset_iter_t it; + if (ch->remove == NULL) { + changeset_iter_add(&it, ch); + } else { + changeset_iter_all(&it, ch); + } + + size_t change_size = 0; + knot_rrset_t rrset = changeset_iter_next(&it); + while (!knot_rrset_empty(&rrset)) { + change_size += rrset_binary_size(&rrset); + rrset = changeset_iter_next(&it); + } + + changeset_iter_clear(&it); + + return soa_from_size + soa_to_size + change_size; +} + +int serialize_rrset(wire_ctx_t *wire, const knot_rrset_t *rrset) +{ + assert(wire != NULL && rrset != NULL); + + // write owner, type, class, rrcnt + int size = knot_dname_to_wire(wire->position, rrset->owner, + wire_ctx_available(wire)); + if (size < 0 || wire_ctx_available(wire) < size + 3 * sizeof(uint16_t)) { + assert(0); + } + wire_ctx_skip(wire, size); + wire_ctx_write_u16(wire, rrset->type); + wire_ctx_write_u16(wire, rrset->rclass); + wire_ctx_write_u16(wire, rrset->rrs.count); + + for (size_t phase = 0; phase < rrset->rrs.count; phase++) { + const knot_rdata_t *rr = knot_rdataset_at(&rrset->rrs, phase); + assert(rr); + uint16_t rdlen = rr->len; + if (wire_ctx_available(wire) < sizeof(uint32_t) + sizeof(uint16_t) + rdlen) { + assert(0); + } + wire_ctx_write_u32(wire, rrset->ttl); + wire_ctx_write_u16(wire, rdlen); + wire_ctx_write(wire, rr->data, rdlen); + assert(wire->error == KNOT_EOK); + } + + return KNOT_EOK; +} + +int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset) +{ + assert(wire != NULL && rrset != NULL); + + // Read owner, rtype, rclass and RR count. + int size = knot_dname_size(wire->position); + if (size < 0) { + assert(0); + } + knot_dname_t *owner = knot_dname_copy(wire->position, NULL); + if (owner == NULL || wire_ctx_available(wire) < size + 3 * sizeof(uint16_t)) { + knot_dname_free(owner, NULL); + return KNOT_EMALF; + } + wire_ctx_skip(wire, size); + uint16_t type = wire_ctx_read_u16(wire); + uint16_t rclass = wire_ctx_read_u16(wire); + uint16_t rrcount = wire_ctx_read_u16(wire); + if (wire->error != KNOT_EOK) { + knot_dname_free(owner, NULL); + return wire->error; + } + if (rrset->owner != NULL) { + if (knot_dname_cmp(owner, rrset->owner) != 0) { + knot_dname_free(owner, NULL); + return KNOT_ESEMCHECK; + } + knot_rrset_clear(rrset, NULL); + } + knot_rrset_init(rrset, owner, type, rclass, 0); + + for (size_t phase = 0; phase < rrcount && wire_ctx_available(wire) > 0; phase++) { + uint32_t ttl = wire_ctx_read_u32(wire); + uint32_t rdata_size = wire_ctx_read_u16(wire); + if (phase == 0) { + rrset->ttl = ttl; + } + if (wire->error != KNOT_EOK || + wire_ctx_available(wire) < rdata_size || + knot_rrset_add_rdata(rrset, wire->position, rdata_size, + NULL) != KNOT_EOK) { + knot_rrset_clear(rrset, NULL); + return KNOT_EMALF; + } + wire_ctx_skip(wire, rdata_size); + assert(wire->error == KNOT_EOK); + } + + return KNOT_EOK; +} + +size_t rrset_serialized_size(const knot_rrset_t *rrset) +{ + if (rrset == NULL) { + return 0; + } + + // Owner size + type + class + RR count. + size_t size = knot_dname_size(rrset->owner) + 3 * sizeof(uint16_t); + + for (uint16_t i = 0; i < rrset->rrs.count; i++) { + const knot_rdata_t *rr = knot_rdataset_at(&rrset->rrs, i); + assert(rr); + // TTL + RR size + RR. + size += sizeof(uint32_t) + sizeof(uint16_t) + rr->len; + } + + return size; +} diff --git a/src/knot/journal/serialization.h b/src/knot/journal/serialization.h new file mode 100644 index 0000000..621dcdb --- /dev/null +++ b/src/knot/journal/serialization.h @@ -0,0 +1,169 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <stdint.h> + +#include "libknot/rrset.h" +#include "libknot/rrtype/soa.h" +#include "knot/updates/changesets.h" +#include "contrib/wire_ctx.h" + +typedef struct zone_diff { + zone_tree_t nodes; + zone_tree_t nsec3s; + zone_node_t *apex; +} zone_diff_t; + +inline static void zone_diff_reverse(zone_diff_t *diff) +{ + diff->nodes.flags ^= ZONE_TREE_BINO_SECOND; + diff->nsec3s.flags ^= ZONE_TREE_BINO_SECOND; + diff->apex = binode_counterpart(diff->apex); +} + +inline static void zone_diff_from_zone(zone_diff_t *diff, const zone_contents_t *z) +{ + diff->nodes = *z->nodes; + if (z->nsec3_nodes != NULL) { + diff->nsec3s = *z->nsec3_nodes; + } else { + memset(&diff->nsec3s, 0, sizeof(diff->nsec3s)); + } + diff->apex = z->apex; +} + +inline static uint32_t zone_diff_to(const zone_diff_t *diff) +{ + return knot_soa_serial(node_rdataset(diff->apex, KNOT_RRTYPE_SOA)->rdata); +} + +inline static uint32_t zone_diff_from(const zone_diff_t *diff) +{ + return knot_soa_serial(node_rdataset(binode_counterpart(diff->apex), KNOT_RRTYPE_SOA)->rdata); +} + +typedef struct serialize_ctx serialize_ctx_t; + +/*! + * \brief Init serialization context. + * + * \param ch Changeset to be serialized. + * + * \return Context. + */ +serialize_ctx_t *serialize_init(const changeset_t *ch); + +/*! + * \brief Init serialization context. + * + * \param z Zone to be serialized like zone-in-journal changeset. + * + * \return Context. + */ +serialize_ctx_t *serialize_zone_init(const zone_contents_t *z); + +/*! + * \brief Init serialization context. + * + * \param z Zone with binodes being updated. + * + * \return Context. + */ +serialize_ctx_t *serialize_zone_diff_init(const zone_diff_t *z); + +/*! + * \brief Pre-check and space computation before serializing a chunk. + * + * \note This MUST be called before each serialize_chunk() ! + * + * \param ctx Serializing context. + * \param thresh_size Optimal size of next chunk. + * \param max_size Maximum size of next chunk. + * \param realsize Output: real exact size of next chunk. + */ +void serialize_prepare(serialize_ctx_t *ctx, size_t thresh_size, + size_t max_size, size_t *realsize); + +/*! + * \brief Perform one step of serializiation: fill one chunk. + * + * \param ctx Serializing context. + * \param chunk Pointer on allocated memory to be serialized into. + * \param chunk_size Its size. It MUST be the same as returned from serialize_prepare(). + */ +void serialize_chunk(serialize_ctx_t *ctx, uint8_t *chunk, size_t chunk_size); + +/*! \brief Tells if there remains something of the changeset + * to be serialized into next chunk(s) yet. */ +bool serialize_unfinished(serialize_ctx_t *ctx); + +/*! + * \brief Free serialization context. + * + * \return KNOT_E* if there were errors during serialization. + */ +int serialize_deinit(serialize_ctx_t *ctx); + +/*! + * \brief Returns size of serialized changeset from zone diff. + * + * \warning Not accurate! This is an upper bound, suitable for policy enforcement etc. + * + * \param[in] diff Zone diff structure to create changeset from. + * + * \return Size of the resulting changeset. + */ +size_t zone_diff_serialized_size(zone_diff_t diff); + +/*! + * \brief Returns size of changeset in serialized form. + * + * \param[in] ch Changeset whose size we want to compute. + * + * \return Size of the changeset. + */ +size_t changeset_serialized_size(const changeset_t *ch); + +/*! + * \brief Simply serialize RRset w/o any chunking. + * + * \param wire + * \param rrset + * + * \return KNOT_E* + */ +int serialize_rrset(wire_ctx_t *wire, const knot_rrset_t *rrset); + +/*! + * \brief Simply deserialize RRset w/o any chunking. + * + * \param wire + * \param rrset + * + * \return KNOT_E* + */ +int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset); + +/*! + * \brief Space needed to serialize RRset. + * + * \param rrset RRset. + * + * \return RRset binary size. + */ +size_t rrset_serialized_size(const knot_rrset_t *rrset); |