summaryrefslogtreecommitdiffstats
path: root/src/knot/journal/journal_read.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/knot/journal/journal_read.c')
-rw-r--r--src/knot/journal/journal_read.c436
1 files changed, 436 insertions, 0 deletions
diff --git a/src/knot/journal/journal_read.c b/src/knot/journal/journal_read.c
new file mode 100644
index 0000000..6c4fc32
--- /dev/null
+++ b/src/knot/journal/journal_read.c
@@ -0,0 +1,436 @@
+/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/journal/journal_read.h"
+
+#include "knot/journal/journal_metadata.h"
+#include "knot/journal/knot_lmdb.h"
+
+#include "contrib/macros.h"
+#include "contrib/ucw/lists.h"
+#include "contrib/wire_ctx.h"
+#include "libknot/error.h"
+
+#include <stdlib.h>
+
+struct journal_read {
+ knot_lmdb_txn_t txn;
+ MDB_val key_prefix;
+ const knot_dname_t *zone;
+ wire_ctx_t wire;
+ uint32_t next;
+};
+
+int journal_read_get_error(const journal_read_t *ctx, int another_error)
+{
+ return (ctx == NULL || ctx->txn.ret == KNOT_EOK ? another_error : ctx->txn.ret);
+}
+
+static void update_ctx_wire(journal_read_t *ctx)
+{
+ ctx->wire = wire_ctx_init_const(ctx->txn.cur_val.mv_data, ctx->txn.cur_val.mv_size);
+ wire_ctx_skip(&ctx->wire, JOURNAL_HEADER_SIZE);
+}
+
+static bool go_next_changeset(journal_read_t *ctx, bool go_zone, const knot_dname_t *zone)
+{
+ free(ctx->key_prefix.mv_data);
+ ctx->key_prefix = journal_changeset_id_to_key(go_zone, ctx->next, zone);
+ if (!knot_lmdb_find_prefix(&ctx->txn, &ctx->key_prefix)) {
+ return false;
+ }
+ if (!go_zone && ctx->next == journal_next_serial(&ctx->txn.cur_val)) {
+ ctx->txn.ret = KNOT_ELOOP;
+ return false;
+ }
+ ctx->next = journal_next_serial(&ctx->txn.cur_val);
+ update_ctx_wire(ctx);
+ return true;
+}
+
+int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx)
+{
+ *ctx = NULL;
+ if (!journal_is_existing(j)) { // this also opens the LMDB if not already
+ return KNOT_ENOENT;
+ }
+
+ journal_read_t *newctx = calloc(1, sizeof(*newctx));
+ if (newctx == NULL) {
+ return KNOT_ENOMEM;
+ }
+
+ newctx->zone = j.zone;
+ newctx->next = serial_from;
+
+ knot_lmdb_begin(j.db, &newctx->txn, false);
+
+ if (go_next_changeset(newctx, read_zone, j.zone)) {
+ *ctx = newctx;
+ return KNOT_EOK;
+ } else {
+ journal_read_end(newctx);
+ return KNOT_ENOENT;
+ }
+}
+
+void journal_read_end(journal_read_t *ctx)
+{
+ if (ctx != NULL) {
+ free(ctx->key_prefix.mv_data);
+ knot_lmdb_abort(&ctx->txn);
+ free(ctx);
+ }
+}
+
+static bool make_data_available(journal_read_t *ctx)
+{
+ if (wire_ctx_available(&ctx->wire) == 0) {
+ if (!knot_lmdb_next(&ctx->txn)) {
+ return false;
+ }
+ if (!knot_lmdb_is_prefix_of(&ctx->key_prefix, &ctx->txn.cur_key)) {
+ return false;
+ }
+ if (ctx->next != journal_next_serial(&ctx->txn.cur_val)) {
+ // consistency check, see also MR !1270
+ ctx->txn.ret = KNOT_EMALF;
+ return false;
+ }
+ update_ctx_wire(ctx);
+ }
+ return true;
+}
+
+// thoughts for next design of journal serialization:
+// - one TTL per rrset
+// - endian
+// - optionally storing whole rdataset at once?
+
+bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rrset, bool allow_next_changeset)
+{
+ //knot_rdataset_clear(&rrset->rrs, NULL);
+ //memset(rrset, 0, sizeof(*rrset));
+ if (!make_data_available(ctx)) {
+ if (!allow_next_changeset || !go_next_changeset(ctx, false, ctx->zone)) {
+ return false;
+ }
+ }
+ rrset->owner = knot_dname_copy(ctx->wire.position, NULL);
+ wire_ctx_skip(&ctx->wire, knot_dname_size(rrset->owner));
+ rrset->type = wire_ctx_read_u16(&ctx->wire);
+ rrset->rclass = wire_ctx_read_u16(&ctx->wire);
+ uint16_t rrs_count = wire_ctx_read_u16(&ctx->wire);
+ for (int i = 0; i < rrs_count && ctx->wire.error == KNOT_EOK; i++) {
+ if (!make_data_available(ctx)) {
+ ctx->wire.error = KNOT_EFEWDATA;
+ }
+ // TODO think of how to export serialized rr directly to knot_rdataset_add
+ // focus on: even address aligning
+ uint32_t ttl = wire_ctx_read_u32(&ctx->wire);
+ if (i == 0) {
+ rrset->ttl = ttl;
+ }
+ uint16_t len = wire_ctx_read_u16(&ctx->wire);
+ if (ctx->wire.error == KNOT_EOK) {
+ ctx->wire.error = knot_rrset_add_rdata(rrset, ctx->wire.position, len, NULL);
+ }
+ wire_ctx_skip(&ctx->wire, len);
+ }
+ if (ctx->txn.ret == KNOT_EOK) {
+ ctx->txn.ret = ctx->wire.error == KNOT_ERANGE ? KNOT_EMALF : ctx->wire.error;
+ }
+ if (ctx->txn.ret == KNOT_EOK) {
+ return true;
+ } else {
+ journal_read_clear_rrset(rrset);
+ return false;
+ }
+}
+
+void journal_read_clear_rrset(knot_rrset_t *rr)
+{
+ knot_rrset_clear(rr, NULL);
+}
+
+int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx)
+{
+ knot_rrset_t rr = { 0 };
+ bool in_remove_section = false;
+ int ret = KNOT_EOK;
+ while (ret == KNOT_EOK && journal_read_rrset(read, &rr, true)) {
+ if (rr_is_apex_soa(&rr, read->zone)) {
+ in_remove_section = !in_remove_section;
+ }
+ ret = cb(in_remove_section, &rr, ctx);
+ journal_read_clear_rrset(&rr);
+ }
+ ret = journal_read_get_error(read, ret);
+ journal_read_end(read);
+ return ret;
+}
+
+static int add_rr_to_contents(zone_contents_t *z, const knot_rrset_t *rrset)
+{
+ zone_node_t *n = NULL;
+ return zone_contents_add_rr(z, rrset, &n);
+ // Shall we ignore ETTL ?
+}
+
+bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch)
+{
+ zone_contents_t *tree = zone_contents_new(ctx->zone, false);
+ knot_rrset_t *soa = calloc(1, sizeof(*soa)), rr = { 0 };
+ if (tree == NULL || soa == NULL) {
+ ctx->txn.ret = KNOT_ENOMEM;
+ goto fail;
+ }
+ memset(ch, 0, sizeof(*ch));
+
+ if (!journal_read_rrset(ctx, soa, true)) {
+ goto fail;
+ }
+ while (journal_read_rrset(ctx, &rr, false)) {
+ if (rr_is_apex_soa(&rr, ctx->zone)) {
+ if (ch->soa_from != NULL) {
+ ctx->txn.ret = KNOT_EMALF;
+ goto fail;
+ }
+ ch->soa_from = soa;
+ ch->remove = tree;
+ soa = malloc(sizeof(*soa));
+ tree = zone_contents_new(ctx->zone, false);
+ if (tree == NULL || soa == NULL) {
+ ctx->txn.ret = KNOT_ENOMEM;
+ goto fail;
+ }
+ *soa = rr; // note this tricky assignment
+ memset(&rr, 0, sizeof(rr));
+ } else {
+ ctx->txn.ret = add_rr_to_contents(tree, &rr);
+ journal_read_clear_rrset(&rr);
+ }
+ }
+
+ if (ctx->txn.ret == KNOT_EOK) {
+ ch->soa_to = soa;
+ ch->add = tree;
+ return true;
+ } else {
+fail:
+ journal_read_clear_rrset(&rr);
+ journal_read_clear_rrset(soa);
+ free(soa);
+ changeset_clear(ch);
+ zone_contents_deep_free(tree);
+ return false;
+ }
+}
+
+void journal_read_clear_changeset(changeset_t *ch)
+{
+ changeset_clear(ch);
+ memset(ch, 0, sizeof(*ch));
+}
+
+static int just_load_md(zone_journal_t j, journal_metadata_t *md, bool *has_zij)
+{
+ knot_lmdb_txn_t txn = { 0 };
+ knot_lmdb_begin(j.db, &txn, false);
+ journal_load_metadata(&txn, j.zone, md);
+ if (has_zij != NULL) {
+ *has_zij = journal_contains(&txn, true, 0, j.zone);
+ }
+ knot_lmdb_abort(&txn);
+ return txn.ret;
+}
+
+int journal_walk_from(zone_journal_t j, uint32_t from,
+ journal_walk_cb_t cb, void *ctx)
+{
+ bool at_least_one = false;
+ journal_metadata_t md = { 0 };
+ journal_read_t *read = NULL;
+ changeset_t ch;
+
+ int ret = just_load_md(j, &md, NULL);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ if ((md.flags & JOURNAL_SERIAL_TO_VALID) && from != md.serial_to &&
+ ret == KNOT_EOK) {
+ ret = journal_read_begin(j, false, from, &read);
+ while (ret == KNOT_EOK && journal_read_changeset(read, &ch)) {
+ ret = cb(false, &ch, ctx);
+ at_least_one = true;
+ journal_read_clear_changeset(&ch);
+ }
+ ret = journal_read_get_error(read, ret);
+ journal_read_end(read);
+ }
+ if (!at_least_one && ret == KNOT_EOK) {
+ ret = cb(false, NULL, ctx);
+ }
+ return ret;
+}
+
+// beware, this function does not operate in single txn!
+int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx)
+{
+ int ret = knot_lmdb_exists(j.db);
+ if (ret == KNOT_ENODB) {
+ ret = cb(true, NULL, ctx);
+ if (ret == KNOT_EOK) {
+ ret = cb(false, NULL, ctx);
+ }
+ return ret;
+ } else if (ret == KNOT_EOK) {
+ ret = knot_lmdb_open(j.db);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ journal_metadata_t md = { 0 };
+ journal_read_t *read = NULL;
+ changeset_t ch;
+ bool zone_in_j = false;
+ ret = just_load_md(j, &md, &zone_in_j);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ if (zone_in_j) {
+ ret = journal_read_begin(j, true, 0, &read);
+ goto read_one_special;
+ } else if ((md.flags & JOURNAL_MERGED_SERIAL_VALID)) {
+ ret = journal_read_begin(j, false, md.merged_serial, &read);
+read_one_special:
+ if (ret == KNOT_EOK && journal_read_changeset(read, &ch)) {
+ ret = cb(true, &ch, ctx);
+ journal_read_clear_changeset(&ch);
+ }
+ ret = journal_read_get_error(read, ret);
+ journal_read_end(read);
+ read = NULL;
+ } else {
+ ret = cb(true, NULL, ctx);
+ }
+
+ if (ret == KNOT_EOK) {
+ ret = journal_walk_from(j, md.first_serial, cb, ctx);
+ }
+ return ret;
+}
+
+typedef struct {
+ size_t observed_count;
+ size_t observed_merged;
+ uint32_t merged_serial;
+ size_t observed_zij;
+ uint32_t first_serial;
+ bool first_serial_valid;
+ uint32_t last_serial;
+ bool last_serial_valid;
+} check_ctx_t;
+
+static int check_cb(bool special, const changeset_t *ch, void *vctx)
+{
+ check_ctx_t *ctx = vctx;
+ if (special && ch != NULL) {
+ if (ch->remove == NULL) {
+ ctx->observed_zij++;
+ ctx->last_serial = changeset_to(ch);
+ ctx->last_serial_valid = true;
+ } else {
+ ctx->merged_serial = changeset_from(ch);
+ ctx->observed_merged++;
+ }
+ } else if (ch != NULL) {
+ if (!ctx->first_serial_valid) {
+ ctx->first_serial = changeset_from(ch);
+ ctx->first_serial_valid = true;
+ }
+ ctx->last_serial = changeset_to(ch);
+ ctx->last_serial_valid = true;
+ ctx->observed_count++;
+ }
+ return KNOT_EOK;
+}
+
+static bool eq(bool a, bool b)
+{
+ return a ? b : !b;
+}
+
+int journal_sem_check(zone_journal_t j)
+{
+ check_ctx_t ctx = { 0 };
+ journal_metadata_t md = { 0 };
+ bool has_zij = false;
+
+ if (!journal_is_existing(j)) {
+ return KNOT_EOK;
+ }
+
+ int ret = just_load_md(j, &md, &has_zij);
+ if (ret == KNOT_EOK) {
+ ret = journal_walk(j, check_cb, &ctx);
+ }
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ if (!eq((md.flags & JOURNAL_SERIAL_TO_VALID), ctx.last_serial_valid)) {
+ return 101;
+ }
+ if (ctx.last_serial_valid && ctx.last_serial != md.serial_to) {
+ return 102;
+ }
+ if (!eq((md.flags & JOURNAL_MERGED_SERIAL_VALID), (ctx.observed_merged > 0))) {
+ return 103;
+ }
+ if (ctx.observed_merged > 1) {
+ return 104;
+ }
+ if (ctx.observed_merged == 1 && ctx.merged_serial != md.merged_serial) {
+ return 105;
+ }
+ if (!eq(has_zij, (ctx.observed_zij > 0))) {
+ return 106;
+ }
+ if (ctx.observed_zij > 1) {
+ return 107;
+ }
+ if (ctx.observed_zij + ctx.observed_merged > 1) {
+ return 108;
+ }
+ if (!eq(((md.flags & JOURNAL_SERIAL_TO_VALID) && md.first_serial != md.serial_to), ctx.first_serial_valid)) {
+ return 109;
+ }
+ if (!eq(ctx.first_serial_valid, (ctx.observed_count > 0))) {
+ return 110;
+ }
+ if (ctx.first_serial_valid && ctx.first_serial != md.first_serial) {
+ return 111;
+ }
+ if (ctx.observed_count != md.changeset_count) {
+ return 112;
+ }
+ if (ctx.observed_merged > 0 && ctx.observed_count == 0) {
+ return 113;
+ }
+ return KNOT_EOK;
+}