diff options
Diffstat (limited to 'src/knot/journal/journal_write.c')
-rw-r--r-- | src/knot/journal/journal_write.c | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/src/knot/journal/journal_write.c b/src/knot/journal/journal_write.c new file mode 100644 index 0000000..ad1247b --- /dev/null +++ b/src/knot/journal/journal_write.c @@ -0,0 +1,333 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_write.h" + +#include "contrib/macros.h" +#include "knot/journal/journal_metadata.h" +#include "knot/journal/journal_read.h" +#include "knot/journal/serialization.h" +#include "libknot/error.h" + +static void journal_write_serialize(knot_lmdb_txn_t *txn, serialize_ctx_t *ser, + const knot_dname_t *apex, bool zij, uint32_t ch_from, uint32_t ch_to) +{ + MDB_val chunk; + uint32_t i = 0; + while (serialize_unfinished(ser) && txn->ret == KNOT_EOK) { + serialize_prepare(ser, JOURNAL_CHUNK_THRESH - JOURNAL_HEADER_SIZE, + JOURNAL_CHUNK_MAX - JOURNAL_HEADER_SIZE, &chunk.mv_size); + if (chunk.mv_size == 0) { + break; // beware! If this is omitted, it creates empty chunk => EMALF when reading. + } + chunk.mv_size += JOURNAL_HEADER_SIZE; + chunk.mv_data = NULL; + MDB_val key = journal_make_chunk_key(apex, ch_from, zij, i); + if (knot_lmdb_insert(txn, &key, &chunk)) { + journal_make_header(chunk.mv_data, ch_to); + serialize_chunk(ser, chunk.mv_data + JOURNAL_HEADER_SIZE, chunk.mv_size - JOURNAL_HEADER_SIZE); + } + free(key.mv_data); + i++; + } + int ret = serialize_deinit(ser); + if (txn->ret == KNOT_EOK) { + txn->ret = ret; + } +} + +void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch) +{ + serialize_ctx_t *ser = serialize_init(ch); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + return; + } + if (ch->remove == NULL) { + journal_write_serialize(txn, ser, ch->soa_to->owner, true, 0, changeset_to(ch)); + } else { + journal_write_serialize(txn, ser, ch->soa_to->owner, false, changeset_from(ch), changeset_to(ch)); + } +} + +void journal_write_zone(knot_lmdb_txn_t *txn, const zone_contents_t *z) +{ + serialize_ctx_t *ser = serialize_zone_init(z); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + return; + } + journal_write_serialize(txn, ser, z->apex->owner, true, 0, zone_contents_serial(z)); +} + +void journal_write_zone_diff(knot_lmdb_txn_t *txn, const zone_diff_t *z) +{ + serialize_ctx_t *ser = serialize_zone_diff_init(z); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + return; + } + journal_write_serialize(txn, ser, z->apex->owner, false, zone_diff_from(z), zone_diff_to(z)); +} + +static bool delete_one(knot_lmdb_txn_t *txn, bool del_zij, uint32_t del_serial, + const knot_dname_t *zone, uint64_t *freed, uint32_t *next_serial) +{ + *freed = 0; + MDB_val prefix = journal_changeset_id_to_key(del_zij, del_serial, zone); + knot_lmdb_foreach(txn, &prefix) { + *freed += txn->cur_val.mv_size; + *next_serial = journal_next_serial(&txn->cur_val); + knot_lmdb_del_cur(txn); + } + free(prefix.mv_data); + return (*freed > 0); +} + +static int merge_cb(bool remove, const knot_rrset_t *rr, void *ctx) +{ + changeset_t *ch = ctx; + return remove ? (rr_is_apex_soa(rr, ch->soa_to->owner) ? + KNOT_EOK : changeset_add_removal(ch, rr, CHANGESET_CHECK)) + : changeset_add_addition(ch, rr, CHANGESET_CHECK); +} + +void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij, + uint32_t merge_serial, uint32_t *original_serial_to) +{ + changeset_t merge; + memset(&merge, 0, sizeof(merge)); + journal_read_t *read = NULL; + txn->ret = journal_read_begin(j, merge_zij, merge_serial, &read); + if (txn->ret != KNOT_EOK) { + return; + } + if (journal_read_changeset(read, &merge)) { + *original_serial_to = changeset_to(&merge); + } + txn->ret = journal_read_rrsets(read, merge_cb, &merge); + + // deleting seems redundant since the merge changeset will be overwritten + // but it would cause EMALF or invalid data if the new merged has less chunks than before + uint32_t del_next_serial; + uint64_t del_freed; + delete_one(txn, merge_zij, merge_serial, j.zone, &del_freed, &del_next_serial); + assert(del_freed > 0 && del_next_serial == *original_serial_to); + + journal_write_changeset(txn, &merge); + journal_read_clear_changeset(&merge); +} + +static void delete_merged(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + journal_metadata_t *md, uint64_t *freed) +{ + if (!(md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + return; + } + uint32_t unused = 0; + delete_one(txn, false, md->merged_serial, zone, freed, &unused); + md->merged_serial = 0; + md->flags &= ~JOURNAL_MERGED_SERIAL_VALID; +} + +bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone, + uint64_t tofree_size, size_t tofree_count, uint32_t stop_at_serial, + uint64_t *freed_size, size_t *freed_count, uint32_t *stopped_at) +{ + *freed_size = 0; + *freed_count = 0; + uint64_t freed_now; + while (from != stop_at_serial && + (*freed_size < tofree_size || *freed_count < tofree_count) && + delete_one(txn, false, from, zone, &freed_now, stopped_at)) { + *freed_size += freed_now; + ++(*freed_count); + from = *stopped_at; + } + return (*freed_count > 0); +} + +void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md) +{ + bool flush = journal_allow_flush(j); + uint32_t merge_orig = 0; + if (journal_contains(txn, true, 0, j.zone)) { + journal_merge(j, txn, true, 0, &merge_orig); + if (!flush) { + journal_metadata_after_merge(md, true, 0, md->serial_to, merge_orig); + } + } else if (!flush) { + uint32_t merge_serial = ((md->flags & JOURNAL_MERGED_SERIAL_VALID) ? md->merged_serial : md->first_serial); + journal_merge(j, txn, false, merge_serial, &merge_orig); + journal_metadata_after_merge(md, false, merge_serial, md->serial_to, merge_orig); + } + + if (flush) { + // delete merged serial if (very unlikely) exists + if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + uint64_t unused64; + uint32_t unused32; + (void)delete_one(txn, false, md->merged_serial, j.zone, &unused64, &unused32); + md->flags &= ~JOURNAL_MERGED_SERIAL_VALID; + } + + // commit partial job and ask zone to flush itself + journal_store_metadata(txn, j.zone, md); + knot_lmdb_commit(txn); + if (txn->ret == KNOT_EOK) { + txn->ret = KNOT_EBUSY; + } + } +} + +#define U_MINUS(minuend, subtrahend) ((minuend) - MIN((minuend), (subtrahend))) + +void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md, + int64_t max_usage, ssize_t max_count) +{ + uint64_t occupied = journal_get_occupied(txn, j.zone), freed; + uint64_t need_tofree = U_MINUS(occupied, max_usage); + size_t count = md->changeset_count, removed; + size_t need_todel = U_MINUS(count, max_count); + + while ((need_tofree > 0 || need_todel > 0) && txn->ret == KNOT_EOK) { + uint32_t del_from = md->first_serial; // don't move this line outside of the loop + uint32_t del_upto = md->flushed_upto; + (void)journal_serial_to(txn, true, 0, j.zone, &del_upto); // in case zij present and wrong flushed_upto, avoid discontinuity + freed = 0; + removed = 0; + journal_delete(txn, del_from, j.zone, need_tofree, need_todel, + del_upto, &freed, &removed, &del_from); + if (freed == 0) { + if (del_upto != md->serial_to) { + journal_try_flush(j, txn, md); + } else { + txn->ret = KNOT_ESPACE; + break; + } + } else { + journal_metadata_after_delete(md, del_from, removed); + need_tofree = U_MINUS(need_tofree, freed); + need_todel = U_MINUS(need_todel, removed); + } + } +} + +int journal_insert_zone(zone_journal_t j, const zone_contents_t *z) +{ + changeset_t fake_ch = { .add = (zone_contents_t *)z }; + size_t ch_size = changeset_serialized_size(&fake_ch); + size_t max_usage = journal_conf_max_usage(j); + if (ch_size >= max_usage) { + return KNOT_ESPACE; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + + update_last_inserter(&txn, j.zone); + journal_del_zone_txn(&txn, j.zone); + + journal_write_zone(&txn, z); + + journal_metadata_t md = { 0 }; + md.flags = JOURNAL_SERIAL_TO_VALID; + md.serial_to = zone_contents_serial(z); + md.first_serial = md.serial_to; + journal_store_metadata(&txn, j.zone, &md); + + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_insert(zone_journal_t j, const changeset_t *ch, const changeset_t *extra, + const zone_diff_t *zdiff) +{ + assert(zdiff == NULL || (ch == NULL && extra == NULL)); + + size_t ch_size = zdiff == NULL ? changeset_serialized_size(ch) : + zone_diff_serialized_size(*zdiff); + size_t max_usage = journal_conf_max_usage(j); + if (ch_size >= max_usage) { + return KNOT_ESPACE; + } + + uint32_t ch_from = zdiff == NULL ? changeset_from(ch) : zone_diff_from(zdiff); + uint32_t ch_to = zdiff == NULL ? changeset_to(ch) : zone_diff_to(zdiff); + if (extra != NULL && (changeset_to(extra) != ch_to || + changeset_from(extra) == ch_from)) { + return KNOT_EINVAL; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + journal_load_metadata(&txn, j.zone, &md); + + update_last_inserter(&txn, j.zone); + + if (extra != NULL) { + if (journal_contains(&txn, true, 0, j.zone)) { + txn.ret = KNOT_ESEMCHECK; + } + uint64_t merged_freed = 0; + delete_merged(&txn, j.zone, &md, &merged_freed); + ch_size += changeset_serialized_size(extra); + ch_size -= merged_freed; + md.flushed_upto = md.serial_to; // set temporarily + md.flags |= JOURNAL_LAST_FLUSHED_VALID; + } + + size_t chs_limit = journal_conf_max_changesets(j); + journal_fix_occupation(j, &txn, &md, max_usage - ch_size, chs_limit - 1); + + // avoid discontinuity + if ((md.flags & JOURNAL_SERIAL_TO_VALID) && md.serial_to != ch_from) { + if (journal_contains(&txn, true, 0, j.zone)) { + txn.ret = KNOT_ESEMCHECK; + } else { + journal_del_zone_txn(&txn, j.zone); + memset(&md, 0, sizeof(md)); + } + } + + // avoid cycle + if (journal_contains(&txn, false, ch_to, j.zone)) { + journal_fix_occupation(j, &txn, &md, INT64_MAX, 1); + } + + if (zdiff == NULL) { + journal_write_changeset(&txn, ch); + } else { + journal_write_zone_diff(&txn, zdiff); + } + journal_metadata_after_insert(&md, ch_from, ch_to); + + if (extra != NULL) { + journal_write_changeset(&txn, extra); + journal_metadata_after_extra(&md, changeset_from(extra), changeset_to(extra)); + } + + journal_store_metadata(&txn, j.zone, &md); + knot_lmdb_commit(&txn); + return txn.ret; +} |