diff options
Diffstat (limited to 'storage/tokudb/ha_tokudb_admin.cc')
-rw-r--r-- | storage/tokudb/ha_tokudb_admin.cc | 1131 |
1 files changed, 1131 insertions, 0 deletions
diff --git a/storage/tokudb/ha_tokudb_admin.cc b/storage/tokudb/ha_tokudb_admin.cc new file mode 100644 index 00000000..514aabbf --- /dev/null +++ b/storage/tokudb/ha_tokudb_admin.cc @@ -0,0 +1,1131 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of TokuDB + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + TokuDB is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + TokuDB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with TokuDB. If not, see <http://www.gnu.org/licenses/>. + +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "tokudb_sysvars.h" +#include "toku_time.h" + +namespace tokudb { +namespace analyze { + +class recount_rows_t : public tokudb::background::job_manager_t::job_t { +public: + void* operator new(size_t sz); + void operator delete(void* p); + + recount_rows_t( + bool user_schedued, + THD* thd, + TOKUDB_SHARE* share, + DB_TXN* txn); + + virtual ~recount_rows_t(); + + virtual const char* key(); + virtual const char* database(); + virtual const char* table(); + virtual const char* type(); + virtual const char* parameters(); + virtual const char* status(); + +protected: + virtual void on_run(); + + virtual void on_destroy(); + +private: + // to be provided by the initiator of recount rows + THD* _thd; + TOKUDB_SHARE* _share; + DB_TXN* _txn; + ulonglong _throttle; + + // for recount rows status reporting + char _parameters[256]; + char _status[1024]; + int _result; + ulonglong _recount_start; // in microseconds + ulonglong _total_elapsed_time; // in microseconds + + bool _local_txn; + ulonglong _rows; + ulonglong _deleted_rows; + ulonglong _ticks; + + static int analyze_recount_rows_progress( + uint64_t count, + uint64_t deleted, + void* extra); + int analyze_recount_rows_progress(uint64_t count, uint64_t deleted); +}; + +void* recount_rows_t::operator new(size_t sz) { + return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE)); +} +void recount_rows_t::operator delete(void* p) { + tokudb::memory::free(p); +} +recount_rows_t::recount_rows_t( + bool user_scheduled, + THD* thd, + TOKUDB_SHARE* share, + DB_TXN* txn) : + tokudb::background::job_manager_t::job_t(user_scheduled), + _share(share), + _result(HA_ADMIN_OK), + _recount_start(0), + _total_elapsed_time(0), + _local_txn(false), + _rows(0), + _deleted_rows(0), + _ticks(0) { + + assert_debug(thd != NULL); + assert_debug(share != NULL); + + if (tokudb::sysvars::analyze_in_background(thd)) { + _thd = NULL; + _txn = NULL; + } else { + _thd = thd; + _txn = txn; + } + + _throttle = tokudb::sysvars::analyze_throttle(thd); + + snprintf(_parameters, + sizeof(_parameters), + "TOKUDB_ANALYZE_THROTTLE=%llu;", + _throttle); + _status[0] = '\0'; +} +recount_rows_t::~recount_rows_t() { +} +void recount_rows_t::on_run() { + const char* orig_proc_info = NULL; + if (_thd) + orig_proc_info = tokudb_thd_get_proc_info(_thd); + _recount_start = tokudb::time::microsec(); + _total_elapsed_time = 0; + + if (_txn == NULL) { + _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED); + + if (_result != 0) { + _txn = NULL; + _result = HA_ADMIN_FAILED; + goto error; + } + _local_txn = true; + } else { + _local_txn = false; + } + + _result = + _share->file->recount_rows( + _share->file, + analyze_recount_rows_progress, + this); + + if (_result != 0) { + if (_local_txn) { + _txn->abort(_txn); + _txn = NULL; + } + _result = HA_ADMIN_FAILED; + goto error; + } + + DB_BTREE_STAT64 dict_stats; + _result = _share->file->stat64(_share->file, _txn, &dict_stats); + if (_result == 0) { + _share->set_row_count(dict_stats.bt_ndata, false); + } + if (_result != 0) + _result = HA_ADMIN_FAILED; + + if (_local_txn) { + if (_result == HA_ADMIN_OK) { + _txn->commit(_txn, 0); + } else { + _txn->abort(_txn); + } + _txn = NULL; + } + + sql_print_information( + "tokudb analyze recount rows %d counted %lld", + _result, + _share->row_count()); +error: + if(_thd) + tokudb_thd_set_proc_info(_thd, orig_proc_info); + return; +} +void recount_rows_t::on_destroy() { + _share->release(); +} +const char* recount_rows_t::key() { + return _share->full_table_name(); +} +const char* recount_rows_t::database() { + return _share->database_name(); +} +const char* recount_rows_t::table() { + return _share->table_name(); +} +const char* recount_rows_t::type() { + static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS"; + return type; +} +const char* recount_rows_t::parameters() { + return _parameters; +} +const char* recount_rows_t::status() { + return _status; +} +int recount_rows_t::analyze_recount_rows_progress( + uint64_t count, + uint64_t deleted, + void* extra) { + + recount_rows_t* context = (recount_rows_t*)extra; + return context->analyze_recount_rows_progress(count, deleted); +} +int recount_rows_t::analyze_recount_rows_progress( + uint64_t count, + uint64_t deleted) { + + _rows = count; + _deleted_rows += deleted; + deleted > 0 ? _ticks += deleted : _ticks++; + + if (_ticks > 1000) { + _ticks = 0; + uint64_t now = tokudb::time::microsec(); + _total_elapsed_time = now - _recount_start; + if ((_thd && thd_kill_level(_thd)) || cancelled()) { + // client killed + return ER_ABORTING_CONNECTION; + } + + // rebuild status + // There is a slight race condition here, + // _status is used here for tokudb_thd_set_proc_info and it is also used + // for the status column in i_s.background_job_status. + // If someone happens to be querying/building the i_s table + // at the exact same time that the status is being rebuilt here, + // the i_s table could get some garbage status. + // This solution is a little heavy handed but it works, it prevents us + // from changing the status while someone might be immediately observing + // us and it prevents someone from observing us while we change the + // status + tokudb::background::_job_manager->lock(); + snprintf(_status, + sizeof(_status), + "recount_rows %s.%s counted %llu rows and %llu deleted " + "in %llu seconds.", + _share->database_name(), + _share->table_name(), + _rows, + _deleted_rows, + _total_elapsed_time / tokudb::time::MICROSECONDS); + tokudb::background::_job_manager->unlock(); + + // report + if (_thd) + tokudb_thd_set_proc_info(_thd, _status); + + // throttle + // given the throttle value, lets calculate the maximum number of rows + // we should have seen so far in a .1 sec resolution + if (_throttle > 0) { + uint64_t estimated_rows = _total_elapsed_time / 100000; + estimated_rows = estimated_rows * (_throttle / 10); + if (_rows + _deleted_rows > estimated_rows) { + // sleep for 1/10 of a second + tokudb::time::sleep_microsec(100000); + } + } + } + return 0; +} + +class standard_t : public tokudb::background::job_manager_t::job_t { +public: + void* operator new(size_t sz); + void operator delete(void* p); + + standard_t(bool user_scheduled, THD* thd, TOKUDB_SHARE* share, DB_TXN* txn); + + virtual ~standard_t(); + + virtual const char* key(void); + virtual const char* database(); + virtual const char* table(); + virtual const char* type(); + virtual const char* parameters(); + virtual const char* status(); + +protected: + virtual void on_run(); + + virtual void on_destroy(); + +private: + // to be provided by initiator of analyze + THD* _thd; + TOKUDB_SHARE* _share; + DB_TXN* _txn; + ulonglong _throttle; // in microseconds + ulonglong _time_limit; // in microseconds + double _delete_fraction; + + // for analyze status reporting, may also use other state + char _parameters[256]; + char _status[1024]; + int _result; + ulonglong _analyze_start; // in microseconds + ulonglong _total_elapsed_time; // in microseconds + + // for analyze internal use, pretty much these are per-key/index + ulonglong _current_key; + bool _local_txn; + ulonglong _half_time; + ulonglong _half_rows; + ulonglong _rows; + ulonglong _deleted_rows; + ulonglong _ticks; + ulonglong _analyze_key_start; // in microseconds + ulonglong _key_elapsed_time; // in microseconds + uint _scan_direction; + + static bool analyze_standard_cursor_callback( + void* extra, + uint64_t deleted_rows); + bool analyze_standard_cursor_callback(uint64_t deleted_rows); + + int analyze_key_progress(); + int analyze_key(uint64_t* rec_per_key_part); +}; + +void* standard_t::operator new(size_t sz) { + return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE)); +} +void standard_t::operator delete(void* p) { + tokudb::memory::free(p); +} +standard_t::standard_t( + bool user_scheduled, + THD* thd, + TOKUDB_SHARE* share, + DB_TXN* txn) : + tokudb::background::job_manager_t::job_t(user_scheduled), + _share(share), + _result(HA_ADMIN_OK), + _analyze_start(0), + _total_elapsed_time(0), + _current_key(0), + _local_txn(false), + _half_time(0), + _half_rows(0), + _rows(0), + _deleted_rows(0), + _ticks(0), + _analyze_key_start(0), + _key_elapsed_time(0), + _scan_direction(0) { + + assert_debug(thd != NULL); + assert_debug(share != NULL); + + if (tokudb::sysvars::analyze_in_background(thd)) { + _thd = NULL; + _txn = NULL; + } else { + _thd = thd; + _txn = txn; + } + _throttle = tokudb::sysvars::analyze_throttle(thd); + _time_limit = + tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS; + _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd); + + snprintf(_parameters, + sizeof(_parameters), + "TOKUDB_ANALYZE_DELETE_FRACTION=%f; " + "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;", + _delete_fraction, + _time_limit / tokudb::time::MICROSECONDS, + _throttle); + + _status[0] = '\0'; +} +standard_t::~standard_t() { +} +void standard_t::on_run() { + DB_BTREE_STAT64 stat64; + uint64_t rec_per_key_part[_share->_max_key_parts]; + uint64_t total_key_parts = 0; + const char* orig_proc_info = NULL; + if (_thd) + orig_proc_info = tokudb_thd_get_proc_info(_thd); + + _analyze_start = tokudb::time::microsec(); + _half_time = _time_limit > 0 ? _time_limit/2 : 0; + + if (_txn == NULL) { + _result = db_env->txn_begin(db_env, NULL, &_txn, DB_READ_UNCOMMITTED); + + if (_result != 0) { + _txn = NULL; + _result = HA_ADMIN_FAILED; + goto error; + } + _local_txn = true; + } else { + _local_txn = false; + } + + assert_always(_share->key_file[0] != NULL); + _result = _share->key_file[0]->stat64(_share->key_file[0], _txn, &stat64); + if (_result != 0) { + _result = HA_ADMIN_FAILED; + goto cleanup; + } + _half_rows = stat64.bt_ndata / 2; + + for (ulonglong current_key = 0; + _result == HA_ADMIN_OK && current_key < _share->_keys; + current_key++) { + + _current_key = current_key; + _rows = _deleted_rows = _ticks = 0; + _result = analyze_key(&rec_per_key_part[total_key_parts]); + + if ((_result != 0 && _result != ETIME) || + (_result != 0 && _rows == 0 && _deleted_rows > 0)) { + _result = HA_ADMIN_FAILED; + } + if (_thd && (_result == HA_ADMIN_FAILED || + static_cast<double>(_deleted_rows) > + _delete_fraction * (_rows + _deleted_rows))) { + + char name[256]; int namelen; + namelen = + snprintf( + name, + sizeof(name), + "%s.%s.%s", + _share->database_name(), + _share->table_name(), + _share->_key_descriptors[_current_key]._name); + _thd->protocol->prepare_for_resend(); + _thd->protocol->store(name, namelen, system_charset_info); + _thd->protocol->store("analyze", 7, system_charset_info); + _thd->protocol->store("info", 4, system_charset_info); + char rowmsg[256]; + int rowmsglen; + rowmsglen = + snprintf( + rowmsg, + sizeof(rowmsg), + "rows processed %llu rows deleted %llu", + _rows, + _deleted_rows); + _thd->protocol->store(rowmsg, rowmsglen, system_charset_info); + _thd->protocol->write(); + + sql_print_information( + "tokudb analyze on %.*s %.*s", + namelen, + name, + rowmsglen, + rowmsg); + } + + total_key_parts += _share->_key_descriptors[_current_key]._parts; + } + if (_result == HA_ADMIN_OK) { + int error = + tokudb::set_card_in_status( + _share->status_block, + _txn, + total_key_parts, + rec_per_key_part); + if (error) + _result = HA_ADMIN_FAILED; + + _share->lock(); + _share->update_cardinality_counts(total_key_parts, rec_per_key_part); + _share->allow_auto_analysis(true); + _share->unlock(); + } + +cleanup: + if (_local_txn) { + if (_result == HA_ADMIN_OK) { + _txn->commit(_txn, 0); + } else { + _txn->abort(_txn); + } + _txn = NULL; + } + +error: + if (_thd) + tokudb_thd_set_proc_info(_thd, orig_proc_info); + return; +} +void standard_t::on_destroy() { + _share->lock(); + _share->allow_auto_analysis(false); + _share->unlock(); + _share->release(); +} +const char* standard_t::key() { + return _share->full_table_name(); +} +const char* standard_t::database() { + return _share->database_name(); +} +const char* standard_t::table() { + return _share->table_name(); +} +const char* standard_t::type() { + static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD"; + return type; +} +const char* standard_t::parameters() { + return _parameters; +} +const char* standard_t::status() { + return _status; +} +bool standard_t::analyze_standard_cursor_callback( + void* extra, + uint64_t deleted_rows) { + standard_t* context = (standard_t*)extra; + return context->analyze_standard_cursor_callback(deleted_rows); +} +bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) { + _deleted_rows += deleted_rows; + _ticks += deleted_rows; + return analyze_key_progress() != 0; +} +int standard_t::analyze_key_progress(void) { + if (_ticks > 1000) { + _ticks = 0; + uint64_t now = tokudb::time::microsec(); + _total_elapsed_time = now - _analyze_start; + _key_elapsed_time = now - _analyze_key_start; + if ((_thd && thd_kill_level(_thd)) || cancelled()) { + // client killed + return ER_ABORTING_CONNECTION; + } else if (_time_limit > 0 && + static_cast<uint64_t>(_key_elapsed_time) > _time_limit) { + // time limit reached + return ETIME; + } + + // rebuild status + // There is a slight race condition here, + // _status is used here for tokudb_thd_set_proc_info and it is also used + // for the status column in i_s.background_job_status. + // If someone happens to be querying/building the i_s table + // at the exact same time that the status is being rebuilt here, + // the i_s table could get some garbage status. + // This solution is a little heavy handed but it works, it prevents us + // from changing the status while someone might be immediately observing + // us and it prevents someone from observing us while we change the + // status. + static const char* scan_direction_str[] = {"not scanning", + "scanning forward", + "scanning backward", + "scan unknown"}; + + const char* scan_direction = NULL; + switch (_scan_direction) { + case 0: + scan_direction = scan_direction_str[0]; + break; + case DB_NEXT: + scan_direction = scan_direction_str[1]; + break; + case DB_PREV: + scan_direction = scan_direction_str[2]; + break; + default: + scan_direction = scan_direction_str[3]; + break; + } + + float progress_rows = 0.0; + if (_share->row_count() > 0) + progress_rows = static_cast<float>(_rows) / + static_cast<float>(_share->row_count()); + float progress_time = 0.0; + if (_time_limit > 0) + progress_time = static_cast<float>(_key_elapsed_time) / + static_cast<float>(_time_limit); + tokudb::background::_job_manager->lock(); + snprintf( + _status, + sizeof(_status), + "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% " + "time, %s", + _share->database_name(), + _share->table_name(), + _share->_key_descriptors[_current_key]._name, + _current_key, + _share->_keys, + progress_rows * 100.0, + progress_time * 100.0, + scan_direction); + tokudb::background::_job_manager->unlock(); + + // report + if (_thd) + tokudb_thd_set_proc_info(_thd, _status); + + // throttle + // given the throttle value, lets calculate the maximum number of rows + // we should have seen so far in a .1 sec resolution + if (_throttle > 0) { + uint64_t estimated_rows = _key_elapsed_time / 100000; + estimated_rows = estimated_rows * (_throttle / 10); + if (_rows + _deleted_rows > estimated_rows) { + // sleep for 1/10 of a second + tokudb::time::sleep_microsec(100000); + } + } + } + return 0; +} +int standard_t::analyze_key(uint64_t* rec_per_key_part) { + int error = 0; + DB* db = _share->key_file[_current_key]; + assert_always(db != NULL); + uint64_t num_key_parts = _share->_key_descriptors[_current_key]._parts; + uint64_t unique_rows[num_key_parts]; + bool is_unique = _share->_key_descriptors[_current_key]._is_unique; + DBC* cursor = NULL; + int close_error = 0; + DBT key, prev_key; + bool copy_key = false; + + _analyze_key_start = tokudb::time::microsec(); + _key_elapsed_time = 0; + _scan_direction = DB_NEXT; + + if (is_unique && num_key_parts == 1) { + // don't compute for unique keys with a single part. we already know + // the answer. + _rows = unique_rows[0] = 1; + goto done; + } + + for (uint64_t i = 0; i < num_key_parts; i++) + unique_rows[i] = 1; + + // stop looking when the entire dictionary was analyzed, or a + // cap on execution time was reached, or the analyze was killed. + while (1) { + if (cursor == NULL) { + error = db->cursor(db, _txn, &cursor, 0); + if (error != 0) + goto done; + + cursor->c_set_check_interrupt_callback( + cursor, + analyze_standard_cursor_callback, + this); + + memset(&key, 0, sizeof(DBT)); + memset(&prev_key, 0, sizeof(DBT)); + copy_key = true; + } + + error = cursor->c_get(cursor, &key, 0, _scan_direction); + if (error != 0) { + if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED) + error = 0; // not an error + break; + } else if (cancelled()) { + error = ER_ABORTING_CONNECTION; + break; + } + + _rows++; + _ticks++; + + // if copy_key is false at this pont, we have some value sitting in + // prev_key that we can compare to + // if the comparison reveals a unique key, we must set copy_key to true + // so the code following can copy he current key into prev_key for the + // next iteration + if (copy_key == false) { + // compare this key with the previous key. ignore + // appended PK for SK's. + // TODO if a prefix is different, then all larger keys + // that include the prefix are also different. + // TODO if we are comparing the entire primary key or + // the entire unique secondary key, then the cardinality + // must be 1, so we can avoid computing it. + for (uint64_t i = 0; i < num_key_parts; i++) { + int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1); + if (cmp != 0) { + unique_rows[i]++; + copy_key = true; + } + } + } + + // prev_key = key or prev_key is NULL + if (copy_key) { + prev_key.data = + tokudb::memory::realloc( + prev_key.data, + key.size, + MYF(MY_WME|MY_ZEROFILL|MY_FAE)); + assert_always(prev_key.data); + prev_key.size = key.size; + memcpy(prev_key.data, key.data, prev_key.size); + copy_key = false; + } + + error = analyze_key_progress(); + if (error == ETIME) { + error = 0; + break; + } else if (error) { + break; + } + + // if we have a time limit, are scanning forward and have exceed the + // _half_time and not passed the _half_rows number of the rows in the + // index: clean up the keys, close the cursor and reverse direction. + if (TOKUDB_UNLIKELY(_half_time > 0 && + _scan_direction == DB_NEXT && + _key_elapsed_time >= _half_time && + _rows < _half_rows)) { + + tokudb::memory::free(prev_key.data); prev_key.data = NULL; + close_error = cursor->c_close(cursor); + assert_always(close_error == 0); + cursor = NULL; + _scan_direction = DB_PREV; + } + } + // cleanup + if (prev_key.data) tokudb::memory::free(prev_key.data); + if (cursor) close_error = cursor->c_close(cursor); + assert_always(close_error == 0); + +done: + // in case we timed out (bunch of deleted records) without hitting a + // single row + if (_rows == 0) + _rows = 1; + + // return cardinality + for (uint64_t i = 0; i < num_key_parts; i++) { + rec_per_key_part[i] = _rows / unique_rows[i]; + } + return error; +} + +} // namespace analyze +} // namespace tokudb + + +int ha_tokudb::analyze(THD *thd, TOKUDB_UNUSED(HA_CHECK_OPT *check_opt)) { + TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name()); + int result = HA_ADMIN_OK; + tokudb::sysvars::analyze_mode_t mode = tokudb::sysvars::analyze_mode(thd); + + switch (mode) { + case tokudb::sysvars::TOKUDB_ANALYZE_RECOUNT_ROWS: + result = share->analyze_recount_rows(thd, transaction); + break; + case tokudb::sysvars::TOKUDB_ANALYZE_STANDARD: + share->lock(); + result = share->analyze_standard(thd, transaction); + share->unlock(); + break; + case tokudb::sysvars::TOKUDB_ANALYZE_CANCEL: + share->cancel_background_jobs(); + break; + default: + break; // no-op + } + TOKUDB_HANDLER_DBUG_RETURN(result); +} + +int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) { + TOKUDB_HANDLER_DBUG_ENTER("%s", table_name()); + + assert_always(thd != NULL); + + int result = HA_ADMIN_OK; + + tokudb::analyze::recount_rows_t* job + = new tokudb::analyze::recount_rows_t(true, thd, this, txn); + assert_always(job != NULL); + + // job->destroy will drop the ref + addref(); + unlock(); + + bool ret = tokudb::background::_job_manager-> + run_job(job, tokudb::sysvars::analyze_in_background(thd)); + + if (!ret) { + job->destroy(); + delete job; + result = HA_ADMIN_FAILED; + } + + TOKUDB_HANDLER_DBUG_RETURN(result); +} + +// on entry, if txn is !NULL, it is a user session invoking ANALYZE directly +// and no lock will be held on 'this', else if txn is NULL it is an auto and +// 'this' will be locked. +int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) { + TOKUDB_HANDLER_DBUG_ENTER("%s", table_name()); + + assert_always(thd != NULL); + assert_debug(_mutex.is_owned_by_me() == true); + + int result = HA_ADMIN_OK; + + // stub out analyze if optimize is remapped to alter recreate + analyze + // when not auto analyze or if this is an alter + if ((txn && + thd_sql_command(thd) != SQLCOM_ANALYZE && + thd_sql_command(thd) != SQLCOM_ALTER_TABLE) || + thd_sql_command(thd) == SQLCOM_ALTER_TABLE) { + TOKUDB_HANDLER_DBUG_RETURN(result); + } + + tokudb::analyze::standard_t* job + = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd, + this, txn); + assert_always(job != NULL); + + // akin to calling addref, but we know, right here, right now, everything + // in the share is set up, files open, etc... + // job->destroy will drop the ref + _use_count++; + + // don't want any autos kicking off while we are analyzing + disallow_auto_analysis(); + + unlock(); + + bool ret = + tokudb::background::_job_manager->run_job( + job, + tokudb::sysvars::analyze_in_background(thd)); + + if (!ret) { + job->destroy(); + delete job; + result = HA_ADMIN_FAILED; + } + + lock(); + + TOKUDB_HANDLER_DBUG_RETURN(result); +} + + +typedef struct hot_optimize_context { + THD* thd; + char* write_status_msg; + ha_tokudb* ha; + uint progress_stage; + uint current_table; + uint num_tables; + float progress_limit; + uint64_t progress_last_time; + uint64_t throttle; +} *HOT_OPTIMIZE_CONTEXT; + +static int hot_optimize_progress_fun(void *extra, float progress) { + HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra; + if (thd_kill_level(context->thd)) { + sprintf( + context->write_status_msg, + "The process has been killed, aborting hot optimize."); + return ER_ABORTING_CONNECTION; + } + float percentage = progress * 100; + sprintf( + context->write_status_msg, + "Optimization of index %u of %u about %.lf%% done", + context->current_table + 1, + context->num_tables, + percentage); + thd_proc_info(context->thd, context->write_status_msg); +#ifdef HA_TOKUDB_HAS_THD_PROGRESS + if (context->progress_stage < context->current_table) { + // the progress stage is behind the current table, so move up + // to the next stage and set the progress stage to current. + thd_progress_next_stage(context->thd); + context->progress_stage = context->current_table; + } + // the percentage we report here is for the current stage/db + thd_progress_report(context->thd, (unsigned long long) percentage, 100); +#endif + + // throttle the optimize table + if (context->throttle) { + uint64_t time_now = toku_current_time_microsec(); + uint64_t dt = time_now - context->progress_last_time; + uint64_t throttle_time = 1000000ULL / context->throttle; + if (throttle_time > dt) { + usleep(throttle_time - dt); + } + context->progress_last_time = toku_current_time_microsec(); + } + + // return 1 if progress has reach the progress limit + return progress >= context->progress_limit; +} + +// flatten all DB's in this table, to do so, peform hot optimize on each db +int ha_tokudb::do_optimize(THD* thd) { + TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name()); + int error = 0; + const char* orig_proc_info = tokudb_thd_get_proc_info(thd); + uint curr_num_DBs = table->s->keys + tokudb_test(hidden_primary_key); + +#ifdef HA_TOKUDB_HAS_THD_PROGRESS + // each DB is its own stage. as HOT goes through each db, we'll + // move on to the next stage. + thd_progress_init(thd, curr_num_DBs); +#endif + + // for each DB, run optimize and hot_optimize + for (uint i = 0; i < curr_num_DBs; i++) { + // only optimize the index if it matches the optimize_index_name + // session variable + const char* optimize_index_name = + tokudb::sysvars::optimize_index_name(thd); + if (optimize_index_name) { + const char* this_index_name = + i >= table_share->keys ? + "primary" : + table_share->key_info[i].name.str; + if (strcasecmp(optimize_index_name, this_index_name) != 0) { + continue; + } + } + + DB* db = share->key_file[i]; + assert_always(db != NULL); + error = db->optimize(db); + if (error) { + goto cleanup; + } + + struct hot_optimize_context hc; + memset(&hc, 0, sizeof hc); + hc.thd = thd; + hc.write_status_msg = this->write_status_msg; + hc.ha = this; + hc.current_table = i; + hc.num_tables = curr_num_DBs; + hc.progress_limit = tokudb::sysvars::optimize_index_fraction(thd); + hc.progress_last_time = toku_current_time_microsec(); + hc.throttle = tokudb::sysvars::optimize_throttle(thd); + uint64_t loops_run; + error = + db->hot_optimize( + db, + NULL, + NULL, + hot_optimize_progress_fun, + &hc, + &loops_run); + if (error) { + goto cleanup; + } + } + error = 0; + +cleanup: +#ifdef HA_TOKUDB_HAS_THD_PROGRESS + thd_progress_end(thd); +#endif + thd_proc_info(thd, orig_proc_info); + TOKUDB_HANDLER_DBUG_RETURN(error); +} + +int ha_tokudb::optimize(TOKUDB_UNUSED(THD* thd), + TOKUDB_UNUSED(HA_CHECK_OPT* check_opt)) { + TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name()); + int error; +#if TOKU_OPTIMIZE_WITH_RECREATE + error = HA_ADMIN_TRY_ALTER; +#else + error = do_optimize(thd); +#endif + TOKUDB_HANDLER_DBUG_RETURN(error); +} + +struct check_context { + THD* thd; +}; + +static int ha_tokudb_check_progress(void* extra, + TOKUDB_UNUSED(float progress)) { + struct check_context* context = (struct check_context*)extra; + int result = 0; + if (thd_kill_level(context->thd)) + result = ER_ABORTING_CONNECTION; + return result; +} + +static void ha_tokudb_check_info(THD* thd, TABLE* table, const char* msg) { + if (thd->vio_ok()) { + char tablename[ + table->s->db.length + 1 + + table->s->table_name.length + 1]; + snprintf( + tablename, + sizeof(tablename), + "%.*s.%.*s", + (int)table->s->db.length, + table->s->db.str, + (int)table->s->table_name.length, + table->s->table_name.str); + thd->protocol->prepare_for_resend(); + thd->protocol->store(tablename, strlen(tablename), system_charset_info); + thd->protocol->store("check", 5, system_charset_info); + thd->protocol->store("info", 4, system_charset_info); + thd->protocol->store(msg, strlen(msg), system_charset_info); + thd->protocol->write(); + } +} + +int ha_tokudb::check(THD* thd, HA_CHECK_OPT* check_opt) { + TOKUDB_HANDLER_DBUG_ENTER("%s", share->table_name()); + const char* orig_proc_info = tokudb_thd_get_proc_info(thd); + int result = HA_ADMIN_OK; + int r; + + int keep_going = 1; + if (check_opt->flags & T_QUICK) { + keep_going = 0; + } + if (check_opt->flags & T_EXTEND) { + keep_going = 1; + } + + r = acquire_table_lock(transaction, lock_write); + if (r != 0) + result = HA_ADMIN_INTERNAL_ERROR; + if (result == HA_ADMIN_OK) { + uint32_t num_DBs = table_share->keys + tokudb_test(hidden_primary_key); + snprintf( + write_status_msg, + sizeof(write_status_msg), + "%s primary=%d num=%d", + share->table_name(), + primary_key, + num_DBs); + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) { + ha_tokudb_check_info(thd, table, write_status_msg); + time_t now = time(0); + char timebuf[32]; + TOKUDB_HANDLER_TRACE( + "%.24s %s", + ctime_r(&now, timebuf), + write_status_msg); + } + for (uint i = 0; i < num_DBs; i++) { + DB* db = share->key_file[i]; + assert_always(db != NULL); + const char* kname = + i == primary_key ? "primary" : table_share->key_info[i].name.str; + snprintf( + write_status_msg, + sizeof(write_status_msg), + "%s key=%s %u", + share->table_name(), + kname, + i); + thd_proc_info(thd, write_status_msg); + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) { + ha_tokudb_check_info(thd, table, write_status_msg); + time_t now = time(0); + char timebuf[32]; + TOKUDB_HANDLER_TRACE( + "%.24s %s", + ctime_r(&now, timebuf), + write_status_msg); + } + struct check_context check_context = { thd }; + r = db->verify_with_progress( + db, + ha_tokudb_check_progress, + &check_context, + (tokudb::sysvars::debug & TOKUDB_DEBUG_CHECK) != 0, + keep_going); + if (r != 0) { + char msg[32 + strlen(kname)]; + sprintf(msg, "Corrupt %s", kname); + ha_tokudb_check_info(thd, table, msg); + } + snprintf( + write_status_msg, + sizeof(write_status_msg), + "%s key=%s %u result=%d", + share->full_table_name(), + kname, + i, + r); + thd_proc_info(thd, write_status_msg); + if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_CHECK))) { + ha_tokudb_check_info(thd, table, write_status_msg); + time_t now = time(0); + char timebuf[32]; + TOKUDB_HANDLER_TRACE( + "%.24s %s", + ctime_r(&now, timebuf), + write_status_msg); + } + if (result == HA_ADMIN_OK && r != 0) { + result = HA_ADMIN_CORRUPT; + if (!keep_going) + break; + } + } + } + thd_proc_info(thd, orig_proc_info); + TOKUDB_HANDLER_DBUG_RETURN(result); +} |