#include "common_engine.h" #include "backup_copy.h" #include "xtrabackup.h" #include "common.h" #include "backup_debug.h" #include #include #include #include namespace common_engine { class Table { public: Table(std::string &db, std::string &table, std::string &fs_name) : m_db(std::move(db)), m_table(std::move(table)), m_fs_name(std::move(fs_name)) {} virtual ~Table() {} void add_file_name(const char *file_name) { m_fnames.push_back(file_name); } virtual bool copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize, unsigned thread_num); std::string &get_db() { return m_db; } std::string &get_table() { return m_table; } std::string &get_version() { return m_version; } protected: std::string m_db; std::string m_table; std::string m_fs_name; std::string m_version; std::vector m_fnames; }; bool Table::copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool, unsigned thread_num) { static const size_t buf_size = 10 * 1024 * 1024; std::unique_ptr buf; bool result = false; File frm_file = -1; std::vector files; bool locked = false; std::string full_tname("`"); full_tname.append(m_db).append("`.`").append(m_table).append("`"); if (!no_lock && !backup_lock(con, full_tname.c_str())) { msg(thread_num, "Error on executing BACKUP LOCK for table %s", full_tname.c_str()); goto exit; } else locked = !no_lock; if ((frm_file = mysql_file_open(key_file_frm, (m_fs_name + ".frm").c_str(), O_RDONLY | O_SHARE, MYF(0))) < 0 && !m_fnames.empty() && !ends_with(m_fnames[0].c_str(), ".ARZ") && !ends_with(m_fnames[0].c_str(), ".ARM")) { // Don't treat it as error, as the table can be dropped after it // was added to queue for copying result = true; goto exit; } for (const auto &fname : m_fnames) { File file = mysql_file_open(0, fname.c_str(),O_RDONLY | O_SHARE, MYF(0)); if (file < 0) { msg(thread_num, "Error on file %s open during %s table copy", fname.c_str(), full_tname.c_str()); goto exit; } files.push_back(file); } if (locked && !backup_unlock(con)) { msg(thread_num, "Error on BACKUP UNLOCK for table %s", full_tname.c_str()); locked = false; goto exit; } locked = false; buf.reset(new uchar[buf_size]); for (size_t i = 0; i < m_fnames.size(); ++i) { ds_file_t *dst_file = nullptr; size_t bytes_read; size_t copied_size = 0; MY_STAT stat_info; if (my_fstat(files[i], &stat_info, MYF(0))) { msg(thread_num, "error: failed to get stat info for file %s of " "table %s", m_fnames[i].c_str(), full_tname.c_str()); goto exit; } const char *dst_path = (xtrabackup_copy_back || xtrabackup_move_back) ? m_fnames[i].c_str() : trim_dotslash(m_fnames[i].c_str()); dst_file = ds_open(ds, dst_path, &stat_info, false); if (!dst_file) { msg(thread_num, "error: cannot open destination stream for %s, table %s", dst_path, full_tname.c_str()); goto exit; } while ((bytes_read = my_read(files[i], buf.get(), buf_size, MY_WME))) { if (bytes_read == size_t(-1)) { msg(thread_num, "error: file %s read for table %s", m_fnames[i].c_str(), full_tname.c_str()); ds_close(dst_file); goto exit; } xtrabackup_io_throttling(); if (ds_write(dst_file, buf.get(), bytes_read)) { msg(thread_num, "error: file %s write for table %s", dst_path, full_tname.c_str()); ds_close(dst_file); goto exit; } copied_size += bytes_read; } mysql_file_close(files[i], MYF(MY_WME)); files[i] = -1; ds_close(dst_file); msg(thread_num, "Copied file %s for table %s, %zu bytes", m_fnames[i].c_str(), full_tname.c_str(), copied_size); } result = true; #ifndef DBUG_OFF { std::string sql_name(m_db); sql_name.append("/").append(m_table); DBUG_MARIABACKUP_EVENT_LOCK("after_ce_table_copy", fil_space_t::name_type(sql_name.data(), sql_name.size())); } #endif // DBUG_OFF exit: if (frm_file >= 0) { m_version = ::read_table_version_id(frm_file); mysql_file_close(frm_file, MYF(MY_WME)); } if (locked && !backup_unlock(con)) { msg(thread_num, "Error on BACKUP UNLOCK for table %s", full_tname.c_str()); result = false; } for (auto file : files) if (file >= 0) mysql_file_close(file, MYF(MY_WME)); return result; } // Append-only tables class LogTable : public Table { public: LogTable(std::string &db, std::string &table, std::string &fs_name) : Table(db, table, fs_name) {} virtual ~LogTable() { (void)close(); } bool copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize, unsigned thread_num) override; bool close(); private: bool open(ds_ctxt_t *ds, unsigned thread_num); std::vector m_src; std::vector m_dst; }; bool LogTable::open(ds_ctxt_t *ds, unsigned thread_num) { DBUG_ASSERT(m_src.empty()); DBUG_ASSERT(m_dst.empty()); std::string full_tname("`"); full_tname.append(m_db).append("`.`").append(m_table).append("`"); for (const auto &fname : m_fnames) { File file = mysql_file_open(0, fname.c_str(),O_RDONLY | O_SHARE, MYF(0)); if (file < 0) { msg(thread_num, "Error on file %s open during %s log table copy", fname.c_str(), full_tname.c_str()); return false; } m_src.push_back(file); MY_STAT stat_info; if (my_fstat(file, &stat_info, MYF(0))) { msg(thread_num, "error: failed to get stat info for file %s of " "log table %s", fname.c_str(), full_tname.c_str()); return false; } const char *dst_path = (xtrabackup_copy_back || xtrabackup_move_back) ? fname.c_str() : trim_dotslash(fname.c_str()); ds_file_t *dst_file = ds_open(ds, dst_path, &stat_info, false); if (!dst_file) { msg(thread_num, "error: cannot open destination stream for %s, " "log table %s", dst_path, full_tname.c_str()); return false; } m_dst.push_back(dst_file); } File frm_file; if ((frm_file = mysql_file_open(key_file_frm, (m_fs_name + ".frm").c_str(), O_RDONLY | O_SHARE, MYF(0))) < 0 && !m_fnames.empty() && !ends_with(m_fnames[0].c_str(), ".ARZ") && !ends_with(m_fnames[0].c_str(), ".ARM")) { msg(thread_num, "Error on .frm file open for log table %s", full_tname.c_str()); return false; } m_version = ::read_table_version_id(frm_file); mysql_file_close(frm_file, MYF(MY_WME)); return true; } bool LogTable::close() { while (!m_src.empty()) { auto f = m_src.back(); m_src.pop_back(); mysql_file_close(f, MYF(MY_WME)); } while (!m_dst.empty()) { auto f = m_dst.back(); m_dst.pop_back(); ds_close(f); } return true; } bool LogTable::copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize, unsigned thread_num) { static const size_t buf_size = 10 * 1024 * 1024; DBUG_ASSERT(ds); DBUG_ASSERT(con); if (m_src.empty() && !open(ds, thread_num)) { close(); return false; } DBUG_ASSERT(m_src.size() == m_dst.size()); std::unique_ptr buf(new uchar[buf_size]); for (size_t i = 0; i < m_src.size(); ++i) { // .CSM can be rewritten (see write_meta_file() usage in ha_tina.cc) if (!finalize && ends_with(m_fnames[i].c_str(), ".CSM")) continue; size_t bytes_read; size_t copied_size = 0; while ((bytes_read = my_read(m_src[i], buf.get(), buf_size, MY_WME))) { if (bytes_read == size_t(-1)) { msg(thread_num, "error: file %s read for log table %s", m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`"). append(m_table).append("`").c_str()); close(); return false; } xtrabackup_io_throttling(); if (ds_write(m_dst[i], buf.get(), bytes_read)) { msg(thread_num, "error: file %s write for log table %s", m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`"). append(m_table).append("`").c_str()); close(); return false; } copied_size += bytes_read; } msg(thread_num, "Copied file %s for log table %s, %zu bytes", m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`"). append(m_table).append("`").c_str(), copied_size); } return true; } class BackupImpl { public: BackupImpl( const char *datadir_path, ds_ctxt_t *datasink, std::vector &con_pool, ThreadPool &thread_pool) : m_datadir_path(datadir_path), m_ds(datasink), m_con_pool(con_pool), m_process_table_jobs(thread_pool) {} ~BackupImpl() { } bool scan( const std::unordered_set &exclude_tables, std::unordered_set *out_processed_tables, bool no_lock, bool collect_log_and_stats); void set_post_copy_table_hook(const post_copy_table_hook_t &hook) { m_table_post_copy_hook = hook; } bool copy_log_tables(bool finalize); bool copy_stats_tables(); bool wait_for_finish(); bool close_log_tables(); private: void process_table_job(Table *table, bool no_lock, bool delete_table, bool finalize, unsigned thread_num); const char *m_datadir_path; ds_ctxt_t *m_ds; std::vector &m_con_pool; TasksGroup m_process_table_jobs; post_copy_table_hook_t m_table_post_copy_hook; std::unordered_map> m_log_tables; std::unordered_map> m_stats_tables; }; void BackupImpl::process_table_job(Table *table, bool no_lock, bool delete_table, bool finalize, unsigned thread_num) { int result = 0; if (!m_process_table_jobs.get_result()) goto exit; if (!table->copy(m_ds, m_con_pool[thread_num], no_lock, finalize, thread_num)) goto exit; if (m_table_post_copy_hook) m_table_post_copy_hook(table->get_db(), table->get_table(), table->get_version()); result = 1; exit: if (delete_table) delete table; m_process_table_jobs.finish_task(result); } bool BackupImpl::scan(const std::unordered_set &exclude_tables, std::unordered_set *out_processed_tables, bool no_lock, bool collect_log_and_stats) { msg("Start scanning common engine tables, need backup locks: %d, " "collect log and stat tables: %d", no_lock, collect_log_and_stats); std::unordered_map> found_tables; foreach_file_in_db_dirs(m_datadir_path, [&](const char *file_path)->bool { static const char *ext_list[] = {".MYD", ".MYI", ".MRG", ".ARM", ".ARZ", ".CSM", ".CSV", NULL}; bool is_aria = ends_with(file_path, ".MAD") || ends_with(file_path, ".MAI"); if (!collect_log_and_stats && is_aria) return true; if (!is_aria && !filename_matches(file_path, ext_list)) return true; if (check_if_skip_table(file_path)) { msg("Skipping %s.", file_path); return true; } auto db_table_fs = convert_filepath_to_tablename(file_path); auto tk = table_key(std::get<0>(db_table_fs), std::get<1>(db_table_fs)); // log and stats tables are only collected in this function, // so there is no need to filter out them with exclude_tables. if (collect_log_and_stats) { if (is_log_table(std::get<0>(db_table_fs).c_str(), std::get<1>(db_table_fs).c_str())) { auto table_it = m_log_tables.find(tk); if (table_it == m_log_tables.end()) { msg("Log table found: %s", tk.c_str()); table_it = m_log_tables.emplace(tk, std::unique_ptr(new LogTable(std::get<0>(db_table_fs), std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first; } msg("Collect log table file: %s", file_path); table_it->second->add_file_name(file_path); return true; } // Aria can handle statistics tables else if (is_stats_table(std::get<0>(db_table_fs).c_str(), std::get<1>(db_table_fs).c_str()) && !is_aria) { auto table_it = m_stats_tables.find(tk); if (table_it == m_stats_tables.end()) { msg("Stats table found: %s", tk.c_str()); table_it = m_stats_tables.emplace(tk, std::unique_ptr(new Table(std::get<0>(db_table_fs), std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first; } msg("Collect stats table file: %s", file_path); table_it->second->add_file_name(file_path); return true; } } else if (is_log_table(std::get<0>(db_table_fs).c_str(), std::get<1>(db_table_fs).c_str()) || is_stats_table(std::get<0>(db_table_fs).c_str(), std::get<1>(db_table_fs).c_str())) return true; if (is_aria) return true; if (exclude_tables.count(tk)) { msg("Skip table %s at it is in exclude list", tk.c_str()); return true; } auto table_it = found_tables.find(tk); if (table_it == found_tables.end()) { table_it = found_tables.emplace(tk, std::unique_ptr
(new Table(std::get<0>(db_table_fs), std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first; } table_it->second->add_file_name(file_path); return true; }); for (auto &table_it : found_tables) { m_process_table_jobs.push_task( std::bind(&BackupImpl::process_table_job, this, table_it.second.release(), no_lock, true, false, std::placeholders::_1)); if (out_processed_tables) out_processed_tables->insert(table_it.first); } msg("Stop scanning common engine tables"); return true; } bool BackupImpl::copy_log_tables(bool finalize) { for (auto &table_it : m_log_tables) { // Do not execute BACKUP LOCK for log tables as it's supposed // that they must be copied on BLOCK_DDL and BLOCK_COMMIT locks. m_process_table_jobs.push_task( std::bind(&BackupImpl::process_table_job, this, table_it.second.get(), true, false, finalize, std::placeholders::_1)); } return true; } bool BackupImpl::copy_stats_tables() { for (auto &table_it : m_stats_tables) { // Do not execute BACKUP LOCK for stats tables as it's supposed // that they must be copied on BLOCK_DDL and BLOCK_COMMIT locks. // Delete stats table object after copy (see process_table_job()) m_process_table_jobs.push_task( std::bind(&BackupImpl::process_table_job, this, table_it.second.release(), true, true, false, std::placeholders::_1)); } m_stats_tables.clear(); return true; } bool BackupImpl::wait_for_finish() { /* Wait for threads to exit */ return m_process_table_jobs.wait_for_finish(); } bool BackupImpl::close_log_tables() { bool result = wait_for_finish(); for (auto &table_it : m_log_tables) table_it.second->close(); return result; } Backup::Backup(const char *datadir_path, ds_ctxt_t *datasink, std::vector &con_pool, ThreadPool &thread_pool) : m_backup_impl( new BackupImpl(datadir_path, datasink, con_pool, thread_pool)) { } Backup::~Backup() { delete m_backup_impl; } bool Backup::scan( const std::unordered_set &exclude_tables, std::unordered_set *out_processed_tables, bool no_lock, bool collect_log_and_stats) { return m_backup_impl->scan(exclude_tables, out_processed_tables, no_lock, collect_log_and_stats); } bool Backup::copy_log_tables(bool finalize) { return m_backup_impl->copy_log_tables(finalize); } bool Backup::copy_stats_tables() { return m_backup_impl->copy_stats_tables(); } bool Backup::wait_for_finish() { return m_backup_impl->wait_for_finish(); } bool Backup::close_log_tables() { return m_backup_impl->close_log_tables(); } void Backup::set_post_copy_table_hook(const post_copy_table_hook_t &hook) { m_backup_impl->set_post_copy_table_hook(hook); } } // namespace common_engine