diff options
Diffstat (limited to '')
-rw-r--r-- | sql/log.cc | 1930 |
1 files changed, 1251 insertions, 679 deletions
@@ -40,11 +40,13 @@ #include "sql_audit.h" #include "mysqld.h" #include "ddl_log.h" +#include "gtid_index.h" #include <my_dir.h> #include <m_ctype.h> // For test_if_number #include <set_var.h> // for Sys_last_gtid_ptr +#include <ilist.h> #ifdef _WIN32 #include "message.h" @@ -58,6 +60,7 @@ #include "sp_rcontext.h" #include "sp_head.h" #include "sql_table.h" +#include "log_cache.h" #include "wsrep_mysqld.h" #ifdef WITH_WSREP @@ -74,9 +77,6 @@ /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_TIME_SIZE 32 -#define MY_OFF_T_UNDEF (~(my_off_t)0UL) -/* Truncate cache log files bigger than this */ -#define CACHE_FILE_TRUNC_SIZE 65536 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -107,6 +107,16 @@ static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; static my_bool opt_optimize_thread_scheduling= TRUE; +/* + The binlog_checksum_options value is accessed protected under LOCK_log. As + the checksum option used must be consistent across an entire binlog file, + and log rotation is needed whenever this is changed. + + As an exception, event checksums are precomputed using a non-locked read + of binlog_checksum_options. Later, the value is checked against the option + value, this time under LOCK_log, and checksums are re-computed if the value + differs. +*/ ulong binlog_checksum_options; #ifndef DBUG_OFF ulong opt_binlog_dbug_fsync_sleep= 0; @@ -155,12 +165,44 @@ static SHOW_VAR binlog_status_vars_detail[]= Variables for the binlog background thread. Protected by the MYSQL_BIN_LOG::LOCK_binlog_background_thread mutex. */ +struct Binlog_background_job +{ + union + { + MYSQL_BIN_LOG::xid_count_per_binlog *notify_entry; + struct { + Gtid_index_writer *gi; + rpl_gtid *gtid_list; + uint32 gtid_count; + uint32 offset; + } gtid_index_data; + }; + Binlog_background_job *next; + enum enum_job_type { + CHECKPOINT_NOTIFY, + GTID_INDEX_UPDATE, + GTID_INDEX_CLOSE, + SENTINEL + } job_type; +}; static bool binlog_background_thread_started= false; static bool binlog_background_thread_stop= false; -static MYSQL_BIN_LOG::xid_count_per_binlog * - binlog_background_thread_queue= NULL; +static bool binlog_background_thread_sentinel= false; +static Binlog_background_job *binlog_background_thread_queue= NULL; +static Binlog_background_job **binlog_background_thread_endptr= + &binlog_background_thread_queue; +static Binlog_background_job *binlog_background_freelist= NULL; static bool start_binlog_background_thread(); +static int queue_binlog_background_checkpoint_notify( + MYSQL_BIN_LOG::xid_count_per_binlog *entry); +static int queue_binlog_background_gtid_index_update(Gtid_index_writer *gi, + uint32 offset, + rpl_gtid *gtid_list, + uint32 count); +static int queue_binlog_background_gtid_index_close(Gtid_index_writer *gi); +static int queue_binlog_background_sentinel(); +static void binlog_background_wait_for_sentinel(); static rpl_binlog_state rpl_global_gtid_binlog_state; @@ -272,229 +314,6 @@ void make_default_log_name(char **out, const char* log_ext, bool once) Helper classes to store non-transactional and transactional data before copying it to the binary log. */ -class binlog_cache_data -{ -public: - binlog_cache_data(): m_pending(0), status(0), - before_stmt_pos(MY_OFF_T_UNDEF), - incident(FALSE), - saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), - ptr_binlog_cache_disk_use(0) - { } - - ~binlog_cache_data() - { - DBUG_ASSERT(empty()); - close_cached_file(&cache_log); - } - - /* - Return 1 if there is no relevant entries in the cache - - This is: - - Cache is empty - - There are row or critical (DDL?) events in the cache - - The status test is needed to avoid writing entries with only - a table map entry, which would crash in do_apply_event() on the slave - as it assumes that there is always a row entry after a table map. - */ - bool empty() const - { - return (pending() == NULL && - (my_b_write_tell(&cache_log) == 0 || - ((status & (LOGGED_ROW_EVENT | LOGGED_CRITICAL)) == 0))); - } - - Rows_log_event *pending() const - { - return m_pending; - } - - void set_pending(Rows_log_event *const pending_arg) - { - m_pending= pending_arg; - } - - void set_incident(void) - { - incident= TRUE; - } - - bool has_incident(void) - { - return(incident); - } - - void reset() - { - bool cache_was_empty= empty(); - bool truncate_file= (cache_log.file != -1 && - my_b_write_tell(&cache_log) > CACHE_FILE_TRUNC_SIZE); - truncate(0,1); // Forget what's in cache - if (!cache_was_empty) - compute_statistics(); - if (truncate_file) - my_chsize(cache_log.file, 0, 0, MYF(MY_WME)); - - status= 0; - incident= FALSE; - before_stmt_pos= MY_OFF_T_UNDEF; - DBUG_ASSERT(empty()); - } - - my_off_t get_byte_position() const - { - return my_b_tell(&cache_log); - } - - my_off_t get_prev_position() - { - return(before_stmt_pos); - } - - void set_prev_position(my_off_t pos) - { - before_stmt_pos= pos; - } - - void restore_prev_position() - { - truncate(before_stmt_pos); - } - - void restore_savepoint(my_off_t pos) - { - truncate(pos); - if (pos < before_stmt_pos) - before_stmt_pos= MY_OFF_T_UNDEF; - } - - void set_binlog_cache_info(my_off_t param_max_binlog_cache_size, - ulong *param_ptr_binlog_cache_use, - ulong *param_ptr_binlog_cache_disk_use) - { - /* - The assertions guarantee that the set_binlog_cache_info is - called just once and information passed as parameters are - never zero. - - This is done while calling the constructor binlog_cache_mngr. - We cannot set information in the constructor binlog_cache_data - because the space for binlog_cache_mngr is allocated through - a placement new. - - In the future, we can refactor this and change it to avoid - the set_binlog_info. - */ - DBUG_ASSERT(saved_max_binlog_cache_size == 0); - DBUG_ASSERT(param_max_binlog_cache_size != 0); - DBUG_ASSERT(ptr_binlog_cache_use == 0); - DBUG_ASSERT(param_ptr_binlog_cache_use != 0); - DBUG_ASSERT(ptr_binlog_cache_disk_use == 0); - DBUG_ASSERT(param_ptr_binlog_cache_disk_use != 0); - - saved_max_binlog_cache_size= param_max_binlog_cache_size; - ptr_binlog_cache_use= param_ptr_binlog_cache_use; - ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use; - cache_log.end_of_file= saved_max_binlog_cache_size; - } - - void add_status(enum_logged_status status_arg) - { - status|= status_arg; - } - - /* - Cache to store data before copying it to the binary log. - */ - IO_CACHE cache_log; - -private: - /* - Pending binrows event. This event is the event where the rows are currently - written. - */ - Rows_log_event *m_pending; - - /* - Bit flags for what has been writing to cache. Used to - discard logs without any data changes. - see enum_logged_status; - */ - uint32 status; - - /* - Binlog position before the start of the current statement. - */ - my_off_t before_stmt_pos; - - /* - This indicates that some events did not get into the cache and most likely - it is corrupted. - */ - bool incident; - - /** - This function computes binlog cache and disk usage. - */ - void compute_statistics() - { - statistic_increment(*ptr_binlog_cache_use, &LOCK_status); - if (cache_log.disk_writes != 0) - { -#ifdef REAL_STATISTICS - statistic_add(*ptr_binlog_cache_disk_use, - cache_log.disk_writes, &LOCK_status); -#else - statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status); -#endif - cache_log.disk_writes= 0; - } - } - - /* - Stores the values of maximum size of the cache allowed when this cache - is configured. This corresponds to either - . max_binlog_cache_size or max_binlog_stmt_cache_size. - */ - my_off_t saved_max_binlog_cache_size; - - /* - Stores a pointer to the status variable that keeps track of the in-memory - cache usage. This corresponds to either - . binlog_cache_use or binlog_stmt_cache_use. - */ - ulong *ptr_binlog_cache_use; - - /* - Stores a pointer to the status variable that keeps track of the disk - cache usage. This corresponds to either - . binlog_cache_disk_use or binlog_stmt_cache_disk_use. - */ - ulong *ptr_binlog_cache_disk_use; - - /* - It truncates the cache to a certain position. This includes deleting the - pending event. - */ - void truncate(my_off_t pos, bool reset_cache=0) - { - DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); - cache_log.error=0; - if (pending()) - { - delete pending(); - set_pending(0); - } - reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache); - cache_log.end_of_file= saved_max_binlog_cache_size; - } - - binlog_cache_data& operator=(const binlog_cache_data& info); - binlog_cache_data(const binlog_cache_data& info); -}; - void Log_event_writer::add_status(enum_logged_status status) { @@ -508,6 +327,38 @@ void Log_event_writer::set_incident() } +/** + Select if and how to write checksum for an event written to the binlog. + + - When writing directly to the binlog, the user-configured checksum option + is used. + - When writing to a transaction or statement cache, we have + binlog_cache_data that contains the checksum option to use (pre-computed + checksums). + - Otherwise, no checksum used. +*/ +enum_binlog_checksum_alg +Log_event::select_checksum_alg(const binlog_cache_data *data) +{ + if (cache_type == Log_event::EVENT_NO_CACHE) + { + DBUG_ASSERT(!data); + /* + When we're selecting the checksum algorithm to write directly to the + actual binlog, we must be holding the LOCK_log, otherwise the checksum + configuration could change just after we read it. + */ + mysql_mutex_assert_owner(mysql_bin_log.get_log_lock()); + return (enum_binlog_checksum_alg)binlog_checksum_options; + } + + if (data) + return data->checksum_opt; + + return BINLOG_CHECKSUM_ALG_OFF; +} + + class binlog_cache_mngr { public: binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size, @@ -515,8 +366,10 @@ public: ulong *param_ptr_binlog_stmt_cache_use, ulong *param_ptr_binlog_stmt_cache_disk_use, ulong *param_ptr_binlog_cache_use, - ulong *param_ptr_binlog_cache_disk_use) - : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) + ulong *param_ptr_binlog_cache_disk_use, + bool precompute_checksums) + : stmt_cache(precompute_checksums), trx_cache(precompute_checksums), + last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) { stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, param_ptr_binlog_stmt_cache_use, @@ -578,6 +431,7 @@ public: ulong binlog_id; /* Set if we get an error during commit that must be returned from unlog(). */ bool delayed_error; + //Will be reset when gtid is written into binlog uchar gtid_flags3; decltype (rpl_gtid::seq_no) sa_seq_no; @@ -1763,12 +1617,11 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) DBUG_ENTER("binlog_trans_log_truncate"); DBUG_PRINT("enter", ("pos: %lu", (ulong) pos)); - DBUG_ASSERT(thd_get_ha_data(thd, binlog_hton) != NULL); + DBUG_ASSERT(thd->binlog_get_cache_mngr() != NULL); /* Only true if binlog_trans_log_savepos() wasn't called before */ DBUG_ASSERT(pos != ~(my_off_t) 0); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); cache_mngr->trx_cache.restore_savepoint(pos); DBUG_VOID_RETURN; } @@ -1808,8 +1661,7 @@ int binlog_init(void *p) static int binlog_close_connection(handlerton *hton, THD *thd) { DBUG_ENTER("binlog_close_connection"); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); #ifdef WITH_WSREP if (WSREP(thd) && cache_mngr && !cache_mngr->trx_cache.empty()) { IO_CACHE* cache= cache_mngr->get_binlog_cache_log(true); @@ -2057,29 +1909,31 @@ static int binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { DBUG_ENTER("binlog_truncate_trx_cache"); + + if(!WSREP_EMULATE_BINLOG_NNULL(thd) && !mysql_bin_log.is_open()) + DBUG_RETURN(0); + int error=0; - /* - This function handles transactional changes and as such this flag - equals to true. - */ - bool const is_transactional= TRUE; DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s", FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), all ? "all" : "stmt")); - thd->binlog_remove_pending_rows_event(TRUE, is_transactional); + auto &trx_cache= cache_mngr->trx_cache; + MYSQL_BIN_LOG::remove_pending_rows_event(thd, &trx_cache); + thd->reset_binlog_for_next_statement(); + /* If rolling back an entire transaction or a single statement not inside a transaction, we reset the transaction cache. */ if (ending_trans(thd, all)) { - if (cache_mngr->trx_cache.has_incident()) + if (trx_cache.has_incident()) error= mysql_bin_log.write_incident(thd); - thd->reset_binlog_for_next_statement(); + DBUG_ASSERT(thd->binlog_table_maps == 0); cache_mngr->reset(false, true); } @@ -2088,9 +1942,9 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) transaction cache to remove the statement. */ else - cache_mngr->trx_cache.restore_prev_position(); + trx_cache.restore_prev_position(); - DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); + DBUG_ASSERT(trx_cache.pending() == NULL); DBUG_RETURN(error); } @@ -2167,7 +2021,6 @@ int binlog_rollback_by_xid(handlerton *hton, XID *xid) DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK || (thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY)); - rc= binlog_rollback(hton, thd, TRUE); thd->ha_data[hton->slot].ha_info[1].reset(); @@ -2250,6 +2103,7 @@ static int binlog_commit_flush_xa_prepare(THD *thd, bool all, return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } + /** This function is called once after each statement. @@ -2266,9 +2120,12 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc) PSI_stage_info org_stage; DBUG_ENTER("binlog_commit"); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + bool is_ending_transaction= ending_trans(thd, all); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); + /* + cache_mngr can be NULL in case if binlog logging is disabled. + */ if (!cache_mngr) { DBUG_ASSERT(WSREP(thd) || @@ -2328,7 +2185,7 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc) - We are in a transaction and a full transaction is committed. Otherwise, we accumulate the changes. */ - if (likely(!error) && ending_trans(thd, all)) + if (likely(!error) && is_ending_transaction) { bool is_xa_prepare= is_preparing_xa(thd); @@ -2368,9 +2225,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); + bool is_ending_trans= ending_trans(thd, all); int error= 0; - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); if (!cache_mngr) { @@ -2411,7 +2268,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) thd->reset_binlog_for_next_statement(); DBUG_RETURN(error); } - if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd)) + if (!wsrep_emulate_bin_log && Event_log::check_write_error(thd)) { /* "all == true" means that a "rollback statement" triggered the error and @@ -2429,7 +2286,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) else if (likely(!error)) { ulong binlog_format= thd->wsrep_binlog_format(thd->variables.binlog_format); - if (ending_trans(thd, all) && trans_cannot_safely_rollback(thd, all)) + if (is_ending_trans && trans_cannot_safely_rollback(thd, all)) error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* Truncate the cache if: @@ -2441,7 +2298,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) . the format is not MIXED or no temporary non-trans table was updated. */ - else if (ending_trans(thd, all) || + else if (is_ending_trans || (!(thd->transaction->stmt.has_created_dropped_temp_table() && !thd->is_current_stmt_binlog_format_row()) && (!stmt_has_updated_non_trans_table(thd) || @@ -2464,19 +2321,20 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) void binlog_reset_cache(THD *thd) { - binlog_cache_mngr *const cache_mngr= opt_bin_log ? - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton) : 0; + binlog_cache_mngr *const cache_mngr= opt_bin_log ? + thd->binlog_get_cache_mngr() : 0; DBUG_ENTER("binlog_reset_cache"); if (cache_mngr) { - thd->binlog_remove_pending_rows_event(TRUE, TRUE); + MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache); + thd->reset_binlog_for_next_statement(); cache_mngr->reset(true, true); } DBUG_VOID_RETURN; } -void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) +void Event_log::set_write_error(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::set_write_error"); @@ -2516,7 +2374,7 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) DBUG_VOID_RETURN; } -bool MYSQL_BIN_LOG::check_write_error(THD *thd) +bool Event_log::check_write_error(THD *thd) { DBUG_ENTER("MYSQL_BIN_LOG::check_write_error"); @@ -2545,7 +2403,7 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd) event to the binlog rather than write corrupt data to it. */ bool -MYSQL_BIN_LOG::check_cache_error(THD *thd, binlog_cache_data *cache_data) +Event_log::check_cache_error(THD *thd, binlog_cache_data *cache_data) { if (!cache_data) return false; @@ -2621,18 +2479,13 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. - */ -#ifdef WITH_WSREP - /* for streaming replication, we must replicate savepoint rollback so that - slaves can maintain SR transactions + + For streaming replication, we must replicate savepoint rollback so that + slaves can maintain SR transactions */ - if (unlikely(thd->wsrep_trx().is_streaming() || - (trans_has_updated_non_trans_table(thd)) || - (thd->variables.option_bits & OPTION_BINLOG_THIS_TRX))) -#else - if (unlikely(trans_has_updated_non_trans_table(thd) || - (thd->variables.option_bits & OPTION_BINLOG_THIS_TRX))) -#endif /* WITH_WSREP */ + if (IF_WSREP(thd->wsrep_trx().is_streaming(),0) || + trans_has_updated_non_trans_table(thd) || + (thd->variables.option_bits & OPTION_BINLOG_THIS_TRX)) { char buf[1024]; String log_query(buf, sizeof(buf), &my_charset_bin); @@ -3091,13 +2944,14 @@ void MYSQL_LOG::close(uint exiting) { end_io_cache(&log_file); - if (log_type == LOG_BIN && mysql_file_sync(log_file.file, MYF(MY_WME)) && ! write_error) + if (log_type == LOG_BIN && log_file.file >= 0 && + mysql_file_sync(log_file.file, MYF(MY_WME)) && ! write_error) { write_error= 1; sql_print_error(ER_DEFAULT(ER_ERROR_ON_WRITE), name, errno); } - if (!(exiting & LOG_CLOSE_DELAYED_CLOSE) && + if (!(exiting & LOG_CLOSE_DELAYED_CLOSE) && log_file.file >= 0 && mysql_file_close(log_file.file, MYF(MY_WME)) && ! write_error) { write_error= 1; @@ -3592,12 +3446,12 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :reset_master_pending(0), mark_xid_done_waiting(0), - bytes_written(0), last_used_log_number(0), - file_id(1), open_count(1), + bytes_written(0), binlog_space_total(0), + last_used_log_number(0), file_id(1), open_count(1), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), group_commit_trigger_count(0), group_commit_trigger_timeout(0), - group_commit_trigger_lock_wait(0), + group_commit_trigger_lock_wait(0), gtid_index(nullptr), sync_period_ptr(sync_period), sync_counter(0), state_file_deleted(false), binlog_state_recover_done(false), is_relay_log(0), relay_signal_cnt(0), @@ -3610,7 +3464,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) We don't want to initialize locks here as such initialization depends on safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is called only in main(). Doing initialization here would make it happen - before main(). + before main(). init_pthread_objects() can be called for that purpose. */ index_file_name[0] = 0; bzero((char*) &index_file, sizeof(index_file)); @@ -3702,7 +3556,7 @@ void MYSQL_BIN_LOG::init(ulong max_size_arg) void MYSQL_BIN_LOG::init_pthread_objects() { - MYSQL_LOG::init_pthread_objects(); + Event_log::init_pthread_objects(); mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_init(key_BINLOG_LOCK_xid_list, @@ -3718,9 +3572,6 @@ void MYSQL_BIN_LOG::init_pthread_objects() &COND_binlog_background_thread, 0); mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end, &COND_binlog_background_thread_end, 0); - - mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos, - MY_MUTEX_INIT_SLOW); } @@ -3791,6 +3642,71 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, } +bool Event_log::open(enum cache_type io_cache_type_arg) +{ + bool error= init_io_cache(&log_file, -1, LOG_BIN_IO_SIZE, io_cache_type_arg, + 0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)); + + log_state= LOG_OPENED; + inited= true; + if (error) + return error; + + longlong bytes_written= write_description_event(BINLOG_CHECKSUM_ALG_OFF, + false, true, false); + status_var_add(current_thd->status_var.binlog_bytes_written, bytes_written); + return bytes_written < 0; +} + +longlong +Event_log::write_description_event(enum_binlog_checksum_alg checksum_alg, + bool encrypt, bool dont_set_created, + bool is_relay_log) +{ + Format_description_log_event s(BINLOG_VERSION, NULL, checksum_alg); + /* + don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache + as we won't be able to reset it later + */ + if (io_cache_type == WRITE_CACHE) + s.flags |= LOG_EVENT_BINLOG_IN_USE_F; + if (is_relay_log) + s.set_relay_log_event(); + + crypto.scheme = 0; + if (!s.is_valid()) + return -1; + s.dont_set_created= dont_set_created; + if (write_event(&s, checksum_alg, 0, &log_file)) + return -1; + + if (encrypt) + { + uint key_version= encryption_key_get_latest_version(ENCRYPTION_KEY_SYSTEM_DATA); + if (key_version == ENCRYPTION_KEY_VERSION_INVALID) + { + sql_print_error("Failed to enable encryption of binary logs"); + return -1; + } + + if (key_version != ENCRYPTION_KEY_NOT_ENCRYPTED) + { + if (my_random_bytes(crypto.nonce, sizeof(crypto.nonce))) + return -1; + + Start_encryption_log_event sele(1, key_version, crypto.nonce); + if (write_event(&sele, checksum_alg, 0, &log_file)) + return -1; + + // Start_encryption_log_event is written, enable the encryption + if (crypto.init(sele.crypto_scheme, key_version)) + return -1; + } + } + return (longlong)s.data_written; +} + + /** Open a (new) binlog file. @@ -3916,17 +3832,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, } { - /* - In 4.x we put Start event only in the first binlog. But from 5.0 we - want a Start event even if this is not the very first binlog. - */ - Format_description_log_event s(BINLOG_VERSION); - /* - don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache - as we won't be able to reset it later - */ - if (io_cache_type == WRITE_CACHE) - s.flags |= LOG_EVENT_BINLOG_IN_USE_F; + enum_binlog_checksum_alg alg; if (is_relay_log) { @@ -3934,45 +3840,16 @@ bool MYSQL_BIN_LOG::open(const char *log_name, relay_log_checksum_alg= opt_slave_sql_verify_checksum ? (enum_binlog_checksum_alg) binlog_checksum_options : BINLOG_CHECKSUM_ALG_OFF; - s.checksum_alg= relay_log_checksum_alg; - s.set_relay_log_event(); + alg= relay_log_checksum_alg; } else - s.checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + alg= (enum_binlog_checksum_alg)binlog_checksum_options; - crypto.scheme = 0; - DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - if (!s.is_valid()) - goto err; - s.dont_set_created= null_created_arg; - if (write_event(&s)) + longlong written= write_description_event(alg, encrypt_binlog, + null_created_arg, is_relay_log); + if (written == -1) goto err; - bytes_written+= s.data_written; - - if (encrypt_binlog) - { - uint key_version= encryption_key_get_latest_version(ENCRYPTION_KEY_SYSTEM_DATA); - if (key_version == ENCRYPTION_KEY_VERSION_INVALID) - { - sql_print_error("Failed to enable encryption of binary logs"); - goto err; - } - - if (key_version != ENCRYPTION_KEY_NOT_ENCRYPTED) - { - if (my_random_bytes(crypto.nonce, sizeof(crypto.nonce))) - goto err; - - Start_encryption_log_event sele(1, key_version, crypto.nonce); - sele.checksum_alg= s.checksum_alg; - if (write_event(&sele)) - goto err; - - // Start_encryption_log_event is written, enable the encryption - if (crypto.init(sele.crypto_scheme, key_version)) - goto err; - } - } + bytes_written+= written; if (!is_relay_log) { @@ -4016,6 +3893,26 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (write_event(&gl_ev)) goto err; + /* Open an index file for this binlog file. */ + DBUG_ASSERT(!gtid_index); /* Binlog close should clear it. */ + if (gtid_index) + delete gtid_index; + if (opt_binlog_gtid_index) + { + my_off_t offset= my_b_tell(&log_file); + gtid_index= + new Gtid_index_writer(log_file_name, (uint32)offset, + &rpl_global_gtid_binlog_state, + (uint32)opt_binlog_gtid_index_page_size, + (my_off_t)opt_binlog_gtid_index_span_min); + if (!gtid_index) + sql_print_information("Could not create GTID index for binlog " + "file '%s'. Accesses to this binlog file will " + "fallback to slower sequential scan."); + } + else + gtid_index= nullptr; + /* Output a binlog checkpoint event at the start of the binlog file. */ /* @@ -4091,7 +3988,8 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Don't set log_pos in event header */ description_event_for_queue->set_artificial_event(); - if (write_event(description_event_for_queue)) + if (write_event(description_event_for_queue, + description_event_for_queue->used_checksum_alg)) goto err; bytes_written+= description_event_for_queue->data_written; } @@ -4340,7 +4238,8 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, log_name ? log_name : "NULL", full_log_name)); /* As the file is flushed, we can't get an error here */ - (void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0); + error= reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0); + DBUG_ASSERT(!error); for (;;) { @@ -4561,12 +4460,31 @@ bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log, no new ones will be written. So we can proceed to delete the logs. */ mysql_mutex_unlock(&LOCK_xid_list); + + /* + Push a sentinel through the binlog background thread and wait for it to + return. When it does, we know that no more GTID index operations are + pending as we are holding LOCK_log. + (This is normally already the case as we pushed a binlog checkpoint + request through. But if no XID-capable engines are enabled (eg. running + without InnoDB), then that is a no-op). + */ + queue_binlog_background_sentinel(); + binlog_background_wait_for_sentinel(); } /* Save variables so that we can reopen the log */ save_name=name; name=0; // Protect against free - close(LOG_CLOSE_TO_BE_OPENED); + + /* + Close the active log. + Close the active GTID index synchroneously. We don't want the close + running in the background while we delete the gtid index file. And we just + pushed a sentinel through the binlog background thread while holding + LOCK_log, so no other GTID index operations can be pending. + */ + close(LOG_CLOSE_TO_BE_OPENED|LOG_CLOSE_SYNC_GTID_INDEX); last_used_log_number= 0; // Reset log number cache @@ -4591,6 +4509,28 @@ bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log, for (;;) { + /* Delete any GTID index file. */ + char buf[Gtid_index_base::GTID_INDEX_FILENAME_MAX_SIZE]; + Gtid_index_base::make_gtid_index_file_name(buf, sizeof(buf), + linfo.log_file_name); + if (my_delete(buf, MYF(0))) + { + /* If ENOENT, the GTID index file is already deleted or never existed. */ + if (my_errno != ENOENT) + { + if (thd) + { + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_CANT_DELETE_FILE, ER_THD(thd, ER_CANT_DELETE_FILE), + buf, my_errno); + } + sql_print_information("Failed to delete file '%s' (errno=%d)", + buf, my_errno); + } + my_errno= 0; + } + + /* Delete the binlog file. */ if (unlikely((error= my_delete(linfo.log_file_name, MYF(0))))) { if (my_errno == ENOENT) @@ -4699,6 +4639,7 @@ err: reset_master_pending--; reset_master_count++; mysql_mutex_unlock(&LOCK_xid_list); + binlog_space_total= 0; } mysql_mutex_unlock(&LOCK_index); @@ -5017,7 +4958,6 @@ int MYSQL_BIN_LOG::open_purge_index_file(bool destroy) { int error= 0; File file= -1; - DBUG_ENTER("MYSQL_BIN_LOG::open_purge_index_file"); if (destroy) @@ -5101,6 +5041,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, int error= 0; LOG_INFO log_info; LOG_INFO check_log_info; + char buf[Gtid_index_base::GTID_INDEX_FILENAME_MAX_SIZE]; DBUG_ASSERT(my_b_inited(&purge_index_file)); @@ -5134,6 +5075,24 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, /* Get rid of the trailing '\n' */ log_info.log_file_name[length-1]= 0; + Gtid_index_base::make_gtid_index_file_name(buf, sizeof(buf), + log_info.log_file_name); + if (my_delete(buf, MYF(0))) + { + /* If ENOENT, the GTID index file is already deleted or never existed. */ + if (my_errno != ENOENT) + { + if (thd) + { + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_CANT_DELETE_FILE, ER_THD(thd, ER_CANT_DELETE_FILE), + buf, my_errno); + } + sql_print_information("Failed to delete file '%s'", buf); + } + my_errno= 0; + } + if (unlikely(!mysql_file_stat(m_key_file_log, log_info.log_file_name, &s, MYF(0)))) { @@ -5299,8 +5258,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) if (unlikely((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/)))) goto err; - while (strcmp(log_file_name, log_info.log_file_name) && - can_purge_log(log_info.log_file_name)) + while (can_purge_log(log_info.log_file_name)) { if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &stat_area, MYF(0))) @@ -5338,46 +5296,246 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) } else { - if (stat_area.st_mtime < purge_time) - strmake_buf(to_log, log_info.log_file_name); - else + if (stat_area.st_mtime >= purge_time) break; + strmake_buf(to_log, log_info.log_file_name); } if (find_next_log(&log_info, 0)) break; } - error= (to_log[0] ? purge_logs(to_log, 1, 0, 1, (ulonglong *) 0) : 0); + if (to_log[0]) + { + ulonglong reclaimed_space= 0; + error= purge_logs(to_log, 1, 0, 1, &reclaimed_space); + binlog_space_total-= reclaimed_space; + } + +err: + mysql_mutex_unlock(&LOCK_index); + DBUG_RETURN(error); +} + + +/** + Purge old logs so that we have a total size lower than binlog_space_limit. + + @note + If any of the logs before the deleted one is in use, + only purge logs up to this one. + + @retval + 0 ok + @retval + LOG_INFO_FATAL if any other than ENOENT error from + mysql_file_stat() or mysql_file_delete() +*/ + +int MYSQL_BIN_LOG::real_purge_logs_by_size(ulonglong binlog_pos) +{ + int error= 0; + LOG_INFO log_info; + MY_STAT stat_area; + char to_log[FN_REFLEN]; + ulonglong found_space= 0; + DBUG_ENTER("real_purge_logs_by_size"); + + mysql_mutex_lock(&LOCK_index); + + /* Check if another user changed the value of binlog_space_limit just now */ + if (! binlog_space_limit) + goto err; + + if ((error = find_log_pos(&log_info, NullS, + false /*need_lock_index=false*/))) + goto err; + + to_log[0] = 0; + while (can_purge_log(log_info.log_file_name)) + { + if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &stat_area, + MYF(0))) + { + if (my_errno != ENOENT) + { + /* + Other than ENOENT are fatal + */ + THD *thd = current_thd; + if (thd) + { + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_BINLOG_PURGE_FATAL_ERR, + "A problem with getting info on being purged %s; " + "consider examining correspondence " + "of your binlog index file " + "to the actual binlog files", + log_info.log_file_name); + } + else + { + sql_print_warning("Failed to stat log file '%s'", + log_info.log_file_name); + } + error= LOG_INFO_FATAL; + goto err; + } + } + else + { + found_space+= stat_area.st_size; + strmake(to_log, log_info.log_file_name, sizeof(to_log) - 1); + if (binlog_space_total + binlog_pos - found_space <= binlog_space_limit) + break; // Found enough + if (find_next_log(&log_info, false /*need_lock_index=false*/)) + break; + } + } + if (found_space) + { + ulonglong reclaimed_space= 0; + purge_logs(to_log, true, false /*need_lock_index=false*/, + true /*need_update_threads=true*/, + &reclaimed_space); + DBUG_ASSERT(reclaimed_space == found_space); + binlog_space_total-= reclaimed_space; + /* + The following is here to handle cases where something goes wrong. + Like a bug or if somethings adds data to an existing log file. + */ + DBUG_ASSERT((longlong) binlog_space_total >= 0); + if ((longlong) binlog_space_total <= 0) + count_binlog_space(); + } err: mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } +/* + The following variables are here to allows us to quickly check if + the can_purge_log(log_file_name_arg) name will fail in the + 'log_in_use' call. + + waiting_for_slave_to_change_binlog is 1 if last log_in_use failed. + purge_binlog_name is the last failed log_file_name_arg. + + sending_new_binlog_file, cached in purge_sending_new_binlog_file, + is incremented every time a slave changes to use a new binary log. +*/ + +static bool waiting_for_slave_to_change_binlog= 0; +static ulonglong purge_sending_new_binlog_file= 0; +static char purge_binlog_name[FN_REFLEN]; bool MYSQL_BIN_LOG::can_purge_log(const char *log_file_name_arg) { - xid_count_per_binlog *b; + THD *thd= current_thd; // May be NULL at startup + bool res; - if (is_active(log_file_name_arg)) - return false; - mysql_mutex_lock(&LOCK_xid_list); + if (is_active(log_file_name_arg) || + (!is_relay_log && waiting_for_slave_to_change_binlog && + purge_sending_new_binlog_file == sending_new_binlog_file && + !strcmp(log_file_name_arg, purge_binlog_name))) + return false; + + DBUG_ASSERT(!is_relay_log || binlog_xid_count_list.is_empty()); + if (!is_relay_log) { - I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); - while ((b= it++) && - 0 != strncmp(log_file_name_arg+dirname_length(log_file_name_arg), - b->binlog_name, b->binlog_name_len)) - ; + xid_count_per_binlog *b; + mysql_mutex_lock(&LOCK_xid_list); + { + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + while ((b= it++) && + 0 != strncmp(log_file_name_arg+dirname_length(log_file_name_arg), + b->binlog_name, b->binlog_name_len)) + ; + } + mysql_mutex_unlock(&LOCK_xid_list); + if (b) + return false; } - mysql_mutex_unlock(&LOCK_xid_list); - if (b) - return false; - return !log_in_use(log_file_name_arg); + + if (!is_relay_log) + { + waiting_for_slave_to_change_binlog= 0; + purge_sending_new_binlog_file= sending_new_binlog_file; + } + if ((res= log_in_use(log_file_name_arg, + (is_relay_log || + (thd && thd->lex->sql_command == SQLCOM_PURGE)) ? + 0 : slave_connections_needed_for_purge))) + { + if (!is_relay_log) + { + waiting_for_slave_to_change_binlog= 1; + strmake(purge_binlog_name, log_file_name_arg, + sizeof(purge_binlog_name)-1); + } + } + return !res; } #endif /* HAVE_REPLICATION */ +/** + Count a total size of binary logs (except the active one) to the variable + binlog_space_total. + + @retval + 0 ok + @retval + LOG_INFO_FATAL if any other than ENOENT error from + mysql_file_stat() or mysql_file_delete() +*/ + +int MYSQL_BIN_LOG::count_binlog_space() +{ + int error; + LOG_INFO log_info; + DBUG_ENTER("count_binlog_space"); + + binlog_space_total = 0; + if ((error= find_log_pos(&log_info, NullS, false /*need_lock_index=false*/))) + goto done; + + MY_STAT stat_area; + while (!is_active(log_info.log_file_name)) + { + if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &stat_area, + MYF(0))) + { + if (my_errno != ENOENT) + { + error= LOG_INFO_FATAL; + goto done; + } + } + else + binlog_space_total+= stat_area.st_size; + if (find_next_log(&log_info, false /*need_lock_index=false*/)) + break; + } +done: + DBUG_RETURN(error); +} + + +ulonglong MYSQL_BIN_LOG::get_binlog_space_total() +{ + ulonglong used_space= 0; + mysql_mutex_lock(&LOCK_log); + /* Get position in current log file */ + used_space= my_b_tell(&log_file); + mysql_mutex_lock(&LOCK_index); + mysql_mutex_unlock(&LOCK_log); + used_space+= binlog_space_total; + mysql_mutex_unlock(&LOCK_index); + return used_space; +} + bool MYSQL_BIN_LOG::is_xidlist_idle() { @@ -5484,6 +5642,7 @@ int MYSQL_BIN_LOG::new_file_without_locking() @note The new file name is stored last in the index file + binlog_space_total will be updated if binlog_space_limit is set */ int MYSQL_BIN_LOG::new_file_impl() @@ -5530,17 +5689,19 @@ int MYSQL_BIN_LOG::new_file_impl() */ Rotate_log_event r(new_name + dirname_length(new_name), 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0); + enum_binlog_checksum_alg checksum_alg = BINLOG_CHECKSUM_ALG_UNDEF; /* The current relay-log's closing Rotate event must have checksum value computed with an algorithm of the last relay-logged FD event. */ if (is_relay_log) - r.checksum_alg= relay_log_checksum_alg; - DBUG_ASSERT(!is_relay_log || - relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + checksum_alg= relay_log_checksum_alg; + else + checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if ((DBUG_IF("fault_injection_new_file_rotate_event") && (error= close_on_error= TRUE)) || - (error= write_event(&r))) + (error= write_event(&r, checksum_alg))) { DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno= 2;); close_on_error= TRUE; @@ -5563,7 +5724,6 @@ int MYSQL_BIN_LOG::new_file_impl() goto end; } update_binlog_end_pos(); - old_name=name; name=0; // Don't free name close_flag= LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX; @@ -5578,6 +5738,8 @@ int MYSQL_BIN_LOG::new_file_impl() old_file= log_file.file; close_flag|= LOG_CLOSE_DELAYED_CLOSE; delay_close= true; + if (binlog_space_limit) + binlog_space_total+= binlog_end_pos; } close(close_flag); if (checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF) @@ -5657,31 +5819,43 @@ end2: DBUG_RETURN(error); } -bool MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *cache_data, - IO_CACHE *file) +bool Event_log::write_event(Log_event *ev, binlog_cache_data *data, + IO_CACHE *file) +{ + return write_event(ev, ev->select_checksum_alg(data), data, file); +} + +bool MYSQL_BIN_LOG::write_event(Log_event *ev) { - Log_event_writer writer(file, 0, &crypto); + return write_event(ev, ev->select_checksum_alg(NULL), 0, &log_file); +} + +bool Event_log::write_event(Log_event *ev, + enum_binlog_checksum_alg checksum_alg, + binlog_cache_data *cache_data, IO_CACHE *file) +{ + Log_event_writer writer(file, cache_data, checksum_alg, &crypto); if (crypto.scheme && file == &log_file) { writer.ctx= alloca(crypto.ctx_size); writer.set_encrypted_writer(); } - if (cache_data) - cache_data->add_status(ev->logged_status()); return writer.write(ev); } -bool MYSQL_BIN_LOG::append(Log_event *ev) +bool MYSQL_BIN_LOG::append(Log_event *ev, + enum_binlog_checksum_alg checksum_alg) { bool res; mysql_mutex_lock(&LOCK_log); - res= append_no_lock(ev); + res= append_no_lock(ev, checksum_alg); mysql_mutex_unlock(&LOCK_log); return res; } -bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) +bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev, + enum_binlog_checksum_alg checksum_alg) { bool error = 0; DBUG_ENTER("MYSQL_BIN_LOG::append"); @@ -5689,7 +5863,7 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) mysql_mutex_assert_owner(&LOCK_log); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); - if (write_event(ev)) + if (write_event(ev, checksum_alg)) { error=1; goto err; @@ -5719,7 +5893,7 @@ bool MYSQL_BIN_LOG::write_event_buffer(uchar* buf, uint len) { DBUG_ASSERT(crypto.scheme == 1); - uint elen; + uint elen= len - 4; uchar iv[BINLOG_IV_LENGTH]; ebuf= (uchar*)my_safe_alloca(len); @@ -5821,8 +5995,7 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) bool trans_has_updated_trans_table(const THD* thd) { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); return (cache_mngr ? !cache_mngr->trx_cache.empty() : 0); } @@ -5871,8 +6044,7 @@ bool use_trans_cache(const THD* thd, bool is_transactional) { if (is_transactional) return 1; - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + auto *const cache_mngr= thd->binlog_get_cache_mngr(); return ((thd->is_current_stmt_binlog_format_row() || thd->variables.binlog_direct_non_trans_update) ? 0 : @@ -5943,17 +6115,11 @@ bool stmt_has_updated_non_trans_table(const THD* thd) binlog_hton, which has internal linkage. */ -binlog_cache_mngr *THD::binlog_setup_trx_data() +static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd) { - DBUG_ENTER("THD::binlog_setup_trx_data"); - binlog_cache_mngr *cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - - if (cache_mngr) - DBUG_RETURN(cache_mngr); // Already set up - - cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr, - sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); + auto *cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr, + sizeof(binlog_cache_mngr), + MYF(MY_ZEROFILL)); if (!cache_mngr || open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir, LOG_PREFIX, (size_t)binlog_stmt_cache_size, MYF(MY_WME)) || @@ -5961,17 +6127,42 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME))) { my_free(cache_mngr); - DBUG_RETURN(0); // Didn't manage to set it up + return NULL; } - thd_set_ha_data(this, binlog_hton, cache_mngr); + /* + Don't attempt to precompute checksums if: + - Disabled by user request, --binlog-legacy-event-pos + - Binlog is encrypted, cannot use precomputed checksums + - WSREP/Galera. + */ + bool precompute_checksums= + !WSREP_NNULL(thd) && !encrypt_binlog && !opt_binlog_legacy_event_pos; cache_mngr= new (cache_mngr) - binlog_cache_mngr(max_binlog_stmt_cache_size, - max_binlog_cache_size, - &binlog_stmt_cache_use, - &binlog_stmt_cache_disk_use, - &binlog_cache_use, - &binlog_cache_disk_use); + binlog_cache_mngr(max_binlog_stmt_cache_size, + max_binlog_cache_size, + &binlog_stmt_cache_use, + &binlog_stmt_cache_disk_use, + &binlog_cache_use, + &binlog_cache_disk_use, + precompute_checksums); + + return cache_mngr; +} + +binlog_cache_mngr *THD::binlog_setup_trx_data() +{ + DBUG_ENTER("THD::binlog_setup_trx_data"); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + if (!cache_mngr) + { + cache_mngr= binlog_setup_cache_mngr(this); + thd_set_ha_data(this, binlog_hton, cache_mngr); + } + + DBUG_RETURN(cache_mngr); } @@ -6038,7 +6229,7 @@ void THD::set_binlog_start_alter_seq_no(uint64 s_no) void THD::binlog_start_trans_and_stmt() { - binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= binlog_get_cache_mngr(); DBUG_ENTER("binlog_start_trans_and_stmt"); DBUG_PRINT("enter", ("cache_mngr: %p cache_mngr->trx_cache.get_prev_position(): %lu", cache_mngr, @@ -6081,7 +6272,8 @@ THD::binlog_start_trans_and_stmt() uchar *buf= 0; size_t len= 0; IO_CACHE tmp_io_cache; - Log_event_writer writer(&tmp_io_cache, 0); + // Replicated events in writeset doesn't have checksum + Log_event_writer writer(&tmp_io_cache, 0, BINLOG_CHECKSUM_ALG_OFF, NULL); if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, 128, MYF(MY_WME))) { @@ -6096,8 +6288,6 @@ THD::binlog_start_trans_and_stmt() } Gtid_log_event gtid_event(this, seqno, domain_id, true, LOG_EVENT_SUPPRESS_USE_F, true, 0); - // Replicated events in writeset doesn't have checksum - gtid_event.checksum_alg= BINLOG_CHECKSUM_ALG_OFF; gtid_event.server_id= server_id; writer.write(>id_event); wsrep_write_cache_buf(&tmp_io_cache, &buf, &len); @@ -6125,8 +6315,7 @@ THD::binlog_start_trans_and_stmt() } void THD::binlog_set_stmt_begin() { - binlog_cache_mngr *cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= binlog_get_cache_mngr(); /* The call to binlog_trans_log_savepos() might create the cache_mngr @@ -6136,7 +6325,7 @@ void THD::binlog_set_stmt_begin() { */ my_off_t pos= 0; binlog_trans_log_savepos(this, &pos); - cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + cache_mngr= binlog_get_cache_mngr(); cache_mngr->trx_cache.set_prev_position(pos); } @@ -6243,7 +6432,7 @@ bool THD::binlog_write_table_maps() } if (table->file->row_logging) { - if (binlog_write_table_map(table, with_annotate)) + if (mysql_bin_log.write_table_map(this, table, with_annotate)) DBUG_RETURN(1); with_annotate= 0; } @@ -6276,7 +6465,7 @@ bool THD::binlog_write_table_maps() nonzero if an error pops up when writing the table map event. */ -bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) +bool MYSQL_BIN_LOG::write_table_map(THD *thd, TABLE *table, bool with_annotate) { int error= 1; bool is_transactional= table->file->row_logging_has_trans; @@ -6290,21 +6479,21 @@ bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) (table->s->table_map_id & MAX_TABLE_MAP_ID) != 0); /* Ensure that all events in a GTID group are in the same cache */ - if (variables.option_bits & OPTION_GTID_BEGIN) + if (thd->variables.option_bits & OPTION_GTID_BEGIN) is_transactional= 1; Table_map_log_event - the_event(this, table, table->s->table_map_id, is_transactional); + the_event(thd, table, table->s->table_map_id, is_transactional); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); binlog_cache_data *cache_data= (cache_mngr-> get_binlog_cache_data(is_transactional)); IO_CACHE *file= &cache_data->cache_log; - Log_event_writer writer(file, cache_data); + Log_event_writer writer(file, cache_data, + the_event.select_checksum_alg(cache_data), NULL); if (with_annotate) - if (binlog_write_annotated_row(&writer)) + if (thd->binlog_write_annotated_row(&writer)) goto write_err; DBUG_EXECUTE_IF("table_map_write_error", @@ -6322,74 +6511,74 @@ bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) DBUG_RETURN(0); write_err: - mysql_bin_log.set_write_error(this, is_transactional); + set_write_error(thd, is_transactional); /* For non-transactional engine or multi statement transaction with mixed engines, data is written to table but writing to binary log failed. In these scenarios rollback is not possible. Hence report an incident. */ - if (mysql_bin_log.check_cache_error(this, cache_data) && - lex->stmt_accessed_table(LEX::STMT_WRITES_NON_TRANS_TABLE) && + if (check_cache_error(thd, cache_data) && + thd->lex->stmt_accessed_table(LEX::STMT_WRITES_NON_TRANS_TABLE) && table->current_lock == F_WRLCK) cache_data->set_incident(); DBUG_RETURN(error); } +binlog_cache_mngr *THD::binlog_get_cache_mngr() const +{ + return (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); +} + + /** This function retrieves a pending row event from a cache which is specified through the parameter @c is_transactional. Respectively, when it is @c true, the pending event is returned from the transactional cache. Otherwise from the non-transactional cache. - @param is_transactional @c true indicates a transactional cache, + @param cache_mngr cache manager to return pending row from + @param use_trans_cache @c true indicates a transactional cache, otherwise @c false a non-transactional. @return The row event if any. */ -Rows_log_event* -THD::binlog_get_pending_rows_event(bool is_transactional) const +Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr, + bool use_trans_cache) { - Rows_log_event* rows= NULL; - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - - /* - This is less than ideal, but here's the story: If there is no cache_mngr, - prepare_pending_rows_event() has never been called (since the cache_mngr - is set up there). In that case, we just return NULL. - */ - if (cache_mngr) - { - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); - - rows= cache_data->pending(); - } - return (rows); + DBUG_ASSERT(cache_mngr); + return cache_mngr->get_binlog_cache_data(use_trans_cache)->pending(); } -/** - This function stores a pending row event into a cache which is specified - through the parameter @c is_transactional. Respectively, when it is @c - true, the pending event is stored into the transactional cache. Otherwise - into the non-transactional cache. - - @param evt a pointer to the row event. - @param is_transactional @c true indicates a transactional cache, - otherwise @c false a non-transactional. -*/ -void -THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) +binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr, + bool use_trans_cache) { - binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data(); - - DBUG_ASSERT(cache_mngr); + return cache_mngr->get_binlog_cache_data(use_trans_cache); +} - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); +int binlog_flush_pending_rows_event(THD *thd, bool stmt_end, + bool is_transactional, + Event_log *bin_log, + binlog_cache_data *cache_data) +{ + int error= 0; + auto *pending= cache_data->pending(); + if (pending) + { + /* + Mark the event as the last event of a statement if the stmt_end + flag is set. + */ + if (stmt_end) + { + pending->set_flags(Rows_log_event::STMT_END_F); + thd->reset_binlog_for_next_statement(); + } - cache_data->set_pending(ev); + error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_data, + is_transactional); + } + return error; } @@ -6403,18 +6592,10 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) otherwise @c false a non-transactional. */ int -MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) +MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data) { DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event"); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - - DBUG_ASSERT(cache_mngr); - - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); - if (Rows_log_event* pending= cache_data->pending()) { delete pending; @@ -6428,6 +6609,7 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) Moves the last bunch of rows from the pending Rows event to a cache (either transactional cache if is_transaction is @c true, or the non-transactional cache otherwise. Sets a new pending event. + In case of error during flushing, sets write_error=1 to itself. @param thd a pointer to the user thread. @param evt a pointer to the row event. @@ -6435,27 +6617,20 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) otherwise @c false a non-transactional. */ int -MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, - Rows_log_event* event, - bool is_transactional) +Event_log::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, + binlog_cache_data *cache_data, + bool is_transactional) { - DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); - DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); + DBUG_ENTER("Event_log::flush_and_set_pending_rows_event(event)"); + DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || is_open()); DBUG_PRINT("enter", ("event: %p", event)); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - - DBUG_ASSERT(cache_mngr); - - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); - DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending())); if (Rows_log_event* pending= cache_data->pending()) { - Log_event_writer writer(&cache_data->cache_log, cache_data); + Log_event_writer writer(&cache_data->cache_log, cache_data, + pending->select_checksum_alg(cache_data), NULL); /* Write pending event to the cache. @@ -6495,11 +6670,89 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, delete pending; } - thd->binlog_set_pending_rows_event(event, is_transactional); + cache_data->set_pending(event); DBUG_RETURN(0); } +/* + Member function for ensuring that there is an rows log + event of the apropriate type before proceeding. + + POST CONDITION: + If a non-NULL pointer is returned, the pending event for thread 'thd' will + be an event created by callback hold by event_factory, and + will be either empty or have enough space to hold 'needed' bytes. + In addition, the columns bitmap will be correct for the row, meaning that + the pending event will be flushed if the columns in the event differ from + the columns suppled to the function. + + RETURNS + If no error, a non-NULL pending event (either one which already existed or + the newly created one). + If error, NULL. + */ + +Rows_log_event* +Event_log::prepare_pending_rows_event(THD *thd, TABLE* table, + binlog_cache_data *cache_data, + uint32 serv_id, size_t needed, + bool is_transactional, + Rows_event_factory event_factory) +{ + DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event"); + /* Pre-conditions */ + DBUG_ASSERT(table->s->table_map_id != ~0UL); + + /* + There is no good place to set up the transactional data, so we + have to do it here. + */ + Rows_log_event* pending= cache_data->pending(); + + if (unlikely(pending && !pending->is_valid())) + DBUG_RETURN(NULL); + + /* + Check if the current event is non-NULL and a write-rows + event. Also check if the table provided is mapped: if it is not, + then we have switched to writing to a new table. + If there is no pending event, we need to create one. If there is a pending + event, but it's not about the same table id, or not of the same type + (between Write, Update and Delete), or not the same affected columns, or + going to be too big, flush this event to disk and create a new pending + event. + */ + if (!pending || + pending->server_id != serv_id || + pending->get_table_id() != table->s->table_map_id || + pending->get_general_type_code() != event_factory.type_code || + pending->get_data_size() + needed > opt_binlog_rows_event_max_size || + pending->read_write_bitmaps_cmp(table) == FALSE) + { + /* Create a new RowsEventT... */ + Rows_log_event* const + ev= event_factory.create(thd, table, table->s->table_map_id, + is_transactional); + if (unlikely(!ev)) + DBUG_RETURN(NULL); + ev->server_id= serv_id; // I don't like this, it's too easy to forget. + /* + flush the pending event and replace it with the newly created + event... + */ + if (unlikely(flush_and_set_pending_rows_event(thd, ev, cache_data, + is_transactional))) + { + delete ev; + DBUG_RETURN(NULL); + } + + DBUG_RETURN(ev); /* This is the new pending event */ + } + DBUG_RETURN(pending); /* This is the current pending event */ +} + /* Generate a new global transaction ID, and write it to the binlog */ @@ -7014,6 +7267,8 @@ err: { bool synced; + update_gtid_index((uint32)offset, thd->get_last_commit_gtid()); + if ((error= flush_and_sync(&synced))) { } @@ -7091,6 +7346,30 @@ err: } +void +MYSQL_BIN_LOG::update_gtid_index(uint32 offset, rpl_gtid gtid) +{ + if (!unlikely(gtid_index)) + return; + + rpl_gtid *gtid_list; + uint32 gtid_count; + int err= gtid_index->process_gtid_check_batch(offset, >id, + >id_list, >id_count); + if (err) + return; + if (gtid_list) + { + /* + Perform the GTID index update in the binlog background thread, + as we are running under the critical LOCK_log mutex. + */ + if (queue_binlog_background_gtid_index_update(gtid_index, offset, + gtid_list, gtid_count)) + my_free(gtid_list); + } +} + int error_log_print(enum loglevel level, const char *format, va_list args) { @@ -7239,6 +7518,7 @@ MYSQL_BIN_LOG::do_checkpoint_request(ulong binlog_id) int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) { int error= 0; + ulonglong binlog_pos; DBUG_ENTER("MYSQL_BIN_LOG::rotate"); #ifdef WITH_WSREP @@ -7254,7 +7534,8 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log); *check_purge= false; - if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size)) + binlog_pos= my_b_tell(&log_file); + if (force_rotate || binlog_pos >= max_size) { ulong binlog_id= current_binlog_id; /* @@ -7296,18 +7577,31 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) mark_xid_done(binlog_id, false); } else + { *check_purge= true; + binlog_pos= my_b_tell(&log_file); + } } + /* + Purge by size here for every write. Purge based on timestamps is done + by purge() when rotate has been done(). + */ +#ifdef HAVE_REPLICATION + purge_logs_by_size(binlog_pos); +#endif DBUG_RETURN(error); } /** The method executes logs purging routine. + @param all If false, purge only based on binlog_expire_logs_seconds. + If true, purge also based on binlog_space_limit. @retval nonzero - error in rotating routine. */ -void MYSQL_BIN_LOG::purge() + +void MYSQL_BIN_LOG::purge(bool all) { mysql_mutex_assert_not_owner(&LOCK_log); #ifdef HAVE_REPLICATION @@ -7322,14 +7616,21 @@ void MYSQL_BIN_LOG::purge() } DEBUG_SYNC(current_thd, "after_purge_logs_before_date"); } + if (all && binlog_space_limit) + { + ulonglong binlog_pos; + mysql_mutex_lock(&LOCK_log); + binlog_pos= my_b_tell(&log_file); + purge_logs_by_size(binlog_pos); + mysql_mutex_unlock(&LOCK_log); + } #endif } - void MYSQL_BIN_LOG::checkpoint_and_purge(ulong binlog_id) { do_checkpoint_request(binlog_id); - purge(); + purge(0); } @@ -7509,37 +7810,27 @@ uint MYSQL_BIN_LOG::next_file_id() return res; } -class CacheWriter: public Log_event_writer -{ -public: - size_t remains; - - CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum, - Binlog_crypt_data *cr) - : Log_event_writer(file_arg, 0, cr), remains(0), thd(thd_arg), - first(true) - { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; } - ~CacheWriter() - { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } +int Event_log::write_cache_raw(THD *thd, IO_CACHE *cache) +{ + DBUG_ENTER("Event_log::write_cache_raw"); + mysql_mutex_assert_owner(&LOCK_log); + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + DBUG_RETURN(ER_ERROR_ON_WRITE); - int write(uchar* pos, size_t len) + IO_CACHE *file= get_log_file(); + IF_DBUG(size_t total= cache->end_of_file,); + do { - DBUG_ENTER("CacheWriter::write"); - if (first) - write_header(pos, len); - else - write_data(pos, len); - - remains -= len; - if ((first= !remains)) - write_footer(); - DBUG_RETURN(0); - } -private: - THD *thd; - bool first; -}; + size_t read_len= cache->read_end - cache->read_pos; + int res= my_b_safe_write(file, cache->read_pos, read_len); + if (unlikely(res)) + DBUG_RETURN(res); + IF_DBUG(total-= read_len,); + } while (my_b_fill(cache)); + DBUG_ASSERT(total == 0); + DBUG_RETURN(0); +} /* Write the contents of a cache to the binary log. @@ -7558,18 +7849,37 @@ private: events prior to fill in the binlog cache. */ -int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) +int Event_log::write_cache(THD *thd, binlog_cache_data *cache_data) { - DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); - + DBUG_ENTER("Event_log::write_cache"); + IO_CACHE *cache= &cache_data->cache_log; mysql_mutex_assert_owner(&LOCK_log); + + /* + If possible, just copy the cache over byte-by-byte with pre-computed + checksums. + */ + if (likely(binlog_checksum_options == (ulong)cache_data->checksum_opt) && + likely(!crypto.scheme) && + likely(!opt_binlog_legacy_event_pos)) + { + int res= my_b_copy_all_to_cache(cache, &log_file); + status_var_add(thd->status_var.binlog_bytes_written, my_b_tell(cache)); + DBUG_RETURN(res ? ER_ERROR_ON_WRITE : 0); + } + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) DBUG_RETURN(ER_ERROR_ON_WRITE); - size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; - size_t val; - size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t - uchar header[LOG_EVENT_HEADER_LEN]; - CacheWriter writer(thd, &log_file, binlog_checksum_options, &crypto); + /* Amount of remaining bytes in the IO_CACHE read buffer. */ + size_t log_file_pos; + uchar header_buf[LOG_EVENT_HEADER_LEN]; + Log_event_writer writer(get_log_file(), 0, + (enum_binlog_checksum_alg)binlog_checksum_options, + &crypto); + uint checksum_len= writer.checksum_len; + uint old_checksum_len= (cache_data->checksum_opt != BINLOG_CHECKSUM_ALG_OFF) ? + BINLOG_CHECKSUM_LEN : 0; + int err= 0; if (crypto.scheme) { @@ -7594,129 +7904,79 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) split. */ - group= (size_t)my_b_tell(&log_file); - hdr_offs= carry= 0; - - do + log_file_pos= (size_t)my_b_tell(get_log_file()); + for (;;) { /* - if we only got a partial header in the last iteration, - get the other half now and process a full header. + Empty cache at an event boundary means we are done (but empty cache + elsewhere is an error). */ - if (unlikely(carry > 0)) - { - DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); - size_t tail= LOG_EVENT_HEADER_LEN - carry; - - /* assemble both halves */ - memcpy(&header[carry], (char *)cache->read_pos, tail); - - uint32 len= uint4korr(header + EVENT_LEN_OFFSET); - writer.remains= len; - - /* fix end_log_pos */ - end_log_pos_inc += writer.checksum_len; - val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; - int4store(header + LOG_POS_OFFSET, val); - - /* fix len */ - len+= writer.checksum_len; - int4store(header + EVENT_LEN_OFFSET, len); - - if (writer.write(header, LOG_EVENT_HEADER_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - - cache->read_pos+= tail; - length-= tail; - carry= 0; - - /* next event header at ... */ - hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len; - } - - /* if there is anything to write, process it. */ + if (my_b_tell(cache) == cache->end_of_file) + break; - if (likely(length > 0)) - { - DBUG_EXECUTE_IF("fail_binlog_write_1", - errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); - /* - process all event-headers in this (partial) cache. - if next header is beyond current read-buffer, - we'll get it later (though not necessarily in the - very next iteration, just "eventually"). - */ + DBUG_EXECUTE_IF("fail_binlog_write_1", + { + errno= 28; + goto error_in_write; + }); - if (hdr_offs >= length) + if (my_b_read(cache, header_buf, LOG_EVENT_HEADER_LEN)) + goto error_in_read; + + /* Adjust the length and end_log_pos appropriately. */ + uint ev_len= uint4korr(&header_buf[EVENT_LEN_OFFSET]); // netto len + DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN + old_checksum_len); + if (unlikely(ev_len < LOG_EVENT_HEADER_LEN + old_checksum_len)) + goto error_in_read; + uint new_len= ev_len - old_checksum_len + checksum_len; + int4store(&header_buf[EVENT_LEN_OFFSET], new_len); + log_file_pos+= new_len; + int4store(&header_buf[LOG_POS_OFFSET], log_file_pos); + + /* Write the header to the binlog. */ + if (writer.write_header(header_buf, LOG_EVENT_HEADER_LEN)) + goto error_in_write; + ev_len-= (LOG_EVENT_HEADER_LEN + old_checksum_len); + + /* Write the rest of the event. */ + size_t length= my_b_bytes_in_cache(cache); + while (ev_len > 0) + { + if (length == 0) { - if (writer.write(cache->read_pos, length)) - DBUG_RETURN(ER_ERROR_ON_WRITE); + if (!(length= my_b_fill(cache))) + goto error_in_read; } + uint chunk= MY_MIN(ev_len, (uint)length); + if (writer.write_data(cache->read_pos, chunk)) + goto error_in_write; + cache->read_pos+= chunk; + length-= chunk; + ev_len-= chunk; + } + /* + Discard any old precomputed checksum len (any needed checksum will be + written by writer.write_footer()). + */ + if (old_checksum_len > 0 && my_b_read(cache, header_buf, old_checksum_len)) + goto error_in_read; + if (writer.write_footer()) + goto error_in_write; - while (hdr_offs < length) - { - /* - finish off with remains of the last event that crawls - from previous into the current buffer - */ - if (writer.remains != 0) - { - if (writer.write(cache->read_pos, hdr_offs)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - - /* - partial header only? save what we can get, process once - we get the rest. - */ - if (hdr_offs + LOG_EVENT_HEADER_LEN > length) - { - carry= length - hdr_offs; - memcpy(header, (char *)cache->read_pos + hdr_offs, carry); - length= hdr_offs; - } - else - { - /* we've got a full event-header, and it came in one piece */ - uchar *ev= (uchar *)cache->read_pos + hdr_offs; - uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len - uchar *log_pos= ev + LOG_POS_OFFSET; - - end_log_pos_inc += writer.checksum_len; - /* fix end_log_pos */ - val= uint4korr(log_pos) + group + end_log_pos_inc; - int4store(log_pos, val); - - /* fix length */ - int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len); - - writer.remains= ev_len; - if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs))) - DBUG_RETURN(ER_ERROR_ON_WRITE); - - /* next event header at ... */ - hdr_offs += ev_len; // incr by the netto len - - DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length); - } - } + } + goto end; // All OK - /* - Adjust hdr_offs. Note that it may still point beyond the segment - read in the next iteration; if the current event is very long, - it may take a couple of read-iterations (and subsequent adjustments - of hdr_offs) for it to point into the then-current segment. - If we have a split header (!carry), hdr_offs will be set at the - beginning of the next iteration, overwriting the value we set here: - */ - hdr_offs -= length; - } - } while ((length= my_b_fill(cache))); +error_in_write: + err= ER_ERROR_ON_WRITE; + goto end; - DBUG_ASSERT(carry == 0); - DBUG_ASSERT(!writer.checksum_len || writer.remains == 0); +error_in_read: + err= ER_ERROR_ON_READ; + goto end; - DBUG_RETURN(0); // All OK +end: + status_var_add(thd->status_var.binlog_bytes_written, writer.bytes_written); + DBUG_RETURN(err); } /* @@ -7836,15 +8096,19 @@ MYSQL_BIN_LOG:: write_binlog_checkpoint_event_already_locked(const char *name_arg, uint len) { my_off_t offset; + bool err; Binlog_checkpoint_log_event ev(name_arg, len); + /* Note that we must sync the binlog checkpoint to disk. Otherwise a subsequent log purge could delete binlogs that XA recovery thinks are needed (even though they are not really). */ - if (!write_event(&ev) && !flush_and_sync(0)) + err= write_event(&ev) || flush_and_sync(0); + offset= my_b_tell(&log_file); + if (!err) { - update_binlog_end_pos(); + update_binlog_end_pos(offset); } else { @@ -7859,10 +8123,6 @@ write_binlog_checkpoint_event_already_locked(const char *name_arg, uint len) sql_print_error("Failed to write binlog checkpoint event to binary log"); } - offset= my_b_tell(&log_file); - - update_binlog_end_pos(offset); - /* Take mutex to protect against a reader seeing partial writes of 64-bit offset on 32-bit CPUs. @@ -8570,6 +8830,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); commit_offset= my_b_write_tell(&log_file); + update_gtid_index((uint32)commit_offset, + current->thd->get_last_commit_gtid()); cache_mngr->last_commit_pos_offset= commit_offset; if ((cache_mngr->using_xa && cache_mngr->xa_xid) || current->need_unlog) { @@ -8846,7 +9108,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_RETURN(ER_ERROR_ON_WRITE); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && - write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) + write_cache(entry->thd, mngr->get_binlog_cache_data(FALSE))) { entry->error_cache= &mngr->stmt_cache.cache_log; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -8857,7 +9119,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_EXECUTE_IF("crash_before_writing_xid", { if ((write_cache(entry->thd, - mngr->get_binlog_cache_log(TRUE)))) + mngr->get_binlog_cache_data(TRUE)))) DBUG_PRINT("info", ("error writing binlog cache")); else flush_and_sync(0); @@ -8866,7 +9128,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_SUICIDE(); }); - if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE))) + if (write_cache(entry->thd, mngr->get_binlog_cache_data(TRUE))) { entry->error_cache= &mngr->trx_cache.cache_log; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -9134,11 +9396,11 @@ void MYSQL_BIN_LOG::close(uint exiting) { Stop_log_event s; // the checksumming rule for relay-log case is similar to Rotate - s.checksum_alg= is_relay_log ? relay_log_checksum_alg - : (enum_binlog_checksum_alg)binlog_checksum_options; - DBUG_ASSERT(!is_relay_log || - relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - write_event(&s); + enum_binlog_checksum_alg checksum_alg= is_relay_log ? + relay_log_checksum_alg : + (enum_binlog_checksum_alg)binlog_checksum_options; + DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + write_event(&s, checksum_alg); bytes_written+= s.data_written; flush_io_cache(&log_file); update_binlog_end_pos(); @@ -9166,6 +9428,33 @@ void MYSQL_BIN_LOG::close(uint exiting) } #endif /* HAVE_REPLICATION */ + if (!is_relay_log && likely(gtid_index)) + { + if (exiting & (LOG_CLOSE_STOP_EVENT|LOG_CLOSE_SYNC_GTID_INDEX)) + { + /* + The binlog background thread is already stopped just close the final + GTID index synchronously. Or caller explicitly requested synchronous + close of the GTID index. + */ + gtid_index->close(); + delete gtid_index; + } + else + { + /* + Queue a close on the current GTID index. + Important that this is queued _before_ the checkpoint request is sent + (and thus before chechpoint notifications can be queued); this way, if + we crash before the GTID index is synced to disk, the checkpoint will + still be pending and the binlog file will be scanned during crash + recovery and the GTID index recovered. + */ + queue_binlog_background_gtid_index_close(gtid_index); + } + gtid_index= nullptr; + } + /* don't pwrite in a file opened with O_APPEND - it doesn't work */ if (log_file.type == WRITE_CACHE && !(exiting & LOG_CLOSE_DELAYED_CLOSE)) { @@ -10759,22 +11048,7 @@ void TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) { xid_count_per_binlog *entry= static_cast<xid_count_per_binlog *>(cookie); - bool found_entry= false; - mysql_mutex_lock(&LOCK_binlog_background_thread); - /* count the same notification kind from different engines */ - for (xid_count_per_binlog *link= binlog_background_thread_queue; - link && !found_entry; link= link->next_in_queue) - { - if ((found_entry= (entry == link))) - entry->notify_count++; - } - if (!found_entry) - { - entry->next_in_queue= binlog_background_thread_queue; - binlog_background_thread_queue= entry; - } - mysql_cond_signal(&COND_binlog_background_thread); - mysql_mutex_unlock(&LOCK_binlog_background_thread); + queue_binlog_background_checkpoint_notify(entry); } /* @@ -10793,7 +11067,9 @@ pthread_handler_t binlog_background_thread(void *arg __attribute__((unused))) { bool stop; - MYSQL_BIN_LOG::xid_count_per_binlog *queue, *next; + Binlog_background_job *queue, *next; + Binlog_background_job *freelist= nullptr; + Binlog_background_job **freelist_endptr= &freelist; THD *thd; my_thread_init(); DBUG_ENTER("binlog_background_thread"); @@ -10837,6 +11113,18 @@ binlog_background_thread(void *arg __attribute__((unused))) */ THD_STAGE_INFO(thd, stage_binlog_waiting_background_tasks); mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + + /* + Put back our job objects in the freelist, now that we own the mutex again. + */ + if (freelist) + { + *freelist_endptr= binlog_background_freelist; + binlog_background_freelist= freelist; + freelist= nullptr; + freelist_endptr= &freelist; + } + for (;;) { stop= binlog_background_thread_stop; @@ -10855,6 +11143,7 @@ binlog_background_thread(void *arg __attribute__((unused))) } /* Grab the queue, if any. */ binlog_background_thread_queue= NULL; + binlog_background_thread_endptr= &binlog_background_thread_queue; mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); /* Process any incoming commit_checkpoint_notify() calls. */ @@ -10870,17 +11159,40 @@ binlog_background_thread(void *arg __attribute__((unused))) #endif while (queue) { - long count= queue->notify_count; - THD_STAGE_INFO(thd, stage_binlog_processing_checkpoint_notify); - DEBUG_SYNC(thd, "binlog_background_thread_before_mark_xid_done"); - /* Set the thread start time */ - thd->set_time(); - /* Grab next pointer first, as mark_xid_done() may free the element. */ - next= queue->next_in_queue; - queue->notify_count= 0; - for (long i= 0; i <= count; i++) - mysql_bin_log.mark_xid_done(queue->binlog_id, true); - queue= next; + switch (queue->job_type) + { + case Binlog_background_job::CHECKPOINT_NOTIFY: + THD_STAGE_INFO(thd, stage_binlog_processing_checkpoint_notify); + DEBUG_SYNC(thd, "binlog_background_thread_before_mark_xid_done"); + /* Set the thread start time */ + thd->set_time(); + mysql_bin_log.mark_xid_done(queue->notify_entry->binlog_id, true); + break; + + case Binlog_background_job::GTID_INDEX_UPDATE: + queue->gtid_index_data.gi-> + async_update(queue->gtid_index_data.offset, + queue->gtid_index_data.gtid_list, + queue->gtid_index_data.gtid_count); + break; + + case Binlog_background_job::GTID_INDEX_CLOSE: + queue->gtid_index_data.gi->close(); + delete queue->gtid_index_data.gi; + break; + + case Binlog_background_job::SENTINEL: + /* + The sentinel is a way to signal to reset_logs() that all pending + background jobs prior to the sentinel have been processed. + */ + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + DBUG_ASSERT(binlog_background_thread_sentinel); + binlog_background_thread_sentinel= false; + mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + break; + } #ifdef ENABLED_DEBUG_SYNC DBUG_EXECUTE_IF("binlog_background_checkpoint_processed", @@ -10889,6 +11201,12 @@ binlog_background_thread(void *arg __attribute__((unused))) STRING_WITH_LEN("now SIGNAL binlog_background_checkpoint_processed"))); ); #endif + + next= queue->next; + queue->next= nullptr; + *freelist_endptr= queue; + freelist_endptr= &queue->next; + queue= next; } if (stop) @@ -10897,6 +11215,13 @@ binlog_background_thread(void *arg __attribute__((unused))) THD_STAGE_INFO(thd, stage_binlog_stopping_background_thread); + while (freelist) + { + next= freelist->next; + my_free(freelist); + freelist= next; + } + /* No need to use mutex as thd is not linked into other threads */ THD_count::count++; delete thd; @@ -10905,6 +11230,12 @@ binlog_background_thread(void *arg __attribute__((unused))) /* Signal that we are (almost) stopped. */ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + while (binlog_background_freelist) + { + next= binlog_background_freelist->next; + my_free(binlog_background_freelist); + binlog_background_freelist= next; + } binlog_background_thread_stop= false; mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end); mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); @@ -10948,6 +11279,139 @@ start_binlog_background_thread() return 0; } + + +static Binlog_background_job * +get_binlog_background_job() +{ + Binlog_background_job *job; + mysql_mutex_assert_owner(&mysql_bin_log.LOCK_binlog_background_thread); + + if ((job= binlog_background_freelist) != nullptr) + binlog_background_freelist= job->next; + else + job= (Binlog_background_job *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*job), + MYF(MY_WME)); + + return job; +} + + +static void +queue_binlog_background_job(Binlog_background_job *job) +{ + mysql_mutex_assert_owner(&mysql_bin_log.LOCK_binlog_background_thread); + + job->next= nullptr; + *binlog_background_thread_endptr= job; + binlog_background_thread_endptr= &job->next; + mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread); +} + + +static int +queue_binlog_background_checkpoint_notify( + MYSQL_BIN_LOG::xid_count_per_binlog *entry) +{ + int res; + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + Binlog_background_job *job= get_binlog_background_job(); + if (!job) + res= 1; + else + { + job->job_type= Binlog_background_job::CHECKPOINT_NOTIFY; + job->notify_entry= entry; + queue_binlog_background_job(job); + res= 0; + } + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + return res; +} + + +static int +queue_binlog_background_gtid_index_update(Gtid_index_writer *gi, uint32 offset, + rpl_gtid *gtid_list, uint32 count) +{ + int res; + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + Binlog_background_job *job= get_binlog_background_job(); + if (!unlikely(job)) + res= 1; + else + { + job->job_type= Binlog_background_job::GTID_INDEX_UPDATE; + job->gtid_index_data.gi= gi; + job->gtid_index_data.gtid_list= gtid_list; + job->gtid_index_data.gtid_count= count; + job->gtid_index_data.offset= offset; + queue_binlog_background_job(job); + res= 0; + } + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + return res; +} + + +static int +queue_binlog_background_gtid_index_close(Gtid_index_writer *gi) +{ + int res; + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + Binlog_background_job *job= get_binlog_background_job(); + if (!job) + return 1; + else + { + job->job_type= Binlog_background_job::GTID_INDEX_CLOSE; + job->gtid_index_data.gi= gi; + queue_binlog_background_job(job); + res= 0; + } + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + return res; +} + + +static int +queue_binlog_background_sentinel() +{ + int res; + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + DBUG_ASSERT(!binlog_background_thread_sentinel); + Binlog_background_job *job= get_binlog_background_job(); + if (!job) + return 1; + else + { + binlog_background_thread_sentinel= true; + job->job_type= Binlog_background_job::SENTINEL; + queue_binlog_background_job(job); + res= 0; + } + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + return res; +} + +static void +binlog_background_wait_for_sentinel() +{ + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + while(binlog_background_thread_sentinel) + mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end, + &mysql_bin_log.LOCK_binlog_background_thread); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); +} + + #ifdef HAVE_REPLICATION class Recovery_context { @@ -11214,7 +11678,7 @@ bool Recovery_context::reset_truncate_coord(my_off_t pos) for (uint i= 0; i < gtid_maybe_to_truncate->elements(); i++) { rpl_gtid gtid= gtid_maybe_to_truncate->at(i); - if (rpl_global_gtid_binlog_state.update_nolock(>id, false)) + if (rpl_global_gtid_binlog_state.update_nolock(>id)) return true; } gtid_maybe_to_truncate->clear(); @@ -11306,7 +11770,7 @@ bool Recovery_context::decide_or_assess(xid_recovery_member *member, int round, if (truncate_gtid.seq_no == 0 /* was reset or never set */ || (truncate_set_in_1st && round == 2 /* reevaluted at round turn */)) { - if (set_truncate_coord(linfo, round, fdle->checksum_alg)) + if (set_truncate_coord(linfo, round, fdle->used_checksum_alg)) return true; } else @@ -11479,11 +11943,13 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, Format_description_log_event *fdle, bool do_xa) { Log_event *ev= NULL; + Gtid_index_writer *gtid_index_recover= NULL; HASH xids, ddl_log_ids; MEM_ROOT mem_root; char binlog_checkpoint_name[FN_REFLEN]; bool binlog_checkpoint_found; IO_CACHE log; + IO_CACHE *cur_log; File file= -1; const char *errmsg; #ifdef HAVE_REPLICATION @@ -11530,12 +11996,16 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, */ binlog_checkpoint_found= false; + cur_log= first_log; for (round= 1;;) { - while ((ev= Log_event::read_log_event(round == 1 ? first_log : &log, - fdle, opt_master_verify_checksum)) + while ((ev= Log_event::read_log_event(cur_log, fdle, + opt_master_verify_checksum)) && ev->is_valid()) { +#ifdef HAVE_REPLICATION + my_off_t end_pos= my_b_tell(cur_log); +#endif enum Log_event_type typ= ev->get_type_code(); switch (typ) { @@ -11552,7 +12022,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, member->decided_to_commit= true; } #else - if (ctx.decide_or_assess(member, round, fdle, linfo, ev->log_pos)) + if (ctx.decide_or_assess(member, round, fdle, linfo, end_pos)) goto err2; #endif } @@ -11610,6 +12080,8 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, /* Initialise the binlog state from the Gtid_list event. */ if (rpl_global_gtid_binlog_state.load(glev->list, glev->count)) goto err2; + if (opt_binlog_gtid_index) + gtid_index_recover= recover_gtid_index_start(last_log_name, end_pos); } break; @@ -11649,15 +12121,19 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback())))); - if (rpl_global_gtid_binlog_state.update_nolock(&ctx.last_gtid, false)) + recover_gtid_index_process(gtid_index_recover, end_pos, &ctx.last_gtid); + if (rpl_global_gtid_binlog_state.update_nolock(&ctx.last_gtid)) goto err2; ctx.last_gtid_valid= false; } - ctx.prev_event_pos= ev->log_pos; + ctx.prev_event_pos= end_pos; #endif delete ev; ev= NULL; } // end of while + recover_gtid_index_end(gtid_index_recover); + gtid_index_recover= NULL; + cur_log= &log; /* If the last binlog checkpoint event points to an older log, we have to @@ -11742,6 +12218,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, err2: delete ev; + recover_gtid_index_abort(gtid_index_recover); if (file >= 0) { end_io_cache(&log); @@ -11760,6 +12237,104 @@ err1: } +/* + Start recovery of the GTID index for a binlog file. + The old index is deleted and a new index is rebuilt while scanning the + binlog file during binlog recovery. + Errors are not fatal, as the code can fallback to slower full binlog file + scan when no GTID index is available. + + @param base_name File name of the binlog file. + @param offset End log pos of the GTID_LIST log event of the binlog file. + + @return Gtid_index_writer object or NULL. +*/ +Gtid_index_writer * +MYSQL_BIN_LOG::recover_gtid_index_start(const char *base_name, my_off_t offset) +{ + char buf[Gtid_index_base::GTID_INDEX_FILENAME_MAX_SIZE]; + + Gtid_index_base::make_gtid_index_file_name(buf, sizeof(buf), base_name); + if (my_delete(buf, MYF(0))) + { + /* If ENOENT, the GTID index file is already deleted or never existed. */ + if (my_errno != ENOENT) + { + sql_print_information("Failed to delete file '%s' (errno=%d)", buf, my_errno); + } + my_errno= 0; + } + Gtid_index_writer *gi= + new Gtid_index_writer(base_name, (uint32)offset, + &rpl_global_gtid_binlog_state, + (uint32)opt_binlog_gtid_index_page_size, + (my_off_t)opt_binlog_gtid_index_span_min); + return gi; +} + + +/* + Process one GTID during GTID index recovery. + + @param gi Gtid_index_writer object or NULL. + @param offset End log pos of the GTID event. + @param gev GTID log event to process. + + @return nothing +*/ +void +MYSQL_BIN_LOG::recover_gtid_index_process(Gtid_index_writer *gi, + my_off_t offset, + const rpl_gtid *gtid) +{ + if (gi) + { + gi->process_gtid((uint32)offset, gtid); + } +} + + +/* + Complete the recovery of one GTID index, syncing and closing it. + + @param gi Gtid_index_writer object or NULL. + + @return nothing +*/ +void +MYSQL_BIN_LOG::recover_gtid_index_end(Gtid_index_writer *gi) +{ + if (gi) + { + gi->close(); + delete gi; + } +} + + +/* + Abort the recovery of one GTID index, deleting any partially recovered index. + + @param gi Gtid_index_writer object or NULL. + + @return nothing +*/ +void +MYSQL_BIN_LOG::recover_gtid_index_abort(Gtid_index_writer *gi) +{ + if (gi) + { + char buf[Gtid_index_base::GTID_INDEX_FILENAME_MAX_SIZE]; + strmake(buf, gi->index_file_name, sizeof(buf)-1); + /* + Delete first the Gtid_index_writer object and then the partial index + (the writer still has the index file open and active until destructed). + */ + delete(gi); + my_delete(buf, MYF(0)); + } +} + int MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery) @@ -11881,7 +12456,7 @@ mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file) { binlog_cache_mngr *cache_mngr; if (opt_bin_log && - (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton))) + (cache_mngr= thd->binlog_get_cache_mngr())) { *out_file= cache_mngr->last_commit_pos_file; *out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset); @@ -11996,7 +12571,7 @@ TC_LOG_BINLOG::set_status_variables(THD *thd) if (thd && opt_bin_log) { mysql_mutex_lock(&thd->LOCK_thd_data); - auto cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + auto cache_mngr= thd->binlog_get_cache_mngr(); have_snapshot= cache_mngr && cache_mngr->last_commit_pos_file[0]; if (have_snapshot) { @@ -12120,8 +12695,7 @@ maria_declare_plugin_end; IO_CACHE *wsrep_get_cache(THD * thd, bool is_transactional) { DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF); - binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) - thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *cache_mngr = thd->binlog_get_cache_mngr(); if (cache_mngr) return cache_mngr->get_binlog_cache_log(is_transactional); @@ -12146,8 +12720,7 @@ void wsrep_thd_binlog_trx_reset(THD * thd) /* todo: fix autocommit select to not call the caller */ - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); if (cache_mngr) { cache_mngr->reset(false, true); @@ -12165,11 +12738,11 @@ void wsrep_thd_binlog_stmt_rollback(THD * thd) { DBUG_ENTER("wsrep_thd_binlog_stmt_rollback"); WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback"); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); if (cache_mngr) { - thd->binlog_remove_pending_rows_event(TRUE, TRUE); + MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache); + thd->reset_binlog_for_next_statement(); cache_mngr->stmt_cache.reset(); } DBUG_VOID_RETURN; @@ -12191,8 +12764,7 @@ void wsrep_register_binlog_handler(THD *thd, bool trx) back a statement or a transaction. However, notifications do not happen if the binary log is set as read/write. */ - binlog_cache_mngr *cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr(); /* cache_mngr may be missing e.g. in mtr test ev51914.test */ if (cache_mngr) { |