summaryrefslogtreecommitdiffstats
path: root/storage/tokudb/PerconaFT/ft/serialize/ft_node-serialize.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/serialize/ft_node-serialize.cc')
-rw-r--r--storage/tokudb/PerconaFT/ft/serialize/ft_node-serialize.cc3259
1 files changed, 3259 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/serialize/ft_node-serialize.cc b/storage/tokudb/PerconaFT/ft/serialize/ft_node-serialize.cc
new file mode 100644
index 00000000..e6648b76
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/serialize/ft_node-serialize.cc
@@ -0,0 +1,3259 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "portability/toku_atomic.h"
+
+#include "ft/cachetable/cachetable.h"
+#include "ft/ft.h"
+#include "ft/ft-internal.h"
+#include "ft/node.h"
+#include "ft/logger/log-internal.h"
+#include "ft/txn/rollback.h"
+#include "ft/serialize/block_allocator.h"
+#include "ft/serialize/block_table.h"
+#include "ft/serialize/compress.h"
+#include "ft/serialize/ft_node-serialize.h"
+#include "ft/serialize/sub_block.h"
+#include "util/sort.h"
+#include "util/threadpool.h"
+#include "util/status.h"
+#include "util/scoped_malloc.h"
+
+static FT_UPGRADE_STATUS_S ft_upgrade_status;
+
+#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ft_upgrade_status, k, c, t, "ft upgrade: " l, inc)
+
+static void
+status_init(void)
+{
+ // Note, this function initializes the keyname, type, and legend fields.
+ // Value fields are initialized to zero by compiler.
+ STATUS_INIT(FT_UPGRADE_FOOTPRINT, nullptr, UINT64, "footprint", TOKU_ENGINE_STATUS);
+ ft_upgrade_status.initialized = true;
+}
+#undef STATUS_INIT
+
+#define UPGRADE_STATUS_VALUE(x) ft_upgrade_status.status[x].value.num
+
+void
+toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) {
+ if (!ft_upgrade_status.initialized) {
+ status_init();
+ }
+ UPGRADE_STATUS_VALUE(FT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint();
+ *s = ft_upgrade_status;
+}
+
+static int num_cores = 0; // cache the number of cores for the parallelization
+static struct toku_thread_pool *ft_pool = NULL;
+bool toku_serialize_in_parallel;
+
+int get_num_cores(void) {
+ return num_cores;
+}
+
+struct toku_thread_pool *get_ft_pool(void) {
+ return ft_pool;
+}
+
+void toku_serialize_set_parallel(bool in_parallel) {
+ toku_unsafe_set(&toku_serialize_in_parallel, in_parallel);
+}
+
+void toku_ft_serialize_layer_init(void) {
+ num_cores = toku_os_get_number_active_processors();
+ int r = toku_thread_pool_create(&ft_pool, num_cores);
+ lazy_assert_zero(r);
+ toku_serialize_in_parallel = false;
+}
+
+void toku_ft_serialize_layer_destroy(void) {
+ toku_thread_pool_destroy(&ft_pool);
+}
+
+enum { FILE_CHANGE_INCREMENT = (16 << 20) };
+
+static inline uint64_t
+alignup64(uint64_t a, uint64_t b) {
+ return ((a+b-1)/b)*b;
+}
+
+// safe_file_size_lock must be held.
+void
+toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep)
+// Effect: If file size >= SIZE+32MiB, reduce file size.
+// (32 instead of 16.. hysteresis).
+// Return 0 on success, otherwise an error number.
+{
+ int64_t file_size;
+ {
+ int r = toku_os_get_file_size(fd, &file_size);
+ lazy_assert_zero(r);
+ invariant(file_size >= 0);
+ }
+ invariant(expected_size == (uint64_t)file_size);
+ // If file space is overallocated by at least 32M
+ if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
+ toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
+ invariant(new_size < file_size);
+ invariant(new_size >= 0);
+ int r = ftruncate(fd, new_size);
+ lazy_assert_zero(r);
+ *new_sizep = new_size;
+ }
+ else {
+ *new_sizep = file_size;
+ }
+ return;
+}
+
+static int64_t
+min64(int64_t a, int64_t b) {
+ if (a<b) return a;
+ return b;
+}
+
+void
+toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size)
+// Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
+// Return 0 on success, otherwise an error number.
+{
+ int64_t file_size = 0;
+ //TODO(yoni): Allow variable stripe_width (perhaps from ft) for larger raids
+ const uint64_t stripe_width = 4096;
+ {
+ int r = toku_os_get_file_size(fd, &file_size);
+ if (r != 0) { // debug #2463
+ int the_errno = get_maybe_error_errno();
+ fprintf(stderr, "%s:%d fd=%d size=%" PRIu64 " r=%d errno=%d\n", __FUNCTION__, __LINE__, fd, size, r, the_errno); fflush(stderr);
+ }
+ lazy_assert_zero(r);
+ }
+ invariant(file_size >= 0);
+ invariant(expected_size == file_size);
+ // We want to double the size of the file, or add 16MiB, whichever is less.
+ // We emulate calling this function repeatedly until it satisfies the request.
+ int64_t to_write = 0;
+ if (file_size == 0) {
+ // Prevent infinite loop by starting with stripe_width as a base case.
+ to_write = stripe_width;
+ }
+ while (file_size + to_write < size) {
+ to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width);
+ }
+ if (to_write > 0) {
+ assert(to_write%512==0);
+ toku::scoped_malloc_aligned wbuf_aligned(to_write, 512);
+ char *wbuf = reinterpret_cast<char *>(wbuf_aligned.get());
+ memset(wbuf, 0, to_write);
+ toku_off_t start_write = alignup64(file_size, stripe_width);
+ invariant(start_write >= file_size);
+ toku_os_full_pwrite(fd, wbuf, to_write, start_write);
+ *new_size = start_write + to_write;
+ }
+ else {
+ *new_size = file_size;
+ }
+}
+
+// Don't include the sub_block header
+// Overhead calculated in same order fields are written to wbuf
+enum {
+ node_header_overhead = (8+ // magic "tokunode" or "tokuleaf" or "tokuroll"
+ 4+ // layout_version
+ 4+ // layout_version_original
+ 4), // build_id
+};
+
+// uncompressed header offsets
+enum {
+ uncompressed_magic_offset = 0,
+ uncompressed_version_offset = 8,
+};
+
+static uint32_t
+serialize_node_header_size(FTNODE node) {
+ uint32_t retval = 0;
+ retval += 8; // magic
+ retval += sizeof(node->layout_version);
+ retval += sizeof(node->layout_version_original);
+ retval += 4; // BUILD_ID
+ retval += 4; // n_children
+ retval += node->n_children*8; // encode start offset and length of each partition
+ retval += 4; // checksum
+ return retval;
+}
+
+static void
+serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) {
+ if (node->height == 0)
+ wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8);
+ else
+ wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8);
+ paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION);
+ wbuf_nocrc_int(wbuf, node->layout_version);
+ wbuf_nocrc_int(wbuf, node->layout_version_original);
+ wbuf_nocrc_uint(wbuf, BUILD_ID);
+ wbuf_nocrc_int (wbuf, node->n_children);
+ for (int i=0; i<node->n_children; i++) {
+ assert(BP_SIZE(ndd,i)>0);
+ wbuf_nocrc_int(wbuf, BP_START(ndd, i)); // save the beginning of the partition
+ wbuf_nocrc_int(wbuf, BP_SIZE (ndd, i)); // and the size
+ }
+ // checksum the header
+ uint32_t end_to_end_checksum = toku_x1764_memory(wbuf->buf, wbuf_get_woffset(wbuf));
+ wbuf_nocrc_int(wbuf, end_to_end_checksum);
+ invariant(wbuf->ndone == wbuf->size);
+}
+
+static uint32_t
+serialize_ftnode_partition_size (FTNODE node, int i)
+{
+ uint32_t result = 0;
+ paranoid_invariant(node->bp[i].state == PT_AVAIL);
+ result++; // Byte that states what the partition is
+ if (node->height > 0) {
+ NONLEAF_CHILDINFO bnc = BNC(node, i);
+ // number of messages (4 bytes) plus size of the buffer
+ result += (4 + toku_bnc_nbytesinbuf(bnc));
+ // number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree
+ result += (4 + (4 * bnc->fresh_message_tree.size()));
+ result += (4 + (4 * bnc->stale_message_tree.size()));
+ result += (4 + (4 * bnc->broadcast_list.size()));
+ }
+ else {
+ result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header
+ result += BLB_NBYTESINDATA(node, i);
+ }
+ result += 4; // checksum
+ return result;
+}
+
+#define FTNODE_PARTITION_DMT_LEAVES 0xaa
+#define FTNODE_PARTITION_MSG_BUFFER 0xbb
+
+UU() static int
+assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
+ bool is_fresh = msg_buffer->get_freshness(offset);
+ assert(is_fresh);
+ return 0;
+}
+
+UU() static int
+assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
+ bool is_fresh = msg_buffer->get_freshness(offset);
+ assert(!is_fresh);
+ return 0;
+}
+
+static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) {
+#ifdef TOKU_DEBUG_PARANOID
+ bnc->fresh_message_tree.iterate<message_buffer, assert_fresh>(&bnc->msg_buffer);
+ bnc->stale_message_tree.iterate<message_buffer, assert_stale>(&bnc->msg_buffer);
+#endif
+}
+
+static int
+wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) {
+ wbuf_nocrc_int(wb, offset);
+ return 0;
+}
+
+static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
+ unsigned char ch = FTNODE_PARTITION_MSG_BUFFER;
+ wbuf_nocrc_char(wb, ch);
+
+ // serialize the message buffer
+ bnc->msg_buffer.serialize_to_wbuf(wb);
+
+ // serialize the message trees (num entries, offsets array):
+ // first, verify their contents are consistent with the message buffer
+ bnc_verify_message_trees(bnc);
+
+ // fresh
+ wbuf_nocrc_int(wb, bnc->fresh_message_tree.size());
+ bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
+
+ // stale
+ wbuf_nocrc_int(wb, bnc->stale_message_tree.size());
+ bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
+
+ // broadcast
+ wbuf_nocrc_int(wb, bnc->broadcast_list.size());
+ bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb);
+}
+
+//
+// Serialize the i'th partition of node into sb
+// For leaf nodes, this would be the i'th basement node
+// For internal nodes, this would be the i'th internal node
+//
+static void
+serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
+ // Caller should have allocated memory.
+ invariant_notnull(sb->uncompressed_ptr);
+ invariant(sb->uncompressed_size > 0);
+ paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i));
+
+ //
+ // Now put the data into sb->uncompressed_ptr
+ //
+ struct wbuf wb;
+ wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
+ if (node->height > 0) {
+ // TODO: (Zardosht) possibly exit early if there are no messages
+ serialize_child_buffer(BNC(node, i), &wb);
+ }
+ else {
+ unsigned char ch = FTNODE_PARTITION_DMT_LEAVES;
+ bn_data* bd = BLB_DATA(node, i);
+
+ wbuf_nocrc_char(&wb, ch);
+ wbuf_nocrc_uint(&wb, bd->num_klpairs());
+
+ bd->serialize_to_wbuf(&wb);
+ }
+ uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
+ wbuf_nocrc_int(&wb, end_to_end_checksum);
+ invariant(wb.ndone == wb.size);
+ invariant(sb->uncompressed_size==wb.ndone);
+}
+
+//
+// Takes the data in sb->uncompressed_ptr, and compresses it
+// into a newly allocated buffer sb->compressed_ptr
+//
+static void
+compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) {
+ invariant(sb->compressed_ptr != nullptr);
+ invariant(sb->compressed_size_bound > 0);
+ paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size));
+
+ //
+ // This probably seems a bit complicated. Here is what is going on.
+ // In PerconaFT 5.0, sub_blocks were compressed and the compressed data
+ // was checksummed. The checksum did NOT include the size of the compressed data
+ // and the size of the uncompressed data. The fields of sub_block only reference the
+ // compressed data, and it is the responsibility of the user of the sub_block
+ // to write the length
+ //
+ // For Dr. No, we want the checksum to also include the size of the compressed data, and the
+ // size of the decompressed data, because this data
+ // may be read off of disk alone, so it must be verifiable alone.
+ //
+ // So, we pass in a buffer to compress_nocrc_sub_block that starts 8 bytes after the beginning
+ // of sb->compressed_ptr, so we have space to put in the sizes, and then run the checksum.
+ //
+ sb->compressed_size = compress_nocrc_sub_block(
+ sb,
+ (char *)sb->compressed_ptr + 8,
+ sb->compressed_size_bound,
+ method
+ );
+
+ uint32_t* extra = (uint32_t *)(sb->compressed_ptr);
+ // store the compressed and uncompressed size at the beginning
+ extra[0] = toku_htod32(sb->compressed_size);
+ extra[1] = toku_htod32(sb->uncompressed_size);
+ // now checksum the entire thing
+ sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes
+ sb->xsum = toku_x1764_memory(sb->compressed_ptr,sb->compressed_size);
+
+ //
+ // This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains
+ // two integers at the beginning, the size and uncompressed size, and then the compressed
+ // data. sb->xsum contains the checksum of this entire thing.
+ //
+ // In PerconaFT 5.0, sb->compressed_ptr only contained the compressed data, sb->xsum
+ // checksummed only the compressed data, and the checksumming of the sizes were not
+ // done here.
+ //
+}
+
+//
+// Returns the size needed to serialize the ftnode info
+// Does not include header information that is common with rollback logs
+// such as the magic, layout_version, and build_id
+// Includes only node specific info such as pivot information, n_children, and so on
+//
+static uint32_t
+serialize_ftnode_info_size(FTNODE node)
+{
+ uint32_t retval = 0;
+ retval += 8; // max_msn_applied_to_node_on_disk
+ retval += 4; // nodesize
+ retval += 4; // flags
+ retval += 4; // height;
+ retval += 8; // oldest_referenced_xid_known
+ retval += node->pivotkeys.serialized_size();
+ retval += (node->n_children-1)*4; // encode length of each pivot
+ if (node->height > 0) {
+ retval += node->n_children*8; // child blocknum's
+ }
+ retval += 4; // checksum
+ return retval;
+}
+
+static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) {
+ // Memory must have been allocated by our caller.
+ invariant(sb->uncompressed_size > 0);
+ invariant_notnull(sb->uncompressed_ptr);
+ paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node));
+
+ struct wbuf wb;
+ wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
+
+ wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk);
+ wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be
+ wbuf_nocrc_uint(&wb, node->flags);
+ wbuf_nocrc_int (&wb, node->height);
+ wbuf_TXNID(&wb, node->oldest_referenced_xid_known);
+ node->pivotkeys.serialize_to_wbuf(&wb);
+
+ // child blocks, only for internal nodes
+ if (node->height > 0) {
+ for (int i = 0; i < node->n_children; i++) {
+ wbuf_nocrc_BLOCKNUM(&wb, BP_BLOCKNUM(node,i));
+ }
+ }
+
+ uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
+ wbuf_nocrc_int(&wb, end_to_end_checksum);
+ invariant(wb.ndone == wb.size);
+ invariant(sb->uncompressed_size==wb.ndone);
+}
+
+// This is the size of the uncompressed data, not including the compression headers
+unsigned int
+toku_serialize_ftnode_size (FTNODE node) {
+ unsigned int result = 0;
+ //
+ // As of now, this seems to be called if and only if the entire node is supposed
+ // to be in memory, so we will assert it.
+ //
+ toku_ftnode_assert_fully_in_memory(node);
+ result += serialize_node_header_size(node);
+ result += serialize_ftnode_info_size(node);
+ for (int i = 0; i < node->n_children; i++) {
+ result += serialize_ftnode_partition_size(node,i);
+ }
+ return result;
+}
+
+struct serialize_times {
+ tokutime_t serialize_time;
+ tokutime_t compress_time;
+};
+
+static void
+serialize_and_compress_partition(FTNODE node,
+ int childnum,
+ enum toku_compression_method compression_method,
+ SUB_BLOCK sb,
+ struct serialize_times *st)
+{
+ // serialize, compress, update status
+ tokutime_t t0 = toku_time_now();
+ serialize_ftnode_partition(node, childnum, sb);
+ tokutime_t t1 = toku_time_now();
+ compress_ftnode_sub_block(sb, compression_method);
+ tokutime_t t2 = toku_time_now();
+
+ st->serialize_time += t1 - t0;
+ st->compress_time += t2 - t1;
+}
+
+void
+toku_create_compressed_partition_from_available(
+ FTNODE node,
+ int childnum,
+ enum toku_compression_method compression_method,
+ SUB_BLOCK sb
+ )
+{
+ tokutime_t t0 = toku_time_now();
+
+ // serialize
+ sb->uncompressed_size = serialize_ftnode_partition_size(node, childnum);
+ toku::scoped_malloc uncompressed_buf(sb->uncompressed_size);
+ sb->uncompressed_ptr = uncompressed_buf.get();
+ serialize_ftnode_partition(node, childnum, sb);
+
+ tokutime_t t1 = toku_time_now();
+
+ // compress. no need to pad with extra bytes for sizes/xsum - we're not storing them
+ set_compressed_size_bound(sb, compression_method);
+ sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound);
+ sb->compressed_size = compress_nocrc_sub_block(
+ sb,
+ sb->compressed_ptr,
+ sb->compressed_size_bound,
+ compression_method
+ );
+ sb->uncompressed_ptr = NULL;
+
+ tokutime_t t2 = toku_time_now();
+
+ toku_ft_status_update_serialize_times(node, t1 - t0, t2 - t1);
+}
+
+static void
+serialize_and_compress_serially(FTNODE node,
+ int npartitions,
+ enum toku_compression_method compression_method,
+ struct sub_block sb[],
+ struct serialize_times *st) {
+ for (int i = 0; i < npartitions; i++) {
+ serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
+ }
+}
+
+struct serialize_compress_work {
+ struct work base;
+ FTNODE node;
+ int i;
+ enum toku_compression_method compression_method;
+ struct sub_block *sb;
+ struct serialize_times st;
+};
+
+static void *
+serialize_and_compress_worker(void *arg) {
+ struct workset *ws = (struct workset *) arg;
+ while (1) {
+ struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
+ if (w == NULL)
+ break;
+ int i = w->i;
+ serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
+ }
+ workset_release_ref(ws);
+ return arg;
+}
+
+static void
+serialize_and_compress_in_parallel(FTNODE node,
+ int npartitions,
+ enum toku_compression_method compression_method,
+ struct sub_block sb[],
+ struct serialize_times *st) {
+ if (npartitions == 1) {
+ serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
+ } else {
+ int T = num_cores;
+ if (T > npartitions)
+ T = npartitions;
+ if (T > 0)
+ T = T - 1;
+ struct workset ws;
+ ZERO_STRUCT(ws);
+ workset_init(&ws);
+ struct serialize_compress_work work[npartitions];
+ workset_lock(&ws);
+ for (int i = 0; i < npartitions; i++) {
+ work[i] = (struct serialize_compress_work) { .base = {{NULL, NULL}},
+ .node = node,
+ .i = i,
+ .compression_method = compression_method,
+ .sb = sb,
+ .st = { .serialize_time = 0, .compress_time = 0} };
+ workset_put_locked(&ws, &work[i].base);
+ }
+ workset_unlock(&ws);
+ toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
+ workset_add_ref(&ws, T);
+ serialize_and_compress_worker(&ws);
+ workset_join(&ws);
+ workset_destroy(&ws);
+
+ // gather up the statistics from each thread's work item
+ for (int i = 0; i < npartitions; i++) {
+ st->serialize_time += work[i].st.serialize_time;
+ st->compress_time += work[i].st.compress_time;
+ }
+ }
+}
+
+static void
+serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb,
+ enum toku_compression_method compression_method, struct serialize_times *st) {
+ // serialize, compress, update serialize times.
+ tokutime_t t0 = toku_time_now();
+ serialize_ftnode_info(node, sb);
+ tokutime_t t1 = toku_time_now();
+ compress_ftnode_sub_block(sb, compression_method);
+ tokutime_t t2 = toku_time_now();
+
+ st->serialize_time += t1 - t0;
+ st->compress_time += t2 - t1;
+}
+
+int toku_serialize_ftnode_to_memory(FTNODE node,
+ FTNODE_DISK_DATA* ndd,
+ unsigned int basementnodesize,
+ enum toku_compression_method compression_method,
+ bool do_rebalancing,
+ bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
+ /*out*/ size_t *n_bytes_to_write,
+ /*out*/ size_t *n_uncompressed_bytes,
+ /*out*/ char **bytes_to_write)
+// Effect: Writes out each child to a separate malloc'd buffer, then compresses
+// all of them, and writes the uncompressed header, to bytes_to_write,
+// which is malloc'd.
+//
+// The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
+// 512-byte padding is for O_DIRECT to work.
+{
+ toku_ftnode_assert_fully_in_memory(node);
+
+ if (do_rebalancing && node->height == 0) {
+ toku_ftnode_leaf_rebalance(node, basementnodesize);
+ }
+ const int npartitions = node->n_children;
+
+ // Each partition represents a compressed sub block
+ // For internal nodes, a sub block is a message buffer
+ // For leaf nodes, a sub block is a basement node
+ toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
+ struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
+ XREALLOC_N(npartitions, *ndd);
+
+ //
+ // First, let's serialize and compress the individual sub blocks
+ //
+
+ // determine how large our serialization and compression buffers need to be.
+ size_t serialize_buf_size = 0, compression_buf_size = 0;
+ for (int i = 0; i < node->n_children; i++) {
+ sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
+ sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
+ serialize_buf_size += sb[i].uncompressed_size;
+ compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
+ }
+
+ // give each sub block a base pointer to enough buffer space for serialization and compression
+ toku::scoped_malloc serialize_buf(serialize_buf_size);
+ toku::scoped_malloc compression_buf(compression_buf_size);
+ for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
+ sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
+ sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
+ uncompressed_offset += sb[i].uncompressed_size;
+ compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
+ invariant(uncompressed_offset <= serialize_buf_size);
+ invariant(compressed_offset <= compression_buf_size);
+ }
+
+ // do the actual serialization now that we have buffer space
+ struct serialize_times st = { 0, 0 };
+ if (in_parallel) {
+ serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
+ } else {
+ serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
+ }
+
+ //
+ // Now lets create a sub-block that has the common node information,
+ // This does NOT include the header
+ //
+
+ // determine how large our serialization and copmression buffers need to be
+ struct sub_block sb_node_info;
+ sub_block_init(&sb_node_info);
+ size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
+ size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
+ toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
+ toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
+ sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
+ sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
+ sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
+ sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
+
+ // do the actual serialization now that we have buffer space
+ serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
+
+ //
+ // At this point, we have compressed each of our pieces into individual sub_blocks,
+ // we can put the header and all the subblocks into a single buffer and return it.
+ //
+
+ // update the serialize times, ignore the header for simplicity. we captured all
+ // of the partitions' serialize times so that's probably good enough.
+ toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
+
+ // The total size of the node is:
+ // size of header + disk size of the n+1 sub_block's created above
+ uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
+ + sb_node_info.compressed_size // compressed nodeinfo (without its checksum)
+ + 4); // nodeinfo's checksum
+ uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
+ + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum)
+ + 4); // nodeinfo's checksum
+ // store the BP_SIZESs
+ for (int i = 0; i < node->n_children; i++) {
+ uint32_t len = sb[i].compressed_size + 4; // data and checksum
+ BP_SIZE (*ndd,i) = len;
+ BP_START(*ndd,i) = total_node_size;
+ total_node_size += sb[i].compressed_size + 4;
+ total_uncompressed_size += sb[i].uncompressed_size + 4;
+ }
+
+ // now create the final serialized node
+ uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
+ char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
+ char *curr_ptr = data;
+
+ // write the header
+ struct wbuf wb;
+ wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
+ serialize_node_header(node, *ndd, &wb);
+ assert(wb.ndone == wb.size);
+ curr_ptr += serialize_node_header_size(node);
+
+ // now write sb_node_info
+ memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
+ curr_ptr += sb_node_info.compressed_size;
+ // write the checksum
+ *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
+ curr_ptr += sizeof(sb_node_info.xsum);
+
+ for (int i = 0; i < npartitions; i++) {
+ memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
+ curr_ptr += sb[i].compressed_size;
+ // write the checksum
+ *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
+ curr_ptr += sizeof(sb[i].xsum);
+ }
+ // Zero the rest of the buffer
+ memset(data + total_node_size, 0, total_buffer_size - total_node_size);
+
+ assert((uint32_t) (curr_ptr - data) == total_node_size);
+ *bytes_to_write = data;
+ *n_bytes_to_write = total_buffer_size;
+ *n_uncompressed_bytes = total_uncompressed_size;
+
+ invariant(*n_bytes_to_write % 512 == 0);
+ invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
+ return 0;
+}
+
+int toku_serialize_ftnode_to(int fd,
+ BLOCKNUM blocknum,
+ FTNODE node,
+ FTNODE_DISK_DATA *ndd,
+ bool do_rebalancing,
+ FT ft,
+ bool for_checkpoint) {
+ size_t n_to_write;
+ size_t n_uncompressed_bytes;
+ char *compressed_buf = nullptr;
+
+ // because toku_serialize_ftnode_to is only called for
+ // in toku_ftnode_flush_callback, we pass false
+ // for in_parallel. The reasoning is that when we write
+ // nodes to disk via toku_ftnode_flush_callback, we
+ // assume that it is being done on a non-critical
+ // background thread (probably for checkpointing), and therefore
+ // should not hog CPU,
+ //
+ // Should the above facts change, we may want to revisit
+ // passing false for in_parallel here
+ //
+ // alternatively, we could have made in_parallel a parameter
+ // for toku_serialize_ftnode_to, but instead we did this.
+ int r = toku_serialize_ftnode_to_memory(
+ node,
+ ndd,
+ ft->h->basementnodesize,
+ ft->h->compression_method,
+ do_rebalancing,
+ toku_unsafe_fetch(&toku_serialize_in_parallel),
+ &n_to_write,
+ &n_uncompressed_bytes,
+ &compressed_buf);
+ if (r != 0) {
+ return r;
+ }
+
+ // If the node has never been written, then write the whole buffer,
+ // including the zeros
+ invariant(blocknum.b >= 0);
+ DISKOFF offset;
+
+ // Dirties the ft
+ ft->blocktable.realloc_on_disk(
+ blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
+
+ tokutime_t t0 = toku_time_now();
+ toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
+ tokutime_t t1 = toku_time_now();
+
+ tokutime_t io_time = t1 - t0;
+ toku_ft_status_update_flush_reason(
+ node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
+
+ toku_free(compressed_buf);
+ node->clear_dirty(); // See #1957. Must set the node to be clean after
+ // serializing it so that it doesn't get written again on
+ // the next checkpoint or eviction.
+ if (node->height == 0) {
+ for (int i = 0; i < node->n_children; i++) {
+ if (BP_STATE(node, i) == PT_AVAIL) {
+ BLB_LRD(node, i) = 0;
+ }
+ }
+ }
+ return 0;
+}
+
+static void
+sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc,
+ const toku::comparator &cmp,
+ int32_t **fresh_offsets, int32_t nfresh,
+ int32_t **stale_offsets, int32_t nstale,
+ int32_t **broadcast_offsets, int32_t nbroadcast) {
+ // We always have fresh / broadcast offsets (even if they are empty)
+ // but we may not have stale offsets, in the case of v13 upgrade.
+ invariant(fresh_offsets != nullptr);
+ invariant(broadcast_offsets != nullptr);
+ invariant(cmp.valid());
+
+ typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> msn_sort;
+
+ const int32_t n_in_this_buffer = nfresh + nstale + nbroadcast;
+ struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer);
+ msn_sort::mergesort_r(*fresh_offsets, nfresh, extra);
+ bnc->fresh_message_tree.destroy();
+ bnc->fresh_message_tree.create_steal_sorted_array(fresh_offsets, nfresh, n_in_this_buffer);
+ if (stale_offsets) {
+ msn_sort::mergesort_r(*stale_offsets, nstale, extra);
+ bnc->stale_message_tree.destroy();
+ bnc->stale_message_tree.create_steal_sorted_array(stale_offsets, nstale, n_in_this_buffer);
+ }
+ bnc->broadcast_list.destroy();
+ bnc->broadcast_list.create_steal_sorted_array(broadcast_offsets, nbroadcast, n_in_this_buffer);
+}
+
+static MSN
+deserialize_child_buffer_v13(FT ft, NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
+ // We skip 'stale' offsets for upgraded nodes.
+ int32_t nfresh = 0, nbroadcast = 0;
+ int32_t *fresh_offsets = nullptr, *broadcast_offsets = nullptr;
+
+ // Only sort buffers if we have a valid comparison function. In certain scenarios,
+ // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
+ // for simple inspection and don't actually require that the message buffers are
+ // properly sorted. This is very ugly, but correct.
+ const bool sort = ft->cmp.valid();
+
+ MSN highest_msn_in_this_buffer =
+ bnc->msg_buffer.deserialize_from_rbuf_v13(rb, &ft->h->highest_unused_msn_for_upgrade,
+ sort ? &fresh_offsets : nullptr, &nfresh,
+ sort ? &broadcast_offsets : nullptr, &nbroadcast);
+
+ if (sort) {
+ sort_and_steal_offset_arrays(bnc, ft->cmp,
+ &fresh_offsets, nfresh,
+ nullptr, 0, // no stale offsets
+ &broadcast_offsets, nbroadcast);
+ }
+
+ return highest_msn_in_this_buffer;
+}
+
+static void
+deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rb, const toku::comparator &cmp) {
+ int32_t nfresh = 0, nstale = 0, nbroadcast = 0;
+ int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets;
+
+ // Only sort buffers if we have a valid comparison function. In certain scenarios,
+ // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
+ // for simple inspection and don't actually require that the message buffers are
+ // properly sorted. This is very ugly, but correct.
+ const bool sort = cmp.valid();
+
+ // read in the message buffer
+ bnc->msg_buffer.deserialize_from_rbuf(rb,
+ sort ? &fresh_offsets : nullptr, &nfresh,
+ sort ? &stale_offsets : nullptr, &nstale,
+ sort ? &broadcast_offsets : nullptr, &nbroadcast);
+
+ if (sort) {
+ sort_and_steal_offset_arrays(bnc, cmp,
+ &fresh_offsets, nfresh,
+ &stale_offsets, nstale,
+ &broadcast_offsets, nbroadcast);
+ }
+}
+
+static void
+deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
+ // read in the message buffer
+ bnc->msg_buffer.deserialize_from_rbuf(rb,
+ nullptr, nullptr, // fresh_offsets, nfresh,
+ nullptr, nullptr, // stale_offsets, nstale,
+ nullptr, nullptr); // broadcast_offsets, nbroadcast
+
+ // read in each message tree (fresh, stale, broadcast)
+ int32_t nfresh = rbuf_int(rb);
+ int32_t *XMALLOC_N(nfresh, fresh_offsets);
+ for (int i = 0; i < nfresh; i++) {
+ fresh_offsets[i] = rbuf_int(rb);
+ }
+
+ int32_t nstale = rbuf_int(rb);
+ int32_t *XMALLOC_N(nstale, stale_offsets);
+ for (int i = 0; i < nstale; i++) {
+ stale_offsets[i] = rbuf_int(rb);
+ }
+
+ int32_t nbroadcast = rbuf_int(rb);
+ int32_t *XMALLOC_N(nbroadcast, broadcast_offsets);
+ for (int i = 0; i < nbroadcast; i++) {
+ broadcast_offsets[i] = rbuf_int(rb);
+ }
+
+ // build OMTs out of each offset array
+ bnc->fresh_message_tree.destroy();
+ bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh);
+ bnc->stale_message_tree.destroy();
+ bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale);
+ bnc->broadcast_list.destroy();
+ bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast);
+}
+
+// dump a buffer to stderr
+// no locking around this for now
+void
+dump_bad_block(unsigned char *vp, uint64_t size) {
+ const uint64_t linesize = 64;
+ uint64_t n = size / linesize;
+ for (uint64_t i = 0; i < n; i++) {
+ fprintf(stderr, "%p: ", vp);
+ for (uint64_t j = 0; j < linesize; j++) {
+ unsigned char c = vp[j];
+ fprintf(stderr, "%2.2X", c);
+ }
+ fprintf(stderr, "\n");
+ vp += linesize;
+ }
+ size = size % linesize;
+ for (uint64_t i=0; i<size; i++) {
+ if ((i % linesize) == 0)
+ fprintf(stderr, "%p: ", vp+i);
+ fprintf(stderr, "%2.2X", vp[i]);
+ if (((i+1) % linesize) == 0)
+ fprintf(stderr, "\n");
+ }
+ fprintf(stderr, "\n");
+}
+
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+
+BASEMENTNODE toku_create_empty_bn(void) {
+ BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
+ bn->data_buffer.initialize_empty();
+ return bn;
+}
+
+BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) {
+ BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
+ bn->max_msn_applied = orig_bn->max_msn_applied;
+ bn->seqinsert = orig_bn->seqinsert;
+ bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied;
+ bn->stat64_delta = orig_bn->stat64_delta;
+ bn->logical_rows_delta = orig_bn->logical_rows_delta;
+ bn->data_buffer.clone(&orig_bn->data_buffer);
+ return bn;
+}
+
+BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
+ BASEMENTNODE XMALLOC(bn);
+ bn->max_msn_applied.msn = 0;
+ bn->seqinsert = 0;
+ bn->stale_ancestor_messages_applied = false;
+ bn->stat64_delta = ZEROSTATS;
+ bn->logical_rows_delta = 0;
+ bn->data_buffer.init_zero();
+ return bn;
+}
+
+NONLEAF_CHILDINFO toku_create_empty_nl(void) {
+ NONLEAF_CHILDINFO XMALLOC(cn);
+ cn->msg_buffer.create();
+ cn->fresh_message_tree.create_no_array();
+ cn->stale_message_tree.create_no_array();
+ cn->broadcast_list.create_no_array();
+ memset(cn->flow, 0, sizeof cn->flow);
+ return cn;
+}
+
+// must clone the OMTs, since we serialize them along with the message buffer
+NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
+ NONLEAF_CHILDINFO XMALLOC(cn);
+ cn->msg_buffer.clone(&orig_childinfo->msg_buffer);
+ cn->fresh_message_tree.create_no_array();
+ cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree);
+ cn->stale_message_tree.create_no_array();
+ cn->stale_message_tree.clone(orig_childinfo->stale_message_tree);
+ cn->broadcast_list.create_no_array();
+ cn->broadcast_list.clone(orig_childinfo->broadcast_list);
+ memset(cn->flow, 0, sizeof cn->flow);
+ return cn;
+}
+
+void destroy_basement_node (BASEMENTNODE bn)
+{
+ bn->data_buffer.destroy();
+ toku_free(bn);
+}
+
+void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
+{
+ nl->msg_buffer.destroy();
+ nl->fresh_message_tree.destroy();
+ nl->stale_message_tree.destroy();
+ nl->broadcast_list.destroy();
+ toku_free(nl);
+}
+
+void read_block_from_fd_into_rbuf(
+ int fd,
+ BLOCKNUM blocknum,
+ FT ft,
+ struct rbuf *rb
+ )
+{
+ // get the file offset and block size for the block
+ DISKOFF offset, size;
+ ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
+ DISKOFF size_aligned = roundup_to_multiple(512, size);
+ uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
+ rbuf_init(rb, raw_block, size);
+ // read the block
+ ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
+ assert((DISKOFF)rlen >= size);
+ assert((DISKOFF)rlen <= size_aligned);
+}
+
+static const int read_header_heuristic_max = 32*1024;
+
+#ifndef MIN
+#define MIN(a,b) (((a)>(b)) ? (b) : (a))
+#endif
+
+// Effect: If the header part of the node is small enough, then read it into the rbuf. The rbuf will be allocated to be big enough in any case.
+static void read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd, BLOCKNUM blocknum,
+ FT ft, struct rbuf *rb,
+ ftnode_fetch_extra *bfe) {
+ DISKOFF offset, size;
+ ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
+ DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size));
+ uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block);
+ rbuf_init(rb, raw_block, read_size);
+
+ // read the block
+ tokutime_t t0 = toku_time_now();
+ ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
+ tokutime_t t1 = toku_time_now();
+
+ assert(rlen >= 0);
+ rbuf_init(rb, raw_block, rlen);
+
+ bfe->bytes_read = rlen;
+ bfe->io_time = t1 - t0;
+ toku_ft_status_update_pivot_fetch_reason(bfe);
+}
+
+//
+// read the compressed partition into the sub_block,
+// validate the checksum of the compressed data
+//
+int
+read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
+{
+ int r = 0;
+ sb->compressed_size = rbuf_int(rb);
+ sb->uncompressed_size = rbuf_int(rb);
+ const void **cp = (const void **) &sb->compressed_ptr;
+ rbuf_literal_bytes(rb, cp, sb->compressed_size);
+ sb->xsum = rbuf_int(rb);
+ // let's check the checksum
+ uint32_t actual_xsum = toku_x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size);
+ if (sb->xsum != actual_xsum) {
+ r = TOKUDB_BAD_CHECKSUM;
+ }
+ return r;
+}
+
+static int
+read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
+{
+ int r = 0;
+ r = read_compressed_sub_block(rb, sb);
+ if (r != 0) {
+ goto exit;
+ }
+
+ just_decompress_sub_block(sb);
+exit:
+ return r;
+}
+
+// Allocates space for the sub-block and de-compresses the data from
+// the supplied compressed pointer..
+void
+just_decompress_sub_block(struct sub_block *sb)
+{
+ // <CER> TODO: Add assert that the subblock was read in.
+ sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
+
+ toku_decompress(
+ (Bytef *) sb->uncompressed_ptr,
+ sb->uncompressed_size,
+ (Bytef *) sb->compressed_ptr,
+ sb->compressed_size
+ );
+}
+
+// verify the checksum
+int verify_ftnode_sub_block(struct sub_block *sb,
+ const char *fname,
+ BLOCKNUM blocknum) {
+ int r = 0;
+ // first verify the checksum
+ uint32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
+ uint32_t stored_xsum = toku_dtoh32(*((uint32_t *)((char *)sb->uncompressed_ptr + data_size)));
+ uint32_t actual_xsum = toku_x1764_memory(sb->uncompressed_ptr, data_size);
+ if (stored_xsum != actual_xsum) {
+ fprintf(
+ stderr,
+ "%s:%d:verify_ftnode_sub_block - "
+ "file[%s], blocknum[%lld], stored_xsum[%u] != actual_xsum[%u]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ stored_xsum,
+ actual_xsum);
+ dump_bad_block((Bytef *) sb->uncompressed_ptr, sb->uncompressed_size);
+ r = TOKUDB_BAD_CHECKSUM;
+ }
+ return r;
+}
+
+// This function deserializes the data stored by serialize_ftnode_info
+static int deserialize_ftnode_info(struct sub_block *sb, FTNODE node) {
+
+ // sb_node_info->uncompressed_ptr stores the serialized node information
+ // this function puts that information into node
+
+ // first verify the checksum
+ int r = 0;
+ const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
+ r = verify_ftnode_sub_block(sb, fname, node->blocknum);
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_info - "
+ "file[%s], blocknum[%lld], verify_ftnode_sub_block failed with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ r);
+ dump_bad_block(static_cast<unsigned char *>(sb->uncompressed_ptr),
+ sb->uncompressed_size);
+ goto exit;
+ }
+
+ uint32_t data_size;
+ data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
+
+ // now with the data verified, we can read the information into the node
+ struct rbuf rb;
+ rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
+
+ node->max_msn_applied_to_node_on_disk = rbuf_MSN(&rb);
+ (void)rbuf_int(&rb);
+ node->flags = rbuf_int(&rb);
+ node->height = rbuf_int(&rb);
+ if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) {
+ (void) rbuf_int(&rb); // optimized_for_upgrade
+ }
+ if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) {
+ rbuf_TXNID(&rb, &node->oldest_referenced_xid_known);
+ }
+
+ // now create the basement nodes or childinfos, depending on whether this is a
+ // leaf node or internal node
+ // now the subtree_estimates
+
+ // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf.
+
+ // now the pivots
+ if (node->n_children > 1) {
+ node->pivotkeys.deserialize_from_rbuf(&rb, node->n_children - 1);
+ } else {
+ node->pivotkeys.create_empty();
+ }
+
+ // if this is an internal node, unpack the block nums, and fill in necessary fields
+ // of childinfo
+ if (node->height > 0) {
+ for (int i = 0; i < node->n_children; i++) {
+ BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
+ BP_WORKDONE(node, i) = 0;
+ }
+ }
+
+ // make sure that all the data was read
+ if (data_size != rb.ndone) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_info - "
+ "file[%s], blocknum[%lld], data_size[%d] != rb.ndone[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ data_size,
+ rb.ndone);
+ dump_bad_block(rb.buf, rb.size);
+ abort();
+ }
+exit:
+ return r;
+}
+
+static void
+setup_available_ftnode_partition(FTNODE node, int i) {
+ if (node->height == 0) {
+ set_BLB(node, i, toku_create_empty_bn());
+ BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk;
+ }
+ else {
+ set_BNC(node, i, toku_create_empty_nl());
+ }
+}
+
+// Assign the child_to_read member of the bfe from the given ftnode
+// that has been brought into memory.
+static void
+update_bfe_using_ftnode(FTNODE node, ftnode_fetch_extra *bfe)
+{
+ if (bfe->type == ftnode_fetch_subset && bfe->search != NULL) {
+ // we do not take into account prefetching yet
+ // as of now, if we need a subset, the only thing
+ // we can possibly require is a single basement node
+ // we find out what basement node the query cares about
+ // and check if it is available
+ bfe->child_to_read = toku_ft_search_which_child(
+ bfe->ft->cmp,
+ node,
+ bfe->search
+ );
+ } else if (bfe->type == ftnode_fetch_keymatch) {
+ // we do not take into account prefetching yet
+ // as of now, if we need a subset, the only thing
+ // we can possibly require is a single basement node
+ // we find out what basement node the query cares about
+ // and check if it is available
+ if (node->height == 0) {
+ int left_child = bfe->leftmost_child_wanted(node);
+ int right_child = bfe->rightmost_child_wanted(node);
+ if (left_child == right_child) {
+ bfe->child_to_read = left_child;
+ }
+ }
+ }
+}
+
+// Using the search parameters in the bfe, this function will
+// initialize all of the given ftnode's partitions.
+static void
+setup_partitions_using_bfe(FTNODE node,
+ ftnode_fetch_extra *bfe,
+ bool data_in_memory)
+{
+ // Leftmost and Rightmost Child bounds.
+ int lc, rc;
+ if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) {
+ lc = bfe->leftmost_child_wanted(node);
+ rc = bfe->rightmost_child_wanted(node);
+ } else {
+ lc = -1;
+ rc = -1;
+ }
+
+ //
+ // setup memory needed for the node
+ //
+ //printf("node height %d, blocknum %" PRId64 ", type %d lc %d rc %d\n", node->height, node->blocknum.b, bfe->type, lc, rc);
+ for (int i = 0; i < node->n_children; i++) {
+ BP_INIT_UNTOUCHED_CLOCK(node,i);
+ if (data_in_memory) {
+ BP_STATE(node, i) = ((bfe->wants_child_available(i) || (lc <= i && i <= rc))
+ ? PT_AVAIL : PT_COMPRESSED);
+ } else {
+ BP_STATE(node, i) = PT_ON_DISK;
+ }
+ BP_WORKDONE(node,i) = 0;
+
+ switch (BP_STATE(node,i)) {
+ case PT_AVAIL:
+ setup_available_ftnode_partition(node, i);
+ BP_TOUCH_CLOCK(node,i);
+ break;
+ case PT_COMPRESSED:
+ set_BSB(node, i, sub_block_creat());
+ break;
+ case PT_ON_DISK:
+ set_BNULL(node, i);
+ break;
+ case PT_INVALID:
+ abort();
+ }
+ }
+}
+
+static void setup_ftnode_partitions(FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory)
+// Effect: Used when reading a ftnode into main memory, this sets up the partitions.
+// We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations).
+// Arguments: Node: the node to set up.
+// bfe: Describes the key range needed.
+// data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe.
+// false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK.
+{
+ // Set bfe->child_to_read.
+ update_bfe_using_ftnode(node, bfe);
+
+ // Setup the partitions.
+ setup_partitions_using_bfe(node, bfe, data_in_memory);
+}
+
+/* deserialize the partition from the sub-block's uncompressed buffer
+ * and destroy the uncompressed buffer
+ */
+static int deserialize_ftnode_partition(
+ struct sub_block *sb,
+ FTNODE node,
+ int childnum, // which partition to deserialize
+ const toku::comparator &cmp) {
+
+ int r = 0;
+ const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
+ r = verify_ftnode_sub_block(sb, fname, node->blocknum);
+ if (r != 0) {
+ fprintf(stderr,
+ "%s:%d:deserialize_ftnode_partition - "
+ "file[%s], blocknum[%lld], "
+ "verify_ftnode_sub_block failed with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ r);
+ goto exit;
+ }
+ uint32_t data_size;
+ data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
+
+ // now with the data verified, we can read the information into the node
+ struct rbuf rb;
+ rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
+ unsigned char ch;
+ ch = rbuf_char(&rb);
+
+ if (node->height > 0) {
+ if (ch != FTNODE_PARTITION_MSG_BUFFER) {
+ fprintf(stderr,
+ "%s:%d:deserialize_ftnode_partition - "
+ "file[%s], blocknum[%lld], ch[%d] != "
+ "FTNODE_PARTITION_MSG_BUFFER[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ ch,
+ FTNODE_PARTITION_MSG_BUFFER);
+ dump_bad_block(rb.buf, rb.size);
+ assert(ch == FTNODE_PARTITION_MSG_BUFFER);
+ }
+ NONLEAF_CHILDINFO bnc = BNC(node, childnum);
+ if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) {
+ // Layout version <= 26 did not serialize sorted message trees to disk.
+ deserialize_child_buffer_v26(bnc, &rb, cmp);
+ } else {
+ deserialize_child_buffer(bnc, &rb);
+ }
+ BP_WORKDONE(node, childnum) = 0;
+ } else {
+ if (ch != FTNODE_PARTITION_DMT_LEAVES) {
+ fprintf(stderr,
+ "%s:%d:deserialize_ftnode_partition - "
+ "file[%s], blocknum[%lld], ch[%d] != "
+ "FTNODE_PARTITION_DMT_LEAVES[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ ch,
+ FTNODE_PARTITION_DMT_LEAVES);
+ dump_bad_block(rb.buf, rb.size);
+ assert(ch == FTNODE_PARTITION_DMT_LEAVES);
+ }
+
+ BLB_SEQINSERT(node, childnum) = 0;
+ uint32_t num_entries = rbuf_int(&rb);
+ // we are now at the first byte of first leafentry
+ data_size -= rb.ndone; // remaining bytes of leafentry data
+
+ BASEMENTNODE bn = BLB(node, childnum);
+ bn->data_buffer.deserialize_from_rbuf(
+ num_entries, &rb, data_size, node->layout_version_read_from_disk);
+ }
+ if (rb.ndone != rb.size) {
+ fprintf(stderr,
+ "%s:%d:deserialize_ftnode_partition - "
+ "file[%s], blocknum[%lld], rb.ndone[%d] != rb.size[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ rb.ndone,
+ rb.size);
+ dump_bad_block(rb.buf, rb.size);
+ assert(rb.ndone == rb.size);
+ }
+
+exit:
+ return r;
+}
+
+static int decompress_and_deserialize_worker(struct rbuf curr_rbuf,
+ struct sub_block curr_sb,
+ FTNODE node,
+ int child,
+ const toku::comparator &cmp,
+ tokutime_t *decompress_time) {
+ int r = 0;
+ tokutime_t t0 = toku_time_now();
+ r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
+ if (r != 0) {
+ const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
+ fprintf(stderr,
+ "%s:%d:decompress_and_deserialize_worker - "
+ "file[%s], blocknum[%lld], read_and_decompress_sub_block failed "
+ "with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ r);
+ dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
+ goto exit;
+ }
+ *decompress_time = toku_time_now() - t0;
+ // at this point, sb->uncompressed_ptr stores the serialized node partition
+ r = deserialize_ftnode_partition(&curr_sb, node, child, cmp);
+ if (r != 0) {
+ const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
+ fprintf(stderr,
+ "%s:%d:decompress_and_deserialize_worker - "
+ "file[%s], blocknum[%lld], deserialize_ftnode_partition failed "
+ "with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ r);
+ dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
+ goto exit;
+ }
+
+exit:
+ toku_free(curr_sb.uncompressed_ptr);
+ return r;
+}
+
+static int check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf,
+ struct sub_block curr_sb,
+ FTNODE node,
+ int child) {
+ int r = 0;
+ r = read_compressed_sub_block(&curr_rbuf, &curr_sb);
+ if (r != 0) {
+ goto exit;
+ }
+
+ SUB_BLOCK bp_sb;
+ bp_sb = BSB(node, child);
+ bp_sb->compressed_size = curr_sb.compressed_size;
+ bp_sb->uncompressed_size = curr_sb.uncompressed_size;
+ bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
+ memcpy(
+ bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
+exit:
+ return r;
+}
+
+static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) {
+// Effect: Allocate an FTNODE and fill in the values that are not read from
+ FTNODE XMALLOC(node);
+ node->fullhash = fullhash;
+ node->blocknum = blocknum;
+ node->clear_dirty();
+ node->oldest_referenced_xid_known = TXNID_NONE;
+ node->bp = nullptr;
+ node->ct_pair = nullptr;
+ return node;
+}
+
+static int deserialize_ftnode_header_from_rbuf_if_small_enough(
+ FTNODE *ftnode,
+ FTNODE_DISK_DATA *ndd,
+ BLOCKNUM blocknum,
+ uint32_t fullhash,
+ ftnode_fetch_extra *bfe,
+ struct rbuf *rb,
+ int fd)
+// If we have enough information in the rbuf to construct a header, then do so.
+// Also fetch in the basement node if needed.
+// Return 0 if it worked. If something goes wrong (including that we are
+// looking at some old data format that doesn't have partitions) then return
+// nonzero.
+{
+ int r = 0;
+
+ tokutime_t t0, t1;
+ tokutime_t decompress_time = 0;
+ tokutime_t deserialize_time = 0;
+ // we must get the name from bfe and not through
+ // toku_ftnode_get_cachefile_fname_in_env as the node is not set up yet
+ const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
+
+ t0 = toku_time_now();
+
+ FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
+
+ if (rb->size < 24) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], rb->size[%u] < 24\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ rb->size);
+ dump_bad_block(rb->buf, rb->size);
+ // TODO: What error do we return here?
+ // Does it even matter?
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+
+ const void *magic;
+ rbuf_literal_bytes(rb, &magic, 8);
+ if (memcmp(magic, "tokuleaf", 8) != 0 &&
+ memcmp(magic, "tokunode", 8) != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], unrecognized magic number "
+ "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ static_cast<const uint8_t*>(magic)[0],
+ static_cast<const uint8_t*>(magic)[1],
+ static_cast<const uint8_t*>(magic)[2],
+ static_cast<const uint8_t*>(magic)[3],
+ static_cast<const uint8_t*>(magic)[4],
+ static_cast<const uint8_t*>(magic)[5],
+ static_cast<const uint8_t*>(magic)[6],
+ static_cast<const uint8_t*>(magic)[7]);
+ dump_bad_block(rb->buf, rb->size);
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+
+ node->layout_version_read_from_disk = rbuf_int(rb);
+ if (node->layout_version_read_from_disk <
+ FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], node->layout_version_read_from_disk[%d] "
+ "< FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ node->layout_version_read_from_disk,
+ FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES);
+ dump_bad_block(rb->buf, rb->size);
+ // This code path doesn't have to worry about upgrade.
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+
+ // If we get here, we know the node is at least
+ // FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES. We haven't changed
+ // the serialization format since then (this comment is correct as of
+ // version 20, which is Deadshot) so we can go ahead and say the
+ // layout version is current (it will be as soon as we finish
+ // deserializing).
+ // TODO(leif): remove node->layout_version (#5174)
+ node->layout_version = FT_LAYOUT_VERSION;
+
+ node->layout_version_original = rbuf_int(rb);
+ node->build_id = rbuf_int(rb);
+ node->n_children = rbuf_int(rb);
+ // Guaranteed to be have been able to read up to here. If n_children
+ // is too big, we may have a problem, so check that we won't overflow
+ // while reading the partition locations.
+ unsigned int nhsize;
+ // we can do this because n_children is filled in.
+ nhsize = serialize_node_header_size(node);
+ unsigned int needed_size;
+ // we need 12 more so that we can read the compressed block size information
+ // that follows for the nodeinfo.
+ needed_size = nhsize + 12;
+ if (needed_size > rb->size) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], needed_size[%d] > rb->size[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ needed_size,
+ rb->size);
+ dump_bad_block(rb->buf, rb->size);
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+
+ XMALLOC_N(node->n_children, node->bp);
+ XMALLOC_N(node->n_children, *ndd);
+ // read the partition locations
+ for (int i=0; i<node->n_children; i++) {
+ BP_START(*ndd,i) = rbuf_int(rb);
+ BP_SIZE (*ndd,i) = rbuf_int(rb);
+ }
+
+ uint32_t checksum;
+ checksum = toku_x1764_memory(rb->buf, rb->ndone);
+ uint32_t stored_checksum;
+ stored_checksum = rbuf_int(rb);
+ if (stored_checksum != checksum) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], stored_checksum[%d] != checksum[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ stored_checksum,
+ checksum);
+ dump_bad_block(rb->buf, rb->size);
+ r = TOKUDB_BAD_CHECKSUM;
+ goto cleanup;
+ }
+
+ // Now we want to read the pivot information.
+ struct sub_block sb_node_info;
+ sub_block_init(&sb_node_info);
+ // we'll be able to read these because we checked the size earlier.
+ sb_node_info.compressed_size = rbuf_int(rb);
+ sb_node_info.uncompressed_size = rbuf_int(rb);
+ if (rb->size - rb->ndone < sb_node_info.compressed_size + 8) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], rb->size[%d] - rb->ndone[%d] < "
+ "sb_node_info.compressed_size[%d] + 8\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ rb->size,
+ rb->ndone,
+ sb_node_info.compressed_size);
+ dump_bad_block(rb->buf, rb->size);
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+
+ // Finish reading compressed the sub_block
+ const void **cp;
+ cp = (const void **) &sb_node_info.compressed_ptr;
+ rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
+ sb_node_info.xsum = rbuf_int(rb);
+ // let's check the checksum
+ uint32_t actual_xsum;
+ actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr - 8,
+ 8 + sb_node_info.compressed_size);
+ if (sb_node_info.xsum != actual_xsum) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], sb_node_info.xsum[%d] != actual_xsum[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ sb_node_info.xsum,
+ actual_xsum);
+ dump_bad_block(rb->buf, rb->size);
+ r = TOKUDB_BAD_CHECKSUM;
+ goto cleanup;
+ }
+
+ // Now decompress the subblock
+ {
+ toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
+ sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
+ tokutime_t decompress_t0 = toku_time_now();
+ toku_decompress((Bytef *)sb_node_info.uncompressed_ptr,
+ sb_node_info.uncompressed_size,
+ (Bytef *)sb_node_info.compressed_ptr,
+ sb_node_info.compressed_size);
+ tokutime_t decompress_t1 = toku_time_now();
+ decompress_time = decompress_t1 - decompress_t0;
+
+ // at this point sb->uncompressed_ptr stores the serialized node info.
+ r = deserialize_ftnode_info(&sb_node_info, node);
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], deserialize_ftnode_info failed with "
+ "%d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ dump_bad_block(
+ static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
+ sb_node_info.uncompressed_size);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+ }
+
+ // Now we have the ftnode_info. We have a bunch more stuff in the
+ // rbuf, so we might be able to store the compressed data for some
+ // objects.
+ // We can proceed to deserialize the individual subblocks.
+
+ // setup the memory of the partitions
+ // for partitions being decompressed, create either message buffer or basement node
+ // for partitions staying compressed, create sub_block
+ setup_ftnode_partitions(node, bfe, false);
+
+ // We must capture deserialize and decompression time before
+ // the pf_callback, otherwise we would double-count.
+ t1 = toku_time_now();
+ deserialize_time = (t1 - t0) - decompress_time;
+
+ // do partial fetch if necessary
+ if (bfe->type != ftnode_fetch_none) {
+ PAIR_ATTR attr;
+ r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
+ "file[%s], blocknum[%lld], toku_ftnode_pf_callback failed with "
+ "%d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+ }
+
+ // handle clock
+ for (int i = 0; i < node->n_children; i++) {
+ if (bfe->wants_child_available(i)) {
+ paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
+ BP_TOUCH_CLOCK(node,i);
+ }
+ }
+ *ftnode = node;
+ r = 0;
+
+cleanup:
+ if (r == 0) {
+ bfe->deserialize_time += deserialize_time;
+ bfe->decompress_time += decompress_time;
+ toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
+ }
+ if (r != 0) {
+ if (node) {
+ toku_free(*ndd);
+ toku_free(node->bp);
+ toku_free(node);
+ }
+ }
+ return r;
+}
+
+// This function takes a deserialized version 13 or 14 buffer and
+// constructs the associated internal, non-leaf ftnode object. It
+// also creates MSN's for older messages created in older versions
+// that did not generate MSN's for messages. These new MSN's are
+// generated from the root downwards, counting backwards from MIN_MSN
+// and persisted in the ft header.
+static int deserialize_and_upgrade_internal_node(FTNODE node,
+ struct rbuf *rb,
+ ftnode_fetch_extra *bfe,
+ STAT64INFO info) {
+ int version = node->layout_version_read_from_disk;
+
+ if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
+ (void) rbuf_int(rb); // 10. fingerprint
+ }
+
+ node->n_children = rbuf_int(rb); // 11. n_children
+
+ // Sub-tree esitmates...
+ for (int i = 0; i < node->n_children; ++i) {
+ if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
+ (void) rbuf_int(rb); // 12. fingerprint
+ }
+ uint64_t nkeys = rbuf_ulonglong(rb); // 13. nkeys
+ uint64_t ndata = rbuf_ulonglong(rb); // 14. ndata
+ uint64_t dsize = rbuf_ulonglong(rb); // 15. dsize
+ (void) rbuf_char(rb); // 16. exact (char)
+ invariant(nkeys == ndata);
+ if (info) {
+ // info is non-null if we're trying to upgrade old subtree
+ // estimates to stat64info
+ info->numrows += nkeys;
+ info->numbytes += dsize;
+ }
+ }
+
+ // Pivot keys
+ node->pivotkeys.deserialize_from_rbuf(rb, node->n_children - 1);
+
+ // Create space for the child node buffers (a.k.a. partitions).
+ XMALLOC_N(node->n_children, node->bp);
+
+ // Set the child blocknums.
+ for (int i = 0; i < node->n_children; ++i) {
+ BP_BLOCKNUM(node, i) = rbuf_blocknum(rb); // 18. blocknums
+ BP_WORKDONE(node, i) = 0;
+ }
+
+ // Read in the child buffer maps.
+ for (int i = 0; i < node->n_children; ++i) {
+ // The following fields were previously used by the `sub_block_map'
+ // They include:
+ // - 4 byte index
+ (void) rbuf_int(rb);
+ // - 4 byte offset
+ (void) rbuf_int(rb);
+ // - 4 byte size
+ (void) rbuf_int(rb);
+ }
+
+ // We need to setup this node's partitions, but we can't call the
+ // existing call (setup_ftnode_paritions.) because there are
+ // existing optimizations that would prevent us from bringing all
+ // of this node's partitions into memory. Instead, We use the
+ // existing bfe and node to set the bfe's child_to_search member.
+ // Then we create a temporary bfe that needs all the nodes to make
+ // sure we properly intitialize our partitions before filling them
+ // in from our soon-to-be-upgraded node.
+ update_bfe_using_ftnode(node, bfe);
+ ftnode_fetch_extra temp_bfe;
+ temp_bfe.create_for_full_read(nullptr);
+ setup_partitions_using_bfe(node, &temp_bfe, true);
+
+ // Cache the highest MSN generated for the message buffers. This
+ // will be set in the ftnode.
+ //
+ // The way we choose MSNs for upgraded messages is delicate. The
+ // field `highest_unused_msn_for_upgrade' in the header is always an
+ // MSN that no message has yet. So when we have N messages that need
+ // MSNs, we decrement it by N, and then use it and the N-1 MSNs less
+ // than it, but we do not use the value we decremented it to.
+ //
+ // In the code below, we initialize `lowest' with the value of
+ // `highest_unused_msn_for_upgrade' after it is decremented, so we
+ // need to be sure to increment it once before we enqueue our first
+ // message.
+ MSN highest_msn;
+ highest_msn.msn = 0;
+
+ // Deserialize de-compressed buffers.
+ for (int i = 0; i < node->n_children; ++i) {
+ NONLEAF_CHILDINFO bnc = BNC(node, i);
+ MSN highest_msn_in_this_buffer = deserialize_child_buffer_v13(bfe->ft, bnc, rb);
+ if (highest_msn.msn == 0) {
+ highest_msn.msn = highest_msn_in_this_buffer.msn;
+ }
+ }
+
+ // Assign the highest msn from our upgrade message buffers
+ node->max_msn_applied_to_node_on_disk = highest_msn;
+ // Since we assigned MSNs to this node's messages, we need to dirty it.
+ node->set_dirty();
+
+ // Must compute the checksum now (rather than at the end, while we
+ // still have the pointer to the buffer).
+ if (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM) {
+ uint32_t expected_xsum = toku_dtoh32(*(uint32_t*)(rb->buf+rb->size-4)); // 27. checksum
+ uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size-4);
+ if (expected_xsum != actual_xsum) {
+ fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
+ __FUNCTION__,
+ __LINE__,
+ expected_xsum,
+ actual_xsum);
+ fprintf(stderr,
+ "Checksum failure while reading node in file %s.\n",
+ toku_cachefile_fname_in_env(bfe->ft->cf));
+ fflush(stderr);
+ return toku_db_badformat();
+ }
+ }
+
+ return 0;
+}
+
+// This function takes a deserialized version 13 or 14 buffer and
+// constructs the associated leaf ftnode object.
+static int
+deserialize_and_upgrade_leaf_node(FTNODE node,
+ struct rbuf *rb,
+ ftnode_fetch_extra *bfe,
+ STAT64INFO info)
+{
+ int r = 0;
+ int version = node->layout_version_read_from_disk;
+
+ // This is a leaf node, so the offsets in the buffer will be
+ // different from the internal node offsets above.
+ uint64_t nkeys = rbuf_ulonglong(rb); // 10. nkeys
+ uint64_t ndata = rbuf_ulonglong(rb); // 11. ndata
+ uint64_t dsize = rbuf_ulonglong(rb); // 12. dsize
+ invariant(nkeys == ndata);
+ if (info) {
+ // info is non-null if we're trying to upgrade old subtree
+ // estimates to stat64info
+ info->numrows += nkeys;
+ info->numbytes += dsize;
+ }
+
+ // This is the optimized for upgrade field.
+ if (version == FT_LAYOUT_VERSION_14) {
+ (void) rbuf_int(rb); // 13. optimized
+ }
+
+ // npartitions - This is really the number of leaf entries in
+ // our single basement node. There should only be 1 (ONE)
+ // partition, so there shouldn't be any pivot key stored. This
+ // means the loop will not iterate. We could remove the loop and
+ // assert that the value is indeed 1.
+ int npartitions = rbuf_int(rb); // 14. npartitions
+ assert(npartitions == 1);
+
+ // Set number of children to 1, since we will only have one
+ // basement node.
+ node->n_children = 1;
+ XMALLOC_N(node->n_children, node->bp);
+ node->pivotkeys.create_empty();
+
+ // Create one basement node to contain all the leaf entries by
+ // setting up the single partition and updating the bfe.
+ update_bfe_using_ftnode(node, bfe);
+ ftnode_fetch_extra temp_bfe;
+ temp_bfe.create_for_full_read(bfe->ft);
+ setup_partitions_using_bfe(node, &temp_bfe, true);
+
+ // 11. Deserialize the partition maps, though they are not used in the
+ // newer versions of ftnodes.
+ for (int i = 0; i < node->n_children; ++i) {
+ // The following fields were previously used by the `sub_block_map'
+ // They include:
+ // - 4 byte index
+ (void) rbuf_int(rb);
+ // - 4 byte offset
+ (void) rbuf_int(rb);
+ // - 4 byte size
+ (void) rbuf_int(rb);
+ }
+
+ // Copy all of the leaf entries into the single basement node.
+
+ // The number of leaf entries in buffer.
+ int n_in_buf = rbuf_int(rb); // 15. # of leaves
+ BLB_SEQINSERT(node,0) = 0;
+ BASEMENTNODE bn = BLB(node, 0);
+
+ // Read the leaf entries from the buffer, advancing the buffer
+ // as we go.
+ bool has_end_to_end_checksum = (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM);
+ if (version <= FT_LAYOUT_VERSION_13) {
+ // Create our mempool.
+ // Loop through
+ for (int i = 0; i < n_in_buf; ++i) {
+ LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]);
+ uint32_t disksize = leafentry_disksize_13(le);
+ rb->ndone += disksize; // 16. leaf entry (13)
+ invariant(rb->ndone<=rb->size);
+ LEAFENTRY new_le;
+ size_t new_le_size;
+ void* key = NULL;
+ uint32_t keylen = 0;
+ r = toku_le_upgrade_13_14(le,
+ &key,
+ &keylen,
+ &new_le_size,
+ &new_le);
+ assert_zero(r);
+ // Copy the pointer value straight into the OMT
+ LEAFENTRY new_le_in_bn = nullptr;
+ void *maybe_free;
+ bn->data_buffer.get_space_for_insert(
+ i,
+ key,
+ keylen,
+ new_le_size,
+ &new_le_in_bn,
+ &maybe_free
+ );
+ if (maybe_free) {
+ toku_free(maybe_free);
+ }
+ memcpy(new_le_in_bn, new_le, new_le_size);
+ toku_free(new_le);
+ }
+ } else {
+ uint32_t data_size = rb->size - rb->ndone;
+ if (has_end_to_end_checksum) {
+ data_size -= sizeof(uint32_t);
+ }
+ bn->data_buffer.deserialize_from_rbuf(n_in_buf, rb, data_size, node->layout_version_read_from_disk);
+ }
+
+ // Whatever this is must be less than the MSNs of every message above
+ // it, so it's ok to take it here.
+ bn->max_msn_applied = bfe->ft->h->highest_unused_msn_for_upgrade;
+ bn->stale_ancestor_messages_applied = false;
+ node->max_msn_applied_to_node_on_disk = bn->max_msn_applied;
+
+ // Checksum (end to end) is only on version 14
+ if (has_end_to_end_checksum) {
+ uint32_t expected_xsum = rbuf_int(rb); // 17. checksum
+ uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size - 4);
+ if (expected_xsum != actual_xsum) {
+ fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
+ __FUNCTION__,
+ __LINE__,
+ expected_xsum,
+ actual_xsum);
+ fprintf(stderr,
+ "Checksum failure while reading node in file %s.\n",
+ toku_cachefile_fname_in_env(bfe->ft->cf));
+ fflush(stderr);
+ return toku_db_badformat();
+ }
+ }
+
+ // We should have read the whole block by this point.
+ if (rb->ndone != rb->size) {
+ // TODO: Error handling.
+ return 1;
+ }
+
+ return r;
+}
+
+static int read_and_decompress_block_from_fd_into_rbuf(
+ int fd,
+ BLOCKNUM blocknum,
+ DISKOFF offset,
+ DISKOFF size,
+ FT ft,
+ struct rbuf *rb,
+ /* out */ int *layout_version_p);
+
+// This function upgrades a version 14 or 13 ftnode to the current
+// version. NOTE: This code assumes the first field of the rbuf has
+// already been read from the buffer (namely the layout_version of the
+// ftnode.)
+static int deserialize_and_upgrade_ftnode(FTNODE node,
+ FTNODE_DISK_DATA *ndd,
+ BLOCKNUM blocknum,
+ ftnode_fetch_extra *bfe,
+ STAT64INFO info,
+ int fd) {
+ int r = 0;
+ int version;
+
+ // I. First we need to de-compress the entire node, only then can
+ // we read the different sub-sections.
+ // get the file offset and block size for the block
+ DISKOFF offset, size;
+ bfe->ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
+
+ struct rbuf rb;
+ r = read_and_decompress_block_from_fd_into_rbuf(fd,
+ blocknum,
+ offset,
+ size,
+ bfe->ft,
+ &rb,
+ &version);
+ if (r != 0) {
+ const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
+ fprintf(stderr,
+ "%s:%d:deserialize_and_upgrade_ftnode - "
+ "file[%s], blocknum[%lld], "
+ "read_and_decompress_block_from_fd_into_rbuf failed with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ goto exit;
+ }
+
+ // Re-read the magic field from the previous call, since we are
+ // restarting with a fresh rbuf.
+ {
+ const void *magic;
+ rbuf_literal_bytes(&rb, &magic, 8); // 1. magic
+ }
+
+ // II. Start reading ftnode fields out of the decompressed buffer.
+
+ // Copy over old version info.
+ node->layout_version_read_from_disk = rbuf_int(&rb); // 2. layout version
+ version = node->layout_version_read_from_disk;
+ if (version > FT_LAYOUT_VERSION_14) {
+ const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
+ fprintf(stderr,
+ "%s:%d:deserialize_and_upgrade_ftnode - "
+ "file[%s], blocknum[%lld], version[%d] > "
+ "FT_LAYOUT_VERSION_14[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ version,
+ FT_LAYOUT_VERSION_14);
+ dump_bad_block(rb.buf, rb.size);
+ goto exit;
+ }
+ assert(version <= FT_LAYOUT_VERSION_14);
+ // Upgrade the current version number to the current version.
+ node->layout_version = FT_LAYOUT_VERSION;
+
+ node->layout_version_original = rbuf_int(&rb); // 3. original layout
+ node->build_id = rbuf_int(&rb); // 4. build id
+
+ // The remaining offsets into the rbuf do not map to the current
+ // version, so we need to fill in the blanks and ignore older
+ // fields.
+ (void)rbuf_int(&rb); // 5. nodesize
+ node->flags = rbuf_int(&rb); // 6. flags
+ node->height = rbuf_int(&rb); // 7. height
+
+ // If the version is less than 14, there are two extra ints here.
+ // we would need to ignore them if they are there.
+ // These are the 'fingerprints'.
+ if (version == FT_LAYOUT_VERSION_13) {
+ (void) rbuf_int(&rb); // 8. rand4
+ (void) rbuf_int(&rb); // 9. local
+ }
+
+ // The next offsets are dependent on whether this is a leaf node
+ // or not.
+
+ // III. Read in Leaf and Internal Node specific data.
+
+ // Check height to determine whether this is a leaf node or not.
+ if (node->height > 0) {
+ r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info);
+ } else {
+ r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info);
+ }
+
+ XMALLOC_N(node->n_children, *ndd);
+ // Initialize the partition locations to zero, because version 14
+ // and below have no notion of partitions on disk.
+ for (int i=0; i<node->n_children; i++) {
+ BP_START(*ndd,i) = 0;
+ BP_SIZE (*ndd,i) = 0;
+ }
+
+ toku_free(rb.buf);
+exit:
+ return r;
+}
+
+// Effect: deserializes a ftnode that is in rb (with pointer of rb just past the
+// magic) into a FTNODE.
+static int deserialize_ftnode_from_rbuf(FTNODE *ftnode,
+ FTNODE_DISK_DATA *ndd,
+ BLOCKNUM blocknum,
+ uint32_t fullhash,
+ ftnode_fetch_extra *bfe,
+ STAT64INFO info,
+ struct rbuf *rb,
+ int fd) {
+ int r = 0;
+ struct sub_block sb_node_info;
+
+ tokutime_t t0, t1;
+ tokutime_t decompress_time = 0;
+ tokutime_t deserialize_time = 0;
+ const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
+
+ t0 = toku_time_now();
+
+ FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
+
+ // now start reading from rbuf
+ // first thing we do is read the header information
+ const void *magic;
+ rbuf_literal_bytes(rb, &magic, 8);
+ if (memcmp(magic, "tokuleaf", 8) != 0 &&
+ memcmp(magic, "tokunode", 8) != 0) {
+ fprintf(stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], unrecognized magic number "
+ "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ static_cast<const uint8_t *>(magic)[0],
+ static_cast<const uint8_t *>(magic)[1],
+ static_cast<const uint8_t *>(magic)[2],
+ static_cast<const uint8_t *>(magic)[3],
+ static_cast<const uint8_t *>(magic)[4],
+ static_cast<const uint8_t *>(magic)[5],
+ static_cast<const uint8_t *>(magic)[6],
+ static_cast<const uint8_t *>(magic)[7]);
+ dump_bad_block(rb->buf, rb->size);
+
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+
+ node->layout_version_read_from_disk = rbuf_int(rb);
+ lazy_assert(node->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
+
+ // Check if we are reading in an older node version.
+ if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_14) {
+ int version = node->layout_version_read_from_disk;
+ // Perform the upgrade.
+ r = deserialize_and_upgrade_ftnode(node, ndd, blocknum, bfe, info, fd);
+ if (r != 0) {
+ fprintf(stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], deserialize_and_upgrade_ftnode "
+ "failed with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+
+ if (version <= FT_LAYOUT_VERSION_13) {
+ // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag
+ node->flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
+ }
+
+ // If everything is ok, just re-assign the ftnode and retrn.
+ *ftnode = node;
+ r = 0;
+ goto cleanup;
+ }
+
+ // Upgrade versions after 14 to current. This upgrade is trivial, it
+ // removes the optimized for upgrade field, which has already been
+ // removed in the deserialization code (see
+ // deserialize_ftnode_info()).
+ node->layout_version = FT_LAYOUT_VERSION;
+ node->layout_version_original = rbuf_int(rb);
+ node->build_id = rbuf_int(rb);
+ node->n_children = rbuf_int(rb);
+ XMALLOC_N(node->n_children, node->bp);
+ XMALLOC_N(node->n_children, *ndd);
+ // read the partition locations
+ for (int i=0; i<node->n_children; i++) {
+ BP_START(*ndd,i) = rbuf_int(rb);
+ BP_SIZE (*ndd,i) = rbuf_int(rb);
+ }
+ // verify checksum of header stored
+ uint32_t checksum;
+ checksum = toku_x1764_memory(rb->buf, rb->ndone);
+ uint32_t stored_checksum;
+ stored_checksum = rbuf_int(rb);
+ if (stored_checksum != checksum) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], stored_checksum[%d] != checksum[%d]\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ stored_checksum,
+ checksum);
+ dump_bad_block(rb->buf, rb->size);
+ invariant(stored_checksum == checksum);
+ }
+
+ // now we read and decompress the pivot and child information
+ sub_block_init(&sb_node_info);
+ {
+ tokutime_t sb_decompress_t0 = toku_time_now();
+ r = read_and_decompress_sub_block(rb, &sb_node_info);
+ tokutime_t sb_decompress_t1 = toku_time_now();
+ decompress_time += sb_decompress_t1 - sb_decompress_t0;
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], read_and_decompress_sub_block failed "
+ "with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ dump_bad_block(
+ static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
+ sb_node_info.uncompressed_size);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+ }
+
+ // at this point, sb->uncompressed_ptr stores the serialized node info
+ r = deserialize_ftnode_info(&sb_node_info, node);
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], deserialize_ftnode_info failed with "
+ "%d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+ toku_free(sb_node_info.uncompressed_ptr);
+
+ // now that the node info has been deserialized, we can proceed to
+ // deserialize the individual sub blocks
+
+ // setup the memory of the partitions
+ // for partitions being decompressed, create either message buffer or
+ // basement node
+ // for partitions staying compressed, create sub_block
+ setup_ftnode_partitions(node, bfe, true);
+
+ // This loop is parallelizeable, since we don't have a dependency on the
+ // work done so far.
+ for (int i = 0; i < node->n_children; i++) {
+ uint32_t curr_offset = BP_START(*ndd, i);
+ uint32_t curr_size = BP_SIZE(*ndd, i);
+ // the compressed, serialized partitions start at where rb is currently
+ // pointing, which would be rb->buf + rb->ndone
+ // we need to intialize curr_rbuf to point to this place
+ struct rbuf curr_rbuf = {.buf = nullptr, .size = 0, .ndone = 0};
+ rbuf_init(&curr_rbuf, rb->buf + curr_offset, curr_size);
+
+ //
+ // now we are at the point where we have:
+ // - read the entire compressed node off of disk,
+ // - decompressed the pivot and offset information,
+ // - have arrived at the individual partitions.
+ //
+ // Based on the information in bfe, we want to decompress a subset of
+ // of the compressed partitions (also possibly none or possibly all)
+ // The partitions that we want to decompress and make available
+ // to the node, we do, the rest we simply copy in compressed
+ // form into the node, and set the state of the partition to
+ // PT_COMPRESSED
+ //
+
+ struct sub_block curr_sb;
+ sub_block_init(&curr_sb);
+
+ // curr_rbuf is passed by value to decompress_and_deserialize_worker,
+ // so there's no ugly race condition.
+ // This would be more obvious if curr_rbuf were an array.
+
+ // deserialize_ftnode_info figures out what the state
+ // should be and sets up the memory so that we are ready to use it
+
+ switch (BP_STATE(node, i)) {
+ case PT_AVAIL: {
+ // case where we read and decompress the partition
+ tokutime_t partition_decompress_time;
+ r = decompress_and_deserialize_worker(
+ curr_rbuf,
+ curr_sb,
+ node,
+ i,
+ bfe->ft->cmp,
+ &partition_decompress_time);
+ decompress_time += partition_decompress_time;
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], childnum[%d], "
+ "decompress_and_deserialize_worker failed with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ i,
+ r);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+ break;
+ }
+ case PT_COMPRESSED:
+ // case where we leave the partition in the compressed state
+ r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
+ if (r != 0) {
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_from_rbuf - "
+ "file[%s], blocknum[%lld], childnum[%d], "
+ "check_and_copy_compressed_sub_block_worker failed with "
+ "%d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ i,
+ r);
+ dump_bad_block(rb->buf, rb->size);
+ goto cleanup;
+ }
+ break;
+ case PT_INVALID: // this is really bad
+ case PT_ON_DISK: // it's supposed to be in memory.
+ abort();
+ }
+ }
+ *ftnode = node;
+ r = 0;
+
+cleanup:
+ if (r == 0) {
+ t1 = toku_time_now();
+ deserialize_time = (t1 - t0) - decompress_time;
+ bfe->deserialize_time += deserialize_time;
+ bfe->decompress_time += decompress_time;
+ toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
+ }
+ if (r != 0) {
+ // NOTE: Right now, callers higher in the stack will assert on
+ // failure, so this is OK for production. However, if we
+ // create tools that use this function to search for errors in
+ // the FT, then we will leak memory.
+ if (node) {
+ toku_free(node);
+ }
+ }
+ return r;
+}
+
+int
+toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, ftnode_fetch_extra *bfe) {
+ int r = 0;
+ assert(BP_STATE(node,childnum) == PT_ON_DISK);
+ assert(node->bp[childnum].ptr.tag == BCT_NULL);
+
+ //
+ // setup the partition
+ //
+ setup_available_ftnode_partition(node, childnum);
+ BP_STATE(node,childnum) = PT_AVAIL;
+
+ //
+ // read off disk and make available in memory
+ //
+ // get the file offset and block size for the block
+ DISKOFF node_offset, total_node_disk_size;
+ bfe->ft->blocktable.translate_blocknum_to_offset_size(node->blocknum, &node_offset, &total_node_disk_size);
+
+ uint32_t curr_offset = BP_START(ndd, childnum);
+ uint32_t curr_size = BP_SIZE (ndd, childnum);
+
+ struct rbuf rb;
+ rbuf_init(&rb, nullptr, 0);
+
+ uint32_t pad_at_beginning = (node_offset+curr_offset)%512;
+ uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size);
+
+ toku::scoped_malloc_aligned raw_block_buf(padded_size, 512);
+ uint8_t *raw_block = reinterpret_cast<uint8_t *>(raw_block_buf.get());
+ rbuf_init(&rb, pad_at_beginning+raw_block, curr_size);
+ tokutime_t t0 = toku_time_now();
+
+ // read the block
+ assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT
+ assert(0==(padded_size)%512);
+ assert(0==(node_offset+curr_offset-pad_at_beginning)%512);
+ ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning);
+ assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted
+ assert((DISKOFF)rlen <= padded_size); // we didn't read in too much.
+
+ tokutime_t t1 = toku_time_now();
+
+ // read sub block
+ struct sub_block curr_sb;
+ sub_block_init(&curr_sb);
+ r = read_compressed_sub_block(&rb, &curr_sb);
+ if (r != 0) {
+ return r;
+ }
+ invariant(curr_sb.compressed_ptr != NULL);
+
+ // decompress
+ toku::scoped_malloc uncompressed_buf(curr_sb.uncompressed_size);
+ curr_sb.uncompressed_ptr = uncompressed_buf.get();
+ toku_decompress((Bytef *) curr_sb.uncompressed_ptr, curr_sb.uncompressed_size,
+ (Bytef *) curr_sb.compressed_ptr, curr_sb.compressed_size);
+
+ // deserialize
+ tokutime_t t2 = toku_time_now();
+
+ r = deserialize_ftnode_partition(&curr_sb, node, childnum, bfe->ft->cmp);
+
+ tokutime_t t3 = toku_time_now();
+
+ // capture stats
+ tokutime_t io_time = t1 - t0;
+ tokutime_t decompress_time = t2 - t1;
+ tokutime_t deserialize_time = t3 - t2;
+ bfe->deserialize_time += deserialize_time;
+ bfe->decompress_time += decompress_time;
+ toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
+
+ bfe->bytes_read = rlen;
+ bfe->io_time = io_time;
+
+ return r;
+}
+
+// Take a ftnode partition that is in the compressed state, and make it avail
+int toku_deserialize_bp_from_compressed(FTNODE node,
+ int childnum,
+ ftnode_fetch_extra *bfe) {
+
+ int r = 0;
+ assert(BP_STATE(node, childnum) == PT_COMPRESSED);
+ SUB_BLOCK curr_sb = BSB(node, childnum);
+
+ toku::scoped_malloc uncompressed_buf(curr_sb->uncompressed_size);
+ assert(curr_sb->uncompressed_ptr == NULL);
+ curr_sb->uncompressed_ptr = uncompressed_buf.get();
+
+ setup_available_ftnode_partition(node, childnum);
+ BP_STATE(node,childnum) = PT_AVAIL;
+
+ // decompress the sub_block
+ tokutime_t t0 = toku_time_now();
+
+ toku_decompress((Bytef *)curr_sb->uncompressed_ptr,
+ curr_sb->uncompressed_size,
+ (Bytef *)curr_sb->compressed_ptr,
+ curr_sb->compressed_size);
+
+ tokutime_t t1 = toku_time_now();
+
+ r = deserialize_ftnode_partition(curr_sb, node, childnum, bfe->ft->cmp);
+ if (r != 0) {
+ const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
+ fprintf(stderr,
+ "%s:%d:toku_deserialize_bp_from_compressed - "
+ "file[%s], blocknum[%lld], "
+ "deserialize_ftnode_partition failed with %d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)node->blocknum.b,
+ r);
+ dump_bad_block(static_cast<unsigned char *>(curr_sb->compressed_ptr),
+ curr_sb->compressed_size);
+ dump_bad_block(static_cast<unsigned char *>(curr_sb->uncompressed_ptr),
+ curr_sb->uncompressed_size);
+ }
+
+ tokutime_t t2 = toku_time_now();
+
+ tokutime_t decompress_time = t1 - t0;
+ tokutime_t deserialize_time = t2 - t1;
+ bfe->deserialize_time += deserialize_time;
+ bfe->decompress_time += decompress_time;
+ toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
+
+ toku_free(curr_sb->compressed_ptr);
+ toku_free(curr_sb);
+ return r;
+}
+
+static int deserialize_ftnode_from_fd(int fd,
+ BLOCKNUM blocknum,
+ uint32_t fullhash,
+ FTNODE *ftnode,
+ FTNODE_DISK_DATA *ndd,
+ ftnode_fetch_extra *bfe,
+ STAT64INFO info) {
+ struct rbuf rb = RBUF_INITIALIZER;
+
+ tokutime_t t0 = toku_time_now();
+ read_block_from_fd_into_rbuf(fd, blocknum, bfe->ft, &rb);
+ tokutime_t t1 = toku_time_now();
+
+ // Decompress and deserialize the ftnode. Time statistics
+ // are taken inside this function.
+ int r = deserialize_ftnode_from_rbuf(
+ ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd);
+ if (r != 0) {
+ const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
+ fprintf(
+ stderr,
+ "%s:%d:deserialize_ftnode_from_fd - "
+ "file[%s], blocknum[%lld], deserialize_ftnode_from_rbuf failed with "
+ "%d\n",
+ __FILE__,
+ __LINE__,
+ fname ? fname : "unknown",
+ (longlong)blocknum.b,
+ r);
+ dump_bad_block(rb.buf, rb.size);
+ }
+
+ bfe->bytes_read = rb.size;
+ bfe->io_time = t1 - t0;
+ toku_free(rb.buf);
+ return r;
+}
+
+// Effect: Read a node in. If possible, read just the header.
+// Perform version upgrade if necessary.
+int toku_deserialize_ftnode_from(int fd,
+ BLOCKNUM blocknum,
+ uint32_t fullhash,
+ FTNODE *ftnode,
+ FTNODE_DISK_DATA *ndd,
+ ftnode_fetch_extra *bfe) {
+ int r = 0;
+ struct rbuf rb = RBUF_INITIALIZER;
+
+ // each function below takes the appropriate io/decompression/deserialize
+ // statistics
+
+ if (!bfe->read_all_partitions) {
+ read_ftnode_header_from_fd_into_rbuf_if_small_enough(
+ fd, blocknum, bfe->ft, &rb, bfe);
+ r = deserialize_ftnode_header_from_rbuf_if_small_enough(
+ ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
+ } else {
+ // force us to do it the old way
+ r = -1;
+ }
+ if (r != 0) {
+ // Something went wrong, go back to doing it the old way.
+ r = deserialize_ftnode_from_fd(
+ fd, blocknum, fullhash, ftnode, ndd, bfe, nullptr);
+ }
+
+ toku_free(rb.buf);
+ return r;
+}
+
+void
+toku_verify_or_set_counts(FTNODE UU(node)) {
+}
+
+int
+toku_db_badformat(void) {
+ return DB_BADFORMAT;
+}
+
+static size_t
+serialize_rollback_log_size(ROLLBACK_LOG_NODE log) {
+ size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id
+ +16 //TXNID_PAIR
+ +8 //sequence
+ +8 //blocknum
+ +8 //previous (blocknum)
+ +8 //resident_bytecount
+ +8 //memarena size
+ +log->rollentry_resident_bytecount;
+ return size;
+}
+
+static void
+serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calculated_size, int UU(n_sub_blocks), struct sub_block UU(sub_block[])) {
+ struct wbuf wb;
+ wbuf_init(&wb, buf, calculated_size);
+ { //Serialize rollback log to local wbuf
+ wbuf_nocrc_literal_bytes(&wb, "tokuroll", 8);
+ lazy_assert(log->layout_version == FT_LAYOUT_VERSION);
+ wbuf_nocrc_int(&wb, log->layout_version);
+ wbuf_nocrc_int(&wb, log->layout_version_original);
+ wbuf_nocrc_uint(&wb, BUILD_ID);
+ wbuf_nocrc_TXNID_PAIR(&wb, log->txnid);
+ wbuf_nocrc_ulonglong(&wb, log->sequence);
+ wbuf_nocrc_BLOCKNUM(&wb, log->blocknum);
+ wbuf_nocrc_BLOCKNUM(&wb, log->previous);
+ wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount);
+ //Write down memarena size needed to restore
+ wbuf_nocrc_ulonglong(&wb, log->rollentry_arena.total_size_in_use());
+
+ {
+ //Store rollback logs
+ struct roll_entry *item;
+ size_t done_before = wb.ndone;
+ for (item = log->newest_logentry; item; item = item->prev) {
+ toku_logger_rollback_wbuf_nocrc_write(&wb, item);
+ }
+ lazy_assert(done_before + log->rollentry_resident_bytecount == wb.ndone);
+ }
+ }
+ lazy_assert(wb.ndone == wb.size);
+ lazy_assert(calculated_size==wb.ndone);
+}
+
+static void
+serialize_uncompressed_block_to_memory(char * uncompressed_buf,
+ int n_sub_blocks,
+ struct sub_block sub_block[/*n_sub_blocks*/],
+ enum toku_compression_method method,
+ /*out*/ size_t *n_bytes_to_write,
+ /*out*/ char **bytes_to_write)
+// Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work)
+{
+ // allocate space for the compressed uncompressed_buf
+ size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method);
+ size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
+ size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum
+ char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf);
+
+ // copy the header
+ memcpy(compressed_buf, uncompressed_buf, node_header_overhead);
+ if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
+ uncompressed_buf[node_header_overhead], uncompressed_buf[node_header_overhead+1],
+ uncompressed_buf[node_header_overhead+2], uncompressed_buf[node_header_overhead+3]);
+
+ // compress all of the sub blocks
+ char *uncompressed_ptr = uncompressed_buf + node_header_overhead;
+ char *compressed_ptr = compressed_buf + header_len;
+ compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, ft_pool, method);
+
+ //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %" PRIu64 "\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
+
+ // serialize the sub block header
+ uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead);
+ *ptr++ = toku_htod32(n_sub_blocks);
+ for (int i=0; i<n_sub_blocks; i++) {
+ ptr[0] = toku_htod32(sub_block[i].compressed_size);
+ ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
+ ptr[2] = toku_htod32(sub_block[i].xsum);
+ ptr += 3;
+ }
+
+ // compute the header checksum and serialize it
+ uint32_t header_length = (char *)ptr - (char *)compressed_buf;
+ uint32_t xsum = toku_x1764_memory(compressed_buf, header_length);
+ *ptr = toku_htod32(xsum);
+
+ uint32_t padded_len = roundup_to_multiple(512, header_len + compressed_len);
+ // Zero out padding.
+ for (uint32_t i = header_len+compressed_len; i < padded_len; i++) {
+ compressed_buf[i] = 0;
+ }
+ *n_bytes_to_write = padded_len;
+ *bytes_to_write = compressed_buf;
+}
+
+void
+toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) {
+ // get the size of the serialized node
+ size_t calculated_size = serialize_rollback_log_size(log);
+
+ serialized->len = calculated_size;
+ serialized->n_sub_blocks = 0;
+ // choose sub block parameters
+ int sub_block_size = 0;
+ size_t data_size = calculated_size - node_header_overhead;
+ choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks);
+ lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks);
+ lazy_assert(sub_block_size > 0);
+
+ // set the initial sub block size for all of the sub blocks
+ for (int i = 0; i < serialized->n_sub_blocks; i++)
+ sub_block_init(&serialized->sub_block[i]);
+ set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block);
+
+ // allocate space for the serialized node
+ XMALLOC_N(calculated_size, serialized->data);
+ // serialize the node into buf
+ serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block);
+ serialized->blocknum = log->blocknum;
+}
+
+int toku_serialize_rollback_log_to(int fd,
+ ROLLBACK_LOG_NODE log,
+ SERIALIZED_ROLLBACK_LOG_NODE serialized_log,
+ bool is_serialized,
+ FT ft,
+ bool for_checkpoint) {
+ size_t n_to_write;
+ char *compressed_buf;
+ struct serialized_rollback_log_node serialized_local;
+
+ if (is_serialized) {
+ invariant_null(log);
+ } else {
+ invariant_null(serialized_log);
+ serialized_log = &serialized_local;
+ toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log);
+ }
+
+ BLOCKNUM blocknum = serialized_log->blocknum;
+ invariant(blocknum.b >= 0);
+
+ // Compress and malloc buffer to write
+ serialize_uncompressed_block_to_memory(serialized_log->data,
+ serialized_log->n_sub_blocks,
+ serialized_log->sub_block,
+ ft->h->compression_method,
+ &n_to_write,
+ &compressed_buf);
+
+ // Dirties the ft
+ DISKOFF offset;
+ ft->blocktable.realloc_on_disk(
+ blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
+
+ toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
+ toku_free(compressed_buf);
+ if (!is_serialized) {
+ toku_static_serialized_rollback_log_destroy(&serialized_local);
+ log->dirty = false; // See #1957. Must set the node to be clean after
+ // serializing it so that it doesn't get written again
+ // on the next checkpoint or eviction.
+ }
+ return 0;
+}
+
+static int
+deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) {
+ ROLLBACK_LOG_NODE MALLOC(result);
+ int r;
+ if (result==NULL) {
+ r=get_error_errno();
+ if (0) { died0: toku_free(result); }
+ return r;
+ }
+
+ const void *magic;
+ rbuf_literal_bytes(rb, &magic, 8);
+ lazy_assert(!memcmp(magic, "tokuroll", 8));
+
+ result->layout_version = rbuf_int(rb);
+ lazy_assert((FT_LAYOUT_VERSION_25 <= result->layout_version && result->layout_version <= FT_LAYOUT_VERSION_27) ||
+ (result->layout_version == FT_LAYOUT_VERSION));
+ result->layout_version_original = rbuf_int(rb);
+ result->layout_version_read_from_disk = result->layout_version;
+ result->build_id = rbuf_int(rb);
+ result->dirty = false;
+ //TODO: Maybe add descriptor (or just descriptor version) here eventually?
+ //TODO: This is hard.. everything is shared in a single dictionary.
+ rbuf_TXNID_PAIR(rb, &result->txnid);
+ result->sequence = rbuf_ulonglong(rb);
+ result->blocknum = rbuf_blocknum(rb);
+ if (result->blocknum.b != blocknum.b) {
+ r = toku_db_badformat();
+ goto died0;
+ }
+ result->previous = rbuf_blocknum(rb);
+ result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
+
+ size_t arena_initial_size = rbuf_ulonglong(rb);
+ result->rollentry_arena.create(arena_initial_size);
+ if (0) { died1: result->rollentry_arena.destroy(); goto died0; }
+
+ //Load rollback entries
+ lazy_assert(rb->size > 4);
+ //Start with empty list
+ result->oldest_logentry = result->newest_logentry = NULL;
+ while (rb->ndone < rb->size) {
+ struct roll_entry *item;
+ uint32_t rollback_fsize = rbuf_int(rb); //Already read 4. Rest is 4 smaller
+ const void *item_vec;
+ rbuf_literal_bytes(rb, &item_vec, rollback_fsize-4);
+ unsigned char* item_buf = (unsigned char*)item_vec;
+ r = toku_parse_rollback(item_buf, rollback_fsize-4, &item, &result->rollentry_arena);
+ if (r!=0) {
+ r = toku_db_badformat();
+ goto died1;
+ }
+ //Add to head of list
+ if (result->oldest_logentry) {
+ result->oldest_logentry->prev = item;
+ result->oldest_logentry = item;
+ item->prev = NULL;
+ }
+ else {
+ result->oldest_logentry = result->newest_logentry = item;
+ item->prev = NULL;
+ }
+ }
+
+ toku_free(rb->buf);
+ rb->buf = NULL;
+ *log_p = result;
+ return 0;
+}
+
+static int
+deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum,
+ ROLLBACK_LOG_NODE *log,
+ struct rbuf *rb) {
+ int r = 0;
+ ROLLBACK_LOG_NODE rollback_log_node = NULL;
+ invariant((FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION);
+ r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb);
+ if (r==0) {
+ *log = rollback_log_node;
+ }
+ return r;
+}
+
+int
+decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
+ int r = 0;
+ // get the number of compressed sub blocks
+ int n_sub_blocks;
+ n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead]));
+
+ // verify the number of sub blocks
+ invariant(0 <= n_sub_blocks);
+ invariant(n_sub_blocks <= max_sub_blocks);
+
+ { // verify the header checksum
+ uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
+ invariant(header_length <= raw_block_size);
+ uint32_t xsum = toku_x1764_memory(raw_block, header_length);
+ uint32_t stored_xsum = toku_dtoh32(*(uint32_t *)(raw_block + header_length));
+ if (xsum != stored_xsum) {
+ r = TOKUDB_BAD_CHECKSUM;
+ }
+ }
+
+ // deserialize the sub block header
+ struct sub_block sub_block[n_sub_blocks];
+ uint32_t *sub_block_header = (uint32_t *) &raw_block[node_header_overhead+4];
+ for (int i = 0; i < n_sub_blocks; i++) {
+ sub_block_init(&sub_block[i]);
+ sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
+ sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
+ sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
+ sub_block_header += 3;
+ }
+
+ // This predicate needs to be here and instead of where it is set
+ // for the compiler.
+ if (r == TOKUDB_BAD_CHECKSUM) {
+ goto exit;
+ }
+
+ // verify sub block sizes
+ for (int i = 0; i < n_sub_blocks; i++) {
+ uint32_t compressed_size = sub_block[i].compressed_size;
+ if (compressed_size<=0 || compressed_size>(1<<30)) {
+ r = toku_db_badformat();
+ goto exit;
+ }
+
+ uint32_t uncompressed_size = sub_block[i].uncompressed_size;
+ if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
+ if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
+ r = toku_db_badformat();
+ goto exit;
+ }
+ }
+
+ // sum up the uncompressed size of the sub blocks
+ size_t uncompressed_size;
+ uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
+
+ // allocate the uncompressed buffer
+ size_t size;
+ size = node_header_overhead + uncompressed_size;
+ unsigned char *buf;
+ XMALLOC_N(size, buf);
+ rbuf_init(rb, buf, size);
+
+ // copy the uncompressed node header to the uncompressed buffer
+ memcpy(rb->buf, raw_block, node_header_overhead);
+
+ // point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
+ unsigned char *compressed_data;
+ compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
+
+ // point at the start of the uncompressed data
+ unsigned char *uncompressed_data;
+ uncompressed_data = rb->buf + node_header_overhead;
+
+ // decompress all the compressed sub blocks into the uncompressed buffer
+ r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, ft_pool);
+ if (r != 0) {
+ fprintf(stderr, "%s:%d block %" PRId64 " failed %d at %p size %zu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size);
+ dump_bad_block(raw_block, raw_block_size);
+ goto exit;
+ }
+
+ rb->ndone=0;
+exit:
+ return r;
+}
+
+static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
+ // This function exists solely to accommodate future changes in compression.
+ int r = 0;
+ if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) ||
+ (FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) ||
+ version == FT_LAYOUT_VERSION) {
+ r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum);
+ } else {
+ abort();
+ }
+ return r;
+}
+
+static int read_and_decompress_block_from_fd_into_rbuf(
+ int fd,
+ BLOCKNUM blocknum,
+ DISKOFF offset,
+ DISKOFF size,
+ FT ft,
+ struct rbuf *rb,
+ /* out */ int *layout_version_p) {
+ int r = 0;
+ if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
+
+ DISKOFF size_aligned = roundup_to_multiple(512, size);
+ uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
+ {
+ // read the (partially compressed) block
+ ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
+ lazy_assert((DISKOFF)rlen >= size);
+ lazy_assert((DISKOFF)rlen <= size_aligned);
+ }
+ // get the layout_version
+ int layout_version;
+ {
+ uint8_t *magic = raw_block + uncompressed_magic_offset;
+ if (memcmp(magic, "tokuleaf", 8)!=0 &&
+ memcmp(magic, "tokunode", 8)!=0 &&
+ memcmp(magic, "tokuroll", 8)!=0) {
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+ uint8_t *version = raw_block + uncompressed_version_offset;
+ layout_version = toku_dtoh32(*(uint32_t*)version);
+ if (layout_version < FT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > FT_LAYOUT_VERSION) {
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+ }
+
+ r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum);
+ if (r != 0) {
+ // We either failed the checksome, or there is a bad format in
+ // the buffer.
+ if (r == TOKUDB_BAD_CHECKSUM) {
+ fprintf(stderr,
+ "Checksum failure while reading raw block in file %s.\n",
+ toku_cachefile_fname_in_env(ft->cf));
+ abort();
+ } else {
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+ }
+
+ *layout_version_p = layout_version;
+cleanup:
+ if (r!=0) {
+ if (rb->buf) toku_free(rb->buf);
+ rb->buf = NULL;
+ }
+ if (raw_block) {
+ toku_free(raw_block);
+ }
+ return r;
+}
+
+// Read rollback log node from file into struct.
+// Perform version upgrade if necessary.
+int toku_deserialize_rollback_log_from(int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT ft) {
+ int layout_version = 0;
+ int r;
+
+ struct rbuf rb;
+ rbuf_init(&rb, nullptr, 0);
+
+ // get the file offset and block size for the block
+ DISKOFF offset, size;
+ ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
+
+ // if the size is 0, then the blocknum is unused
+ if (size == 0) {
+ // blocknum is unused, just create an empty one and get out
+ ROLLBACK_LOG_NODE XMALLOC(log);
+ rollback_empty_log_init(log);
+ log->blocknum.b = blocknum.b;
+ r = 0;
+ *logp = log;
+ goto cleanup;
+ }
+
+ r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, ft, &rb, &layout_version);
+ if (r!=0) goto cleanup;
+
+ {
+ uint8_t *magic = rb.buf + uncompressed_magic_offset;
+ if (memcmp(magic, "tokuroll", 8)!=0) {
+ r = toku_db_badformat();
+ goto cleanup;
+ }
+ }
+
+ r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb);
+
+cleanup:
+ if (rb.buf) {
+ toku_free(rb.buf);
+ }
+ return r;
+}
+
+int
+toku_upgrade_subtree_estimates_to_stat64info(int fd, FT ft)
+{
+ int r = 0;
+ // 15 was the last version with subtree estimates
+ invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_15);
+
+ FTNODE unused_node = NULL;
+ FTNODE_DISK_DATA unused_ndd = NULL;
+ ftnode_fetch_extra bfe;
+ bfe.create_for_min_read(ft);
+ r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &unused_node, &unused_ndd,
+ &bfe, &ft->h->on_disk_stats);
+ ft->in_memory_stats = ft->h->on_disk_stats;
+
+ if (unused_node) {
+ toku_ftnode_free(&unused_node);
+ }
+ if (unused_ndd) {
+ toku_free(unused_ndd);
+ }
+ return r;
+}
+
+int
+toku_upgrade_msn_from_root_to_header(int fd, FT ft)
+{
+ int r;
+ // 21 was the first version with max_msn_in_ft in the header
+ invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20);
+
+ FTNODE node;
+ FTNODE_DISK_DATA ndd;
+ ftnode_fetch_extra bfe;
+ bfe.create_for_min_read(ft);
+ r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr);
+ if (r != 0) {
+ goto exit;
+ }
+
+ ft->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk;
+ toku_ftnode_free(&node);
+ toku_free(ndd);
+ exit:
+ return r;
+}
+
+#undef UPGRADE_STATUS_VALUE