summaryrefslogtreecommitdiffstats
path: root/storage/tokudb/PerconaFT/ft/loader/loader-internal.h
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/loader/loader-internal.h')
-rw-r--r--storage/tokudb/PerconaFT/ft/loader/loader-internal.h320
1 files changed, 320 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/loader/loader-internal.h b/storage/tokudb/PerconaFT/ft/loader/loader-internal.h
new file mode 100644
index 00000000..6f7b0147
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/loader/loader-internal.h
@@ -0,0 +1,320 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#pragma once
+
+#include <db.h>
+
+#include "portability/toku_pthread.h"
+
+#include "loader/dbufio.h"
+#include "loader/loader.h"
+#include "util/queue.h"
+
+enum {
+ EXTRACTOR_QUEUE_DEPTH = 2,
+ FILE_BUFFER_SIZE = 1<<24,
+ MIN_ROWSET_MEMORY = 1<<23,
+ MIN_MERGE_FANIN = 2,
+ FRACTAL_WRITER_QUEUE_DEPTH = 3,
+ FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
+ DBUFIO_DEPTH = 2,
+ TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
+ MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much
+ MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE
+};
+
+/* These functions are exported to allow the tests to compile. */
+
+/* These structures maintain a collection of all the open temporary files used by the loader. */
+struct file_info {
+ bool is_open;
+ bool is_extant; // if true, the file must be unlinked.
+ char *fname;
+ TOKU_FILE *file;
+ uint64_t n_rows; // how many rows were written into that file
+ size_t buffer_size;
+ void *buffer;
+};
+struct file_infos {
+ int n_files;
+ int n_files_limit;
+ struct file_info *file_infos;
+ int n_files_open, n_files_extant;
+ toku_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos).
+};
+typedef struct fidx { int idx; } FIDX;
+static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
+static int fidx_is_null(const FIDX f) __attribute__((__unused__));
+static int fidx_is_null(const FIDX f) { return f.idx == -1; }
+TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i);
+
+int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx);
+
+/* These data structures are used for manipulating a collection of rows in main memory. */
+struct row {
+ size_t off; // the offset in the data array.
+ int klen,vlen;
+};
+struct rowset {
+ uint64_t memory_budget;
+ size_t n_rows, n_rows_limit;
+ struct row *rows;
+ size_t n_bytes, n_bytes_limit;
+ char *data;
+};
+
+int init_rowset (struct rowset *rows, uint64_t memory_budget);
+void destroy_rowset(struct rowset *rows);
+int add_row(struct rowset *rows, DBT *key, DBT *val);
+
+int loader_write_row(DBT *key,
+ DBT *val,
+ FIDX data,
+ TOKU_FILE *,
+ uint64_t *dataoff,
+ struct wbuf *wb,
+ FTLOADER bl);
+int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val);
+
+struct merge_fileset {
+ bool have_sorted_output; // Is there an previous key?
+ FIDX sorted_output; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open
+ DBT prev_key; // What is it? If it's here, its the last output in the merge fileset
+
+ int n_temp_files, n_temp_files_limit;
+ FIDX *data_fidxs;
+};
+
+void init_merge_fileset (struct merge_fileset *fs);
+void destroy_merge_fileset (struct merge_fileset *fs);
+
+struct poll_callback_s {
+ ft_loader_poll_func poll_function;
+ void *poll_extra;
+};
+typedef struct poll_callback_s *ft_loader_poll_callback;
+
+int ft_loader_init_poll_callback(ft_loader_poll_callback);
+
+void ft_loader_destroy_poll_callback(ft_loader_poll_callback);
+
+void ft_loader_set_poll_function(ft_loader_poll_callback, ft_loader_poll_func poll_function, void *poll_extra);
+
+int ft_loader_call_poll_function(ft_loader_poll_callback, float progress);
+
+struct error_callback_s {
+ int error;
+ ft_loader_error_func error_callback;
+ void *extra;
+ DB *db;
+ int which_db;
+ DBT key;
+ DBT val;
+ bool did_callback;
+ toku_mutex_t mutex;
+};
+typedef struct error_callback_s *ft_loader_error_callback;
+
+void ft_loader_init_error_callback(ft_loader_error_callback);
+
+void ft_loader_destroy_error_callback(ft_loader_error_callback);
+
+int ft_loader_get_error(ft_loader_error_callback);
+
+void ft_loader_set_error_function(ft_loader_error_callback, ft_loader_error_func error_function, void *extra);
+
+int ft_loader_set_error(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
+
+int ft_loader_call_error_function(ft_loader_error_callback);
+
+int ft_loader_set_error_and_callback(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
+
+struct ft_loader_s {
+ // These two are set in the close function, and used while running close
+ struct error_callback_s error_callback;
+ struct poll_callback_s poll_callback;
+
+ generate_row_for_put_func generate_row_for_put;
+ ft_compare_func *bt_compare_funs;
+
+ DB *src_db;
+ int N;
+ DB **dbs; // N of these
+ DESCRIPTOR *descriptors; // N of these.
+ TXNID *root_xids_that_created; // N of these.
+ const char **new_fnames_in_env; // N of these. The file names that the final data will be written to (relative to env).
+
+ uint64_t *extracted_datasizes; // N of these.
+
+ struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated.
+ struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread.
+
+ QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file.
+ toku_pthread_t extractor_thread; // the thread that takes primary rowset and does extraction and the first level sort and write to file.
+ bool extractor_live;
+
+ DBT *last_key; // for each rowset, remember the most recently output key. The system may choose not to keep this up-to-date when a rowset is unsorted. These keys are malloced and ulen maintains the size of the malloced block.
+
+ struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file.
+ uint64_t n_rows; // how many rows have been put?
+ struct merge_fileset *fs;
+
+ const char *temp_file_template;
+
+ CACHETABLE cachetable;
+ bool did_reserve_memory;
+ bool compress_intermediates;
+ bool allow_puts;
+ uint64_t reserved_memory; // how much memory are we allowed to use?
+
+ /* To make it easier to recover from errors, we don't use TOKU_FILE*,
+ * instead we use an index into the file_infos. */
+ struct file_infos file_infos;
+
+#define PROGRESS_MAX (1 << 16)
+ int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
+ // We use an integer so that we can add to the progress using a fetch-and-add instruction.
+
+ int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again).
+
+ LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in ft headers made by this loader.
+ TXNID load_root_xid; //(Root) transaction that performed the load.
+
+ QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
+ toku_pthread_t *fractal_threads;
+ bool *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately).
+
+ unsigned fractal_workers; // number of fractal tree writer threads
+
+ toku_mutex_t mutex;
+ bool mutex_init;
+};
+
+// Set the number of rows in the loader. Used for test.
+void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows);
+
+// Get the number of rows in the loader. Used for test.
+uint64_t toku_ft_loader_get_n_rows(FTLOADER bl);
+
+// The data passed into a fractal_thread via pthread_create.
+struct fractal_thread_args {
+ FTLOADER bl;
+ const DESCRIPTOR descriptor;
+ int fd; // write the ft into fd.
+ int progress_allocation;
+ QUEUE q;
+ uint64_t total_disksize_estimate;
+ int errno_result; // the final result.
+ int which_db;
+ uint32_t target_nodesize;
+ uint32_t target_basementnodesize;
+ enum toku_compression_method target_compression_method;
+ uint32_t target_fanout;
+};
+
+void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows);
+uint64_t toku_ft_loader_get_n_rows(FTLOADER bl);
+
+int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
+ int which_db, DB *dest_db, ft_compare_func,
+ FTLOADER,
+ struct rowset *);
+
+int merge_files (struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func, int progress_allocation, QUEUE);
+
+int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func);
+
+int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *);
+
+//int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
+int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation);
+
+int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func);
+
+// This is probably only for testing.
+int toku_loader_write_ft_from_q_in_C (FTLOADER bl,
+ const DESCRIPTOR descriptor,
+ int fd, // write to here
+ int progress_allocation,
+ QUEUE q,
+ uint64_t total_disksize_estimate,
+ int which_db,
+ uint32_t target_nodesize,
+ uint32_t target_basementnodesize,
+ enum toku_compression_method target_compression_method,
+ uint32_t fanout);
+
+int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *);
+
+int ft_loader_write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
+
+int ft_loader_init_file_infos (struct file_infos *fi);
+void ft_loader_fi_destroy (struct file_infos *fi, bool is_error);
+int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open);
+int ft_loader_fi_close_all (struct file_infos *fi);
+int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
+int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx);
+
+int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
+ CACHETABLE cachetable,
+ generate_row_for_put_func g,
+ DB *src_db,
+ int N, FT_HANDLE ft_hs[/*N*/], DB* dbs[/*N*/],
+ const char *new_fnames_in_env[/*N*/],
+ ft_compare_func bt_compare_functions[/*N*/],
+ const char *temp_file_template,
+ LSN load_lsn,
+ TOKUTXN txn,
+ bool reserve_memory,
+ uint64_t reserve_memory_size,
+ bool compress_intermediates,
+ bool allow_puts);
+
+void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
+
+// For test purposes only. (In production, the rowset size is determined by negotiation with the cachetable for some memory. See #2613.)
+uint64_t toku_ft_loader_get_rowset_budget_for_testing (void);
+
+int toku_ft_loader_finish_extractor(FTLOADER bl);
+
+int toku_ft_loader_get_error(FTLOADER bl, int *loader_errno);
+
+void ft_loader_lock_init(FTLOADER bl);
+void ft_loader_lock_destroy(FTLOADER bl);
+void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl);