diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/cachetable')
7 files changed, 6853 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/cachetable/background_job_manager.cc b/storage/tokudb/PerconaFT/ft/cachetable/background_job_manager.cc new file mode 100644 index 00000000..c109185f --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/background_job_manager.cc @@ -0,0 +1,109 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include <portability/toku_config.h> +#include <memory.h> +#include <toku_pthread.h> + +#include "cachetable/background_job_manager.h" + +toku_instr_key *bjm_jobs_lock_mutex_key; +toku_instr_key *bjm_jobs_wait_key; + +struct background_job_manager_struct { + bool accepting_jobs; + uint32_t num_jobs; + toku_cond_t jobs_wait; + toku_mutex_t jobs_lock; +}; + +void bjm_init(BACKGROUND_JOB_MANAGER *pbjm) { + BACKGROUND_JOB_MANAGER XCALLOC(bjm); + toku_mutex_init(*bjm_jobs_lock_mutex_key, &bjm->jobs_lock, nullptr); + toku_cond_init(*bjm_jobs_wait_key, &bjm->jobs_wait, nullptr); + bjm->accepting_jobs = true; + bjm->num_jobs = 0; + *pbjm = bjm; +} + +void bjm_destroy(BACKGROUND_JOB_MANAGER bjm) { + assert(bjm->num_jobs == 0); + toku_cond_destroy(&bjm->jobs_wait); + toku_mutex_destroy(&bjm->jobs_lock); + toku_free(bjm); +} + +void bjm_reset(BACKGROUND_JOB_MANAGER bjm) { + toku_mutex_lock(&bjm->jobs_lock); + assert(bjm->num_jobs == 0); + bjm->accepting_jobs = true; + toku_mutex_unlock(&bjm->jobs_lock); +} + +int bjm_add_background_job(BACKGROUND_JOB_MANAGER bjm) { + int ret_val; + toku_mutex_lock(&bjm->jobs_lock); + if (bjm->accepting_jobs) { + bjm->num_jobs++; + ret_val = 0; + } + else { + ret_val = -1; + } + toku_mutex_unlock(&bjm->jobs_lock); + return ret_val; +} +void bjm_remove_background_job(BACKGROUND_JOB_MANAGER bjm){ + toku_mutex_lock(&bjm->jobs_lock); + assert(bjm->num_jobs > 0); + bjm->num_jobs--; + if (bjm->num_jobs == 0 && !bjm->accepting_jobs) { + toku_cond_broadcast(&bjm->jobs_wait); + } + toku_mutex_unlock(&bjm->jobs_lock); +} + +void bjm_wait_for_jobs_to_finish(BACKGROUND_JOB_MANAGER bjm) { + toku_mutex_lock(&bjm->jobs_lock); + bjm->accepting_jobs = false; + while (bjm->num_jobs > 0) { + toku_cond_wait(&bjm->jobs_wait, &bjm->jobs_lock); + } + toku_mutex_unlock(&bjm->jobs_lock); +} + diff --git a/storage/tokudb/PerconaFT/ft/cachetable/background_job_manager.h b/storage/tokudb/PerconaFT/ft/cachetable/background_job_manager.h new file mode 100644 index 00000000..ba654590 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/background_job_manager.h @@ -0,0 +1,78 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +// +// The background job manager keeps track of the existence of +// background jobs running. We use the background job manager +// to allow threads to perform background jobs on various pieces +// of the system (e.g. cachefiles and cloned pairs being written out +// for checkpoint) +// + +typedef struct background_job_manager_struct *BACKGROUND_JOB_MANAGER; + + +void bjm_init(BACKGROUND_JOB_MANAGER* bjm); +void bjm_destroy(BACKGROUND_JOB_MANAGER bjm); + +// +// Re-allows a background job manager to accept background jobs +// +void bjm_reset(BACKGROUND_JOB_MANAGER bjm); + +// +// add a background job. If return value is 0, then the addition of the job +// was successful and the user may perform the background job. If return +// value is non-zero, then adding of the background job failed and the user +// may not perform the background job. +// +int bjm_add_background_job(BACKGROUND_JOB_MANAGER bjm); + +// +// remove a background job +// +void bjm_remove_background_job(BACKGROUND_JOB_MANAGER bjm); + +// +// This function waits for all current background jobs to be removed. If the user +// calls bjm_add_background_job while this function is running, or after this function +// has completed, bjm_add_background_job returns an error. +// +void bjm_wait_for_jobs_to_finish(BACKGROUND_JOB_MANAGER bjm); diff --git a/storage/tokudb/PerconaFT/ft/cachetable/cachetable-internal.h b/storage/tokudb/PerconaFT/ft/cachetable/cachetable-internal.h new file mode 100644 index 00000000..05fb771d --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/cachetable-internal.h @@ -0,0 +1,607 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include "cachetable/background_job_manager.h" +#include <portability/toku_random.h> +#include <util/frwlock.h> +#include <util/kibbutz.h> +#include <util/nb_mutex.h> +#include <util/partitioned_counter.h> + +////////////////////////////////////////////////////////////////////////////// +// +// This file contains the classes and structs that make up the cachetable. +// The structs are: +// - cachefile +// - ctpair +// - pair_list +// - cachefile_list +// - checkpointer +// - evictor +// - cleaner +// +// The rest of this comment assumes familiarity with the locks used in these +// classes/structs and what the locks protect. Nevertheless, here is +// a list of the locks that we have: +// - pair_list->list_lock +// - pair_list->pending_lock_expensive +// - pair_list->pending_lock_cheap +// - cachefile_list->lock +// - PAIR->mutex +// - PAIR->value_rwlock +// - PAIR->disk_nb_mutex +// +// Here are rules for how the locks interact: +// - To grab any of the pair_list's locks, or the cachefile_list's lock, +// the cachetable must be in existence +// - To grab the PAIR mutex, we must know the PAIR will not dissappear: +// - the PAIR must be pinned (value_rwlock or disk_nb_mutex is held) +// - OR, the pair_list's list lock is held +// - As a result, to get rid of a PAIR from the pair_list, we must hold +// both the pair_list's list_lock and the PAIR's mutex +// - To grab PAIR->value_rwlock, we must hold the PAIR's mutex +// - To grab PAIR->disk_nb_mutex, we must hold the PAIR's mutex +// and hold PAIR->value_rwlock +// +// Now let's talk about ordering. Here is an order from outer to inner (top locks must be grabbed first) +// - pair_list->pending_lock_expensive +// - pair_list->list_lock +// - cachefile_list->lock +// - PAIR->mutex +// - pair_list->pending_lock_cheap <-- after grabbing this lock, +// NO other locks +// should be grabbed. +// - when grabbing PAIR->value_rwlock or PAIR->disk_nb_mutex, +// if the acquisition will not block, then it does not matter if any other locks held, +// BUT if the acquisition will block, then NO other locks may be held besides +// PAIR->mutex. +// +// HERE ARE TWO EXAMPLES: +// To pin a PAIR on a client thread, the following must be done: +// - first grab the list lock and find the PAIR +// - with the list lock grabbed, grab PAIR->mutex +// - with PAIR->mutex held: +// - release list lock +// - pin PAIR +// - with PAIR pinned, grab pending_lock_cheap, +// - copy and clear PAIR->checkpoint_pending, +// - resolve checkpointing if necessary +// - return to user. +// The list lock may be held while pinning the PAIR if +// the PAIR has no contention. Otherwise, we may have +// get a deadlock with another thread that has the PAIR pinned, +// tries to pin some other PAIR, and in doing so, grabs the list lock. +// +// To unpin a PAIR on a client thread: +// - because the PAIR is pinned, we don't need the pair_list's list_lock +// - so, simply acquire PAIR->mutex +// - unpin the PAIR +// - return +// +////////////////////////////////////////////////////////////////////////////// +class evictor; +class pair_list; + +/////////////////////////////////////////////////////////////////////////////// +// +// Maps to a file on disk. +// +struct cachefile { + // these next two fields are protected by cachetable's list lock + // they are managed whenever we add or remove a pair from + // the cachetable. As of Riddler, this linked list is only used to + // make cachetable_flush_cachefile more efficient + PAIR cf_head; // doubly linked list that is NOT circular + uint32_t num_pairs; // count on number of pairs in the cachetable belong to this cachefile + + bool for_checkpoint; //True if part of the in-progress checkpoint + + // If set and the cachefile closes, the file will be removed. + // Clients must not operate on the cachefile after setting this, + // nor attempt to open any cachefile with the same fname (dname) + // until this cachefile has been fully closed and unlinked. + bool unlink_on_close; + // If set then fclose will not be logged in recovery log. + bool skip_log_recover_on_close; + int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */ + CACHETABLE cachetable; + struct fileid fileid; + // the filenum is used as an identifer of the cachefile + // for logging and recovery + FILENUM filenum; + // number used to generate hashes for blocks in the cachefile + // used in toku_cachetable_hash + // this used to be the filenum.fileid, but now it is separate + uint32_t hash_id; + char *fname_in_env; /* Used for logging */ + + void *userdata; + void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files. + void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function. + void (*free_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function. + void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. + void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function. + void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function. + void (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory + void (*note_unpin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory + BACKGROUND_JOB_MANAGER bjm; +}; + + +/////////////////////////////////////////////////////////////////////////////// +// +// The pair represents the data stored in the cachetable. +// +struct ctpair { + // these fields are essentially constants. They do not change. + CACHEFILE cachefile; + CACHEKEY key; + uint32_t fullhash; + CACHETABLE_FLUSH_CALLBACK flush_callback; + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback; + CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; + CACHETABLE_CLEANER_CALLBACK cleaner_callback; + CACHETABLE_CLONE_CALLBACK clone_callback; + CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK checkpoint_complete_callback; + void *write_extraargs; + + // access to these fields are protected by disk_nb_mutex + void* cloned_value_data; // cloned copy of value_data used for checkpointing + long cloned_value_size; // size of cloned_value_data, used for accounting of size_current + void* disk_data; // data used to fetch/flush value_data to and from disk. + + // access to these fields are protected by value_rwlock + void* value_data; // data used by client threads, FTNODEs and ROLLBACK_LOG_NODEs + PAIR_ATTR attr; + enum cachetable_dirty dirty; + + // protected by PAIR->mutex + uint32_t count; // clock count + uint32_t refcount; // if > 0, then this PAIR is referenced by + // callers to the cachetable, and therefore cannot + // be evicted + uint32_t num_waiting_on_refs; // number of threads waiting on refcount to go to zero + toku_cond_t refcount_wait; // cond used to wait for refcount to go to zero + + // locks + toku::frwlock value_rwlock; + struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint + toku_mutex_t* mutex; // gotten from the pair list + + // Access to checkpoint_pending is protected by two mechanisms, + // the value_rwlock and the pair_list's pending locks (expensive and cheap). + // checkpoint_pending may be true of false. + // Here are the rules for reading/modifying this bit. + // - To transition this field from false to true during begin_checkpoint, + // we must be holding both of the pair_list's pending locks. + // - To transition this field from true to false during end_checkpoint, + // we must be holding the value_rwlock. + // - For a non-checkpoint thread to read the value, we must hold both the + // value_rwlock and one of the pair_list's pending locks + // - For the checkpoint thread to read the value, we must + // hold the value_rwlock + // + bool checkpoint_pending; // If this is on, then we have got to resolve checkpointing modifying it. + + // these are variables that are only used to transfer information to background threads + // we cache them here to avoid a malloc. In the future, we should investigate if this + // is necessary, as having these fields here is not technically necessary + long size_evicting_estimate; + evictor* ev; + pair_list* list; + + // A PAIR is stored in a pair_list (which happens to be PAIR->list). + // These variables are protected by the list lock in the pair_list + // + // clock_next,clock_prev represent a circular doubly-linked list. + PAIR clock_next,clock_prev; // In clock. + PAIR hash_chain; + + // pending_next,pending_next represent a non-circular doubly-linked list. + PAIR pending_next; + PAIR pending_prev; + + // cf_next, cf_prev represent a non-circular doubly-linked list. + // entries in linked list for PAIRs in a cachefile, these are protected + // by the list lock of the PAIR's pair_list. They are used to make + // cachetable_flush_cachefile cheaper so that we don't need + // to search the entire cachetable to find a particular cachefile's + // PAIRs + PAIR cf_next; + PAIR cf_prev; +}; + +// +// This initializes the fields and members of the pair. +// +void pair_init(PAIR p, + CACHEFILE cachefile, + CACHEKEY key, + void *value, + PAIR_ATTR attr, + enum cachetable_dirty dirty, + uint32_t fullhash, + CACHETABLE_WRITE_CALLBACK write_callback, + evictor *ev, + pair_list *list); + + +/////////////////////////////////////////////////////////////////////////////// +// +// The pair list maintains the set of PAIR's that make up +// the cachetable. +// +class pair_list { +public: + // + // the following fields are protected by the list lock + // + uint32_t m_n_in_table; // number of pairs in the hash table + uint32_t m_table_size; // number of buckets in the hash table + uint32_t m_num_locks; + PAIR *m_table; // hash table + toku_mutex_aligned_t *m_mutexes; + // + // The following fields are the heads of various linked lists. + // They also protected by the list lock, but their + // usage is not as straightforward. For each of them, + // only ONE thread is allowed iterate over them with + // a read lock on the list lock. All other threads + // that want to modify elements in the lists or iterate over + // the lists must hold the write list lock. Here is the + // association between what threads may hold a read lock + // on the list lock while iterating: + // - clock_head -> eviction thread (evictor) + // - cleaner_head -> cleaner thread (cleaner) + // - pending_head -> checkpoint thread (checkpointer) + // + PAIR m_clock_head; // of clock . head is the next thing to be up for decrement. + PAIR m_cleaner_head; // for cleaner thread. head is the next thing to look at for possible cleaning. + PAIR m_checkpoint_head; // for begin checkpoint to iterate over PAIRs and mark as pending_checkpoint + PAIR m_pending_head; // list of pairs marked with checkpoint_pending + + // this field is public so we are still POD + + // usage of this lock is described above + toku_pthread_rwlock_t m_list_lock; + // + // these locks are the "pending locks" referenced + // in comments about PAIR->checkpoint_pending. There + // are two of them, but both serve the same purpose, which + // is to protect the transition of a PAIR's checkpoint pending + // value from false to true during begin_checkpoint. + // We use two locks, because threads that want to read the + // checkpoint_pending value may hold a lock for varying periods of time. + // Threads running eviction may need to protect checkpoint_pending + // while writing a node to disk, which is an expensive operation, + // so it uses pending_lock_expensive. Client threads that + // want to pin PAIRs will want to protect checkpoint_pending + // just long enough to read the value and wipe it out. This is + // a cheap operation, and as a result, uses pending_lock_cheap. + // + // By having two locks, and making begin_checkpoint first + // grab pending_lock_expensive and then pending_lock_cheap, + // we ensure that threads that want to pin nodes can grab + // only pending_lock_cheap, and never block behind threads + // holding pending_lock_expensive and writing a node out to disk + // + toku_pthread_rwlock_t m_pending_lock_expensive; + toku_pthread_rwlock_t m_pending_lock_cheap; + void init(); + void destroy(); + void evict_completely(PAIR pair); + void evict_from_cachetable(PAIR pair); + void evict_from_cachefile(PAIR pair); + void add_to_cachetable_only(PAIR p); + void put(PAIR pair); + PAIR find_pair(CACHEFILE file, CACHEKEY key, uint32_t hash); + void pending_pairs_remove (PAIR p); + void verify(); + void get_state(int *num_entries, int *hash_size); + void read_list_lock(); + void read_list_unlock(); + void write_list_lock(); + void write_list_unlock(); + void read_pending_exp_lock(); + void read_pending_exp_unlock(); + void write_pending_exp_lock(); + void write_pending_exp_unlock(); + void read_pending_cheap_lock(); + void read_pending_cheap_unlock(); + void write_pending_cheap_lock(); + void write_pending_cheap_unlock(); + toku_mutex_t* get_mutex_for_pair(uint32_t fullhash); + void pair_lock_by_fullhash(uint32_t fullhash); + void pair_unlock_by_fullhash(uint32_t fullhash); + +private: + void pair_remove (PAIR p); + void remove_from_hash_chain(PAIR p); + void add_to_cf_list (PAIR p); + void add_to_clock (PAIR p); + void add_to_hash_chain(PAIR p); +}; + +/////////////////////////////////////////////////////////////////////////////// +// +// Wrapper for the head of our cachefile list. +// +class cachefile_list { +public: + void init(); + void destroy(); + void read_lock(); + void read_unlock(); + void write_lock(); + void write_unlock(); + int cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf); + int cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf); + void add_cf_unlocked(CACHEFILE newcf); + void add_stale_cf(CACHEFILE newcf); + void remove_cf(CACHEFILE cf); + void remove_stale_cf_unlocked(CACHEFILE cf); + FILENUM reserve_filenum(); + uint32_t get_new_hash_id_unlocked(); + CACHEFILE find_cachefile_unlocked(struct fileid* fileid); + CACHEFILE find_stale_cachefile_unlocked(struct fileid* fileid); + void verify_unused_filenum(FILENUM filenum); + bool evict_some_stale_pair(evictor* ev); + void free_stale_data(evictor* ev); + // access to these fields are protected by the lock + FILENUM m_next_filenum_to_use; + uint32_t m_next_hash_id_to_use; + toku_pthread_rwlock_t m_lock; // this field is publoc so we are still POD + toku::omt<CACHEFILE> m_active_filenum; + toku::omt<CACHEFILE> m_active_fileid; + toku::omt<CACHEFILE> m_stale_fileid; +private: + CACHEFILE find_cachefile_in_list_unlocked(CACHEFILE start, struct fileid* fileid); +}; + + +/////////////////////////////////////////////////////////////////////////////// +// +// The checkpointer handles starting and finishing checkpoints of the +// cachetable's data. +// +class checkpointer { +public: + int init(pair_list *_pl, TOKULOGGER _logger, evictor *_ev, cachefile_list *files); + void destroy(); + void set_checkpoint_period(uint32_t new_period); + uint32_t get_checkpoint_period(); + int shutdown(); + bool has_been_shutdown(); + void begin_checkpoint(); + void add_background_job(); + void remove_background_job(); + void end_checkpoint(void (*testcallback_f)(void*), void* testextra); + TOKULOGGER get_logger(); + // used during begin_checkpoint + void increment_num_txns(); +private: + uint32_t m_checkpoint_num_txns; // how many transactions are in the checkpoint + TOKULOGGER m_logger; + LSN m_lsn_of_checkpoint_in_progress; + uint32_t m_checkpoint_num_files; // how many cachefiles are in the checkpoint + struct minicron m_checkpointer_cron; // the periodic checkpointing thread + cachefile_list *m_cf_list; + pair_list *m_list; + evictor *m_ev; + bool m_checkpointer_cron_init; + bool m_checkpointer_init; + + // variable used by the checkpoint thread to know + // when all work induced by cloning on client threads is done + BACKGROUND_JOB_MANAGER m_checkpoint_clones_bjm; + // private methods for begin_checkpoint + void update_cachefiles(); + void log_begin_checkpoint(); + void turn_on_pending_bits(); + // private methods for end_checkpoint + void fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs); + void checkpoint_pending_pairs(); + void checkpoint_userdata(CACHEFILE* checkpoint_cfs); + void log_end_checkpoint(); + void end_checkpoint_userdata(CACHEFILE* checkpoint_cfs); + void remove_cachefiles(CACHEFILE* checkpoint_cfs); + + // Unit test struct needs access to private members. + friend struct checkpointer_test; +}; + +// +// This is how often we want the eviction thread +// to run, in seconds. +// +const int EVICTION_PERIOD = 1; + +/////////////////////////////////////////////////////////////////////////////// +// +// The evictor handles the removal of pairs from the pair list/cachetable. +// +class evictor { +public: + int init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period); + void destroy(); + void add_pair_attr(PAIR_ATTR attr); + void remove_pair_attr(PAIR_ATTR attr); + void change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr); + void add_cloned_data_size(long size); + void remove_cloned_data_size(long size); + uint64_t reserve_memory(double fraction, uint64_t upper_bound); + void release_reserved_memory(uint64_t reserved_memory); + void run_eviction_thread(); + void do_partial_eviction(PAIR p); + void evict_pair(PAIR p, bool checkpoint_pending); + void wait_for_cache_pressure_to_subside(); + void signal_eviction_thread(); + void signal_eviction_thread_locked(); + bool should_client_thread_sleep(); + bool should_client_wake_eviction_thread(); + // function needed for testing + void get_state(long *size_current_ptr, long *size_limit_ptr); + void fill_engine_status(); + void set_enable_partial_eviction(bool enabled); + bool get_enable_partial_eviction(void) const; +private: + void add_to_size_current(long size); + void remove_from_size_current(long size); + void run_eviction(); + bool run_eviction_on_pair(PAIR p); + void try_evict_pair(PAIR p); + void decrease_size_evicting(long size_evicting_estimate); + bool should_sleeping_clients_wakeup(); + bool eviction_needed(); + + // We have some intentional races with these variables because we're ok with reading something a little bit old. + // Provide some hooks for reading variables in an unsafe way so that there are function names we can stick in a valgrind suppression. + int64_t unsafe_read_size_current(void) const; + int64_t unsafe_read_size_evicting(void) const; + + pair_list* m_pl; + cachefile_list* m_cf_list; + int64_t m_size_current; // the sum of the sizes of the pairs in the cachetable + int64_t m_size_cloned_data; // stores amount of cloned data we have, only used for engine status + // changes to these two values are protected + // by ev_thread_lock + int64_t m_size_reserved; // How much memory is reserved (e.g., by the loader) + int64_t m_size_evicting; // the sum of the sizes of the pairs being written + + // these are constants + int64_t m_low_size_watermark; // target max size of cachetable that eviction thread aims for + int64_t m_low_size_hysteresis; // if cachetable grows to this size, client threads wake up eviction thread upon adding data + int64_t m_high_size_watermark; // if cachetable grows to this size, client threads sleep upon adding data + int64_t m_high_size_hysteresis; // if > cachetable size, then sleeping client threads may wake up + + bool m_enable_partial_eviction; // true if partial evictions are permitted + + // used to calculate random numbers + struct random_data m_random_data; + char m_random_statebuf[64]; + + // mutex that protects fields listed immedietly below + toku_mutex_t m_ev_thread_lock; + // the eviction thread + toku_pthread_t m_ev_thread; + // condition variable that controls the sleeping period + // of the eviction thread + toku_cond_t m_ev_thread_cond; + // number of client threads that are currently sleeping + // due to an over-subscribed cachetable + uint32_t m_num_sleepers; + // states if the eviction thread should run. set to true + // in init, set to false during destroy + bool m_run_thread; + // bool that states if the eviction thread is currently running + bool m_ev_thread_is_running; + // period which the eviction thread sleeps + uint32_t m_period_in_seconds; + // condition variable on which client threads wait on when sleeping + // due to an over-subscribed cachetable + toku_cond_t m_flow_control_cond; + + // variables for engine status + PARTITIONED_COUNTER m_size_nonleaf; + PARTITIONED_COUNTER m_size_leaf; + PARTITIONED_COUNTER m_size_rollback; + PARTITIONED_COUNTER m_size_cachepressure; + PARTITIONED_COUNTER m_wait_pressure_count; + PARTITIONED_COUNTER m_wait_pressure_time; + PARTITIONED_COUNTER m_long_wait_pressure_count; + PARTITIONED_COUNTER m_long_wait_pressure_time; + + KIBBUTZ m_kibbutz; + + // this variable is ONLY used for testing purposes + uint64_t m_num_eviction_thread_runs; + + bool m_ev_thread_init; + bool m_evictor_init; + + friend class evictor_test_helpers; + friend class evictor_unit_test; +}; + +/////////////////////////////////////////////////////////////////////////////// +// +// Iterates over the clean head in the pair list, calling the cleaner +// callback on each node in that list. +// +class cleaner { +public: + int init(uint32_t cleaner_iterations, pair_list* _pl, CACHETABLE _ct); + void destroy(void); + uint32_t get_iterations(void); + void set_iterations(uint32_t new_iterations); + uint32_t get_period_unlocked(void); + void set_period(uint32_t new_period); + int run_cleaner(void); + +private: + pair_list* m_pl; + CACHETABLE m_ct; + struct minicron m_cleaner_cron; // the periodic cleaner thread + uint32_t m_cleaner_iterations; // how many times to run the cleaner per + // cleaner period (minicron has a + // minimum period of 1s so if you want + // more frequent cleaner runs you must + // use this) + bool m_cleaner_cron_init; + bool m_cleaner_init; +}; + +/////////////////////////////////////////////////////////////////////////////// +// +// The cachetable is as close to an ENV as we get. +// +struct cachetable { + pair_list list; + cleaner cl; + evictor ev; + checkpointer cp; + cachefile_list cf_list; + + KIBBUTZ client_kibbutz; // pool of worker threads and jobs to do asynchronously for the client. + KIBBUTZ ct_kibbutz; // pool of worker threads and jobs to do asynchronously for the cachetable + KIBBUTZ checkpointing_kibbutz; // small pool for checkpointing cloned pairs + + char *env_dir; +}; diff --git a/storage/tokudb/PerconaFT/ft/cachetable/cachetable.cc b/storage/tokudb/PerconaFT/ft/cachetable/cachetable.cc new file mode 100644 index 00000000..034d5442 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/cachetable.cc @@ -0,0 +1,5018 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include <my_global.h> +#include <string.h> +#include <time.h> +#include <stdarg.h> + +#include <portability/memory.h> +#include <portability/toku_race_tools.h> +#include <portability/toku_atomic.h> +#include <portability/toku_pthread.h> +#include <portability/toku_portability.h> +#include <portability/toku_stdlib.h> +#include <portability/toku_time.h> + +#include "ft/cachetable/cachetable.h" +#include "ft/cachetable/cachetable-internal.h" +#include "ft/cachetable/checkpoint.h" +#include "ft/logger/log-internal.h" +#include "util/rwlock.h" +#include "util/scoped_malloc.h" +#include "util/status.h" +#include "util/context.h" + +toku_instr_key *cachetable_m_mutex_key; +toku_instr_key *cachetable_ev_thread_lock_mutex_key; + +toku_instr_key *cachetable_m_list_lock_key; +toku_instr_key *cachetable_m_pending_lock_expensive_key; +toku_instr_key *cachetable_m_pending_lock_cheap_key; +toku_instr_key *cachetable_m_lock_key; + +toku_instr_key *cachetable_value_key; +toku_instr_key *cachetable_disk_nb_rwlock_key; + +toku_instr_key *cachetable_p_refcount_wait_key; +toku_instr_key *cachetable_m_flow_control_cond_key; +toku_instr_key *cachetable_m_ev_thread_cond_key; + +toku_instr_key *cachetable_disk_nb_mutex_key; +toku_instr_key *log_internal_lock_mutex_key; +toku_instr_key *eviction_thread_key; + +/////////////////////////////////////////////////////////////////////////////////// +// Engine status +// +// Status is intended for display to humans to help understand system behavior. +// It does not need to be perfectly thread-safe. + +// These should be in the cachetable object, but we make them file-wide so that gdb can get them easily. +// They were left here after engine status cleanup (#2949, rather than moved into the status struct) +// so they are still easily available to the debugger and to save lots of typing. +static uint64_t cachetable_miss; +static uint64_t cachetable_misstime; // time spent waiting for disk read +static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable? +static uint64_t cachetable_evictions; +static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed + + +// Note, toku_cachetable_get_status() is below, after declaration of cachetable. + +static void * const zero_value = nullptr; +static PAIR_ATTR const zero_attr = { + .size = 0, + .nonleaf_size = 0, + .leaf_size = 0, + .rollback_size = 0, + .cache_pressure_size = 0, + .is_valid = true +}; + + +static inline void ctpair_destroy(PAIR p) { + p->value_rwlock.deinit(); + paranoid_invariant(p->refcount == 0); + nb_mutex_destroy(&p->disk_nb_mutex); + toku_cond_destroy(&p->refcount_wait); + toku_free(p); +} + +static inline void pair_lock(PAIR p) { + toku_mutex_lock(p->mutex); +} + +static inline void pair_unlock(PAIR p) { + toku_mutex_unlock(p->mutex); +} + +// adds a reference to the PAIR +// on input and output, PAIR mutex is held +static void pair_add_ref_unlocked(PAIR p) { + p->refcount++; +} + +// releases a reference to the PAIR +// on input and output, PAIR mutex is held +static void pair_release_ref_unlocked(PAIR p) { + paranoid_invariant(p->refcount > 0); + p->refcount--; + if (p->refcount == 0 && p->num_waiting_on_refs > 0) { + toku_cond_broadcast(&p->refcount_wait); + } +} + +static void pair_wait_for_ref_release_unlocked(PAIR p) { + p->num_waiting_on_refs++; + while (p->refcount > 0) { + toku_cond_wait(&p->refcount_wait, p->mutex); + } + p->num_waiting_on_refs--; +} + +bool toku_ctpair_is_write_locked(PAIR pair) { + return pair->value_rwlock.writers() == 1; +} + +void +toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { + ct_status.init(); + CT_STATUS_VAL(CT_MISS) = cachetable_miss; + CT_STATUS_VAL(CT_MISSTIME) = cachetable_misstime; + CT_STATUS_VAL(CT_PREFETCHES) = cachetable_prefetches; + CT_STATUS_VAL(CT_EVICTIONS) = cachetable_evictions; + CT_STATUS_VAL(CT_CLEANER_EXECUTIONS) = cleaner_executions; + CT_STATUS_VAL(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct); + CT_STATUS_VAL(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct); + toku_kibbutz_get_status(ct->client_kibbutz, + &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS), + &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS_ACTIVE), + &CT_STATUS_VAL(CT_POOL_CLIENT_QUEUE_SIZE), + &CT_STATUS_VAL(CT_POOL_CLIENT_MAX_QUEUE_SIZE), + &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED), + &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME)); + toku_kibbutz_get_status(ct->ct_kibbutz, + &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS), + &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE), + &CT_STATUS_VAL(CT_POOL_CACHETABLE_QUEUE_SIZE), + &CT_STATUS_VAL(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE), + &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED), + &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME)); + toku_kibbutz_get_status(ct->checkpointing_kibbutz, + &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS), + &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE), + &CT_STATUS_VAL(CT_POOL_CHECKPOINT_QUEUE_SIZE), + &CT_STATUS_VAL(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE), + &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED), + &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME)); + ct->ev.fill_engine_status(); + *statp = ct_status; +} + +// FIXME global with no toku prefix +void remove_background_job_from_cf(CACHEFILE cf) +{ + bjm_remove_background_job(cf->bjm); +} + +// FIXME global with no toku prefix +void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra) +// The function f must call remove_background_job_from_cf when it completes +{ + int r = bjm_add_background_job(cf->bjm); + // if client is adding a background job, then it must be done + // at a time when the manager is accepting background jobs, otherwise + // the client is screwing up + assert_zero(r); + toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra); +} + +static int +checkpoint_thread (void *checkpointer_v) +// Effect: If checkpoint_period>0 thn periodically run a checkpoint. +// If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later. +// If someone sets the checkpoint_shutdown boolean , then this thread exits. +// This thread notices those changes by waiting on a condition variable. +{ + CHECKPOINTER CAST_FROM_VOIDP(cp, checkpointer_v); + int r = toku_checkpoint(cp, cp->get_logger(), NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT); + invariant_zero(r); + return r; +} + +void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) { + ct->cp.set_checkpoint_period(new_period); +} + +uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) { + return ct->cp.get_checkpoint_period(); +} + +void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) { + if(force_recovery) { + return; + } + ct->cl.set_period(new_period); +} + +uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) { + return ct->cl.get_period_unlocked(); +} + +void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations) { + ct->cl.set_iterations(new_iterations); +} + +uint32_t toku_get_cleaner_iterations (CACHETABLE ct) { + return ct->cl.get_iterations(); +} + +uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) { + return ct->cl.get_iterations(); +} + +void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled) { + ct->ev.set_enable_partial_eviction(enabled); +} + +bool toku_get_enable_partial_eviction (CACHETABLE ct) { + return ct->ev.get_enable_partial_eviction(); +} + +// reserve 25% as "unreservable". The loader cannot have it. +#define unreservable_memory(size) ((size)/4) + +int toku_cachetable_create_ex(CACHETABLE *ct_result, long size_limit, + unsigned long client_pool_threads, + unsigned long cachetable_pool_threads, + unsigned long checkpoint_pool_threads, + LSN UU(initial_lsn), TOKULOGGER logger) { + int result = 0; + int r; + + if (size_limit == 0) { + size_limit = 128*1024*1024; + } + + CACHETABLE XCALLOC(ct); + ct->list.init(); + ct->cf_list.init(); + + int num_processors = toku_os_get_number_active_processors(); + int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1; + r = toku_kibbutz_create(client_pool_threads ? client_pool_threads : num_processors, + &ct->client_kibbutz); + if (r != 0) { + result = r; + goto cleanup; + } + r = toku_kibbutz_create(cachetable_pool_threads ? cachetable_pool_threads : 2*num_processors, + &ct->ct_kibbutz); + if (r != 0) { + result = r; + goto cleanup; + } + r = toku_kibbutz_create(checkpoint_pool_threads ? checkpoint_pool_threads : checkpointing_nworkers, + &ct->checkpointing_kibbutz); + if (r != 0) { + result = r; + goto cleanup; + } + // must be done after creating ct_kibbutz + r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD); + if (r != 0) { + result = r; + goto cleanup; + } + r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list); + if (r != 0) { + result = r; + goto cleanup; + } + r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration + if (r != 0) { + result = r; + goto cleanup; + } + ct->env_dir = toku_xstrdup("."); +cleanup: + if (result == 0) { + *ct_result = ct; + } else { + toku_cachetable_close(&ct); + } + return result; +} + +// Returns a pointer to the checkpoint contained within +// the given cachetable. +CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) { + return &ct->cp; +} + +uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) { + uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound); + return reserved_memory; +} + +void toku_cachetable_release_reserved_memory(CACHETABLE ct, uint64_t reserved_memory) { + ct->ev.release_reserved_memory(reserved_memory); +} + +void +toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) { + toku_free(ct->env_dir); + ct->env_dir = toku_xstrdup(env_dir); +} + +// What cachefile goes with particular iname (iname relative to env)? +// The transaction that is adding the reference might not have a reference +// to the ft, therefore the cachefile might be closing. +// If closing, we want to return that it is not there, but must wait till after +// the close has finished. +// Once the close has finished, there must not be a cachefile with that name +// in the cachetable. +int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) { + return ct->cf_list.cachefile_of_iname_in_env(iname_in_env, cf); +} + +// What cachefile goes with particular fd? +// This function can only be called if the ft is still open, so file must +// still be open +int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) { + return ct->cf_list.cachefile_of_filenum(filenum, cf); +} + +// TEST-ONLY function +// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. +int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_in_env) { + FILENUM filenum = toku_cachetable_reserve_filenum(ct); + bool was_open; + return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_in_env, filenum, &was_open); +} + +// Get a unique filenum from the cachetable +FILENUM +toku_cachetable_reserve_filenum(CACHETABLE ct) { + return ct->cf_list.reserve_filenum(); +} + +static void create_new_cachefile( + CACHETABLE ct, + FILENUM filenum, + uint32_t hash_id, + int fd, + const char *fname_in_env, + struct fileid fileid, + CACHEFILE *cfptr + ) { + // File is not open. Make a new cachefile. + CACHEFILE newcf = NULL; + XCALLOC(newcf); + newcf->cachetable = ct; + newcf->hash_id = hash_id; + newcf->fileid = fileid; + + newcf->filenum = filenum; + newcf->fd = fd; + newcf->fname_in_env = toku_xstrdup(fname_in_env); + bjm_init(&newcf->bjm); + *cfptr = newcf; +} + +int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, + const char *fname_in_env, + FILENUM filenum, bool* was_open) { + int r; + CACHEFILE newcf; + struct fileid fileid; + + assert(filenum.fileid != FILENUM_NONE.fileid); + r = toku_os_get_unique_file_id(fd, &fileid); + if (r != 0) { + r = get_error_errno(); + close(fd); + return r; + } + ct->cf_list.write_lock(); + CACHEFILE existing_cf = ct->cf_list.find_cachefile_unlocked(&fileid); + if (existing_cf) { + *was_open = true; + // Reuse an existing cachefile and close the caller's fd, whose + // responsibility has been passed to us. + r = close(fd); + assert(r == 0); + *cfptr = existing_cf; + r = 0; + goto exit; + } + *was_open = false; + ct->cf_list.verify_unused_filenum(filenum); + // now let's try to find it in the stale cachefiles + existing_cf = ct->cf_list.find_stale_cachefile_unlocked(&fileid); + // found the stale file, + if (existing_cf) { + // fix up the fields in the cachefile + existing_cf->filenum = filenum; + existing_cf->fd = fd; + existing_cf->fname_in_env = toku_xstrdup(fname_in_env); + bjm_init(&existing_cf->bjm); + + // now we need to move all the PAIRs in it back into the cachetable + ct->list.write_list_lock(); + for (PAIR curr_pair = existing_cf->cf_head; curr_pair; curr_pair = curr_pair->cf_next) { + pair_lock(curr_pair); + ct->list.add_to_cachetable_only(curr_pair); + pair_unlock(curr_pair); + } + ct->list.write_list_unlock(); + // move the cachefile back to the list of active cachefiles + ct->cf_list.remove_stale_cf_unlocked(existing_cf); + ct->cf_list.add_cf_unlocked(existing_cf); + *cfptr = existing_cf; + r = 0; + goto exit; + } + + create_new_cachefile( + ct, + filenum, + ct->cf_list.get_new_hash_id_unlocked(), + fd, + fname_in_env, + fileid, + &newcf + ); + + ct->cf_list.add_cf_unlocked(newcf); + + *cfptr = newcf; + r = 0; + exit: + ct->cf_list.write_unlock(); + return r; +} + +static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, bool evict_completely); + +//TEST_ONLY_FUNCTION +int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) { + char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env); + int fd = open(fname_in_cwd, flags+O_BINARY, mode); + int r; + if (fd < 0) { + r = get_error_errno(); + } else { + r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env); + } + toku_free(fname_in_cwd); + return r; +} + +char * +toku_cachefile_fname_in_env (CACHEFILE cf) { + if (cf) { + return cf->fname_in_env; + } + return nullptr; +} + +void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env) { + cf->fname_in_env = new_fname_in_env; +} + +int +toku_cachefile_get_fd (CACHEFILE cf) { + return cf->fd; +} + +static void cachefile_destroy(CACHEFILE cf) { + if (cf->free_userdata) { + cf->free_userdata(cf, cf->userdata); + } + toku_free(cf); +} + +void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) { + CACHEFILE cf = *cfp; + CACHETABLE ct = cf->cachetable; + + bjm_wait_for_jobs_to_finish(cf->bjm); + + // Clients should never attempt to close a cachefile that is being + // checkpointed. We notify clients this is happening in the + // note_pin_by_checkpoint callback. + assert(!cf->for_checkpoint); + + // Flush the cachefile and remove all of its pairs from the cachetable, + // but keep the PAIRs linked in the cachefile. We will store the cachefile + // away in case it gets opened immedietely + // + // if we are unlinking on close, then we want to evict completely, + // otherwise, we will keep the PAIRs and cachefile around in case + // a subsequent open comes soon + cachetable_flush_cachefile(ct, cf, cf->unlink_on_close); + + // Call the close userdata callback to notify the client this cachefile + // and its underlying file are going to be closed + if (cf->close_userdata) { + cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn); + } + // fsync and close the fd. + toku_file_fsync_without_accounting(cf->fd); + int r = close(cf->fd); + assert(r == 0); + cf->fd = -1; + + // destroy the parts of the cachefile + // that do not persist across opens/closes + bjm_destroy(cf->bjm); + cf->bjm = NULL; + + // remove the cf from the list of active cachefiles + ct->cf_list.remove_cf(cf); + cf->filenum = FILENUM_NONE; + + // Unlink the file if the bit was set + if (cf->unlink_on_close) { + char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(cf->cachetable, cf->fname_in_env); + r = unlink(fname_in_cwd); + assert_zero(r); + toku_free(fname_in_cwd); + } + toku_free(cf->fname_in_env); + cf->fname_in_env = NULL; + + // we destroy the cf if the unlink bit was set or if no PAIRs exist + // if no PAIRs exist, there is no sense in keeping the cachefile around + bool destroy_cf = cf->unlink_on_close || (cf->cf_head == NULL); + if (destroy_cf) { + cachefile_destroy(cf); + } + else { + ct->cf_list.add_stale_cf(cf); + } +} + +// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c +// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number. +// Instead we can use a bitmask on a table of size power of two. +// This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan +static inline uint32_t rot(uint32_t x, uint32_t k) { + return (x<<k) | (x>>(32-k)); +} +static inline uint32_t final (uint32_t a, uint32_t b, uint32_t c) { + c ^= b; c -= rot(b,14); + a ^= c; a -= rot(c,11); + b ^= a; b -= rot(a,25); + c ^= b; c -= rot(b,16); + a ^= c; a -= rot(c,4); + b ^= a; b -= rot(a,14); + c ^= b; c -= rot(b,24); + return c; +} + +uint32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key) +// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two. +{ + return final(cachefile->hash_id, (uint32_t)(key.b>>32), (uint32_t)key.b); +} + +#define CLOCK_SATURATION 15 +#define CLOCK_INITIAL_COUNT 3 + +// Requires pair's mutex to be held +static void pair_touch (PAIR p) { + p->count = (p->count < CLOCK_SATURATION) ? p->count+1 : CLOCK_SATURATION; +} + +// Remove a pair from the cachetable, requires write list lock to be held and p->mutex to be held +// Effects: the pair is removed from the LRU list and from the cachetable's hash table. +// The size of the objects in the cachetable is adjusted by the size of the pair being +// removed. +static void cachetable_remove_pair (pair_list* list, evictor* ev, PAIR p) { + list->evict_completely(p); + ev->remove_pair_attr(p->attr); +} + +static void cachetable_free_pair(PAIR p) { + CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback; + CACHEKEY key = p->key; + void *value = p->value_data; + void* disk_data = p->disk_data; + void *write_extraargs = p->write_extraargs; + PAIR_ATTR old_attr = p->attr; + + cachetable_evictions++; + PAIR_ATTR new_attr = p->attr; + // Note that flush_callback is called with write_me false, so the only purpose of this + // call is to tell the ft layer to evict the node (keep_me is false). + // Also, because we have already removed the PAIR from the cachetable in + // cachetable_remove_pair, we cannot pass in p->cachefile and p->cachefile->fd + // for the first two parameters, as these may be invalid (#5171), so, we + // pass in NULL and -1, dummy values + flush_callback(NULL, -1, key, value, &disk_data, write_extraargs, old_attr, &new_attr, false, false, true, false); + + ctpair_destroy(p); +} + +// assumes value_rwlock and disk_nb_mutex held on entry +// responsibility of this function is to only write a locked PAIR to disk +// and NOTHING else. We do not manipulate the state of the PAIR +// of the cachetable here (with the exception of ct->size_current for clones) +// +// No pair_list lock should be held, and the PAIR mutex should not be held +// +static void cachetable_only_write_locked_data( + evictor* ev, + PAIR p, + bool for_checkpoint, + PAIR_ATTR* new_attr, + bool is_clone + ) +{ + CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback; + CACHEFILE cachefile = p->cachefile; + CACHEKEY key = p->key; + void *value = is_clone ? p->cloned_value_data : p->value_data; + void *disk_data = p->disk_data; + void *write_extraargs = p->write_extraargs; + PAIR_ATTR old_attr; + // we do this for drd. If we are a cloned pair and only + // have the disk_nb_mutex, it is a race to access p->attr. + // Luckily, old_attr here is only used for some test applications, + // so inaccurate non-size fields are ok. + if (is_clone) { + old_attr = make_pair_attr(p->cloned_value_size); + } + else { + old_attr = p->attr; + } + bool dowrite = true; + + // write callback + flush_callback( + cachefile, + cachefile->fd, + key, + value, + &disk_data, + write_extraargs, + old_attr, + new_attr, + dowrite, + is_clone ? false : true, // keep_me (only keep if this is not cloned pointer) + for_checkpoint, + is_clone //is_clone + ); + p->disk_data = disk_data; + if (is_clone) { + p->cloned_value_data = NULL; + ev->remove_cloned_data_size(p->cloned_value_size); + p->cloned_value_size = 0; + } +} + + +// +// This function writes a PAIR's value out to disk. Currently, it is called +// by get_and_pin functions that write a PAIR out for checkpoint, by +// evictor threads that evict dirty PAIRS, and by the checkpoint thread +// that needs to write out a dirty node for checkpoint. +// +// Requires on entry for p->mutex to NOT be held, otherwise +// calling cachetable_only_write_locked_data will be very expensive +// +static void cachetable_write_locked_pair( + evictor* ev, + PAIR p, + bool for_checkpoint + ) +{ + PAIR_ATTR old_attr = p->attr; + PAIR_ATTR new_attr = p->attr; + // grabbing the disk_nb_mutex here ensures that + // after this point, no one is writing out a cloned value + // if we grab the disk_nb_mutex inside the if clause, + // then we may try to evict a PAIR that is in the process + // of having its clone be written out + pair_lock(p); + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + // make sure that assumption about cloned_value_data is true + // if we have grabbed the disk_nb_mutex, then that means that + // there should be no cloned value data + assert(p->cloned_value_data == NULL); + if (p->dirty) { + cachetable_only_write_locked_data(ev, p, for_checkpoint, &new_attr, false); + // + // now let's update variables + // + if (new_attr.is_valid) { + p->attr = new_attr; + ev->change_pair_attr(old_attr, new_attr); + } + } + // the pair is no longer dirty once written + p->dirty = CACHETABLE_CLEAN; + pair_lock(p); + nb_mutex_unlock(&p->disk_nb_mutex); + pair_unlock(p); +} + +// Worker thread function to writes and evicts a pair from memory to its cachefile +static void cachetable_evicter(void* extra) { + PAIR p = (PAIR)extra; + pair_list* pl = p->list; + CACHEFILE cf = p->cachefile; + pl->read_pending_exp_lock(); + bool for_checkpoint = p->checkpoint_pending; + p->checkpoint_pending = false; + // per the contract of evictor::evict_pair, + // the pair's mutex, p->mutex, must be held on entry + pair_lock(p); + p->ev->evict_pair(p, for_checkpoint); + pl->read_pending_exp_unlock(); + bjm_remove_background_job(cf->bjm); +} + +static void cachetable_partial_eviction(void* extra) { + PAIR p = (PAIR)extra; + CACHEFILE cf = p->cachefile; + p->ev->do_partial_eviction(p); + bjm_remove_background_job(cf->bjm); +} + +void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) { + void* old_value = old_pair->value_data; + void* new_value = new_pair->value_data; + old_pair->value_data = new_value; + new_pair->value_data = old_value; +} + +void toku_cachetable_maybe_flush_some(CACHETABLE ct) { + // TODO: <CER> Maybe move this... + ct->ev.signal_eviction_thread(); +} + +// Initializes a pair's members. +// +void pair_init(PAIR p, + CACHEFILE cachefile, + CACHEKEY key, + void *value, + PAIR_ATTR attr, + enum cachetable_dirty dirty, + uint32_t fullhash, + CACHETABLE_WRITE_CALLBACK write_callback, + evictor *ev, + pair_list *list) +{ + p->cachefile = cachefile; + p->key = key; + p->value_data = value; + p->cloned_value_data = NULL; + p->cloned_value_size = 0; + p->disk_data = NULL; + p->attr = attr; + p->dirty = dirty; + p->fullhash = fullhash; + + p->flush_callback = write_callback.flush_callback; + p->pe_callback = write_callback.pe_callback; + p->pe_est_callback = write_callback.pe_est_callback; + p->cleaner_callback = write_callback.cleaner_callback; + p->clone_callback = write_callback.clone_callback; + p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback; + p->write_extraargs = write_callback.write_extraargs; + + p->count = 0; // <CER> Is zero the correct init value? + p->refcount = 0; + p->num_waiting_on_refs = 0; + toku_cond_init(*cachetable_p_refcount_wait_key, &p->refcount_wait, nullptr); + p->checkpoint_pending = false; + + p->mutex = list->get_mutex_for_pair(fullhash); + assert(p->mutex); + p->value_rwlock.init(p->mutex +#ifdef TOKU_MYSQL_WITH_PFS + , + *cachetable_value_key +#endif + ); + nb_mutex_init(*cachetable_disk_nb_mutex_key, + *cachetable_disk_nb_rwlock_key, + &p->disk_nb_mutex); + + p->size_evicting_estimate = 0; // <CER> Is zero the correct init value? + + p->ev = ev; + p->list = list; + + p->clock_next = p->clock_prev = NULL; + p->pending_next = p->pending_prev = NULL; + p->cf_next = p->cf_prev = NULL; + p->hash_chain = NULL; +} + +// has ct locked on entry +// This function MUST NOT release and reacquire the cachetable lock +// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. +// +// Requires pair list's write lock to be held on entry. +// the pair's mutex must be held as wel +// +// +static PAIR cachetable_insert_at(CACHETABLE ct, + CACHEFILE cachefile, CACHEKEY key, void *value, + uint32_t fullhash, + PAIR_ATTR attr, + CACHETABLE_WRITE_CALLBACK write_callback, + enum cachetable_dirty dirty) { + PAIR MALLOC(p); + assert(p); + memset(p, 0, sizeof *p); + pair_init(p, + cachefile, + key, + value, + attr, + dirty, + fullhash, + write_callback, + &ct->ev, + &ct->list + ); + + ct->list.put(p); + ct->ev.add_pair_attr(attr); + return p; +} + +// on input, the write list lock must be held AND +// the pair's mutex must be held as wel +static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) { + ct->list.put(p); + ct->ev.add_pair_attr(attr); +} + + +// has ct locked on entry +// This function MUST NOT release and reacquire the cachetable lock +// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior. +// +// Requires pair list's write lock to be held on entry +// +static void cachetable_put_internal( + CACHEFILE cachefile, + PAIR p, + void *value, + PAIR_ATTR attr, + CACHETABLE_PUT_CALLBACK put_callback + ) +{ + CACHETABLE ct = cachefile->cachetable; + // + // + // TODO: (Zardosht), make code run in debug only + // + // + //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash); + //invariant_null(dummy_p); + cachetable_insert_pair_at(ct, p, attr); + invariant_notnull(put_callback); + put_callback(p->key, value, p); +} + +// Pair mutex (p->mutex) is may or may not be held on entry, +// Holding the pair mutex on entry is not important +// for performance or corrrectness +// Pair is pinned on entry +static void +clone_pair(evictor* ev, PAIR p) { + PAIR_ATTR old_attr = p->attr; + PAIR_ATTR new_attr; + long clone_size = 0; + + // act of cloning should be fast, + // not sure if we have to release + // and regrab the cachetable lock, + // but doing it for now + p->clone_callback( + p->value_data, + &p->cloned_value_data, + &clone_size, + &new_attr, + true, + p->write_extraargs + ); + + // now we need to do the same actions we would do + // if the PAIR had been written to disk + // + // because we hold the value_rwlock, + // it doesn't matter whether we clear + // the pending bit before the clone + // or after the clone + p->dirty = CACHETABLE_CLEAN; + if (new_attr.is_valid) { + p->attr = new_attr; + ev->change_pair_attr(old_attr, new_attr); + } + p->cloned_value_size = clone_size; + ev->add_cloned_data_size(p->cloned_value_size); +} + +static void checkpoint_cloned_pair(void* extra) { + PAIR p = (PAIR)extra; + CACHETABLE ct = p->cachefile->cachetable; + PAIR_ATTR new_attr; + // note that pending lock is not needed here because + // we KNOW we are in the middle of a checkpoint + // and that a begin_checkpoint cannot happen + cachetable_only_write_locked_data( + p->ev, + p, + true, //for_checkpoint + &new_attr, + true //is_clone + ); + pair_lock(p); + nb_mutex_unlock(&p->disk_nb_mutex); + pair_unlock(p); + ct->cp.remove_background_job(); +} + +static void +checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) { + toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p); +} + + +// +// Given a PAIR p with the value_rwlock altready held, do the following: +// - If the PAIR needs to be written out to disk for checkpoint: +// - If the PAIR is cloneable, clone the PAIR and place the work +// of writing the PAIR on a background thread. +// - If the PAIR is not cloneable, write the PAIR to disk for checkpoint +// on the current thread +// +// On entry, pair's mutex is NOT held +// +static void +write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending) +{ + if (checkpoint_pending && p->checkpoint_complete_callback) { + p->checkpoint_complete_callback(p->value_data); + } + if (p->dirty && checkpoint_pending) { + if (p->clone_callback) { + pair_lock(p); + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + assert(!p->cloned_value_data); + clone_pair(&ct->ev, p); + assert(p->cloned_value_data); + // place it on the background thread and continue + // responsibility of writer thread to release disk_nb_mutex + ct->cp.add_background_job(); + checkpoint_cloned_pair_on_writer_thread(ct, p); + } + else { + // The pair is not cloneable, just write the pair to disk + // we already have p->value_rwlock and we just do the write in our own thread. + cachetable_write_locked_pair(&ct->ev, p, true); // keeps the PAIR's write lock + } + } +} + +// On entry and exit: hold the pair's mutex (p->mutex) +// Method: take write lock +// maybe write out the node +// Else release write lock +// +static void +write_pair_for_checkpoint_thread (evictor* ev, PAIR p) +{ + // Grab an exclusive lock on the pair. + // If we grab an expensive lock, then other threads will return + // TRY_AGAIN rather than waiting. In production, the only time + // another thread will check if grabbing a lock is expensive is when + // we have a clone_callback (FTNODEs), so the act of checkpointing + // will be cheap. Also, much of the time we'll just be clearing + // pending bits and that's definitely cheap. (see #5427) + p->value_rwlock.write_lock(false); + if (p->checkpoint_pending && p->checkpoint_complete_callback) { + p->checkpoint_complete_callback(p->value_data); + } + if (p->dirty && p->checkpoint_pending) { + if (p->clone_callback) { + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + assert(!p->cloned_value_data); + clone_pair(ev, p); + assert(p->cloned_value_data); + } + else { + // The pair is not cloneable, just write the pair to disk + // we already have p->value_rwlock and we just do the write in our own thread. + // this will grab and release disk_nb_mutex + pair_unlock(p); + cachetable_write_locked_pair(ev, p, true); // keeps the PAIR's write lock + pair_lock(p); + } + p->checkpoint_pending = false; + + // now release value_rwlock, before we write the PAIR out + // so that the PAIR is available to client threads + p->value_rwlock.write_unlock(); // didn't call cachetable_evict_pair so we have to unlock it ourselves. + if (p->clone_callback) { + // note that pending lock is not needed here because + // we KNOW we are in the middle of a checkpoint + // and that a begin_checkpoint cannot happen + PAIR_ATTR attr; + pair_unlock(p); + cachetable_only_write_locked_data( + ev, + p, + true, //for_checkpoint + &attr, + true //is_clone + ); + pair_lock(p); + nb_mutex_unlock(&p->disk_nb_mutex); + } + } + else { + // + // we may clear the pending bit here because we have + // both the cachetable lock and the PAIR lock. + // The rule, as mentioned in toku_cachetable_begin_checkpoint, + // is that to clear the bit, we must have both the PAIR lock + // and the pending lock + // + p->checkpoint_pending = false; + p->value_rwlock.write_unlock(); + } +} + +// +// For each PAIR associated with these CACHEFILEs and CACHEKEYs +// if the checkpoint_pending bit is set and the PAIR is dirty, write the PAIR +// to disk. +// We assume the PAIRs passed in have been locked by the client that made calls +// into the cachetable that eventually make it here. +// +static void checkpoint_dependent_pairs( + CACHETABLE ct, + uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint + PAIR* dependent_pairs, + bool* checkpoint_pending, + enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs + ) +{ + for (uint32_t i =0; i < num_dependent_pairs; i++) { + PAIR curr_dep_pair = dependent_pairs[i]; + // we need to update the dirtyness of the dependent pair, + // because the client may have dirtied it while holding its lock, + // and if the pair is pending a checkpoint, it needs to be written out + if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY; + if (checkpoint_pending[i]) { + write_locked_pair_for_checkpoint(ct, curr_dep_pair, checkpoint_pending[i]); + } + } +} + +void toku_cachetable_put_with_dep_pairs( + CACHEFILE cachefile, + CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash, + void *value, + PAIR_ATTR attr, + CACHETABLE_WRITE_CALLBACK write_callback, + void *get_key_and_fullhash_extra, + uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint + PAIR* dependent_pairs, + enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs + CACHEKEY* key, + uint32_t* fullhash, + CACHETABLE_PUT_CALLBACK put_callback + ) +{ + // + // need to get the key and filehash + // + CACHETABLE ct = cachefile->cachetable; + if (ct->ev.should_client_thread_sleep()) { + ct->ev.wait_for_cache_pressure_to_subside(); + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + + PAIR p = NULL; + XMALLOC(p); + memset(p, 0, sizeof *p); + + ct->list.write_list_lock(); + get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra); + pair_init( + p, + cachefile, + *key, + value, + attr, + CACHETABLE_DIRTY, + *fullhash, + write_callback, + &ct->ev, + &ct->list + ); + pair_lock(p); + p->value_rwlock.write_lock(true); + cachetable_put_internal( + cachefile, + p, + value, + attr, + put_callback + ); + pair_unlock(p); + bool checkpoint_pending[num_dependent_pairs]; + ct->list.write_pending_cheap_lock(); + for (uint32_t i = 0; i < num_dependent_pairs; i++) { + checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; + dependent_pairs[i]->checkpoint_pending = false; + } + ct->list.write_pending_cheap_unlock(); + ct->list.write_list_unlock(); + + // + // now that we have inserted the row, let's checkpoint the + // dependent nodes, if they need checkpointing + // + checkpoint_dependent_pairs( + ct, + num_dependent_pairs, + dependent_pairs, + checkpoint_pending, + dependent_dirty + ); +} + +void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void*value, PAIR_ATTR attr, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_PUT_CALLBACK put_callback + ) { + CACHETABLE ct = cachefile->cachetable; + if (ct->ev.should_client_thread_sleep()) { + ct->ev.wait_for_cache_pressure_to_subside(); + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + + PAIR p = NULL; + XMALLOC(p); + memset(p, 0, sizeof *p); + + ct->list.write_list_lock(); + pair_init( + p, + cachefile, + key, + value, + attr, + CACHETABLE_DIRTY, + fullhash, + write_callback, + &ct->ev, + &ct->list + ); + pair_lock(p); + p->value_rwlock.write_lock(true); + cachetable_put_internal( + cachefile, + p, + value, + attr, + put_callback + ); + pair_unlock(p); + ct->list.write_list_unlock(); +} + +static uint64_t get_tnow(void) { + struct timeval tv; + int r = gettimeofday(&tv, NULL); assert(r == 0); + return tv.tv_sec * 1000000ULL + tv.tv_usec; +} + +// +// cachetable lock and PAIR lock are held on entry +// On exit, cachetable lock is still held, but PAIR lock +// is either released. +// +// No locks are held on entry (besides the rwlock write lock of the PAIR) +// +static void +do_partial_fetch( + CACHETABLE ct, + CACHEFILE cachefile, + PAIR p, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + void *read_extraargs, + bool keep_pair_locked + ) +{ + PAIR_ATTR old_attr = p->attr; + PAIR_ATTR new_attr = zero_attr; + // As of Dr. No, only clean PAIRs may have pieces missing, + // so we do a sanity check here. + assert(!p->dirty); + + pair_lock(p); + invariant(p->value_rwlock.writers()); + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr); + lazy_assert_zero(r); + p->attr = new_attr; + ct->ev.change_pair_attr(old_attr, new_attr); + pair_lock(p); + nb_mutex_unlock(&p->disk_nb_mutex); + if (!keep_pair_locked) { + p->value_rwlock.write_unlock(); + } + pair_unlock(p); +} + +void toku_cachetable_pf_pinned_pair( + void* value, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + void* read_extraargs, + CACHEFILE cf, + CACHEKEY key, + uint32_t fullhash + ) +{ + PAIR_ATTR attr; + PAIR p = NULL; + CACHETABLE ct = cf->cachetable; + ct->list.pair_lock_by_fullhash(fullhash); + p = ct->list.find_pair(cf, key, fullhash); + assert(p != NULL); + assert(p->value_data == value); + assert(p->value_rwlock.writers()); + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + + int fd = cf->fd; + pf_callback(value, p->disk_data, read_extraargs, fd, &attr); + + pair_lock(p); + nb_mutex_unlock(&p->disk_nb_mutex); + pair_unlock(p); +} + +int toku_cachetable_get_and_pin ( + CACHEFILE cachefile, + CACHEKEY key, + uint32_t fullhash, + void**value, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + bool may_modify_value, + void* read_extraargs // parameter for fetch_callback, pf_req_callback, and pf_callback + ) +{ + pair_lock_type lock_type = may_modify_value ? PL_WRITE_EXPENSIVE : PL_READ; + // We have separate parameters of read_extraargs and write_extraargs because + // the lifetime of the two parameters are different. write_extraargs may be used + // long after this function call (e.g. after a flush to disk), whereas read_extraargs + // will not be used after this function returns. As a result, the caller may allocate + // read_extraargs on the stack, whereas write_extraargs must be allocated + // on the heap. + return toku_cachetable_get_and_pin_with_dep_pairs ( + cachefile, + key, + fullhash, + value, + write_callback, + fetch_callback, + pf_req_callback, + pf_callback, + lock_type, + read_extraargs, + 0, // number of dependent pairs that we may need to checkpoint + NULL, // array of dependent pairs + NULL // array stating dirty/cleanness of dependent pairs + ); +} + +// Read a pair from a cachefile into memory using the pair's fetch callback +// on entry, pair mutex (p->mutex) is NOT held, but pair is pinned +static void cachetable_fetch_pair( + CACHETABLE ct, + CACHEFILE cf, + PAIR p, + CACHETABLE_FETCH_CALLBACK fetch_callback, + void* read_extraargs, + bool keep_pair_locked + ) +{ + // helgrind + CACHEKEY key = p->key; + uint32_t fullhash = p->fullhash; + + void *toku_value = NULL; + void *disk_data = NULL; + PAIR_ATTR attr; + + // FIXME this should be enum cachetable_dirty, right? + int dirty = 0; + + pair_lock(p); + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + + int r; + r = fetch_callback(cf, p, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs); + if (dirty) { + p->dirty = CACHETABLE_DIRTY; + } + assert(r == 0); + + p->value_data = toku_value; + p->disk_data = disk_data; + p->attr = attr; + ct->ev.add_pair_attr(attr); + pair_lock(p); + nb_mutex_unlock(&p->disk_nb_mutex); + if (!keep_pair_locked) { + p->value_rwlock.write_unlock(); + } + pair_unlock(p); +} + +static bool get_checkpoint_pending(PAIR p, pair_list* pl) { + bool checkpoint_pending = false; + pl->read_pending_cheap_lock(); + checkpoint_pending = p->checkpoint_pending; + p->checkpoint_pending = false; + pl->read_pending_cheap_unlock(); + return checkpoint_pending; +} + +static void checkpoint_pair_and_dependent_pairs( + CACHETABLE ct, + PAIR p, + bool p_is_pending_checkpoint, + uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint + PAIR* dependent_pairs, + bool* dependent_pairs_pending_checkpoint, + enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs + ) +{ + + // + // A checkpoint must not begin while we are checking dependent pairs or pending bits. + // Here is why. + // + // Now that we have all of the locks on the pairs we + // care about, we can take care of the necessary checkpointing. + // For each pair, we simply need to write the pair if it is + // pending a checkpoint. If no pair is pending a checkpoint, + // then all of this work will be done with the cachetable lock held, + // so we don't need to worry about a checkpoint beginning + // in the middle of any operation below. If some pair + // is pending a checkpoint, then the checkpoint thread + // will not complete its current checkpoint until it can + // successfully grab a lock on the pending pair and + // remove it from its list of pairs pending a checkpoint. + // This cannot be done until we release the lock + // that we have, which is not done in this function. + // So, the point is, it is impossible for a checkpoint + // to begin while we write any of these locked pairs + // for checkpoint, even though writing a pair releases + // the cachetable lock. + // + write_locked_pair_for_checkpoint(ct, p, p_is_pending_checkpoint); + + checkpoint_dependent_pairs( + ct, + num_dependent_pairs, + dependent_pairs, + dependent_pairs_pending_checkpoint, + dependent_dirty + ); +} + +static void unpin_pair(PAIR p, bool read_lock_grabbed) { + if (read_lock_grabbed) { + p->value_rwlock.read_unlock(); + } + else { + p->value_rwlock.write_unlock(); + } +} + + +// on input, the pair's mutex is held, +// on output, the pair's mutex is not held. +// if true, we must try again, and pair is not pinned +// if false, we succeeded, the pair is pinned +static bool try_pin_pair( + PAIR p, + CACHETABLE ct, + CACHEFILE cachefile, + pair_lock_type lock_type, + uint32_t num_dependent_pairs, + PAIR* dependent_pairs, + enum cachetable_dirty* dependent_dirty, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + void* read_extraargs, + bool already_slept + ) +{ + bool dep_checkpoint_pending[num_dependent_pairs]; + bool try_again = true; + bool expensive = (lock_type == PL_WRITE_EXPENSIVE); + if (lock_type != PL_READ) { + p->value_rwlock.write_lock(expensive); + } + else { + p->value_rwlock.read_lock(); + } + pair_touch(p); + pair_unlock(p); + + bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); + + if (partial_fetch_required) { + toku::context pf_ctx(CTX_PARTIAL_FETCH); + + if (ct->ev.should_client_thread_sleep() && !already_slept) { + pair_lock(p); + unpin_pair(p, (lock_type == PL_READ)); + pair_unlock(p); + try_again = true; + goto exit; + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + // + // Just because the PAIR exists does necessarily mean the all the data the caller requires + // is in memory. A partial fetch may be required, which is evaluated above + // if the variable is true, a partial fetch is required so we must grab the PAIR's write lock + // and then call a callback to retrieve what we need + // + assert(partial_fetch_required); + // As of Dr. No, only clean PAIRs may have pieces missing, + // so we do a sanity check here. + assert(!p->dirty); + + if (lock_type == PL_READ) { + pair_lock(p); + p->value_rwlock.read_unlock(); + p->value_rwlock.write_lock(true); + pair_unlock(p); + } + else if (lock_type == PL_WRITE_CHEAP) { + pair_lock(p); + p->value_rwlock.write_unlock(); + p->value_rwlock.write_lock(true); + pair_unlock(p); + } + + partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); + if (partial_fetch_required) { + do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, true); + } + if (lock_type == PL_READ) { + // + // TODO: Zardosht, somehow ensure that a partial eviction cannot happen + // between these two calls + // + pair_lock(p); + p->value_rwlock.write_unlock(); + p->value_rwlock.read_lock(); + pair_unlock(p); + } + else if (lock_type == PL_WRITE_CHEAP) { + pair_lock(p); + p->value_rwlock.write_unlock(); + p->value_rwlock.write_lock(false); + pair_unlock(p); + } + // small hack here for #5439, + // for queries, pf_req_callback does some work for the caller, + // that information may be out of date after a write_unlock + // followed by a relock, so we do it again. + bool pf_required = pf_req_callback(p->value_data,read_extraargs); + assert(!pf_required); + } + + if (lock_type != PL_READ) { + ct->list.read_pending_cheap_lock(); + bool p_checkpoint_pending = p->checkpoint_pending; + p->checkpoint_pending = false; + for (uint32_t i = 0; i < num_dependent_pairs; i++) { + dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; + dependent_pairs[i]->checkpoint_pending = false; + } + ct->list.read_pending_cheap_unlock(); + checkpoint_pair_and_dependent_pairs( + ct, + p, + p_checkpoint_pending, + num_dependent_pairs, + dependent_pairs, + dep_checkpoint_pending, + dependent_dirty + ); + } + + try_again = false; +exit: + return try_again; +} + +int toku_cachetable_get_and_pin_with_dep_pairs ( + CACHEFILE cachefile, + CACHEKEY key, + uint32_t fullhash, + void**value, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + pair_lock_type lock_type, + void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback + uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint + PAIR* dependent_pairs, + enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs + ) +// See cachetable/cachetable.h +{ + CACHETABLE ct = cachefile->cachetable; + bool wait = false; + bool already_slept = false; + bool dep_checkpoint_pending[num_dependent_pairs]; + + // + // If in the process of pinning the node we add data to the cachetable via a partial fetch + // or a full fetch, we may need to first sleep because there is too much data in the + // cachetable. In those cases, we set the bool wait to true and goto try_again, so that + // we can do our sleep and then restart the function. + // +beginning: + if (wait) { + // We shouldn't be holding the read list lock while + // waiting for the evictor to remove pairs. + already_slept = true; + ct->ev.wait_for_cache_pressure_to_subside(); + } + + ct->list.pair_lock_by_fullhash(fullhash); + PAIR p = ct->list.find_pair(cachefile, key, fullhash); + if (p) { + // on entry, holds p->mutex (which is locked via pair_lock_by_fullhash) + // on exit, does not hold p->mutex + bool try_again = try_pin_pair( + p, + ct, + cachefile, + lock_type, + num_dependent_pairs, + dependent_pairs, + dependent_dirty, + pf_req_callback, + pf_callback, + read_extraargs, + already_slept + ); + if (try_again) { + wait = true; + goto beginning; + } + else { + goto got_value; + } + } + else { + toku::context fetch_ctx(CTX_FULL_FETCH); + + ct->list.pair_unlock_by_fullhash(fullhash); + // we only want to sleep once per call to get_and_pin. If we have already + // slept and there is still cache pressure, then we might as + // well just complete the call, because the sleep did not help + // By sleeping only once per get_and_pin, we prevent starvation and ensure + // that we make progress (however slow) on each thread, which allows + // assumptions of the form 'x will eventually happen'. + // This happens in extreme scenarios. + if (ct->ev.should_client_thread_sleep() && !already_slept) { + wait = true; + goto beginning; + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + // Since the pair was not found, we need the write list + // lock to add it. So, we have to release the read list lock + // first. + ct->list.write_list_lock(); + ct->list.pair_lock_by_fullhash(fullhash); + p = ct->list.find_pair(cachefile, key, fullhash); + if (p != NULL) { + ct->list.write_list_unlock(); + // on entry, holds p->mutex, + // on exit, does not hold p->mutex + bool try_again = try_pin_pair( + p, + ct, + cachefile, + lock_type, + num_dependent_pairs, + dependent_pairs, + dependent_dirty, + pf_req_callback, + pf_callback, + read_extraargs, + already_slept + ); + if (try_again) { + wait = true; + goto beginning; + } + else { + goto got_value; + } + } + assert(p == NULL); + + // Insert a PAIR into the cachetable + // NOTE: At this point we still have the write list lock held. + p = cachetable_insert_at( + ct, + cachefile, + key, + zero_value, + fullhash, + zero_attr, + write_callback, + CACHETABLE_CLEAN + ); + invariant_notnull(p); + + // Pin the pair. + p->value_rwlock.write_lock(true); + pair_unlock(p); + + + if (lock_type != PL_READ) { + ct->list.read_pending_cheap_lock(); + invariant(!p->checkpoint_pending); + for (uint32_t i = 0; i < num_dependent_pairs; i++) { + dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending; + dependent_pairs[i]->checkpoint_pending = false; + } + ct->list.read_pending_cheap_unlock(); + } + // We should release the lock before we perform + // these expensive operations. + ct->list.write_list_unlock(); + + if (lock_type != PL_READ) { + checkpoint_dependent_pairs( + ct, + num_dependent_pairs, + dependent_pairs, + dep_checkpoint_pending, + dependent_dirty + ); + } + uint64_t t0 = get_tnow(); + + // Retrieve the value of the PAIR from disk. + // The pair being fetched will be marked as pending if a checkpoint happens during the + // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean. + cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true); + cachetable_miss++; + cachetable_misstime += get_tnow() - t0; + + // If the lock_type requested was a PL_READ, we downgrade to PL_READ, + // but if the request was for a PL_WRITE_CHEAP, we don't bother + // downgrading, because we would have to possibly resolve the + // checkpointing again, and that would just make this function even + // messier. + // + // TODO(yoni): in case of PL_WRITE_CHEAP, write and use + // p->value_rwlock.write_change_status_to_not_expensive(); (Also name it better) + // to downgrade from an expensive write lock to a cheap one + if (lock_type == PL_READ) { + pair_lock(p); + p->value_rwlock.write_unlock(); + p->value_rwlock.read_lock(); + pair_unlock(p); + // small hack here for #5439, + // for queries, pf_req_callback does some work for the caller, + // that information may be out of date after a write_unlock + // followed by a read_lock, so we do it again. + bool pf_required = pf_req_callback(p->value_data,read_extraargs); + assert(!pf_required); + } + goto got_value; + } +got_value: + *value = p->value_data; + return 0; +} + +// Lookup a key in the cachetable. If it is found and it is not being written, then +// acquire a read lock on the pair, update the LRU list, and return sucess. +// +// However, if the page is clean or has checkpoint pending, don't return success. +// This will minimize the number of dirty nodes. +// Rationale: maybe_get_and_pin is used when the system has an alternative to modifying a node. +// In the context of checkpointing, we don't want to gratuituously dirty a page, because it causes an I/O. +// For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify +// the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit. +// Similarly, if the checkpoint is actually pending, we don't want to block on it. +int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { + CACHETABLE ct = cachefile->cachetable; + int r = -1; + ct->list.pair_lock_by_fullhash(fullhash); + PAIR p = ct->list.find_pair(cachefile, key, fullhash); + if (p) { + const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); + bool got_lock = false; + switch (lock_type) { + case PL_READ: + if (p->value_rwlock.try_read_lock()) { + got_lock = p->dirty; + + if (!got_lock) { + p->value_rwlock.read_unlock(); + } + } + break; + case PL_WRITE_CHEAP: + case PL_WRITE_EXPENSIVE: + if (p->value_rwlock.try_write_lock(lock_is_expensive)) { + // we got the lock fast, so continue + ct->list.read_pending_cheap_lock(); + + // if pending a checkpoint, then we don't want to return + // the value to the user, because we are responsible for + // handling the checkpointing, which we do not want to do, + // because it is expensive + got_lock = p->dirty && !p->checkpoint_pending; + + ct->list.read_pending_cheap_unlock(); + if (!got_lock) { + p->value_rwlock.write_unlock(); + } + } + break; + } + if (got_lock) { + pair_touch(p); + *value = p->value_data; + r = 0; + } + } + ct->list.pair_unlock_by_fullhash(fullhash); + return r; +} + +//Used by flusher threads to possibly pin child on client thread if pinning is cheap +//Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless). +//All other conditions remain the same. +int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { + CACHETABLE ct = cachefile->cachetable; + int r = -1; + ct->list.pair_lock_by_fullhash(fullhash); + PAIR p = ct->list.find_pair(cachefile, key, fullhash); + if (p) { + const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); + bool got_lock = false; + switch (lock_type) { + case PL_READ: + if (p->value_rwlock.try_read_lock()) { + got_lock = true; + } else if (!p->value_rwlock.read_lock_is_expensive()) { + p->value_rwlock.write_lock(lock_is_expensive); + got_lock = true; + } + if (got_lock) { + pair_touch(p); + } + pair_unlock(p); + break; + case PL_WRITE_CHEAP: + case PL_WRITE_EXPENSIVE: + if (p->value_rwlock.try_write_lock(lock_is_expensive)) { + got_lock = true; + } else if (!p->value_rwlock.write_lock_is_expensive()) { + p->value_rwlock.write_lock(lock_is_expensive); + got_lock = true; + } + if (got_lock) { + pair_touch(p); + } + pair_unlock(p); + if (got_lock) { + bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); + write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); + } + break; + } + if (got_lock) { + *value = p->value_data; + r = 0; + } + } else { + ct->list.pair_unlock_by_fullhash(fullhash); + } + return r; +} + +int toku_cachetable_get_attr (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, PAIR_ATTR *attr) { + CACHETABLE ct = cachefile->cachetable; + int r; + ct->list.pair_lock_by_fullhash(fullhash); + PAIR p = ct->list.find_pair(cachefile, key, fullhash); + if (p) { + // Assumes pair lock and full hash lock are the same mutex + *attr = p->attr; + r = 0; + } else { + r = -1; + } + ct->list.pair_unlock_by_fullhash(fullhash); + return r; +} + +// +// internal function to unpin a PAIR. +// As of Clayface, this is may be called in two ways: +// - with flush false +// - with flush true +// The first is for when this is run during run_unlockers in +// toku_cachetable_get_and_pin_nonblocking, the second is during +// normal operations. Only during normal operations do we want to possibly +// induce evictions or sleep. +// +static int +cachetable_unpin_internal( + CACHEFILE cachefile, + PAIR p, + enum cachetable_dirty dirty, + PAIR_ATTR attr, + bool flush + ) +{ + invariant_notnull(p); + + CACHETABLE ct = cachefile->cachetable; + bool added_data_to_cachetable = false; + + // hack for #3969, only exists in case where we run unlockers + pair_lock(p); + PAIR_ATTR old_attr = p->attr; + PAIR_ATTR new_attr = attr; + if (dirty) { + p->dirty = CACHETABLE_DIRTY; + } + if (attr.is_valid) { + p->attr = attr; + } + bool read_lock_grabbed = p->value_rwlock.readers() != 0; + unpin_pair(p, read_lock_grabbed); + pair_unlock(p); + + if (attr.is_valid) { + if (new_attr.size > old_attr.size) { + added_data_to_cachetable = true; + } + ct->ev.change_pair_attr(old_attr, new_attr); + } + + // see comments above this function to understand this code + if (flush && added_data_to_cachetable) { + if (ct->ev.should_client_thread_sleep()) { + ct->ev.wait_for_cache_pressure_to_subside(); + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + } + return 0; +} + +int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { + return cachetable_unpin_internal(cachefile, p, dirty, attr, true); +} +int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) { + return cachetable_unpin_internal(cachefile, p, dirty, attr, false); +} + +static void +run_unlockers (UNLOCKERS unlockers) { + while (unlockers) { + assert(unlockers->locked); + unlockers->locked = false; + unlockers->f(unlockers->extra); + unlockers=unlockers->next; + } +} + +// +// This function tries to pin the pair without running the unlockers. +// If it can pin the pair cheaply, it does so, and returns 0. +// If the pin will be expensive, it runs unlockers, +// pins the pair, then releases the pin, +// and then returns TOKUDB_TRY_AGAIN +// +// on entry, pair mutex is held, +// on exit, pair mutex is NOT held +static int +maybe_pin_pair( + PAIR p, + pair_lock_type lock_type, + UNLOCKERS unlockers + ) +{ + int retval = 0; + bool expensive = (lock_type == PL_WRITE_EXPENSIVE); + + // we can pin the PAIR. In each case, we check to see + // if acquiring the pin is expensive. If so, we run the unlockers, set the + // retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR. + // If not, then we pin the PAIR, keep retval at 0, and do not + // run the unlockers, as we intend to return the value to the user + if (lock_type == PL_READ) { + if (p->value_rwlock.read_lock_is_expensive()) { + pair_add_ref_unlocked(p); + pair_unlock(p); + run_unlockers(unlockers); + retval = TOKUDB_TRY_AGAIN; + pair_lock(p); + pair_release_ref_unlocked(p); + } + p->value_rwlock.read_lock(); + } + else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){ + if (p->value_rwlock.write_lock_is_expensive()) { + pair_add_ref_unlocked(p); + pair_unlock(p); + run_unlockers(unlockers); + // change expensive to false because + // we will unpin the pair immedietely + // after pinning it + expensive = false; + retval = TOKUDB_TRY_AGAIN; + pair_lock(p); + pair_release_ref_unlocked(p); + } + p->value_rwlock.write_lock(expensive); + } + else { + abort(); + } + + if (retval == TOKUDB_TRY_AGAIN) { + unpin_pair(p, (lock_type == PL_READ)); + } + pair_touch(p); + pair_unlock(p); + return retval; +} + +int toku_cachetable_get_and_pin_nonblocking( + CACHEFILE cf, + CACHEKEY key, + uint32_t fullhash, + void**value, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + pair_lock_type lock_type, + void *read_extraargs, + UNLOCKERS unlockers + ) +// See cachetable/cachetable.h. +{ + CACHETABLE ct = cf->cachetable; + assert(lock_type == PL_READ || + lock_type == PL_WRITE_CHEAP || + lock_type == PL_WRITE_EXPENSIVE + ); +try_again: + ct->list.pair_lock_by_fullhash(fullhash); + PAIR p = ct->list.find_pair(cf, key, fullhash); + if (p == NULL) { + toku::context fetch_ctx(CTX_FULL_FETCH); + + // Not found + ct->list.pair_unlock_by_fullhash(fullhash); + ct->list.write_list_lock(); + ct->list.pair_lock_by_fullhash(fullhash); + p = ct->list.find_pair(cf, key, fullhash); + if (p != NULL) { + // we just did another search with the write list lock and + // found the pair this means that in between our + // releasing the read list lock and grabbing the write list lock, + // another thread snuck in and inserted the PAIR into + // the cachetable. For simplicity, we just return + // to the top and restart the function + ct->list.write_list_unlock(); + ct->list.pair_unlock_by_fullhash(fullhash); + goto try_again; + } + + p = cachetable_insert_at( + ct, + cf, + key, + zero_value, + fullhash, + zero_attr, + write_callback, + CACHETABLE_CLEAN + ); + assert(p); + // grab expensive write lock, because we are about to do a fetch + // off disk + // No one can access this pair because + // we hold the write list lock and we just injected + // the pair into the cachetable. Therefore, this lock acquisition + // will not block. + p->value_rwlock.write_lock(true); + pair_unlock(p); + run_unlockers(unlockers); // we hold the write list_lock. + ct->list.write_list_unlock(); + + // at this point, only the pair is pinned, + // and no pair mutex held, and + // no list lock is held + uint64_t t0 = get_tnow(); + cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false); + cachetable_miss++; + cachetable_misstime += get_tnow() - t0; + + if (ct->ev.should_client_thread_sleep()) { + ct->ev.wait_for_cache_pressure_to_subside(); + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + + return TOKUDB_TRY_AGAIN; + } + else { + int r = maybe_pin_pair(p, lock_type, unlockers); + if (r == TOKUDB_TRY_AGAIN) { + return TOKUDB_TRY_AGAIN; + } + assert_zero(r); + + if (lock_type != PL_READ) { + bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); + write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); + } + + // At this point, we have pinned the PAIR + // and resolved its checkpointing. The pair's + // mutex is not held. The read list lock IS held. Before + // returning the PAIR to the user, we must + // still check for partial fetch + bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); + if (partial_fetch_required) { + toku::context fetch_ctx(CTX_PARTIAL_FETCH); + + run_unlockers(unlockers); + + // we are now getting an expensive write lock, because we + // are doing a partial fetch. So, if we previously have + // either a read lock or a cheap write lock, we need to + // release and reacquire the correct lock type + if (lock_type == PL_READ) { + pair_lock(p); + p->value_rwlock.read_unlock(); + p->value_rwlock.write_lock(true); + pair_unlock(p); + } + else if (lock_type == PL_WRITE_CHEAP) { + pair_lock(p); + p->value_rwlock.write_unlock(); + p->value_rwlock.write_lock(true); + pair_unlock(p); + } + + // Now wait for the I/O to occur. + partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); + if (partial_fetch_required) { + do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, false); + } + else { + pair_lock(p); + p->value_rwlock.write_unlock(); + pair_unlock(p); + } + + if (ct->ev.should_client_thread_sleep()) { + ct->ev.wait_for_cache_pressure_to_subside(); + } + if (ct->ev.should_client_wake_eviction_thread()) { + ct->ev.signal_eviction_thread(); + } + + return TOKUDB_TRY_AGAIN; + } + else { + *value = p->value_data; + return 0; + } + } + // We should not get here. Above code should hit a return in all cases. + abort(); +} + +struct cachefile_prefetch_args { + PAIR p; + CACHETABLE_FETCH_CALLBACK fetch_callback; + void* read_extraargs; +}; + +struct cachefile_partial_prefetch_args { + PAIR p; + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback; + void *read_extraargs; +}; + +// Worker thread function to read a pair from a cachefile to memory +static void cachetable_reader(void* extra) { + struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra; + CACHEFILE cf = cpargs->p->cachefile; + CACHETABLE ct = cf->cachetable; + cachetable_fetch_pair( + ct, + cpargs->p->cachefile, + cpargs->p, + cpargs->fetch_callback, + cpargs->read_extraargs, + false + ); + bjm_remove_background_job(cf->bjm); + toku_free(cpargs); +} + +static void cachetable_partial_reader(void* extra) { + struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra; + CACHEFILE cf = cpargs->p->cachefile; + CACHETABLE ct = cf->cachetable; + do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, false); + bjm_remove_background_job(cf->bjm); + toku_free(cpargs); +} + +int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + void *read_extraargs, + bool *doing_prefetch) +// Effect: See the documentation for this function in cachetable/cachetable.h +{ + int r = 0; + PAIR p = NULL; + if (doing_prefetch) { + *doing_prefetch = false; + } + CACHETABLE ct = cf->cachetable; + // if cachetable has too much data, don't bother prefetching + if (ct->ev.should_client_thread_sleep()) { + goto exit; + } + ct->list.pair_lock_by_fullhash(fullhash); + // lookup + p = ct->list.find_pair(cf, key, fullhash); + // if not found then create a pair and fetch it + if (p == NULL) { + cachetable_prefetches++; + ct->list.pair_unlock_by_fullhash(fullhash); + ct->list.write_list_lock(); + ct->list.pair_lock_by_fullhash(fullhash); + p = ct->list.find_pair(cf, key, fullhash); + if (p != NULL) { + ct->list.write_list_unlock(); + goto found_pair; + } + + r = bjm_add_background_job(cf->bjm); + assert_zero(r); + p = cachetable_insert_at( + ct, + cf, + key, + zero_value, + fullhash, + zero_attr, + write_callback, + CACHETABLE_CLEAN + ); + assert(p); + p->value_rwlock.write_lock(true); + pair_unlock(p); + ct->list.write_list_unlock(); + + struct cachefile_prefetch_args *MALLOC(cpargs); + cpargs->p = p; + cpargs->fetch_callback = fetch_callback; + cpargs->read_extraargs = read_extraargs; + toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs); + if (doing_prefetch) { + *doing_prefetch = true; + } + goto exit; + } + +found_pair: + // at this point, p is found, pair's mutex is grabbed, and + // no list lock is held + // TODO(leif): should this also just go ahead and wait if all there + // are to wait for are readers? + if (p->value_rwlock.try_write_lock(true)) { + // nobody else is using the node, so we should go ahead and prefetch + pair_touch(p); + pair_unlock(p); + bool partial_fetch_required = pf_req_callback(p->value_data, read_extraargs); + + if (partial_fetch_required) { + r = bjm_add_background_job(cf->bjm); + assert_zero(r); + struct cachefile_partial_prefetch_args *MALLOC(cpargs); + cpargs->p = p; + cpargs->pf_callback = pf_callback; + cpargs->read_extraargs = read_extraargs; + toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs); + if (doing_prefetch) { + *doing_prefetch = true; + } + } + else { + pair_lock(p); + p->value_rwlock.write_unlock(); + pair_unlock(p); + } + } + else { + // Couldn't get the write lock cheaply + pair_unlock(p); + } +exit: + return 0; +} + +void toku_cachefile_verify (CACHEFILE cf) { + toku_cachetable_verify(cf->cachetable); +} + +void toku_cachetable_verify (CACHETABLE ct) { + ct->list.verify(); +} + + + +struct pair_flush_for_close{ + PAIR p; + BACKGROUND_JOB_MANAGER bjm; +}; + +static void cachetable_flush_pair_for_close(void* extra) { + struct pair_flush_for_close *CAST_FROM_VOIDP(args, extra); + PAIR p = args->p; + CACHEFILE cf = p->cachefile; + CACHETABLE ct = cf->cachetable; + PAIR_ATTR attr; + cachetable_only_write_locked_data( + &ct->ev, + p, + false, // not for a checkpoint, as we assert above + &attr, + false // not a clone + ); + p->dirty = CACHETABLE_CLEAN; + bjm_remove_background_job(args->bjm); + toku_free(args); +} + + +static void flush_pair_for_close_on_background_thread( + PAIR p, + BACKGROUND_JOB_MANAGER bjm, + CACHETABLE ct + ) +{ + pair_lock(p); + assert(p->value_rwlock.users() == 0); + assert(nb_mutex_users(&p->disk_nb_mutex) == 0); + assert(!p->cloned_value_data); + if (p->dirty == CACHETABLE_DIRTY) { + int r = bjm_add_background_job(bjm); + assert_zero(r); + struct pair_flush_for_close *XMALLOC(args); + args->p = p; + args->bjm = bjm; + toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args); + } + pair_unlock(p); +} + +static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) { + pair_lock(p); + assert(p->value_rwlock.users() == 0); + assert(nb_mutex_users(&p->disk_nb_mutex) == 0); + assert(!p->cloned_value_data); + assert(p->dirty == CACHETABLE_CLEAN); + assert(p->refcount == 0); + if (completely) { + cachetable_remove_pair(&ct->list, &ct->ev, p); + pair_unlock(p); + // TODO: Eventually, we should not hold the write list lock during free + cachetable_free_pair(p); + } + else { + // if we are not evicting completely, + // we only want to remove the PAIR from the cachetable, + // that is, remove from the hashtable and various linked + // list, but we will keep the PAIRS and the linked list + // in the cachefile intact, as they will be cached away + // in case an open comes soon. + ct->list.evict_from_cachetable(p); + pair_unlock(p); + } +} + +// helper function for cachetable_flush_cachefile, which happens on a close +// writes out the dirty pairs on background threads and returns when +// the writing is done +static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) { + BACKGROUND_JOB_MANAGER bjm = NULL; + bjm_init(&bjm); + ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here + PAIR p = NULL; + // write out dirty PAIRs + uint32_t i; + if (cf) { + for (i = 0, p = cf->cf_head; + i < cf->num_pairs; + i++, p = p->cf_next) + { + flush_pair_for_close_on_background_thread(p, bjm, ct); + } + } + else { + for (i = 0, p = ct->list.m_checkpoint_head; + i < ct->list.m_n_in_table; + i++, p = p->clock_next) + { + flush_pair_for_close_on_background_thread(p, bjm, ct); + } + } + ct->list.write_list_unlock(); + bjm_wait_for_jobs_to_finish(bjm); + bjm_destroy(bjm); +} + +static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf, bool evict_completely) { + ct->list.write_list_lock(); + if (cf) { + if (evict_completely) { + // if we are evicting completely, then the PAIRs will + // be removed from the linked list managed by the + // cachefile, so this while loop works + while (cf->num_pairs > 0) { + PAIR p = cf->cf_head; + remove_pair_for_close(p, ct, evict_completely); + } + } + else { + // on the other hand, if we are not evicting completely, + // then the cachefile's linked list stays intact, and we must + // iterate like this. + for (PAIR p = cf->cf_head; p; p = p->cf_next) { + remove_pair_for_close(p, ct, evict_completely); + } + } + } + else { + while (ct->list.m_n_in_table > 0) { + PAIR p = ct->list.m_checkpoint_head; + // if there is no cachefile, then we better + // be evicting completely because we have no + // cachefile to save the PAIRs to. At least, + // we have no guarantees that the cachefile + // will remain good + invariant(evict_completely); + remove_pair_for_close(p, ct, true); + } + } + ct->list.write_list_unlock(); +} + +static void verify_cachefile_flushed(CACHETABLE ct UU(), CACHEFILE cf UU()) { +#ifdef TOKU_DEBUG_PARANOID + // assert here that cachefile is flushed by checking + // pair_list and finding no pairs belonging to this cachefile + // Make a list of pairs that belong to this cachefile. + if (cf) { + ct->list.write_list_lock(); + // assert here that cachefile is flushed by checking + // pair_list and finding no pairs belonging to this cachefile + // Make a list of pairs that belong to this cachefile. + uint32_t i; + PAIR p = NULL; + for (i = 0, p = ct->list.m_checkpoint_head; + i < ct->list.m_n_in_table; + i++, p = p->clock_next) + { + assert(p->cachefile != cf); + } + ct->list.write_list_unlock(); + } +#endif +} + +// Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if +// the cachefile is NULL. +// Must be holding cachetable lock on entry. +// +// This function assumes that no client thread is accessing or +// trying to access the cachefile while this function is executing. +// This implies no client thread will be trying to lock any nodes +// belonging to the cachefile. +// +// This function also assumes that the cachefile is not in the process +// of being used by a checkpoint. If a checkpoint is currently happening, +// it does NOT include this cachefile. +// +static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf, bool evict_completely) { + // + // Because work on a kibbutz is always done by the client thread, + // and this function assumes that no client thread is doing any work + // on the cachefile, we assume that no client thread will be adding jobs + // to this cachefile's kibbutz. + // + // The caller of this function must ensure that there are + // no jobs added to the kibbutz. This implies that the only work other + // threads may be doing is work by the writer threads. + // + // first write out dirty PAIRs + write_dirty_pairs_for_close(ct, cf); + + // now that everything is clean, get rid of everything + remove_all_pairs_for_close(ct, cf, evict_completely); + + verify_cachefile_flushed(ct, cf); +} + +/* Requires that no locks be held that are used by the checkpoint logic */ +void +toku_cachetable_minicron_shutdown(CACHETABLE ct) { + int r = ct->cp.shutdown(); + assert(r==0); + ct->cl.destroy(); +} + +void toku_cachetable_prepare_close(CACHETABLE ct UU()) { + extern bool toku_serialize_in_parallel; + toku_unsafe_set(&toku_serialize_in_parallel, true); +} + +/* Requires that it all be flushed. */ +void toku_cachetable_close (CACHETABLE *ctp) { + CACHETABLE ct = *ctp; + ct->cp.destroy(); + ct->cl.destroy(); + ct->cf_list.free_stale_data(&ct->ev); + cachetable_flush_cachefile(ct, NULL, true); + ct->ev.destroy(); + ct->list.destroy(); + ct->cf_list.destroy(); + + if (ct->client_kibbutz) + toku_kibbutz_destroy(ct->client_kibbutz); + if (ct->ct_kibbutz) + toku_kibbutz_destroy(ct->ct_kibbutz); + if (ct->checkpointing_kibbutz) + toku_kibbutz_destroy(ct->checkpointing_kibbutz); + toku_free(ct->env_dir); + toku_free(ct); + *ctp = 0; +} + +static PAIR test_get_pair(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, bool have_ct_lock) { + CACHETABLE ct = cachefile->cachetable; + + if (!have_ct_lock) { + ct->list.read_list_lock(); + } + + PAIR p = ct->list.find_pair(cachefile, key, fullhash); + assert(p != NULL); + if (!have_ct_lock) { + ct->list.read_list_unlock(); + } + return p; +} + +//test-only wrapper +int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { + // By default we don't have the lock + PAIR p = test_get_pair(cachefile, key, fullhash, false); + return toku_cachetable_unpin(cachefile, p, dirty, attr); // assume read lock is not grabbed, and that it is a write lock +} + +//test-only wrapper +int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) { + // We hold the cachetable mutex. + PAIR p = test_get_pair(cachefile, key, fullhash, true); + return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr); +} + +//test-only wrapper +int toku_test_cachetable_unpin_and_remove ( + CACHEFILE cachefile, + CACHEKEY key, + CACHETABLE_REMOVE_KEY remove_key, + void* remove_key_extra) +{ + uint32_t fullhash = toku_cachetable_hash(cachefile, key); + PAIR p = test_get_pair(cachefile, key, fullhash, false); + return toku_cachetable_unpin_and_remove(cachefile, p, remove_key, remove_key_extra); +} + +int toku_cachetable_unpin_and_remove ( + CACHEFILE cachefile, + PAIR p, + CACHETABLE_REMOVE_KEY remove_key, + void* remove_key_extra + ) +{ + invariant_notnull(p); + int r = ENOENT; + CACHETABLE ct = cachefile->cachetable; + + p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it. + // grab disk_nb_mutex to ensure any background thread writing + // out a cloned value completes + pair_lock(p); + assert(p->value_rwlock.writers()); + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + assert(p->cloned_value_data == NULL); + + // + // take care of key removal + // + ct->list.write_list_lock(); + ct->list.read_pending_cheap_lock(); + bool for_checkpoint = p->checkpoint_pending; + // now let's wipe out the pending bit, because we are + // removing the PAIR + p->checkpoint_pending = false; + + // For the PAIR to not be picked by the + // cleaner thread, we mark the cachepressure_size to be 0 + // (This is redundant since we have the write_list_lock) + // This should not be an issue because we call + // cachetable_remove_pair before + // releasing the cachetable lock. + // + CACHEKEY key_to_remove = p->key; + p->attr.cache_pressure_size = 0; + // + // callback for removing the key + // for FTNODEs, this leads to calling + // toku_free_blocknum + // + if (remove_key) { + remove_key( + &key_to_remove, + for_checkpoint, + remove_key_extra + ); + } + ct->list.read_pending_cheap_unlock(); + + pair_lock(p); + p->value_rwlock.write_unlock(); + nb_mutex_unlock(&p->disk_nb_mutex); + // + // As of Clayface (6.5), only these threads may be + // blocked waiting to lock this PAIR: + // - the checkpoint thread (because a checkpoint is in progress + // and the PAIR was in the list of pending pairs) + // - a client thread running get_and_pin_nonblocking, who + // ran unlockers, then waited on the PAIR lock. + // While waiting on a PAIR lock, another thread comes in, + // locks the PAIR, and ends up calling unpin_and_remove, + // all while get_and_pin_nonblocking is waiting on the PAIR lock. + // We did not realize this at first, which caused bug #4357 + // The following threads CANNOT be blocked waiting on + // the PAIR lock: + // - a thread trying to run eviction via run_eviction. + // That cannot happen because run_eviction only + // attempts to lock PAIRS that are not locked, and this PAIR + // is locked. + // - cleaner thread, for the same reason as a thread running + // eviction + // - client thread doing a normal get_and_pin. The client is smart + // enough to not try to lock a PAIR that another client thread + // is trying to unpin and remove. Note that this includes work + // done on kibbutzes. + // - writer thread. Writer threads do not grab PAIR locks. They + // get PAIR locks transferred to them by client threads. + // + + // first thing we do is remove the PAIR from the various + // cachetable data structures, so no other thread can possibly + // access it. We do not want to risk some other thread + // trying to lock this PAIR if we release the write list lock + // below. If some thread is already waiting on the lock, + // then we let that thread grab the lock and finish, but + // we don't want any NEW threads to try to grab the PAIR + // lock. + // + // Because we call cachetable_remove_pair and wait, + // the threads that may be waiting + // on this PAIR lock must be careful to do NOTHING with the PAIR + // As per our analysis above, we only need + // to make sure the checkpoint thread and get_and_pin_nonblocking do + // nothing, and looking at those functions, it is clear they do nothing. + // + cachetable_remove_pair(&ct->list, &ct->ev, p); + ct->list.write_list_unlock(); + if (p->refcount > 0) { + pair_wait_for_ref_release_unlocked(p); + } + if (p->value_rwlock.users() > 0) { + // Need to wait for everyone else to leave + // This write lock will be granted only after all waiting + // threads are done. + p->value_rwlock.write_lock(true); + assert(p->refcount == 0); + assert(p->value_rwlock.users() == 1); // us + assert(!p->checkpoint_pending); + assert(p->attr.cache_pressure_size == 0); + p->value_rwlock.write_unlock(); + } + // just a sanity check + assert(nb_mutex_users(&p->disk_nb_mutex) == 0); + assert(p->cloned_value_data == NULL); + //Remove pair. + pair_unlock(p); + cachetable_free_pair(p); + r = 0; + return r; +} + +int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array); +int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) { + array[index] = toku_cachefile_filenum(ft->cf); + return 0; +} + +static int log_open_txn (TOKUTXN txn, void* extra) { + int r; + checkpointer* cp = (checkpointer *)extra; + TOKULOGGER logger = txn->logger; + FILENUMS open_filenums; + uint32_t num_filenums = txn->open_fts.size(); + FILENUM array[num_filenums]; + if (toku_txn_is_read_only(txn)) { + goto cleanup; + } + else { + cp->increment_num_txns(); + } + + open_filenums.num = num_filenums; + open_filenums.filenums = array; + //Fill in open_filenums + r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array); + invariant(r==0); + switch (toku_txn_get_state(txn)) { + case TOKUTXN_LIVE:{ + toku_log_xstillopen(logger, NULL, 0, txn, + toku_txn_get_txnid(txn), + toku_txn_get_txnid(toku_logger_txn_parent(txn)), + txn->roll_info.rollentry_raw_count, + open_filenums, + txn->force_fsync_on_commit, + txn->roll_info.num_rollback_nodes, + txn->roll_info.num_rollentries, + txn->roll_info.spilled_rollback_head, + txn->roll_info.spilled_rollback_tail, + txn->roll_info.current_rollback); + goto cleanup; + } + case TOKUTXN_PREPARING: { + TOKU_XA_XID xa_xid; + toku_txn_get_prepared_xa_xid(txn, &xa_xid); + toku_log_xstillopenprepared(logger, NULL, 0, txn, + toku_txn_get_txnid(txn), + &xa_xid, + txn->roll_info.rollentry_raw_count, + open_filenums, + txn->force_fsync_on_commit, + txn->roll_info.num_rollback_nodes, + txn->roll_info.num_rollentries, + txn->roll_info.spilled_rollback_head, + txn->roll_info.spilled_rollback_tail, + txn->roll_info.current_rollback); + goto cleanup; + } + case TOKUTXN_RETIRED: + case TOKUTXN_COMMITTING: + case TOKUTXN_ABORTING: { + assert(0); + } + } + // default is an error + assert(0); +cleanup: + return 0; +} + +// Requires: All three checkpoint-relevant locks must be held (see checkpoint.c). +// Algorithm: Write a checkpoint record to the log, noting the LSN of that record. +// Use the begin_checkpoint callback to take necessary snapshots (header, btt) +// Mark every dirty node as "pending." ("Pending" means that the node must be +// written to disk before it can be modified.) +void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, TOKULOGGER UU(logger)) { + cp->begin_checkpoint(); +} + + +// This is used by the cachetable_race test. +static volatile int toku_checkpointing_user_data_status = 0; +static void toku_cachetable_set_checkpointing_user_data_status (int v) { + toku_checkpointing_user_data_status = v; +} +int toku_cachetable_get_checkpointing_user_data_status (void) { + return toku_checkpointing_user_data_status; +} + +// Requires: The big checkpoint lock must be held (see checkpoint.c). +// Algorithm: Write all pending nodes to disk +// Use checkpoint callback to write snapshot information to disk (header, btt) +// Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks +// Note: If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log +void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger), + void (*testcallback_f)(void*), void* testextra) { + cp->end_checkpoint(testcallback_f, testextra); +} + +TOKULOGGER toku_cachefile_logger (CACHEFILE cf) { + return cf->cachetable->cp.get_logger(); +} + +FILENUM toku_cachefile_filenum (CACHEFILE cf) { + return cf->filenum; +} + +// debug functions + +int toku_cachetable_assert_all_unpinned (CACHETABLE ct) { + uint32_t i; + int some_pinned=0; + ct->list.read_list_lock(); + for (i=0; i<ct->list.m_table_size; i++) { + PAIR p; + for (p=ct->list.m_table[i]; p; p=p->hash_chain) { + pair_lock(p); + if (p->value_rwlock.users()) { + //printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data); + some_pinned=1; + } + pair_unlock(p); + } + } + ct->list.read_list_unlock(); + return some_pinned; +} + +int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) { + assert(cf != NULL); + int n_pinned=0; + CACHETABLE ct = cf->cachetable; + ct->list.read_list_lock(); + + // Iterate over all the pairs to find pairs specific to the + // given cachefile. + for (uint32_t i = 0; i < ct->list.m_table_size; i++) { + for (PAIR p = ct->list.m_table[i]; p; p = p->hash_chain) { + if (p->cachefile == cf) { + pair_lock(p); + if (p->value_rwlock.users()) { + if (print_them) { + printf("%s:%d pinned: %" PRId64 " (%p)\n", + __FILE__, + __LINE__, + p->key.b, + p->value_data); + } + n_pinned++; + } + pair_unlock(p); + } + } + } + + ct->list.read_list_unlock(); + return n_pinned; +} + +void toku_cachetable_print_state (CACHETABLE ct) { + uint32_t i; + ct->list.read_list_lock(); + for (i=0; i<ct->list.m_table_size; i++) { + PAIR p = ct->list.m_table[i]; + if (p != 0) { + pair_lock(p); + printf("t[%u]=", i); + for (p=ct->list.m_table[i]; p; p=p->hash_chain) { + printf(" {%" PRId64 ", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, (int) p->dirty, p->value_rwlock.users(), p->attr.size); + } + printf("\n"); + pair_unlock(p); + } + } + ct->list.read_list_unlock(); +} + +void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) { + ct->list.get_state(num_entries_ptr, hash_size_ptr); + ct->ev.get_state(size_current_ptr, size_limit_ptr); +} + +int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr, + int *dirty_ptr, long long *pin_ptr, long *size_ptr) { + int r = -1; + uint32_t fullhash = toku_cachetable_hash(cf, key); + ct->list.read_list_lock(); + PAIR p = ct->list.find_pair(cf, key, fullhash); + if (p) { + pair_lock(p); + if (value_ptr) + *value_ptr = p->value_data; + if (dirty_ptr) + *dirty_ptr = p->dirty; + if (pin_ptr) + *pin_ptr = p->value_rwlock.users(); + if (size_ptr) + *size_ptr = p->attr.size; + r = 0; + pair_unlock(p); + } + ct->list.read_list_unlock(); + return r; +} + +void +toku_cachefile_set_userdata (CACHEFILE cf, + void *userdata, + void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), + void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), + void (*free_userdata)(CACHEFILE, void*), + void (*checkpoint_userdata)(CACHEFILE, int, void*), + void (*begin_checkpoint_userdata)(LSN, void*), + void (*end_checkpoint_userdata)(CACHEFILE, int, void*), + void (*note_pin_by_checkpoint)(CACHEFILE, void*), + void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { + cf->userdata = userdata; + cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; + cf->close_userdata = close_userdata; + cf->free_userdata = free_userdata; + cf->checkpoint_userdata = checkpoint_userdata; + cf->begin_checkpoint_userdata = begin_checkpoint_userdata; + cf->end_checkpoint_userdata = end_checkpoint_userdata; + cf->note_pin_by_checkpoint = note_pin_by_checkpoint; + cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint; +} + +void *toku_cachefile_get_userdata(CACHEFILE cf) { + return cf->userdata; +} + +CACHETABLE +toku_cachefile_get_cachetable(CACHEFILE cf) { + return cf->cachetable; +} + +CACHEFILE toku_pair_get_cachefile(PAIR pair) { + return pair->cachefile; +} + +//Only called by ft_end_checkpoint +//Must have access to cf->fd (must be protected) +void toku_cachefile_fsync(CACHEFILE cf) { + toku_file_fsync(cf->fd); +} + +// Make it so when the cachefile closes, the underlying file is unlinked +void toku_cachefile_unlink_on_close(CACHEFILE cf) { + assert(!cf->unlink_on_close); + cf->unlink_on_close = true; +} + +// is this cachefile marked as unlink on close? +bool toku_cachefile_is_unlink_on_close(CACHEFILE cf) { + return cf->unlink_on_close; +} + +void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf) { + cf->skip_log_recover_on_close = true; +} + +void toku_cachefile_do_log_recover_on_close(CACHEFILE cf) { + cf->skip_log_recover_on_close = false; +} + +bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf) { + return cf->skip_log_recover_on_close; +} + +uint64_t toku_cachefile_size(CACHEFILE cf) { + int64_t file_size; + int fd = toku_cachefile_get_fd(cf); + int r = toku_os_get_file_size(fd, &file_size); + assert_zero(r); + return file_size; +} + +char * +toku_construct_full_name(int count, ...) { + va_list ap; + char *name = NULL; + size_t n = 0; + int i; + va_start(ap, count); + for (i=0; i<count; i++) { + char *arg = va_arg(ap, char *); + if (arg) { + n += 1 + strlen(arg) + 1; + char *XMALLOC_N(n, newname); + if (name && !toku_os_is_absolute_name(arg)) + snprintf(newname, n, "%s/%s", name, arg); + else + snprintf(newname, n, "%s", arg); + toku_free(name); + name = newname; + } + } + va_end(ap); + + return name; +} + +char * +toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) { + return toku_construct_full_name(2, ct->env_dir, fname_in_env); +} + +static long +cleaner_thread_rate_pair(PAIR p) +{ + return p->attr.cache_pressure_size; +} + +static int const CLEANER_N_TO_CHECK = 8; + +int toku_cleaner_thread_for_test (CACHETABLE ct) { + return ct->cl.run_cleaner(); +} + +int toku_cleaner_thread (void *cleaner_v) { + cleaner* cl = (cleaner *) cleaner_v; + assert(cl); + return cl->run_cleaner(); +} + +///////////////////////////////////////////////////////////////////////// +// +// cleaner methods +// +ENSURE_POD(cleaner); + +extern uint force_recovery; + +int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) { + // default is no cleaner, for now + m_cleaner_cron_init = false; + if (force_recovery) return 0; + int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this); + if (r == 0) { + m_cleaner_cron_init = true; + } + TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations); + m_cleaner_iterations = _cleaner_iterations; + m_pl = _pl; + m_ct = _ct; + m_cleaner_init = true; + return r; +} + +// this function is allowed to be called multiple times +void cleaner::destroy(void) { + if (!m_cleaner_init) { + return; + } + if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) { + // for test code only, production code uses toku_cachetable_minicron_shutdown() + int r = toku_minicron_shutdown(&m_cleaner_cron); + assert(r==0); + } +} + +uint32_t cleaner::get_iterations(void) { + return m_cleaner_iterations; +} + +void cleaner::set_iterations(uint32_t new_iterations) { + m_cleaner_iterations = new_iterations; +} + +uint32_t cleaner::get_period_unlocked(void) { + return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron); +} + +// +// Sets how often the cleaner thread will run, in seconds +// +void cleaner::set_period(uint32_t new_period) { + toku_minicron_change_period(&m_cleaner_cron, new_period*1000); +} + +// Effect: runs a cleaner. +// +// We look through some number of nodes, the first N that we see which are +// unlocked and are not involved in a cachefile flush, pick one, and call +// the cleaner callback. While we're picking a node, we have the +// cachetable lock the whole time, so we don't need any extra +// synchronization. Once we have one we want, we lock it and notify the +// cachefile that we're doing some background work (so a flush won't +// start). At this point, we can safely unlock the cachetable, do the +// work (callback), and unlock/release our claim to the cachefile. +int cleaner::run_cleaner(void) { + toku::context cleaner_ctx(CTX_CLEANER); + + int r; + uint32_t num_iterations = this->get_iterations(); + for (uint32_t i = 0; i < num_iterations; ++i) { + cleaner_executions++; + m_pl->read_list_lock(); + PAIR best_pair = NULL; + int n_seen = 0; + long best_score = 0; + const PAIR first_pair = m_pl->m_cleaner_head; + if (first_pair == NULL) { + // nothing in the cachetable, just get out now + m_pl->read_list_unlock(); + break; + } + // here we select a PAIR for cleaning + // look at some number of PAIRS, and + // pick what we think is the best one for cleaning + //***** IMPORTANT ****** + // we MUST not pick a PAIR whose rating is 0. We have + // numerous assumptions in other parts of the code that + // this is the case: + // - this is how rollback nodes and leaf nodes are not selected for cleaning + // - this is how a thread that is calling unpin_and_remove will prevent + // the cleaner thread from picking its PAIR (see comments in that function) + do { + // + // We are already holding onto best_pair, if we run across a pair that + // has the same mutex due to a collision in the hashtable, we need + // to be careful. + // + if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) { + // Advance the cleaner head. + long score = 0; + // only bother with this pair if it has no current users + if (m_pl->m_cleaner_head->value_rwlock.users() == 0) { + score = cleaner_thread_rate_pair(m_pl->m_cleaner_head); + if (score > best_score) { + best_score = score; + best_pair = m_pl->m_cleaner_head; + } + } + m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next; + continue; + } + pair_lock(m_pl->m_cleaner_head); + if (m_pl->m_cleaner_head->value_rwlock.users() > 0) { + pair_unlock(m_pl->m_cleaner_head); + } + else { + n_seen++; + long score = 0; + score = cleaner_thread_rate_pair(m_pl->m_cleaner_head); + if (score > best_score) { + best_score = score; + // Since we found a new best pair, we need to + // free the old best pair. + if (best_pair) { + pair_unlock(best_pair); + } + best_pair = m_pl->m_cleaner_head; + } + else { + pair_unlock(m_pl->m_cleaner_head); + } + } + // Advance the cleaner head. + m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next; + } while (m_pl->m_cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK); + m_pl->read_list_unlock(); + + // + // at this point, if we have found a PAIR for cleaning, + // that is, best_pair != NULL, we do the clean + // + // if best_pair !=NULL, then best_pair->mutex is held + // no list lock is held + // + if (best_pair) { + CACHEFILE cf = best_pair->cachefile; + // try to add a background job to the manager + // if we can't, that means the cachefile is flushing, so + // we simply continue the for loop and this iteration + // becomes a no-op + r = bjm_add_background_job(cf->bjm); + if (r) { + pair_unlock(best_pair); + continue; + } + best_pair->value_rwlock.write_lock(true); + pair_unlock(best_pair); + // verify a key assumption. + assert(cleaner_thread_rate_pair(best_pair) > 0); + // check the checkpoint_pending bit + m_pl->read_pending_cheap_lock(); + bool checkpoint_pending = best_pair->checkpoint_pending; + best_pair->checkpoint_pending = false; + m_pl->read_pending_cheap_unlock(); + if (checkpoint_pending) { + write_locked_pair_for_checkpoint(m_ct, best_pair, true); + } + + bool cleaner_callback_called = false; + + // it's theoretically possible that after writing a PAIR for checkpoint, the + // PAIR's heuristic tells us nothing needs to be done. It is not possible + // in Dr. Noga, but unit tests verify this behavior works properly. + if (cleaner_thread_rate_pair(best_pair) > 0) { + r = best_pair->cleaner_callback(best_pair->value_data, + best_pair->key, + best_pair->fullhash, + best_pair->write_extraargs); + assert_zero(r); + cleaner_callback_called = true; + } + + // The cleaner callback must have unlocked the pair, so we + // don't need to unlock it if the cleaner callback is called. + if (!cleaner_callback_called) { + pair_lock(best_pair); + best_pair->value_rwlock.write_unlock(); + pair_unlock(best_pair); + } + // We need to make sure the cachefile sticks around so a close + // can't come destroy it. That's the purpose of this + // "add/remove_background_job" business, which means the + // cachefile is still valid here, even though the cleaner + // callback unlocks the pair. + bjm_remove_background_job(cf->bjm); + } + else { + // If we didn't find anything this time around the cachetable, + // we probably won't find anything if we run around again, so + // just break out from the for-loop now and + // we'll try again when the cleaner thread runs again. + break; + } + } + return 0; +} + +static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD"); + +const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20; +uint32_t PAIR_LOCK_SIZE = 1<<20; + +void toku_pair_list_set_lock_size(uint32_t num_locks) { + PAIR_LOCK_SIZE = num_locks; +} + +static void evict_pair_from_cachefile(PAIR p) { + CACHEFILE cf = p->cachefile; + if (p->cf_next) { + p->cf_next->cf_prev = p->cf_prev; + } + if (p->cf_prev) { + p->cf_prev->cf_next = p->cf_next; + } + else if (p->cachefile->cf_head == p) { + cf->cf_head = p->cf_next; + } + p->cf_prev = p->cf_next = NULL; + cf->num_pairs--; +} + +// Allocates the hash table of pairs inside this pair list. +// +void pair_list::init() { + m_table_size = INITIAL_PAIR_LIST_SIZE; + m_num_locks = PAIR_LOCK_SIZE; + m_n_in_table = 0; + m_clock_head = NULL; + m_cleaner_head = NULL; + m_checkpoint_head = NULL; + m_pending_head = NULL; + m_table = NULL; + + + pthread_rwlockattr_t attr; + pthread_rwlockattr_init(&attr); +#if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP) + pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); +#else +// TODO: need to figure out how to make writer-preferential rwlocks +// happen on osx +#endif + toku_pthread_rwlock_init(*cachetable_m_list_lock_key, &m_list_lock, &attr); + toku_pthread_rwlock_init(*cachetable_m_pending_lock_expensive_key, + &m_pending_lock_expensive, + &attr); + toku_pthread_rwlock_init( + *cachetable_m_pending_lock_cheap_key, &m_pending_lock_cheap, &attr); + XCALLOC_N(m_table_size, m_table); + XCALLOC_N(m_num_locks, m_mutexes); + for (uint64_t i = 0; i < m_num_locks; i++) { + toku_mutex_init( +#ifdef TOKU_PFS_MUTEX_EXTENDED_CACHETABLEMMUTEX + *cachetable_m_mutex_key, +#else + toku_uninstrumented, +#endif + &m_mutexes[i].aligned_mutex, + nullptr); + } +} + +// Frees the pair_list hash table. It is expected to be empty by +// the time this is called. Returns an error if there are any +// pairs in any of the hash table slots. +void pair_list::destroy() { + // Check if any entries exist in the hash table. + for (uint32_t i = 0; i < m_table_size; ++i) { + invariant_null(m_table[i]); + } + for (uint64_t i = 0; i < m_num_locks; i++) { + toku_mutex_destroy(&m_mutexes[i].aligned_mutex); + } + toku_pthread_rwlock_destroy(&m_list_lock); + toku_pthread_rwlock_destroy(&m_pending_lock_expensive); + toku_pthread_rwlock_destroy(&m_pending_lock_cheap); + toku_free(m_table); + toku_free(m_mutexes); +} + +// adds a PAIR to the cachetable's structures, +// but does NOT add it to the list maintained by +// the cachefile +void pair_list::add_to_cachetable_only(PAIR p) { + // sanity check to make sure that the PAIR does not already exist + PAIR pp = this->find_pair(p->cachefile, p->key, p->fullhash); + assert(pp == NULL); + + this->add_to_clock(p); + this->add_to_hash_chain(p); + m_n_in_table++; +} + +// This places the given pair inside of the pair list. +// +// requires caller to have grabbed write lock on list. +// requires caller to have p->mutex held as well +// +void pair_list::put(PAIR p) { + this->add_to_cachetable_only(p); + this->add_to_cf_list(p); +} + +// This removes the given pair from completely from the pair list. +// +// requires caller to have grabbed write lock on list, and p->mutex held +// +void pair_list::evict_completely(PAIR p) { + this->evict_from_cachetable(p); + this->evict_from_cachefile(p); +} + +// Removes the PAIR from the cachetable's lists, +// but does NOT impact the list maintained by the cachefile +void pair_list::evict_from_cachetable(PAIR p) { + this->pair_remove(p); + this->pending_pairs_remove(p); + this->remove_from_hash_chain(p); + + assert(m_n_in_table > 0); + m_n_in_table--; +} + +// Removes the PAIR from the cachefile's list of PAIRs +void pair_list::evict_from_cachefile(PAIR p) { + evict_pair_from_cachefile(p); +} + +// +// Remove pair from linked list for cleaner/clock +// +// +// requires caller to have grabbed write lock on list. +// +void pair_list::pair_remove (PAIR p) { + if (p->clock_prev == p) { + invariant(m_clock_head == p); + invariant(p->clock_next == p); + invariant(m_cleaner_head == p); + invariant(m_checkpoint_head == p); + m_clock_head = NULL; + m_cleaner_head = NULL; + m_checkpoint_head = NULL; + } + else { + if (p == m_clock_head) { + m_clock_head = m_clock_head->clock_next; + } + if (p == m_cleaner_head) { + m_cleaner_head = m_cleaner_head->clock_next; + } + if (p == m_checkpoint_head) { + m_checkpoint_head = m_checkpoint_head->clock_next; + } + p->clock_prev->clock_next = p->clock_next; + p->clock_next->clock_prev = p->clock_prev; + } + p->clock_prev = p->clock_next = NULL; +} + +//Remove a pair from the list of pairs that were marked with the +//pending bit for the in-progress checkpoint. +// +// requires that if the caller is the checkpoint thread, then a read lock +// is grabbed on the list. Otherwise, must have write lock on list. +// +void pair_list::pending_pairs_remove (PAIR p) { + if (p->pending_next) { + p->pending_next->pending_prev = p->pending_prev; + } + if (p->pending_prev) { + p->pending_prev->pending_next = p->pending_next; + } + else if (m_pending_head==p) { + m_pending_head = p->pending_next; + } + p->pending_prev = p->pending_next = NULL; +} + +void pair_list::remove_from_hash_chain(PAIR p) { + // Remove it from the hash chain. + unsigned int h = p->fullhash&(m_table_size - 1); + paranoid_invariant(m_table[h] != NULL); + if (m_table[h] == p) { + m_table[h] = p->hash_chain; + } + else { + PAIR curr = m_table[h]; + while (curr->hash_chain != p) { + curr = curr->hash_chain; + } + // remove p from the singular linked list + curr->hash_chain = p->hash_chain; + } + p->hash_chain = NULL; +} + +// Returns a pair from the pair list, using the given +// pair. If the pair cannot be found, null is returned. +// +// requires caller to have grabbed either a read lock on the list or +// bucket's mutex. +// +PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) { + PAIR found_pair = nullptr; + for (PAIR p = m_table[fullhash&(m_table_size - 1)]; p; p = p->hash_chain) { + if (p->key.b == key.b && p->cachefile == file) { + found_pair = p; + break; + } + } + return found_pair; +} + +// Add PAIR to linked list shared by cleaner thread and clock +// +// requires caller to have grabbed write lock on list. +// +void pair_list::add_to_clock (PAIR p) { + // requires that p is not currently in the table. + // inserts p into the clock list at the tail. + + p->count = CLOCK_INITIAL_COUNT; + //assert either both head and tail are set or they are both NULL + // tail and head exist + if (m_clock_head) { + assert(m_cleaner_head); + assert(m_checkpoint_head); + // insert right before the head + p->clock_next = m_clock_head; + p->clock_prev = m_clock_head->clock_prev; + + p->clock_prev->clock_next = p; + p->clock_next->clock_prev = p; + + } + // this is the first element in the list + else { + m_clock_head = p; + p->clock_next = p->clock_prev = m_clock_head; + m_cleaner_head = p; + m_checkpoint_head = p; + } +} + +// add the pair to the linked list that of PAIRs belonging +// to the same cachefile. This linked list is used +// in cachetable_flush_cachefile. +void pair_list::add_to_cf_list(PAIR p) { + CACHEFILE cf = p->cachefile; + if (cf->cf_head) { + cf->cf_head->cf_prev = p; + } + p->cf_next = cf->cf_head; + p->cf_prev = NULL; + cf->cf_head = p; + cf->num_pairs++; +} + +// Add PAIR to the hashtable +// +// requires caller to have grabbed write lock on list +// and to have grabbed the p->mutex. +void pair_list::add_to_hash_chain(PAIR p) { + uint32_t h = p->fullhash & (m_table_size - 1); + p->hash_chain = m_table[h]; + m_table[h] = p; +} + +// test function +// +// grabs and releases write list lock +// +void pair_list::verify() { + this->write_list_lock(); + uint32_t num_found = 0; + + // First clear all the verify flags by going through the hash chains + { + uint32_t i; + for (i = 0; i < m_table_size; i++) { + PAIR p; + for (p = m_table[i]; p; p = p->hash_chain) { + num_found++; + } + } + } + assert(num_found == m_n_in_table); + num_found = 0; + // Now go through the clock chain, make sure everything in the LRU chain is hashed. + { + PAIR p; + bool is_first = true; + for (p = m_clock_head; m_clock_head != NULL && (p != m_clock_head || is_first); p=p->clock_next) { + is_first=false; + PAIR p2; + uint32_t fullhash = p->fullhash; + //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key)); + for (p2 = m_table[fullhash&(m_table_size-1)]; p2; p2=p2->hash_chain) { + if (p2==p) { + /* found it */ + num_found++; + goto next; + } + } + fprintf(stderr, "Something in the clock chain is not hashed\n"); + assert(0); + next:; + } + assert (num_found == m_n_in_table); + } + this->write_list_unlock(); +} + +// If given pointers are not null, assign the hash table size of +// this pair list and the number of pairs in this pair list. +// +// +// grabs and releases read list lock +// +void pair_list::get_state(int *num_entries, int *hash_size) { + this->read_list_lock(); + if (num_entries) { + *num_entries = m_n_in_table; + } + if (hash_size) { + *hash_size = m_table_size; + } + this->read_list_unlock(); +} + +void pair_list::read_list_lock() { + toku_pthread_rwlock_rdlock(&m_list_lock); +} + +void pair_list::read_list_unlock() { + toku_pthread_rwlock_rdunlock(&m_list_lock); +} + +void pair_list::write_list_lock() { + toku_pthread_rwlock_wrlock(&m_list_lock); +} + +void pair_list::write_list_unlock() { + toku_pthread_rwlock_wrunlock(&m_list_lock); +} + +void pair_list::read_pending_exp_lock() { + toku_pthread_rwlock_rdlock(&m_pending_lock_expensive); +} + +void pair_list::read_pending_exp_unlock() { + toku_pthread_rwlock_rdunlock(&m_pending_lock_expensive); +} + +void pair_list::write_pending_exp_lock() { + toku_pthread_rwlock_wrlock(&m_pending_lock_expensive); +} + +void pair_list::write_pending_exp_unlock() { + toku_pthread_rwlock_wrunlock(&m_pending_lock_expensive); +} + +void pair_list::read_pending_cheap_lock() { + toku_pthread_rwlock_rdlock(&m_pending_lock_cheap); +} + +void pair_list::read_pending_cheap_unlock() { + toku_pthread_rwlock_rdunlock(&m_pending_lock_cheap); +} + +void pair_list::write_pending_cheap_lock() { + toku_pthread_rwlock_wrlock(&m_pending_lock_cheap); +} + +void pair_list::write_pending_cheap_unlock() { + toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap); +} + +toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) { + return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex; +} + +void pair_list::pair_lock_by_fullhash(uint32_t fullhash) { + toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex); +} + +void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) { + toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex); +} + + +ENSURE_POD(evictor); + +// +// This is the function that runs eviction on its own thread. +// +static void *eviction_thread(void *evictor_v) { + evictor *CAST_FROM_VOIDP(evictor, evictor_v); + evictor->run_eviction_thread(); + return toku_pthread_done(evictor_v); +} + +// +// Starts the eviction thread, assigns external object references, +// and initializes all counters and condition variables. +// +int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) { + TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting); + + // set max difference to around 500MB + int64_t max_diff = (1 << 29); + + m_low_size_watermark = _size_limit; + // these values are selected kind of arbitrarily right now as + // being a percentage more than low_size_watermark, which is provided + // by the caller. + m_low_size_hysteresis = (11 * _size_limit)/10; //10% more + if ((m_low_size_hysteresis - m_low_size_watermark) > max_diff) { + m_low_size_hysteresis = m_low_size_watermark + max_diff; + } + m_high_size_hysteresis = (5 * _size_limit)/4; // 20% more + if ((m_high_size_hysteresis - m_low_size_hysteresis) > max_diff) { + m_high_size_hysteresis = m_low_size_hysteresis + max_diff; + } + m_high_size_watermark = (3 * _size_limit)/2; // 50% more + if ((m_high_size_watermark - m_high_size_hysteresis) > max_diff) { + m_high_size_watermark = m_high_size_hysteresis + max_diff; + } + + m_enable_partial_eviction = true; + + m_size_reserved = unreservable_memory(_size_limit); + m_size_current = 0; + m_size_cloned_data = 0; + m_size_evicting = 0; + + m_size_nonleaf = create_partitioned_counter(); + m_size_leaf = create_partitioned_counter(); + m_size_rollback = create_partitioned_counter(); + m_size_cachepressure = create_partitioned_counter(); + m_wait_pressure_count = create_partitioned_counter(); + m_wait_pressure_time = create_partitioned_counter(); + m_long_wait_pressure_count = create_partitioned_counter(); + m_long_wait_pressure_time = create_partitioned_counter(); + + m_pl = _pl; + m_cf_list = _cf_list; + m_kibbutz = _kibbutz; + toku_mutex_init( + *cachetable_ev_thread_lock_mutex_key, &m_ev_thread_lock, nullptr); + toku_cond_init( + *cachetable_m_flow_control_cond_key, &m_flow_control_cond, nullptr); + toku_cond_init( + *cachetable_m_ev_thread_cond_key, &m_ev_thread_cond, nullptr); + m_num_sleepers = 0; + m_ev_thread_is_running = false; + m_period_in_seconds = eviction_period; + + unsigned int seed = (unsigned int) time(NULL); + int r = myinitstate_r(seed, m_random_statebuf, sizeof m_random_statebuf, &m_random_data); + assert_zero(r); + + // start the background thread + m_run_thread = true; + m_num_eviction_thread_runs = 0; + m_ev_thread_init = false; + r = toku_pthread_create( + *eviction_thread_key, &m_ev_thread, nullptr, eviction_thread, this); + if (r == 0) { + m_ev_thread_init = true; + } + m_evictor_init = true; + return r; +} + +// +// This stops the eviction thread and clears the condition variable. +// +// NOTE: This should only be called if there are no evictions in progress. +// +void evictor::destroy() { + if (!m_evictor_init) { + return; + } + assert(m_size_evicting == 0); + // + // commented out of Ming, because we could not finish + // #5672. Once #5672 is solved, we should restore this + // + //assert(m_size_current == 0); + + // Stop the eviction thread. + if (m_ev_thread_init) { + toku_mutex_lock(&m_ev_thread_lock); + m_run_thread = false; + this->signal_eviction_thread_locked(); + toku_mutex_unlock(&m_ev_thread_lock); + void *ret; + int r = toku_pthread_join(m_ev_thread, &ret); + assert_zero(r); + assert(!m_ev_thread_is_running); + } + destroy_partitioned_counter(m_size_nonleaf); + m_size_nonleaf = NULL; + destroy_partitioned_counter(m_size_leaf); + m_size_leaf = NULL; + destroy_partitioned_counter(m_size_rollback); + m_size_rollback = NULL; + destroy_partitioned_counter(m_size_cachepressure); + m_size_cachepressure = NULL; + + destroy_partitioned_counter(m_wait_pressure_count); m_wait_pressure_count = NULL; + destroy_partitioned_counter(m_wait_pressure_time); m_wait_pressure_time = NULL; + destroy_partitioned_counter(m_long_wait_pressure_count); m_long_wait_pressure_count = NULL; + destroy_partitioned_counter(m_long_wait_pressure_time); m_long_wait_pressure_time = NULL; + + toku_cond_destroy(&m_flow_control_cond); + toku_cond_destroy(&m_ev_thread_cond); + toku_mutex_destroy(&m_ev_thread_lock); +} + +// +// Increases status variables and the current size variable +// of the evictor based on the given pair attribute. +// +void evictor::add_pair_attr(PAIR_ATTR attr) { + assert(attr.is_valid); + add_to_size_current(attr.size); + increment_partitioned_counter(m_size_nonleaf, attr.nonleaf_size); + increment_partitioned_counter(m_size_leaf, attr.leaf_size); + increment_partitioned_counter(m_size_rollback, attr.rollback_size); + increment_partitioned_counter(m_size_cachepressure, attr.cache_pressure_size); +} + +// +// Decreases status variables and the current size variable +// of the evictor based on the given pair attribute. +// +void evictor::remove_pair_attr(PAIR_ATTR attr) { + assert(attr.is_valid); + remove_from_size_current(attr.size); + increment_partitioned_counter(m_size_nonleaf, 0 - attr.nonleaf_size); + increment_partitioned_counter(m_size_leaf, 0 - attr.leaf_size); + increment_partitioned_counter(m_size_rollback, 0 - attr.rollback_size); + increment_partitioned_counter(m_size_cachepressure, 0 - attr.cache_pressure_size); +} + +// +// Updates this evictor's stats to match the "new" pair attribute given +// while also removing the given "old" pair attribute. +// +void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) { + this->add_pair_attr(new_attr); + this->remove_pair_attr(old_attr); +} + +// +// Adds the given size to the evictor's estimation of +// the size of the cachetable. +// +void evictor::add_to_size_current(long size) { + (void) toku_sync_fetch_and_add(&m_size_current, size); +} + +// +// Subtracts the given size from the evictor's current +// approximation of the cachetable size. +// +void evictor::remove_from_size_current(long size) { + (void) toku_sync_fetch_and_sub(&m_size_current, size); +} + +// +// Adds the size of cloned data to necessary variables in the evictor +// +void evictor::add_cloned_data_size(long size) { + (void) toku_sync_fetch_and_add(&m_size_cloned_data, size); + add_to_size_current(size); +} + +// +// Removes the size of cloned data to necessary variables in the evictor +// +void evictor::remove_cloned_data_size(long size) { + (void) toku_sync_fetch_and_sub(&m_size_cloned_data, size); + remove_from_size_current(size); +} + +// +// TODO: (Zardosht) comment this function +// +uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) { + toku_mutex_lock(&m_ev_thread_lock); + uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved); + if (0) { // debug + fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n", __PRETTY_FUNCTION__, reserved_memory, upper_bound); + } + if (upper_bound > 0 && reserved_memory > upper_bound) { + reserved_memory = upper_bound; + } + m_size_reserved += reserved_memory; + (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory); + this->signal_eviction_thread_locked(); + toku_mutex_unlock(&m_ev_thread_lock); + + if (this->should_client_thread_sleep()) { + this->wait_for_cache_pressure_to_subside(); + } + return reserved_memory; +} + +// +// TODO: (Zardosht) comment this function +// +void evictor::release_reserved_memory(uint64_t reserved_memory){ + (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory); + toku_mutex_lock(&m_ev_thread_lock); + m_size_reserved -= reserved_memory; + // signal the eviction thread in order to possibly wake up sleeping clients + if (m_num_sleepers > 0) { + this->signal_eviction_thread_locked(); + } + toku_mutex_unlock(&m_ev_thread_lock); +} + +// +// This function is the eviction thread. It runs for the lifetime of +// the evictor. Goes to sleep for period_in_seconds +// by waiting on m_ev_thread_cond. +// +void evictor::run_eviction_thread(){ + toku_mutex_lock(&m_ev_thread_lock); + while (m_run_thread) { + m_num_eviction_thread_runs++; // for test purposes only + m_ev_thread_is_running = true; + // responsibility of run_eviction to release and + // regrab ev_thread_lock as it sees fit + this->run_eviction(); + m_ev_thread_is_running = false; + + if (m_run_thread) { + // + // sleep until either we are signaled + // via signal_eviction_thread or + // m_period_in_seconds amount of time has passed + // + if (m_period_in_seconds) { + toku_timespec_t wakeup_time; + struct timeval tv; + gettimeofday(&tv, 0); + wakeup_time.tv_sec = tv.tv_sec; + wakeup_time.tv_nsec = tv.tv_usec * 1000LL; + wakeup_time.tv_sec += m_period_in_seconds; + toku_cond_timedwait( + &m_ev_thread_cond, + &m_ev_thread_lock, + &wakeup_time + ); + } + // for test purposes, we have an option of + // not waiting on a period, but rather sleeping indefinitely + else { + toku_cond_wait(&m_ev_thread_cond, &m_ev_thread_lock); + } + } + } + toku_mutex_unlock(&m_ev_thread_lock); +} + +// +// runs eviction. +// on entry, ev_thread_lock is grabbed, on exit, ev_thread_lock must still be grabbed +// it is the responsibility of this function to release and reacquire ev_thread_lock as it sees fit. +// +void evictor::run_eviction(){ + // + // These variables will help us detect if everything in the clock is currently being accessed. + // We must detect this case otherwise we will end up in an infinite loop below. + // + bool exited_early = false; + uint32_t num_pairs_examined_without_evicting = 0; + + while (this->eviction_needed()) { + if (m_num_sleepers > 0 && this->should_sleeping_clients_wakeup()) { + toku_cond_broadcast(&m_flow_control_cond); + } + // release ev_thread_lock so that eviction may run without holding mutex + toku_mutex_unlock(&m_ev_thread_lock); + + // first try to do an eviction from stale cachefiles + bool some_eviction_ran = m_cf_list->evict_some_stale_pair(this); + if (!some_eviction_ran) { + m_pl->read_list_lock(); + PAIR curr_in_clock = m_pl->m_clock_head; + // if nothing to evict, we need to exit + if (!curr_in_clock) { + m_pl->read_list_unlock(); + toku_mutex_lock(&m_ev_thread_lock); + exited_early = true; + goto exit; + } + if (num_pairs_examined_without_evicting > m_pl->m_n_in_table) { + // we have a cycle where everything in the clock is in use + // do not return an error + // just let memory be overfull + m_pl->read_list_unlock(); + toku_mutex_lock(&m_ev_thread_lock); + exited_early = true; + goto exit; + } + bool eviction_run = run_eviction_on_pair(curr_in_clock); + if (eviction_run) { + // reset the count + num_pairs_examined_without_evicting = 0; + } + else { + num_pairs_examined_without_evicting++; + } + // at this point, either curr_in_clock is still in the list because it has not been fully evicted, + // and we need to move ct->m_clock_head over. Otherwise, curr_in_clock has been fully evicted + // and we do NOT need to move ct->m_clock_head, as the removal of curr_in_clock + // modified ct->m_clock_head + if (m_pl->m_clock_head && (m_pl->m_clock_head == curr_in_clock)) { + m_pl->m_clock_head = m_pl->m_clock_head->clock_next; + } + m_pl->read_list_unlock(); + } + toku_mutex_lock(&m_ev_thread_lock); + } + +exit: + if (m_num_sleepers > 0 && (exited_early || this->should_sleeping_clients_wakeup())) { + toku_cond_broadcast(&m_flow_control_cond); + } + return; +} + +// +// NOTE: Cachetable lock held on entry. +// Runs eviction on the given PAIR. This may be a +// partial eviction or full eviction. +// +// on entry, pair mutex is NOT held, but pair list's read list lock +// IS held +// on exit, the same conditions must apply +// +bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { + uint32_t n_in_table; + int64_t size_current; + bool ret_val = false; + // function meant to be called on PAIR that is not being accessed right now + CACHEFILE cf = curr_in_clock->cachefile; + int r = bjm_add_background_job(cf->bjm); + if (r) { + goto exit; + } + pair_lock(curr_in_clock); + // these are the circumstances under which we don't run eviction on a pair: + // - if other users are waiting on the lock + // - if the PAIR is referenced by users + // - if the PAIR's disk_nb_mutex is in use, implying that it is + // undergoing a checkpoint + if (curr_in_clock->value_rwlock.users() || + curr_in_clock->refcount > 0 || + nb_mutex_users(&curr_in_clock->disk_nb_mutex)) + { + pair_unlock(curr_in_clock); + bjm_remove_background_job(cf->bjm); + goto exit; + } + + // extract and use these values so that we don't risk them changing + // out from underneath us in calculations below. + n_in_table = m_pl->m_n_in_table; + size_current = m_size_current; + + // now that we have the pair mutex we care about, we can + // release the read list lock and reacquire it at the end of the function + m_pl->read_list_unlock(); + ret_val = true; + if (curr_in_clock->count > 0) { + toku::context pe_ctx(CTX_PARTIAL_EVICTION); + + uint32_t curr_size = curr_in_clock->attr.size; + // if the size of this PAIR is greater than the average size of PAIRs + // in the cachetable, then decrement it, otherwise, decrement + // probabilistically + if (curr_size*n_in_table >= size_current) { + curr_in_clock->count--; + } else { + // generate a random number between 0 and 2^16 + assert(size_current <= (INT64_MAX / ((1<<16)-1))); // to protect against possible overflows + int32_t rnd = myrandom_r(&m_random_data) % (1<<16); + // The if-statement below will be true with probability of + // curr_size/(average size of PAIR in cachetable) + // Here is how the math is done: + // average_size = size_current/n_in_table + // curr_size/average_size = curr_size*n_in_table/size_current + // we evaluate if a random number from 0 to 2^16 is less than + // than curr_size/average_size * 2^16. So, our if-clause should be + // if (2^16*curr_size/average_size > rnd) + // this evaluates to: + // if (2^16*curr_size*n_in_table/size_current > rnd) + // by multiplying each side of the equation by size_current, we get + // if (2^16*curr_size*n_in_table > rnd*size_current) + // and dividing each side by 2^16, + // we get the if-clause below + // + if ((((int64_t)curr_size) * n_in_table) >= (((int64_t)rnd) * size_current)>>16) { + curr_in_clock->count--; + } + } + + if (m_enable_partial_eviction) { + // call the partial eviction callback + curr_in_clock->value_rwlock.write_lock(true); + + void *value = curr_in_clock->value_data; + void* disk_data = curr_in_clock->disk_data; + void *write_extraargs = curr_in_clock->write_extraargs; + enum partial_eviction_cost cost; + long bytes_freed_estimate = 0; + curr_in_clock->pe_est_callback(value, disk_data, + &bytes_freed_estimate, &cost, + write_extraargs); + if (cost == PE_CHEAP) { + pair_unlock(curr_in_clock); + curr_in_clock->size_evicting_estimate = 0; + this->do_partial_eviction(curr_in_clock); + bjm_remove_background_job(cf->bjm); + } else if (cost == PE_EXPENSIVE) { + // only bother running an expensive partial eviction + // if it is expected to free space + if (bytes_freed_estimate > 0) { + pair_unlock(curr_in_clock); + curr_in_clock->size_evicting_estimate = bytes_freed_estimate; + toku_mutex_lock(&m_ev_thread_lock); + m_size_evicting += bytes_freed_estimate; + toku_mutex_unlock(&m_ev_thread_lock); + toku_kibbutz_enq(m_kibbutz, cachetable_partial_eviction, + curr_in_clock); + } else { + curr_in_clock->value_rwlock.write_unlock(); + pair_unlock(curr_in_clock); + bjm_remove_background_job(cf->bjm); + } + } else { + assert(false); + } + } else { + pair_unlock(curr_in_clock); + bjm_remove_background_job(cf->bjm); + } + } else { + toku::context pe_ctx(CTX_FULL_EVICTION); + + // responsibility of try_evict_pair to eventually remove background job + // pair's mutex is still grabbed here + this->try_evict_pair(curr_in_clock); + } + // regrab the read list lock, because the caller assumes + // that it is held. The contract requires this. + m_pl->read_list_lock(); +exit: + return ret_val; +} + +struct pair_unpin_with_new_attr_extra { + pair_unpin_with_new_attr_extra(evictor *e, PAIR p) : + ev(e), pair(p) { + } + evictor *ev; + PAIR pair; +}; + +static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) { + struct pair_unpin_with_new_attr_extra *info = + reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra); + PAIR p = info->pair; + evictor *ev = info->ev; + + // change the attr in the evictor, then update the value in the pair + ev->change_pair_attr(p->attr, new_attr); + p->attr = new_attr; + + // unpin + pair_lock(p); + p->value_rwlock.write_unlock(); + pair_unlock(p); +} + +// +// on entry and exit, pair's mutex is not held +// on exit, PAIR is unpinned +// +void evictor::do_partial_eviction(PAIR p) { + // Copy the old attr + PAIR_ATTR old_attr = p->attr; + long long size_evicting_estimate = p->size_evicting_estimate; + + struct pair_unpin_with_new_attr_extra extra(this, p); + p->pe_callback(p->value_data, old_attr, p->write_extraargs, + // passed as the finalize continuation, which allows the + // pe_callback to unpin the node before doing expensive cleanup + pair_unpin_with_new_attr, &extra); + + // now that the pe_callback (and its pair_unpin_with_new_attr continuation) + // have finished, we can safely decrease size_evicting + this->decrease_size_evicting(size_evicting_estimate); +} + +// +// CT lock held on entry +// background job has been added for p->cachefile on entry +// responsibility of this function to make sure that background job is removed +// +// on entry, pair's mutex is held, on exit, the pair's mutex is NOT held +// +void evictor::try_evict_pair(PAIR p) { + CACHEFILE cf = p->cachefile; + // evictions without a write or unpinned pair's that are clean + // can be run in the current thread + + // the only caller, run_eviction_on_pair, should call this function + // only if no one else is trying to use it + assert(!p->value_rwlock.users()); + p->value_rwlock.write_lock(true); + // if the PAIR is dirty, the running eviction requires writing the + // PAIR out. if the disk_nb_mutex is grabbed, then running + // eviction requires waiting for the disk_nb_mutex to become available, + // which may be expensive. Hence, if either is true, we + // do the eviction on a writer thread + if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) { + p->size_evicting_estimate = 0; + // + // This method will unpin PAIR and release PAIR mutex + // + // because the PAIR is not dirty, we can safely pass + // false for the for_checkpoint parameter + this->evict_pair(p, false); + bjm_remove_background_job(cf->bjm); + } + else { + pair_unlock(p); + toku_mutex_lock(&m_ev_thread_lock); + assert(m_size_evicting >= 0); + p->size_evicting_estimate = p->attr.size; + m_size_evicting += p->size_evicting_estimate; + assert(m_size_evicting >= 0); + toku_mutex_unlock(&m_ev_thread_lock); + toku_kibbutz_enq(m_kibbutz, cachetable_evicter, p); + } +} + +// +// Requires: This thread must hold the write lock (nb_mutex) for the pair. +// The pair's mutex (p->mutex) is also held. +// on exit, neither is held +// +void evictor::evict_pair(PAIR p, bool for_checkpoint) { + if (p->dirty) { + pair_unlock(p); + cachetable_write_locked_pair(this, p, for_checkpoint); + pair_lock(p); + } + // one thing we can do here is extract the size_evicting estimate, + // have decrease_size_evicting take the estimate and not the pair, + // and do this work after we have called + // cachetable_maybe_remove_and_free_pair + this->decrease_size_evicting(p->size_evicting_estimate); + // if we are to remove this pair, we need the write list lock, + // to get it in a way that avoids deadlocks, we must first release + // the pair's mutex, then grab the write list lock, then regrab the + // pair's mutex. The pair cannot go anywhere because + // the pair is still pinned + nb_mutex_lock(&p->disk_nb_mutex, p->mutex); + pair_unlock(p); + m_pl->write_list_lock(); + pair_lock(p); + p->value_rwlock.write_unlock(); + nb_mutex_unlock(&p->disk_nb_mutex); + // at this point, we have the pair list's write list lock + // and we have the pair's mutex (p->mutex) held + + // this ensures that a clone running in the background first completes + bool removed = false; + if (p->value_rwlock.users() == 0 && p->refcount == 0) { + // assumption is that if we are about to remove the pair + // that no one has grabbed the disk_nb_mutex, + // and that there is no cloned_value_data, because + // no one is writing a cloned value out. + assert(nb_mutex_users(&p->disk_nb_mutex) == 0); + assert(p->cloned_value_data == NULL); + cachetable_remove_pair(m_pl, this, p); + removed = true; + } + pair_unlock(p); + m_pl->write_list_unlock(); + // do not want to hold the write list lock while freeing a pair + if (removed) { + cachetable_free_pair(p); + } +} + +// +// this function handles the responsibilities for writer threads when they +// decrease size_evicting. The responsibilities are: +// - decrease m_size_evicting in a thread safe manner +// - in some circumstances, signal the eviction thread +// +void evictor::decrease_size_evicting(long size_evicting_estimate) { + if (size_evicting_estimate > 0) { + toku_mutex_lock(&m_ev_thread_lock); + int64_t buffer = m_high_size_hysteresis - m_low_size_watermark; + // if size_evicting is transitioning from greater than buffer to below buffer, and + // some client threads are sleeping, we need to wake up the eviction thread. + // Here is why. In this scenario, we are in one of two cases: + // - size_current - size_evicting < low_size_watermark + // If this is true, then size_current < high_size_hysteresis, which + // means we need to wake up sleeping clients + // - size_current - size_evicting > low_size_watermark, + // which means more evictions must be run. + // The consequences of both cases are the responsibility + // of the eviction thread. + // + bool need_to_signal_ev_thread = + (m_num_sleepers > 0) && + !m_ev_thread_is_running && + (m_size_evicting > buffer) && + ((m_size_evicting - size_evicting_estimate) <= buffer); + m_size_evicting -= size_evicting_estimate; + assert(m_size_evicting >= 0); + if (need_to_signal_ev_thread) { + this->signal_eviction_thread_locked(); + } + toku_mutex_unlock(&m_ev_thread_lock); + } +} + +// +// Wait for cache table space to become available +// size_current is number of bytes currently occupied by data (referred to by pairs) +// size_evicting is number of bytes queued up to be evicted +// +void evictor::wait_for_cache_pressure_to_subside() { + uint64_t t0 = toku_current_time_microsec(); + toku_mutex_lock(&m_ev_thread_lock); + m_num_sleepers++; + this->signal_eviction_thread_locked(); + toku_cond_wait(&m_flow_control_cond, &m_ev_thread_lock); + m_num_sleepers--; + toku_mutex_unlock(&m_ev_thread_lock); + uint64_t t1 = toku_current_time_microsec(); + increment_partitioned_counter(m_wait_pressure_count, 1); + uint64_t tdelta = t1 - t0; + increment_partitioned_counter(m_wait_pressure_time, tdelta); + if (tdelta > 1000000) { + increment_partitioned_counter(m_long_wait_pressure_count, 1); + increment_partitioned_counter(m_long_wait_pressure_time, tdelta); + } +} + +// +// Get the status of the current estimated size of the cachetable, +// and the evictor's set limit. +// +void evictor::get_state(long *size_current_ptr, long *size_limit_ptr) { + if (size_current_ptr) { + *size_current_ptr = m_size_current; + } + if (size_limit_ptr) { + *size_limit_ptr = m_low_size_watermark; + } +} + +// +// Force the eviction thread to do some work. +// +// This function does not require any mutex to be held. +// As a result, scheduling is not guaranteed, but that is tolerable. +// +void evictor::signal_eviction_thread() { + toku_mutex_lock(&m_ev_thread_lock); + toku_cond_signal(&m_ev_thread_cond); + toku_mutex_unlock(&m_ev_thread_lock); +} + +void evictor::signal_eviction_thread_locked() { + toku_cond_signal(&m_ev_thread_cond); +} + +// +// Returns true if the cachetable is so over subscribed, that a client thread should sleep +// +// This function may be called in a thread-unsafe manner. Locks are not +// required to read size_current. The result is that +// the values may be a little off, but we think that is tolerable. +// +bool evictor::should_client_thread_sleep(){ + return unsafe_read_size_current() > m_high_size_watermark; +} + +// +// Returns true if a sleeping client should be woken up because +// the cachetable is not overly subscribed +// +// This function may be called in a thread-unsafe manner. Locks are not +// required to read size_current. The result is that +// the values may be a little off, but we think that is tolerable. +// +bool evictor::should_sleeping_clients_wakeup() { + return unsafe_read_size_current() <= m_high_size_hysteresis; +} + +// +// Returns true if a client thread should try to wake up the eviction +// thread because the client thread has noticed too much data taken +// up in the cachetable. +// +// This function may be called in a thread-unsafe manner. Locks are not +// required to read size_current or size_evicting. The result is that +// the values may be a little off, but we think that is tolerable. +// If the caller wants to ensure that ev_thread_is_running and size_evicting +// are accurate, then the caller must hold ev_thread_lock before +// calling this function. +// +bool evictor::should_client_wake_eviction_thread() { + return + !m_ev_thread_is_running && + ((unsafe_read_size_current() - m_size_evicting) > m_low_size_hysteresis); +} + +// +// Determines if eviction is needed. If the current size of +// the cachetable exceeds the sum of our fixed size limit and +// the amount of data currently being evicted, then eviction is needed +// +bool evictor::eviction_needed() { + return (m_size_current - m_size_evicting) > m_low_size_watermark; +} + +inline int64_t evictor::unsafe_read_size_current(void) const { + return m_size_current; +} + +void evictor::fill_engine_status() { + CT_STATUS_VAL(CT_SIZE_CURRENT) = m_size_current; + CT_STATUS_VAL(CT_SIZE_LIMIT) = m_low_size_hysteresis; + CT_STATUS_VAL(CT_SIZE_WRITING) = m_size_evicting; + CT_STATUS_VAL(CT_SIZE_NONLEAF) = read_partitioned_counter(m_size_nonleaf); + CT_STATUS_VAL(CT_SIZE_LEAF) = read_partitioned_counter(m_size_leaf); + CT_STATUS_VAL(CT_SIZE_ROLLBACK) = read_partitioned_counter(m_size_rollback); + CT_STATUS_VAL(CT_SIZE_CACHEPRESSURE) = read_partitioned_counter(m_size_cachepressure); + CT_STATUS_VAL(CT_SIZE_CLONED) = m_size_cloned_data; + CT_STATUS_VAL(CT_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_wait_pressure_count); + CT_STATUS_VAL(CT_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_wait_pressure_time); + CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_long_wait_pressure_count); + CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_long_wait_pressure_time); +} + +void evictor::set_enable_partial_eviction(bool enabled) { + m_enable_partial_eviction = enabled; +} + +bool evictor::get_enable_partial_eviction(void) const { + return m_enable_partial_eviction; +} + +//////////////////////////////////////////////////////////////////////////////// + +ENSURE_POD(checkpointer); + +// +// Sets the cachetable reference in this checkpointer class, this is temporary. +// +int checkpointer::init(pair_list *_pl, + TOKULOGGER _logger, + evictor *_ev, + cachefile_list *files) { + m_list = _pl; + m_logger = _logger; + m_ev = _ev; + m_cf_list = files; + bjm_init(&m_checkpoint_clones_bjm); + + // Default is no checkpointing. + m_checkpointer_cron_init = false; + int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this); + if (r == 0) { + m_checkpointer_cron_init = true; + } + m_checkpointer_init = true; + return r; +} + +void checkpointer::destroy() { + if (!m_checkpointer_init) { + return; + } + if (m_checkpointer_cron_init && !this->has_been_shutdown()) { + // for test code only, production code uses toku_cachetable_minicron_shutdown() + int r = this->shutdown(); + assert(r == 0); + } + bjm_destroy(m_checkpoint_clones_bjm); +} + +// +// Sets how often the checkpoint thread will run, in seconds +// +void checkpointer::set_checkpoint_period(uint32_t new_period) { + toku_minicron_change_period(&m_checkpointer_cron, new_period*1000); +} + +// +// Sets how often the checkpoint thread will run. +// +uint32_t checkpointer::get_checkpoint_period() { + return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron); +} + +// +// Stops the checkpoint thread. +// +int checkpointer::shutdown() { + return toku_minicron_shutdown(&m_checkpointer_cron); +} + +// +// If checkpointing is running, this returns false. +// +bool checkpointer::has_been_shutdown() { + return toku_minicron_has_been_shutdown(&m_checkpointer_cron); +} + +TOKULOGGER checkpointer::get_logger() { + return m_logger; +} + +void checkpointer::increment_num_txns() { + m_checkpoint_num_txns++; +} + +struct iterate_begin_checkpoint { + LSN lsn_of_checkpoint_in_progress; + iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { } + static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) { + assert(cf->begin_checkpoint_userdata); + if (cf->for_checkpoint) { + cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata); + } + return 0; + } +}; + +// +// Update the user data in any cachefiles in our checkpoint list. +// +void checkpointer::update_cachefiles() { + struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress); + int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint, + iterate_begin_checkpoint::fn>(&iterate); + assert_zero(r); +} + +struct iterate_note_pin { + static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) { + assert(cf->note_pin_by_checkpoint); + cf->note_pin_by_checkpoint(cf, cf->userdata); + cf->for_checkpoint = true; + return 0; + } +}; + +// +// Sets up and kicks off a checkpoint. +// +void checkpointer::begin_checkpoint() { + // 1. Initialize the accountability counters. + m_checkpoint_num_txns = 0; + + // 2. Make list of cachefiles to be included in the checkpoint. + m_cf_list->read_lock(); + m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr); + m_checkpoint_num_files = m_cf_list->m_active_fileid.size(); + m_cf_list->read_unlock(); + + // 3. Create log entries for this checkpoint. + if (m_logger) { + this->log_begin_checkpoint(); + } + + bjm_reset(m_checkpoint_clones_bjm); + + m_list->write_pending_exp_lock(); + m_list->read_list_lock(); + m_cf_list->read_lock(); // needed for update_cachefiles + m_list->write_pending_cheap_lock(); + // 4. Turn on all the relevant checkpoint pending bits. + this->turn_on_pending_bits(); + + // 5. + this->update_cachefiles(); + m_list->write_pending_cheap_unlock(); + m_cf_list->read_unlock(); + m_list->read_list_unlock(); + m_list->write_pending_exp_unlock(); +} + +struct iterate_log_fassociate { + static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) { + assert(cf->log_fassociate_during_checkpoint); + cf->log_fassociate_during_checkpoint(cf, cf->userdata); + return 0; + } +}; + +// +// Assuming the logger exists, this will write out the folloing +// information to the log. +// +// 1. Writes the BEGIN_CHECKPOINT to the log. +// 2. Writes the list of open dictionaries to the log. +// 3. Writes the list of open transactions to the log. +// 4. Writes the list of dicionaries that have had rollback logs suppresed. +// +// NOTE: This also has the side effecto of setting the LSN +// of checkpoint in progress. +// +void checkpointer::log_begin_checkpoint() { + int r = 0; + + // Write the BEGIN_CHECKPOINT to the log. + LSN begin_lsn={ .lsn = (uint64_t) -1 }; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed. + TXN_MANAGER mgr = toku_logger_get_txn_manager(m_logger); + TXNID last_xid = toku_txn_manager_get_last_xid(mgr); + toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid); + m_lsn_of_checkpoint_in_progress = begin_lsn; + + // Log the list of open dictionaries. + m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr); + + // Write open transactions to the log. + r = toku_txn_manager_iter_over_live_txns( + m_logger->txn_manager, + log_open_txn, + this + ); + assert(r == 0); +} + +// +// Sets the pending bits of EVERY PAIR in the cachetable, regardless of +// whether the PAIR is clean or not. It will be the responsibility of +// end_checkpoint or client threads to simply clear the pending bit +// if the PAIR is clean. +// +// On entry and exit , the pair list's read list lock is grabbed, and +// both pending locks are grabbed +// +void checkpointer::turn_on_pending_bits() { + PAIR p = NULL; + uint32_t i; + for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) { + assert(!p->checkpoint_pending); + //Only include pairs belonging to cachefiles in the checkpoint + if (!p->cachefile->for_checkpoint) { + continue; + } + // Mark everything as pending a checkpoint + // + // The rule for the checkpoint_pending bit is as follows: + // - begin_checkpoint may set checkpoint_pending to true + // even though the pair lock on the node is not held. + // - any thread that wants to clear the pending bit must own + // the PAIR lock. Otherwise, + // we may end up clearing the pending bit before the + // current lock is ever released. + p->checkpoint_pending = true; + if (m_list->m_pending_head) { + m_list->m_pending_head->pending_prev = p; + } + p->pending_next = m_list->m_pending_head; + p->pending_prev = NULL; + m_list->m_pending_head = p; + } + invariant(p == m_list->m_checkpoint_head); +} + +void checkpointer::add_background_job() { + int r = bjm_add_background_job(m_checkpoint_clones_bjm); + assert_zero(r); +} +void checkpointer::remove_background_job() { + bjm_remove_background_job(m_checkpoint_clones_bjm); +} + +void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextra) { + toku::scoped_malloc checkpoint_cfs_buf(m_checkpoint_num_files * sizeof(CACHEFILE)); + CACHEFILE *checkpoint_cfs = reinterpret_cast<CACHEFILE *>(checkpoint_cfs_buf.get()); + + this->fill_checkpoint_cfs(checkpoint_cfs); + this->checkpoint_pending_pairs(); + this->checkpoint_userdata(checkpoint_cfs); + // For testing purposes only. Dictionary has been fsync-ed to disk but log has not yet been written. + if (testcallback_f) { + testcallback_f(testextra); + } + this->log_end_checkpoint(); + this->end_checkpoint_userdata(checkpoint_cfs); + + // Delete list of cachefiles in the checkpoint, + this->remove_cachefiles(checkpoint_cfs); +} + +struct iterate_checkpoint_cfs { + CACHEFILE *checkpoint_cfs; + uint32_t checkpoint_num_files; + uint32_t curr_index; + iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) : + checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) { + } + static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) { + if (cf->for_checkpoint) { + assert(info->curr_index < info->checkpoint_num_files); + info->checkpoint_cfs[info->curr_index] = cf; + info->curr_index++; + } + return 0; + } +}; + +void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) { + struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files); + + m_cf_list->read_lock(); + m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate); + assert(iterate.curr_index == m_checkpoint_num_files); + m_cf_list->read_unlock(); +} + +void checkpointer::checkpoint_pending_pairs() { + PAIR p; + m_list->read_list_lock(); + while ((p = m_list->m_pending_head)!=0) { + // <CER> TODO: Investigate why we move pending head outisde of the pending_pairs_remove() call. + m_list->m_pending_head = m_list->m_pending_head->pending_next; + m_list->pending_pairs_remove(p); + // if still pending, clear the pending bit and write out the node + pair_lock(p); + m_list->read_list_unlock(); + write_pair_for_checkpoint_thread(m_ev, p); + pair_unlock(p); + m_list->read_list_lock(); + } + assert(!m_list->m_pending_head); + m_list->read_list_unlock(); + bjm_wait_for_jobs_to_finish(m_checkpoint_clones_bjm); +} + +void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) { + // have just written data blocks, so next write the translation and header for each open dictionary + for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { + CACHEFILE cf = checkpoint_cfs[i]; + assert(cf->for_checkpoint); + assert(cf->checkpoint_userdata); + toku_cachetable_set_checkpointing_user_data_status(1); + cf->checkpoint_userdata(cf, cf->fd, cf->userdata); + toku_cachetable_set_checkpointing_user_data_status(0); + } +} + +void checkpointer::log_end_checkpoint() { + if (m_logger) { + toku_log_end_checkpoint(m_logger, NULL, + 1, // want the end_checkpoint to be fsync'd + m_lsn_of_checkpoint_in_progress, + 0, + m_checkpoint_num_files, + m_checkpoint_num_txns); + toku_logger_note_checkpoint(m_logger, m_lsn_of_checkpoint_in_progress); + } +} + +void checkpointer::end_checkpoint_userdata(CACHEFILE* checkpoint_cfs) { + // everything has been written to file and fsynced + // ... call checkpoint-end function in block translator + // to free obsolete blocks on disk used by previous checkpoint + //cachefiles_in_checkpoint is protected by the checkpoint_safe_lock + for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { + CACHEFILE cf = checkpoint_cfs[i]; + assert(cf->for_checkpoint); + assert(cf->end_checkpoint_userdata); + cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata); + } +} + +// +// Deletes all the cachefiles in this checkpointers cachefile list. +// +void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) { + // making this a while loop because note_unpin_by_checkpoint may destroy the cachefile + for (uint32_t i = 0; i < m_checkpoint_num_files; i++) { + CACHEFILE cf = checkpoint_cfs[i]; + // Checking for function existing so that this function + // can be called from cachetable tests. + assert(cf->for_checkpoint); + cf->for_checkpoint = false; + assert(cf->note_unpin_by_checkpoint); + // Clear the bit saying theis file is in the checkpoint. + cf->note_unpin_by_checkpoint(cf, cf->userdata); + } +} + + +//////////////////////////////////////////////////////// +// +// cachefiles list +// +static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD"); + +void cachefile_list::init() { + m_next_filenum_to_use.fileid = 0; + m_next_hash_id_to_use = 0; + toku_pthread_rwlock_init(*cachetable_m_lock_key, &m_lock, nullptr); + m_active_filenum.create(); + m_active_fileid.create(); + m_stale_fileid.create(); +} + +void cachefile_list::destroy() { + m_active_filenum.destroy(); + m_active_fileid.destroy(); + m_stale_fileid.destroy(); + toku_pthread_rwlock_destroy(&m_lock); +} + +void cachefile_list::read_lock() { + toku_pthread_rwlock_rdlock(&m_lock); +} + +void cachefile_list::read_unlock() { + toku_pthread_rwlock_rdunlock(&m_lock); +} + +void cachefile_list::write_lock() { + toku_pthread_rwlock_wrlock(&m_lock); +} + +void cachefile_list::write_unlock() { + toku_pthread_rwlock_wrunlock(&m_lock); +} + +struct iterate_find_iname { + const char *iname_in_env; + CACHEFILE found_cf; + iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { } + static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) { + if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) { + info->found_cf = cf; + return -1; + } + return 0; + } +}; + +int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) { + struct iterate_find_iname iterate(iname_in_env); + + read_lock(); + int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate); + if (iterate.found_cf != nullptr) { + assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0); + *cf = iterate.found_cf; + r = 0; + } else { + r = ENOENT; + } + read_unlock(); + return r; +} + +static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) { + const FILENUM a = a_cf->filenum; + if (a.fileid < b.fileid) { + return -1; + } else if (a.fileid == b.fileid) { + return 0; + } else { + return 1; + } +} + +int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) { + read_lock(); + int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr); + if (r == DB_NOTFOUND) { + r = ENOENT; + } else { + invariant_zero(r); + } + read_unlock(); + return r; +} + +static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) { + return toku_fileid_cmp(a_cf->fileid, b); +} + +void cachefile_list::add_cf_unlocked(CACHEFILE cf) { + int r; + r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr); + assert_zero(r); + r = m_active_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr); + assert_zero(r); +} + +void cachefile_list::add_stale_cf(CACHEFILE cf) { + write_lock(); + int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr); + assert_zero(r); + write_unlock(); +} + +void cachefile_list::remove_cf(CACHEFILE cf) { + write_lock(); + + uint32_t idx; + int r; + r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(cf->filenum, nullptr, &idx); + assert_zero(r); + r = m_active_filenum.delete_at(idx); + assert_zero(r); + + r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx); + assert_zero(r); + r = m_active_fileid.delete_at(idx); + assert_zero(r); + + write_unlock(); +} + +void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) { + uint32_t idx; + int r; + r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx); + assert_zero(r); + r = m_stale_fileid.delete_at(idx); + assert_zero(r); +} + +FILENUM cachefile_list::reserve_filenum() { + // taking a write lock because we are modifying next_filenum_to_use + FILENUM filenum = FILENUM_NONE; + write_lock(); + while (1) { + int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(m_next_filenum_to_use, nullptr, nullptr); + if (r == 0) { + m_next_filenum_to_use.fileid++; + continue; + } + assert(r == DB_NOTFOUND); + + // skip the reserved value UINT32_MAX and wrap around to zero + if (m_next_filenum_to_use.fileid == FILENUM_NONE.fileid) { + m_next_filenum_to_use.fileid = 0; + continue; + } + + filenum = m_next_filenum_to_use; + m_next_filenum_to_use.fileid++; + break; + } + write_unlock(); + return filenum; +} + +uint32_t cachefile_list::get_new_hash_id_unlocked() { + uint32_t retval = m_next_hash_id_to_use; + m_next_hash_id_to_use++; + return retval; +} + +CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) { + CACHEFILE cf = nullptr; + int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr); + if (r == 0) { + assert(!cf->unlink_on_close); + } + return cf; +} + +CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) { + CACHEFILE cf = nullptr; + int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr); + if (r == 0) { + assert(!cf->unlink_on_close); + } + return cf; +} + +void cachefile_list::verify_unused_filenum(FILENUM filenum) { + int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr); + assert(r == DB_NOTFOUND); +} + +// returns true if some eviction ran, false otherwise +bool cachefile_list::evict_some_stale_pair(evictor* ev) { + write_lock(); + if (m_stale_fileid.size() == 0) { + write_unlock(); + return false; + } + + CACHEFILE stale_cf = nullptr; + int r = m_stale_fileid.fetch(0, &stale_cf); + assert_zero(r); + + // we should not have a cf in the stale list + // that does not have any pairs + PAIR p = stale_cf->cf_head; + paranoid_invariant(p != NULL); + evict_pair_from_cachefile(p); + + // now that we have evicted something, + // let's check if the cachefile is needed anymore + // + // it is not needed if the latest eviction caused + // the cf_head for that cf to become null + bool destroy_cf = stale_cf->cf_head == nullptr; + if (destroy_cf) { + remove_stale_cf_unlocked(stale_cf); + } + + write_unlock(); + + ev->remove_pair_attr(p->attr); + cachetable_free_pair(p); + if (destroy_cf) { + cachefile_destroy(stale_cf); + } + return true; +} + +void cachefile_list::free_stale_data(evictor* ev) { + write_lock(); + while (m_stale_fileid.size() != 0) { + CACHEFILE stale_cf = nullptr; + int r = m_stale_fileid.fetch(0, &stale_cf); + assert_zero(r); + + // we should not have a cf in the stale list + // that does not have any pairs + PAIR p = stale_cf->cf_head; + paranoid_invariant(p != NULL); + + evict_pair_from_cachefile(p); + ev->remove_pair_attr(p->attr); + cachetable_free_pair(p); + + // now that we have evicted something, + // let's check if the cachefile is needed anymore + if (stale_cf->cf_head == NULL) { + remove_stale_cf_unlocked(stale_cf); + cachefile_destroy(stale_cf); + } + } + write_unlock(); +} + +void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void); +void +toku_cachetable_helgrind_ignore(void) { + TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status); +} diff --git a/storage/tokudb/PerconaFT/ft/cachetable/cachetable.h b/storage/tokudb/PerconaFT/ft/cachetable/cachetable.h new file mode 100644 index 00000000..c5c21b49 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/cachetable.h @@ -0,0 +1,588 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include <fcntl.h> + +#include "ft/logger/logger.h" +#include "ft/serialize/block_table.h" +#include "ft/txn/txn.h" +#include "ft/ft-status.h" +#include "util/minicron.h" + +// Maintain a cache mapping from cachekeys to values (void*) +// Some of the keys can be pinned. Don't pin too many or for too long. +// If the cachetable is too full, it will call the flush_callback() function with the key, the value, and the otherargs +// and then remove the key-value pair from the cache. +// The callback won't be any of the currently pinned keys. +// Also when flushing an object, the cachetable drops all references to it, +// so you may need to free() it. +// Note: The cachetable should use a common pool of memory, flushing things across cachetables. +// (The first implementation doesn't) +// If you pin something twice, you must unpin it twice. +// table_size is the initial size of the cache table hash table (in number of entries) +// size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes) + +typedef BLOCKNUM CACHEKEY; + +class checkpointer; +typedef class checkpointer *CHECKPOINTER; +typedef struct cachetable *CACHETABLE; +typedef struct cachefile *CACHEFILE; +typedef struct ctpair *PAIR; + +// This struct hold information about values stored in the cachetable. +// As one can tell from the names, we are probably violating an +// abstraction layer by placing names. +// +// The purpose of having this struct is to have a way for the +// cachetable to accumulate the some totals we are interested in. +// Breaking this abstraction layer by having these names was the +// easiest way. +// +typedef struct pair_attr_s { + long size; // size PAIR's value takes in memory + long nonleaf_size; // size if PAIR is a nonleaf node, 0 otherwise, used only for engine status + long leaf_size; // size if PAIR is a leaf node, 0 otherwise, used only for engine status + long rollback_size; // size of PAIR is a rollback node, 0 otherwise, used only for engine status + long cache_pressure_size; // amount PAIR contributes to cache pressure, is sum of buffer sizes and workdone counts + bool is_valid; +} PAIR_ATTR; + +static inline PAIR_ATTR make_pair_attr(long size) { + PAIR_ATTR result={ + .size = size, + .nonleaf_size = 0, + .leaf_size = 0, + .rollback_size = 0, + .cache_pressure_size = 0, + .is_valid = true + }; + return result; +} + +void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period); +uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct); +void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations); +uint32_t toku_get_cleaner_iterations (CACHETABLE ct); +uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct); +void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled); +bool toku_get_enable_partial_eviction (CACHETABLE ct); + +// cachetable operations + +// create and initialize a cache table +// size_limit is the upper limit on the size of the size of the values in the table +// pass 0 if you want the default +int toku_cachetable_create_ex(CACHETABLE *result, long size_limit, + unsigned long client_pool_threads, + unsigned long cachetable_pool_threads, + unsigned long checkpoint_pool_threads, + LSN initial_lsn, struct tokulogger *logger); + +#define toku_cachetable_create(r, s, l, o) \ + toku_cachetable_create_ex(r, s, 0, 0, 0, l, o); + +// Create a new cachetable. +// Effects: a new cachetable is created and initialized. +// The cachetable pointer is stored into result. +// The sum of the sizes of the memory objects is set to size_limit, in whatever +// units make sense to the user of the cachetable. +// Returns: If success, returns 0 and result points to the new cachetable. Otherwise, +// returns an error number. + +// Returns a pointer to the checkpointer within the given cachetable. +CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct); + +// What is the cachefile that goes with a particular filenum? +// During a transaction, we cannot reuse a filenum. +int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf); + +// What is the cachefile that goes with a particular iname (relative to env)? +// During a transaction, we cannot reuse an iname. +int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf); + +// Get the iname (within the cwd) associated with the cachefile +// Return the filename +char *toku_cachefile_fname_in_cwd (CACHEFILE cf); + +void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, struct tokulogger *logger); + +void toku_cachetable_end_checkpoint(CHECKPOINTER cp, struct tokulogger *logger, + void (*testcallback_f)(void*), void * testextra); + + +// Shuts down checkpoint thread +// Requires no locks be held that are taken by the checkpoint function +void toku_cachetable_minicron_shutdown(CACHETABLE ct); + +// Prepare to close the cachetable. This informs the cachetable that it is about to be closed +// so that it can tune its checkpoint resource use. +void toku_cachetable_prepare_close(CACHETABLE ct); + +// Close the cachetable. +// Effects: All of the memory objects are flushed to disk, and the cachetable is destroyed. +void toku_cachetable_close(CACHETABLE *ct); + +// Open a file and bind the file to a new cachefile object. (For use by test programs only.) +int toku_cachetable_openf(CACHEFILE *,CACHETABLE, const char *fname_in_env, int flags, mode_t mode); + +// Bind a file to a new cachefile object. +int toku_cachetable_openfd(CACHEFILE *,CACHETABLE, int fd, + const char *fname_relative_to_env); +int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int fd, + const char *fname_in_env, + FILENUM filenum, bool* was_open); + +// reserve a unique filenum +FILENUM toku_cachetable_reserve_filenum(CACHETABLE ct); + +// Effect: Reserve a fraction of the cachetable memory. +// Returns the amount reserved. +// To return the memory to the cachetable, call toku_cachetable_release_reserved_memory +// Requires 0<fraction<1. +uint64_t toku_cachetable_reserve_memory(CACHETABLE, double fraction, uint64_t upper_bound); +void toku_cachetable_release_reserved_memory(CACHETABLE, uint64_t); + +// cachefile operations + +// Does an fsync of a cachefile. +void toku_cachefile_fsync(CACHEFILE cf); + +enum partial_eviction_cost { + PE_CHEAP=0, // running partial eviction is cheap, and can be done on the client thread + PE_EXPENSIVE=1, // running partial eviction is expensive, and should not be done on the client thread +}; + +// cachetable pair clean or dirty WRT external memory +enum cachetable_dirty { + CACHETABLE_CLEAN=0, // the cached object is clean WRT the cachefile + CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile +}; + +// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable. +// When write_me is true, the value should be written to storage. +// When keep_me is false, the value should be freed. +// When for_checkpoint is true, this was a 'pending' write +// Returns: 0 if success, otherwise an error number. +// Can access fd (fd is protected by a readlock during call) +typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void *value, void **disk_data, void *write_extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone); + +// The fetch callback is called when a thread is attempting to get and pin a memory +// object and it is not in the cachetable. +// Returns: 0 if success, otherwise an error number. The address and size of the object +// associated with the key are returned. +// Can access fd (fd is protected by a readlock during call) +typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, PAIR p, int fd, CACHEKEY key, uint32_t fullhash, void **value_data, void **disk_data, PAIR_ATTR *sizep, int *dirtyp, void *read_extraargs); + +// The cachetable calls the partial eviction estimate callback to determine if +// partial eviction is a cheap operation that may be called by on the client thread +// or whether partial eviction is expensive and should be done on a background (writer) thread. +// The callback conveys this information by setting cost to either PE_CHEAP or PE_EXPENSIVE. +// If cost is PE_EXPENSIVE, then the callback also sets bytes_freed_estimate +// to return an estimate of the number of bytes it will free +// so that the cachetable can estimate how much data is being evicted on background threads. +// If cost is PE_CHEAP, then the callback does not set bytes_freed_estimate. +typedef void (*CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK)(void *ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void *write_extraargs); + +// The cachetable calls the partial eviction callback is to possibly try and partially evict pieces +// of the PAIR. The callback determines the strategy for what to evict. The callback may choose to free +// nothing, or may choose to free as much as possible. When the partial eviction callback is finished, +// it must call finalize with the new PAIR_ATTR and the given finalize_extra. After this point, the +// write lock will be released on the PAIR and it is no longer safe to operate on any of the passed arguments. +// This is useful for doing expensive cleanup work outside of the PAIR's write lock (such as destroying objects, etc) +// +// on entry, requires a write lock to be held on the PAIR in the cachetable while this function is called +// on exit, the finalize continuation is called +typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *ftnode_pv, PAIR_ATTR old_attr, void *write_extraargs, + void (*finalize)(PAIR_ATTR new_attr, void *extra), void *finalize_extra); + +// The cachetable calls this function to determine if get_and_pin call requires a partial fetch. If this function returns true, +// then the cachetable will subsequently call CACHETABLE_PARTIAL_FETCH_CALLBACK to perform +// a partial fetch. If this function returns false, then the PAIR's value is returned to the caller as is. +// +// An alternative to having this callback is to always call CACHETABLE_PARTIAL_FETCH_CALLBACK, and let +// CACHETABLE_PARTIAL_FETCH_CALLBACK decide whether to do any partial fetching or not. +// There is no particular reason why this alternative was not chosen. +// Requires: a read lock to be held on the PAIR +typedef bool (*CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK)(void *ftnode_pv, void *read_extraargs); + +// The cachetable calls the partial fetch callback when a thread needs to read or decompress a subset of a PAIR into memory. +// An example is needing to read a basement node into memory. Another example is decompressing an internal node's +// message buffer. The cachetable determines if a partial fetch is necessary by first calling CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK. +// The new PAIR_ATTR of the PAIR is returned in sizep +// Can access fd (fd is protected by a readlock during call) +// Returns: 0 if success, otherwise an error number. +typedef int (*CACHETABLE_PARTIAL_FETCH_CALLBACK)(void *value_data, void* disk_data, void *read_extraargs, int fd, PAIR_ATTR *sizep); + +// The cachetable calls the put callback during a cachetable_put command to provide the opaque PAIR. +// The PAIR can then be used to later unpin the pair. +// Returns: 0 if success, otherwise an error number. +typedef void (*CACHETABLE_PUT_CALLBACK)(CACHEKEY key, void *value_data, PAIR p); + +// TODO(leif) XXX TODO XXX +typedef int (*CACHETABLE_CLEANER_CALLBACK)(void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *write_extraargs); + +typedef void (*CACHETABLE_CLONE_CALLBACK)(void* value_data, void** cloned_value_data, long* clone_size, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs); + +typedef void (*CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK)(void *value_data); + +typedef struct { + CACHETABLE_FLUSH_CALLBACK flush_callback; + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback; + CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; + CACHETABLE_CLEANER_CALLBACK cleaner_callback; + CACHETABLE_CLONE_CALLBACK clone_callback; + CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK checkpoint_complete_callback; + void* write_extraargs; // parameter for flush_callback, pe_est_callback, pe_callback, and cleaner_callback +} CACHETABLE_WRITE_CALLBACK; + +typedef void (*CACHETABLE_GET_KEY_AND_FULLHASH)(CACHEKEY* cachekey, uint32_t* fullhash, void* extra); + +typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, void* extra); + +void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, + void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), + void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), + void (*free_userdata)(CACHEFILE, void*), + void (*checkpoint_userdata)(CACHEFILE, int, void*), + void (*begin_checkpoint_userdata)(LSN, void*), + void (*end_checkpoint_userdata)(CACHEFILE, int, void*), + void (*note_pin_by_checkpoint)(CACHEFILE, void*), + void (*note_unpin_by_checkpoint)(CACHEFILE, void*)); +// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata(). +// Before starting a checkpoint, we call checkpoint_prepare_userdata(). +// When the cachefile needs to be checkpointed, we call checkpoint_userdata(). +// If userdata is already non-NULL, then we simply overwrite it. + +void *toku_cachefile_get_userdata(CACHEFILE); +// Effect: Get the user data. + +CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf); +// Effect: Get the cachetable. + +CACHEFILE toku_pair_get_cachefile(PAIR); +// Effect: Get the cachefile of the pair + +void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair); +// Effect: Swaps the value_data of old_pair and new_pair. +// Requires: both old_pair and new_pair to be pinned with write locks. + +typedef enum { + PL_READ = 0, + PL_WRITE_CHEAP, + PL_WRITE_EXPENSIVE +} pair_lock_type; + +// put something into the cachetable and checkpoint dependent pairs +// if the checkpointing is necessary +void toku_cachetable_put_with_dep_pairs( + CACHEFILE cachefile, + CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash, + void *value, + PAIR_ATTR attr, + CACHETABLE_WRITE_CALLBACK write_callback, + void *get_key_and_fullhash_extra, + uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint + PAIR* dependent_pairs, + enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs + CACHEKEY* key, + uint32_t* fullhash, + CACHETABLE_PUT_CALLBACK put_callback + ); + +// Put a memory object into the cachetable. +// Effects: Lookup the key in the cachetable. If the key is not in the cachetable, +// then insert the pair and pin it. Otherwise return an error. Some of the key +// value pairs may be evicted from the cachetable when the cachetable gets too big. +void toku_cachetable_put(CACHEFILE cf, CACHEKEY key, uint32_t fullhash, + void *value, PAIR_ATTR size, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_PUT_CALLBACK put_callback + ); + +// Get and pin the memory object of a PAIR, and write dependent pairs to disk +// if the dependent pairs are pending a checkpoint. +// Effects: If the memory object is in the cachetable, acquire a PAIR lock on it. +// Otherwise, fetch it from storage by calling the fetch callback. If the fetch +// succeeded, add the memory object to the cachetable with a PAIR lock on it. +// Before returning to the user, if the PAIR object being retrieved, or any of the +// dependent pairs passed in as parameters must be written to disk for checkpoint, +// then the required PAIRs are written to disk for checkpoint. +// KEY PROPERTY OF DEPENDENT PAIRS: They are already locked by the client +// Returns: 0 if the memory object is in memory, otherwise an error number. +int toku_cachetable_get_and_pin_with_dep_pairs ( + CACHEFILE cachefile, + CACHEKEY key, + uint32_t fullhash, + void**value, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + pair_lock_type lock_type, + void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback + uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint + PAIR* dependent_pairs, + enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs + ); + +// Get and pin a memory object. +// Effects: If the memory object is in the cachetable acquire the PAIR lock on it. +// Otherwise, fetch it from storage by calling the fetch callback. If the fetch +// succeeded, add the memory object to the cachetable with a read lock on it. +// Returns: 0 if the memory object is in memory, otherwise an error number. +int toku_cachetable_get_and_pin ( + CACHEFILE cachefile, + CACHEKEY key, + uint32_t fullhash, + void**value, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + bool may_modify_value, + void* read_extraargs // parameter for fetch_callback, pf_req_callback, and pf_callback + ); + +// does partial fetch on a pinned pair +void toku_cachetable_pf_pinned_pair( + void* value, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + void* read_extraargs, + CACHEFILE cf, + CACHEKEY key, + uint32_t fullhash + ); + +struct unlockers { + bool locked; + void (*f)(void* extra); + void *extra; + struct unlockers *next; +}; +typedef struct unlockers *UNLOCKERS; + +// Effect: If the block is in the cachetable, then return it. +// Otherwise call the functions in unlockers, fetch the data (but don't pin it, since we'll just end up pinning it again later), and return TOKUDB_TRY_AGAIN. +int toku_cachetable_get_and_pin_nonblocking ( + CACHEFILE cf, + CACHEKEY key, + uint32_t fullhash, + void**value, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + pair_lock_type lock_type, + void *read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback + UNLOCKERS unlockers + ); + +int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, pair_lock_type, void**); +// Effect: Maybe get and pin a memory object. +// This function is similar to the get_and_pin function except that it +// will not attempt to fetch a memory object that is not in the cachetable or requires any kind of blocking to get it. +// Returns: If the the item is already in memory, then return 0 and store it in the +// void**. If the item is not in memory, then return a nonzero error number. + +int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, pair_lock_type, void**); +// Effect: Like maybe get and pin, but may pin a clean pair. + +int toku_cachetable_get_attr(CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, PAIR_ATTR *); +// Effect: get the attributes for cachekey +// Returns: 0 if success, non-zero if cachekey is not cached +// Notes: this function exists for tests + +int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size); +// Effect: Unpin a memory object +// Modifies: If the memory object is in the cachetable, then OR the dirty flag, +// update the size, and release the read lock on the memory object. +// Returns: 0 if success, otherwise returns an error number. +// Requires: The ct is locked. + +int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size); +// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked. +// Requires: The ct is NOT locked. + +int toku_cachetable_unpin_and_remove (CACHEFILE, PAIR, CACHETABLE_REMOVE_KEY, void*); /* Removing something already present is OK. */ +// Effect: Remove an object from the cachetable. Don't write it back. +// Requires: The object must be pinned exactly once. + +// test-only wrapper that use CACHEKEY and fullhash +int toku_test_cachetable_unpin(CACHEFILE, CACHEKEY, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR size); + +// test-only wrapper that use CACHEKEY and fullhash +int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, CACHEKEY, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR size); + +// test-only wrapper that use CACHEKEY +int toku_test_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY, CACHETABLE_REMOVE_KEY, void*); /* Removing something already present is OK. */ + +int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash, + CACHETABLE_WRITE_CALLBACK write_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, + CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, + void *read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback + bool *doing_prefetch); +// Effect: Prefetch a memory object for a given key into the cachetable +// Precondition: The cachetable mutex is NOT held. +// Postcondition: The cachetable mutex is NOT held. +// Returns: 0 if success +// Implement Note: +// 1) The pair's rwlock is acquired (for write) (there is not a deadlock here because the rwlock is a pthread_cond_wait using the cachetable mutex). +// Case A: Single-threaded. +// A1) Call cachetable_fetch_pair, which +// a) Obtains a readlock on the cachefile's fd (to prevent multipler readers at once) +// b) Unlocks the cachetable +// c) Does the fetch off disk. +// d) Locks the cachetable +// e) Unlocks the fd lock. +// f) Unlocks the pair rwlock. +// Case B: Multithreaded +// a) Enqueue a cachetable_reader into the workqueue. +// b) Unlock the cache table. +// c) The enqueue'd job later locks the cachetable, and calls cachetable_fetch_pair (doing the steps in A1 above). + +int toku_cachetable_assert_all_unpinned (CACHETABLE); + +int toku_cachefile_count_pinned (CACHEFILE, int /*printthem*/ ); + +// Close the cachefile. +// Effects: All of the cached object associated with the cachefile are evicted from +// the cachetable. The flush callback is called for each of these objects. The +// close function does not return until all of the objects are evicted. The cachefile +// object is freed. +// If oplsn_valid is true then use oplsn as the LSN of the close instead of asking the logger. oplsn_valid being true is only allowed during recovery, and requires that you are removing the last reference (otherwise the lsn wouldn't make it in.) +void toku_cachefile_close (CACHEFILE*, bool oplsn_valid, LSN oplsn); + +// Return on success (different from pread and pwrite) +//int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, toku_off_t offset); +//int cachefile_pread (CACHEFILE, void *buf, size_t count, toku_off_t offset); + +// Get the file descriptor associated with the cachefile +// Return the file descriptor +// Grabs a read lock protecting the fd +int toku_cachefile_get_fd (CACHEFILE); + +// Get the iname (within the environment) associated with the cachefile +// Return the filename +char * toku_cachefile_fname_in_env (CACHEFILE cf); + +void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env); + +// Make it so when the cachefile closes, the underlying file is unlinked +void toku_cachefile_unlink_on_close(CACHEFILE cf); + +// is this cachefile marked as unlink on close? +bool toku_cachefile_is_unlink_on_close(CACHEFILE cf); + +void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf); +void toku_cachefile_do_log_recover_on_close(CACHEFILE cf); +bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf); + +// Return the logger associated with the cachefile +struct tokulogger *toku_cachefile_logger(CACHEFILE cf); + +// Return the filenum associated with the cachefile +FILENUM toku_cachefile_filenum(CACHEFILE cf); + +// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two. +uint32_t toku_cachetable_hash(CACHEFILE cf, CACHEKEY key); + +uint32_t toku_cachefile_fullhash_of_header(CACHEFILE cf); + +// debug functions + +// Print the contents of the cachetable. This is mainly used from gdb +void toku_cachetable_print_state (CACHETABLE ct); + +// Get the state of the cachetable. This is used to verify the cachetable +void toku_cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr); + +// Get the state of a cachetable entry by key. This is used to verify the cachetable +int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf, + void **value_ptr, + int *dirty_ptr, + long long *pin_ptr, + long *size_ptr); + +// Verify the whole cachetable that the cachefile is in. Slow. +void toku_cachefile_verify (CACHEFILE cf); + +// Verify the cachetable. Slow. +void toku_cachetable_verify (CACHETABLE t); + +// Not for use in production, but useful for testing. +void toku_cachetable_print_hash_histogram (void) __attribute__((__visibility__("default"))); + +void toku_cachetable_maybe_flush_some(CACHETABLE ct); + +// for stat64 +uint64_t toku_cachefile_size(CACHEFILE cf); + +void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s); + +void toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir); +char * toku_construct_full_name(int count, ...); +char * toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env); + +void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra); +// Effect: Add a job to the cachetable's collection of work to do. Note that function f must call remove_background_job_from_cf() + +void remove_background_job_from_cf (CACHEFILE cf); +// Effect: When a kibbutz job or cleaner thread finishes in a cachefile, +// the cachetable must be notified. + +// test-only function +int toku_cachetable_get_checkpointing_user_data_status(void); + +// test-only function +int toku_cleaner_thread_for_test(CACHETABLE ct); +int toku_cleaner_thread(void *cleaner_v); + +// test function. Exported in the ydb layer and used by tests that want to run DRD +// The default of 1M is too high for drd tests, so this is a mechanism to set a smaller number. +void toku_pair_list_set_lock_size(uint32_t num_locks); + +// Used by ft-ops.cc to figure out if it has the write lock on a pair. +// Pretty hacky and not accurate enough, should be improved at the frwlock +// layer. +__attribute__((const,nonnull)) +bool toku_ctpair_is_write_locked(PAIR pair); diff --git a/storage/tokudb/PerconaFT/ft/cachetable/checkpoint.cc b/storage/tokudb/PerconaFT/ft/cachetable/checkpoint.cc new file mode 100644 index 00000000..aad018f4 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/checkpoint.cc @@ -0,0 +1,333 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +/*********** + * The purpose of this file is to implement the high-level logic for + * taking a checkpoint. + * + * There are three locks used for taking a checkpoint. They are listed below. + * + * NOTE: The reader-writer locks may be held by either multiple clients + * or the checkpoint function. (The checkpoint function has the role + * of the writer, the clients have the reader roles.) + * + * - multi_operation_lock + * This is a new reader-writer lock. + * This lock is held by the checkpoint function only for as long as is required to + * to set all the "pending" bits and to create the checkpoint-in-progress versions + * of the header and translation table (btt). + * The following operations must take the multi_operation_lock: + * - any set of operations that must be atomic with respect to begin checkpoint + * + * - checkpoint_safe_lock + * This is a new reader-writer lock. + * This lock is held for the entire duration of the checkpoint. + * It is used to prevent more than one checkpoint from happening at a time + * (the checkpoint function is non-re-entrant), and to prevent certain operations + * that should not happen during a checkpoint. + * The following operations must take the checkpoint_safe lock: + * - delete a dictionary + * - rename a dictionary + * The application can use this lock to disable checkpointing during other sensitive + * operations, such as making a backup copy of the database. + * + * Once the "pending" bits are set and the snapshots are taken of the header and btt, + * most normal database operations are permitted to resume. + * + * + * + *****/ + +#include <my_global.h> +#include <time.h> + +#include "portability/toku_portability.h" +#include "portability/toku_atomic.h" + +#include "ft/cachetable/cachetable.h" +#include "ft/cachetable/checkpoint.h" +#include "ft/ft.h" +#include "ft/logger/log-internal.h" +#include "ft/logger/recover.h" +#include "util/frwlock.h" +#include "util/status.h" + +toku_instr_key *checkpoint_safe_mutex_key; +toku_instr_key *checkpoint_safe_rwlock_key; +toku_instr_key *multi_operation_lock_key; +toku_instr_key *low_priority_multi_operation_lock_key; + +toku_instr_key *rwlock_cond_key; +toku_instr_key *rwlock_wait_read_key; +toku_instr_key *rwlock_wait_write_key; + +void toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS statp) { + cp_status.init(); + CP_STATUS_VAL(CP_PERIOD) = toku_get_checkpoint_period_unlocked(ct); + *statp = cp_status; +} + +static LSN last_completed_checkpoint_lsn; + +static toku_mutex_t checkpoint_safe_mutex; +static toku::frwlock checkpoint_safe_lock; +static toku_pthread_rwlock_t multi_operation_lock; +static toku_pthread_rwlock_t low_priority_multi_operation_lock; + +static bool initialized = false; // sanity check +static volatile bool locked_mo = false; // true when the multi_operation write lock is held (by checkpoint) +static volatile bool locked_cs = false; // true when the checkpoint_safe write lock is held (by checkpoint) +static volatile uint64_t toku_checkpoint_begin_long_threshold = 1000000; // 1 second +static volatile uint64_t toku_checkpoint_end_long_threshold = 1000000 * 60; // 1 minute + +// Note following static functions are called from checkpoint internal logic only, +// and use the "writer" calls for locking and unlocking. + +static void +multi_operation_lock_init(void) { + pthread_rwlockattr_t attr; + pthread_rwlockattr_init(&attr); +#if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP) + pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); +#else +// TODO: need to figure out how to make writer-preferential rwlocks +// happen on osx +#endif + toku_pthread_rwlock_init( + *multi_operation_lock_key, &multi_operation_lock, &attr); + toku_pthread_rwlock_init(*low_priority_multi_operation_lock_key, + &low_priority_multi_operation_lock, + &attr); + pthread_rwlockattr_destroy(&attr); + locked_mo = false; +} + +static void +multi_operation_lock_destroy(void) { + toku_pthread_rwlock_destroy(&multi_operation_lock); + toku_pthread_rwlock_destroy(&low_priority_multi_operation_lock); +} + +static void +multi_operation_checkpoint_lock(void) { + toku_pthread_rwlock_wrlock(&low_priority_multi_operation_lock); + toku_pthread_rwlock_wrlock(&multi_operation_lock); + locked_mo = true; +} + +static void +multi_operation_checkpoint_unlock(void) { + locked_mo = false; + toku_pthread_rwlock_wrunlock(&multi_operation_lock); + toku_pthread_rwlock_wrunlock(&low_priority_multi_operation_lock); +} + +static void checkpoint_safe_lock_init(void) { + toku_mutex_init( + *checkpoint_safe_mutex_key, &checkpoint_safe_mutex, nullptr); + checkpoint_safe_lock.init(&checkpoint_safe_mutex +#ifdef TOKU_MYSQL_WITH_PFS + , + *checkpoint_safe_rwlock_key +#endif + ); + locked_cs = false; +} + +static void +checkpoint_safe_lock_destroy(void) { + checkpoint_safe_lock.deinit(); + toku_mutex_destroy(&checkpoint_safe_mutex); +} + +static void +checkpoint_safe_checkpoint_lock(void) { + toku_mutex_lock(&checkpoint_safe_mutex); + checkpoint_safe_lock.write_lock(false); + toku_mutex_unlock(&checkpoint_safe_mutex); + locked_cs = true; +} + +static void +checkpoint_safe_checkpoint_unlock(void) { + locked_cs = false; + toku_mutex_lock(&checkpoint_safe_mutex); + checkpoint_safe_lock.write_unlock(); + toku_mutex_unlock(&checkpoint_safe_mutex); +} + +// toku_xxx_client_(un)lock() functions are only called from client code, +// never from checkpoint code, and use the "reader" interface to the lock functions. + +void +toku_multi_operation_client_lock(void) { + if (locked_mo) + (void) toku_sync_fetch_and_add(&CP_STATUS_VAL(CP_CLIENT_WAIT_ON_MO), 1); + toku_pthread_rwlock_rdlock(&multi_operation_lock); +} + +void +toku_multi_operation_client_unlock(void) { + toku_pthread_rwlock_rdunlock(&multi_operation_lock); +} + +void toku_low_priority_multi_operation_client_lock(void) { + toku_pthread_rwlock_rdlock(&low_priority_multi_operation_lock); +} + +void toku_low_priority_multi_operation_client_unlock(void) { + toku_pthread_rwlock_rdunlock(&low_priority_multi_operation_lock); +} + +void +toku_checkpoint_safe_client_lock(void) { + if (locked_cs) + (void) toku_sync_fetch_and_add(&CP_STATUS_VAL(CP_CLIENT_WAIT_ON_CS), 1); + toku_mutex_lock(&checkpoint_safe_mutex); + checkpoint_safe_lock.read_lock(); + toku_mutex_unlock(&checkpoint_safe_mutex); + toku_multi_operation_client_lock(); +} + +void +toku_checkpoint_safe_client_unlock(void) { + toku_mutex_lock(&checkpoint_safe_mutex); + checkpoint_safe_lock.read_unlock(); + toku_mutex_unlock(&checkpoint_safe_mutex); + toku_multi_operation_client_unlock(); +} + +// Initialize the checkpoint mechanism, must be called before any client operations. +void +toku_checkpoint_init(void) { + multi_operation_lock_init(); + checkpoint_safe_lock_init(); + initialized = true; +} + +void +toku_checkpoint_destroy(void) { + multi_operation_lock_destroy(); + checkpoint_safe_lock_destroy(); + initialized = false; +} + +#define SET_CHECKPOINT_FOOTPRINT(x) CP_STATUS_VAL(CP_FOOTPRINT) = footprint_offset + x + + +// Take a checkpoint of all currently open dictionaries +int +toku_checkpoint(CHECKPOINTER cp, TOKULOGGER logger, + void (*callback_f)(void*), void * extra, + void (*callback2_f)(void*), void * extra2, + checkpoint_caller_t caller_id) { + int footprint_offset = (int) caller_id * 1000; + + assert(initialized); + + (void) toku_sync_fetch_and_add(&CP_STATUS_VAL(CP_WAITERS_NOW), 1); + checkpoint_safe_checkpoint_lock(); + (void) toku_sync_fetch_and_sub(&CP_STATUS_VAL(CP_WAITERS_NOW), 1); + + if (CP_STATUS_VAL(CP_WAITERS_NOW) > CP_STATUS_VAL(CP_WAITERS_MAX)) + CP_STATUS_VAL(CP_WAITERS_MAX) = CP_STATUS_VAL(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock + + SET_CHECKPOINT_FOOTPRINT(10); + multi_operation_checkpoint_lock(); + SET_CHECKPOINT_FOOTPRINT(20); + toku_ft_open_close_lock(); + + SET_CHECKPOINT_FOOTPRINT(30); + CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_BEGIN) = time(NULL); + uint64_t t_checkpoint_begin_start = toku_current_time_microsec(); + toku_cachetable_begin_checkpoint(cp, logger); + uint64_t t_checkpoint_begin_end = toku_current_time_microsec(); + + toku_ft_open_close_unlock(); + multi_operation_checkpoint_unlock(); + + SET_CHECKPOINT_FOOTPRINT(40); + if (callback_f) { + callback_f(extra); // callback is called with checkpoint_safe_lock still held + } + + uint64_t t_checkpoint_end_start = toku_current_time_microsec(); + toku_cachetable_end_checkpoint(cp, logger, callback2_f, extra2); + uint64_t t_checkpoint_end_end = toku_current_time_microsec(); + + SET_CHECKPOINT_FOOTPRINT(50); + if (logger) { + last_completed_checkpoint_lsn = logger->last_completed_checkpoint_lsn; + toku_logger_maybe_trim_log(logger, last_completed_checkpoint_lsn); + CP_STATUS_VAL(CP_LAST_LSN) = last_completed_checkpoint_lsn.lsn; + } + + SET_CHECKPOINT_FOOTPRINT(60); + CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_END) = time(NULL); + CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_BEGIN_COMPLETE) = CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_BEGIN); + CP_STATUS_VAL(CP_CHECKPOINT_COUNT)++; + uint64_t duration = t_checkpoint_begin_end - t_checkpoint_begin_start; + CP_STATUS_VAL(CP_BEGIN_TIME) += duration; + if (duration >= toku_checkpoint_begin_long_threshold) { + CP_STATUS_VAL(CP_LONG_BEGIN_TIME) += duration; + CP_STATUS_VAL(CP_LONG_BEGIN_COUNT) += 1; + } + duration = t_checkpoint_end_end - t_checkpoint_end_start; + CP_STATUS_VAL(CP_END_TIME) += duration; + if (duration >= toku_checkpoint_end_long_threshold) { + CP_STATUS_VAL(CP_LONG_END_TIME) += duration; + CP_STATUS_VAL(CP_LONG_END_COUNT) += 1; + } + CP_STATUS_VAL(CP_TIME_CHECKPOINT_DURATION) += (uint64_t) ((time_t) CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_END)) - ((time_t) CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_BEGIN)); + CP_STATUS_VAL(CP_TIME_CHECKPOINT_DURATION_LAST) = (uint64_t) ((time_t) CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_END)) - ((time_t) CP_STATUS_VAL(CP_TIME_LAST_CHECKPOINT_BEGIN)); + CP_STATUS_VAL(CP_FOOTPRINT) = 0; + + checkpoint_safe_checkpoint_unlock(); + return 0; +} + +#include <toku_race_tools.h> +void __attribute__((__constructor__)) toku_checkpoint_helgrind_ignore(void); +void +toku_checkpoint_helgrind_ignore(void) { + TOKU_VALGRIND_HG_DISABLE_CHECKING(&cp_status, sizeof cp_status); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&locked_mo, sizeof locked_mo); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&locked_cs, sizeof locked_cs); +} + +#undef SET_CHECKPOINT_FOOTPRINT diff --git a/storage/tokudb/PerconaFT/ft/cachetable/checkpoint.h b/storage/tokudb/PerconaFT/ft/cachetable/checkpoint.h new file mode 100644 index 00000000..1aff1738 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/cachetable/checkpoint.h @@ -0,0 +1,120 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#pragma once + +#include <stdint.h> + +#include "ft/cachetable/cachetable.h" + +//Effect: Change [end checkpoint (n) - begin checkpoint (n+1)] delay to +// new_period seconds. 0 means disable. +void toku_set_checkpoint_period(CACHETABLE ct, uint32_t new_period); + +uint32_t toku_get_checkpoint_period_unlocked(CACHETABLE ct); + + +/****** + * + * NOTE: checkpoint_safe_lock is highest level lock + * multi_operation_lock is next level lock + * ydb_big_lock is next level lock + * + * Locks must always be taken in this sequence (highest level first). + * + */ + + +/****** + * Client code must hold the checkpoint_safe lock during the following operations: + * - delete a dictionary via DB->remove + * - delete a dictionary via DB_TXN->abort(txn) (where txn created a dictionary) + * - rename a dictionary //TODO: Handlerton rename needs to take this + * //TODO: Handlerton rename needs to be recoded for transaction recovery + *****/ + +void toku_checkpoint_safe_client_lock(void); + +void toku_checkpoint_safe_client_unlock(void); + + + +/****** + * These functions are called from the ydb level. + * Client code must hold the multi_operation lock during the following operations: + * - insertion into multiple indexes + * - replace into (simultaneous delete/insert on a single key) + *****/ + +void toku_multi_operation_client_lock(void); +void toku_low_priority_multi_operation_client_lock(void); + +void toku_multi_operation_client_unlock(void); +void toku_low_priority_multi_operation_client_unlock(void); + + +// Initialize the checkpoint mechanism, must be called before any client operations. +// Must pass in function pointers to take/release ydb lock. +void toku_checkpoint_init(void); + +void toku_checkpoint_destroy(void); + +typedef enum {SCHEDULED_CHECKPOINT = 0, // "normal" checkpoint taken on checkpoint thread + CLIENT_CHECKPOINT = 1, // induced by client, such as FLUSH LOGS or SAVEPOINT + INDEXER_CHECKPOINT = 2, + STARTUP_CHECKPOINT = 3, + UPGRADE_CHECKPOINT = 4, + RECOVERY_CHECKPOINT = 5, + SHUTDOWN_CHECKPOINT = 6} checkpoint_caller_t; + +// Take a checkpoint of all currently open dictionaries +// Callbacks are called during checkpoint procedure while checkpoint_safe lock is still held. +// Callbacks are primarily intended for use in testing. +// caller_id identifies why the checkpoint is being taken. +int toku_checkpoint(CHECKPOINTER cp, struct tokulogger *logger, + void (*callback_f)(void *extra), void *extra, + void (*callback2_f)(void *extra2), void *extra2, + checkpoint_caller_t caller_id); + +/****** + * These functions are called from the ydb level. + * They return status information and have no side effects. + * Some status information may be incorrect because no locks are taken to collect status. + * (If checkpoint is in progress, it may overwrite status info while it is being read.) + *****/ +void toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS stat); |