diff options
Diffstat (limited to '')
-rw-r--r-- | sql/log_event_server.cc | 2024 |
1 files changed, 547 insertions, 1477 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 6e22a3ab..f7029b39 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -53,6 +53,7 @@ #include "wsrep_mysqld.h" #include "sql_insert.h" #include "sql_table.h" +#include <mysql/service_wsrep.h> #include <my_bitmap.h> #include "rpl_utility.h" @@ -232,19 +233,12 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error, #if defined(HAVE_REPLICATION) static void set_thd_db(THD *thd, Rpl_filter *rpl_filter, - const char *db, uint32 db_len) + const LEX_CSTRING &db) { - char lcase_db_buf[NAME_LEN +1]; - LEX_CSTRING new_db; - new_db.length= db_len; - if (lower_case_table_names == 1) - { - strmov(lcase_db_buf, db); - my_casedn_str(system_charset_info, lcase_db_buf); - new_db.str= lcase_db_buf; - } - else - new_db.str= db; + IdentBuffer<NAME_LEN> lcase_db_buf; + LEX_CSTRING new_db= lower_case_table_names == 1 ? + lcase_db_buf.copy_casedn(db).to_lex_cstring() : + db; /* TODO WARNING this makes rewrite_db respect lower_case_table_names values * for more info look MDEV-17446 */ new_db.str= rpl_filter->get_rewrite_db(new_db.str, &new_db.length); @@ -367,37 +361,6 @@ inline bool unexpected_error_code(int unexpected_error) } } -/* - pretty_print_str() -*/ - -static void -pretty_print_str(String *packet, const char *str, int len) -{ - const char *end= str + len; - packet->append(STRING_WITH_LEN("'")); - while (str < end) - { - char c; - switch ((c=*str++)) { - case '\n': packet->append(STRING_WITH_LEN("\\n")); break; - case '\r': packet->append(STRING_WITH_LEN("\\r")); break; - case '\\': packet->append(STRING_WITH_LEN("\\\\")); break; - case '\b': packet->append(STRING_WITH_LEN("\\b")); break; - case '\t': packet->append(STRING_WITH_LEN("\\t")); break; - case '\'': packet->append(STRING_WITH_LEN("\\'")); break; - case 0 : packet->append(STRING_WITH_LEN("\\0")); break; - default: - packet->append(&c, 1); - break; - } - } - packet->append(STRING_WITH_LEN("'")); -} -#endif /* HAVE_REPLICATION */ - - -#if defined(HAVE_REPLICATION) /** Create a prefix for the temporary files that is to be used for @@ -540,7 +503,7 @@ int append_query_string(CHARSET_INFO *csinfo, String *to, beg= (char*) to->ptr() + to->length(); ptr= beg; if (csinfo->escape_with_backslash_is_dangerous) - ptr= str_to_hex(ptr, str, len); + ptr= str_to_hex(ptr, (uchar*)str, len); else { *ptr++= '\''; @@ -574,8 +537,8 @@ int append_query_string(CHARSET_INFO *csinfo, String *to, **************************************************************************/ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) - :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg), - checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) + :log_pos(0), temp_buf(0), exec_time(0), + slave_exec_mode(SLAVE_EXEC_MODE_STRICT), thd(thd_arg) { server_id= thd->variables.server_id; when= thd->start_time; @@ -599,7 +562,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event() :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE), - thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) + slave_exec_mode(SLAVE_EXEC_MODE_STRICT), thd(0) { server_id= global_system_variables.server_id; /* @@ -620,29 +583,17 @@ int Log_event::do_update_pos(rpl_group_info *rgi) Relay_log_info *rli= rgi->rli; DBUG_ENTER("Log_event::do_update_pos"); + DBUG_ASSERT(rli); DBUG_ASSERT(!rli->belongs_to_client()); + /* - rli is null when (as far as I (Guilhem) know) the caller is - Load_log_event::do_apply_event *and* that one is called from - Execute_load_log_event::do_apply_event. In this case, we don't - do anything here ; Execute_load_log_event::do_apply_event will - call Log_event::do_apply_event again later with the proper rli. - Strictly speaking, if we were sure that rli is null only in the - case discussed above, 'if (rli)' is useless here. But as we are - not 100% sure, keep it for now. - - Matz: I don't think we will need this check with this refactoring. + In parallel execution, delay position update for the events that are + not part of event groups (format description, rotate, and such) until + the actual event execution reaches that point. */ - if (rli) - { - /* - In parallel execution, delay position update for the events that are - not part of event groups (format description, rotate, and such) until - the actual event execution reaches that point. - */ - if (!rgi->is_parallel_exec || is_group_event(get_type_code())) - rli->stmt_done(log_pos, thd, rgi); - } + if (!rgi->is_parallel_exec || is_group_event(get_type_code())) + rli->stmt_done(log_pos, thd, rgi); + DBUG_RETURN(0); // Cannot fail currently } @@ -736,85 +687,6 @@ void Log_event::init_show_field_list(THD *thd, List<Item>* field_list) mem_root); } -/** - A decider of whether to trigger checksum computation or not. - To be invoked in Log_event::write() stack. - The decision is positive - - S,M) if it's been marked for checksumming with @c checksum_alg - - M) otherwise, if @@global.binlog_checksum is not NONE and the event is - directly written to the binlog file. - The to-be-cached event decides at @c write_cache() time. - - Otherwise the decision is negative. - - @note A side effect of the method is altering Log_event::checksum_alg - it the latter was undefined at calling. - - @return true Checksum should be used. Log_event::checksum_alg is set. - @return false No checksum -*/ - -my_bool Log_event::need_checksum() -{ - my_bool ret; - DBUG_ENTER("Log_event::need_checksum"); - - /* - few callers of Log_event::write - (incl FD::write, FD constructing code on the slave side, Rotate relay log - and Stop event) - provides their checksum alg preference through Log_event::checksum_alg. - */ - if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) - ret= checksum_alg != BINLOG_CHECKSUM_ALG_OFF; - else - { - ret= binlog_checksum_options && cache_type == Log_event::EVENT_NO_CACHE; - checksum_alg= ret ? (enum_binlog_checksum_alg)binlog_checksum_options - : BINLOG_CHECKSUM_ALG_OFF; - } - /* - FD calls the methods before data_written has been calculated. - The following invariant claims if the current is not the first - call (and therefore data_written is not zero) then `ret' must be - TRUE. It may not be null because FD is always checksummed. - */ - - DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret || - data_written == 0); - - DBUG_ASSERT(!ret || - ((checksum_alg == binlog_checksum_options || - /* - Stop event closes the relay-log and its checksum alg - preference is set by the caller can be different - from the server's binlog_checksum_options. - */ - get_type_code() == STOP_EVENT || - /* - Rotate:s can be checksummed regardless of the server's - binlog_checksum_options. That applies to both - the local RL's Rotate and the master's Rotate - which IO thread instantiates via queue_binlog_ver_3_event. - */ - get_type_code() == ROTATE_EVENT || - get_type_code() == START_ENCRYPTION_EVENT || - /* FD is always checksummed */ - get_type_code() == FORMAT_DESCRIPTION_EVENT) && - checksum_alg != BINLOG_CHECKSUM_ALG_OFF)); - - DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - - DBUG_ASSERT(((get_type_code() != ROTATE_EVENT && - get_type_code() != STOP_EVENT) || - get_type_code() != FORMAT_DESCRIPTION_EVENT) || - cache_type == Log_event::EVENT_NO_CACHE); - - DBUG_RETURN(ret); -} - int Log_event_writer::write_internal(const uchar *pos, size_t len) { DBUG_ASSERT(!ctx || encrypt_or_write == &Log_event_writer::encrypt_and_write); @@ -954,7 +826,7 @@ int Log_event_writer::write_footer() Log_event::write_header() */ -bool Log_event::write_header(size_t event_data_length) +bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length) { uchar header[LOG_EVENT_HEADER_LEN]; ulong now; @@ -963,8 +835,6 @@ bool Log_event::write_header(size_t event_data_length) (longlong) writer->pos(), event_data_length, (int) get_type_code())); - writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0; - /* Store number of bytes that will be written by this event */ data_written= event_data_length + sizeof(header) + writer->checksum_len; @@ -973,11 +843,17 @@ bool Log_event::write_header(size_t event_data_length) change the position */ - if (is_artificial_event()) + if (is_artificial_event() || + cache_type == Log_event::EVENT_STMT_CACHE || + cache_type == Log_event::EVENT_TRANSACTIONAL_CACHE) { /* Artificial events are automatically generated and do not exist in master's binary log, so log_pos should be set to 0. + + Events written through transaction or statement cache have log_pos set + to 0 so that they can be copied directly to the binlog without having + to compute the real end_log_pos. */ log_pos= 0; } @@ -1112,7 +988,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src, will print! */ -bool Query_log_event::write() +bool Query_log_event::write(Log_event_writer *writer) { uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS]; uchar *start, *start_of_status; @@ -1184,7 +1060,7 @@ bool Query_log_event::write() if (catalog_len) // i.e. this var is inited (false for 4.0 events) { store_str_with_code_and_len(&start, - catalog, catalog_len, Q_CATALOG_NZ_CODE); + catalog, catalog_len, (uint) Q_CATALOG_NZ_CODE); /* In 5.0.x where x<4 masters we used to store the end zero here. This was a waste of one byte so we don't do it in x>=4 masters. We change code to @@ -1211,6 +1087,14 @@ bool Query_log_event::write() int2store(start+2, auto_increment_offset); start+= 4; } + + if (thd && (thd->used & THD::CHARACTER_SET_COLLATIONS_USED)) + { + *start++= Q_CHARACTER_SET_COLLATIONS; + size_t len= thd->variables.character_set_collations.to_binary((char*)start); + start+= len; + } + if (charset_inited) { *start++= Q_CHARSET_CODE; @@ -1244,18 +1128,6 @@ bool Query_log_event::write() int8store(start, table_map_for_update); start+= 8; } - if (master_data_written != 0) - { - /* - Q_MASTER_DATA_WRITTEN_CODE only exists in relay logs where the master - has binlog_version<4 and the slave has binlog_version=4. See comment - for master_data_written in log_event.h for details. - */ - *start++= Q_MASTER_DATA_WRITTEN_CODE; - int4store(start, master_data_written); - start+= 4; - } - if (thd && thd->need_binlog_invoker()) { LEX_CSTRING user; @@ -1365,16 +1237,16 @@ bool Query_log_event::write() event_length= ((uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len); - return write_header(event_length) || - write_data(buf, QUERY_HEADER_LEN) || - write_post_header_for_derived() || - write_data(start_of_status, (uint) status_vars_len) || - write_data(db, db_len + 1) || - write_data(query, q_len) || - write_footer(); + return write_header(writer, event_length) || + write_data(writer, buf, QUERY_HEADER_LEN) || + write_post_header_for_derived(writer) || + write_data(writer, start_of_status, (uint) status_vars_len) || + write_data(writer, db, db_len + 1) || + write_data(writer, query, q_len) || + write_footer(writer); } -bool Query_compressed_log_event::write() +bool Query_compressed_log_event::write(Log_event_writer *writer) { uchar *buffer; uint32 alloc_size, compressed_size; @@ -1393,7 +1265,7 @@ bool Query_compressed_log_event::write() uint32 q_len_tmp= q_len; query= (char*) buffer; q_len= compressed_size; - ret= Query_log_event::write(); + ret= Query_log_event::write(writer); query= query_tmp; q_len= q_len_tmp; } @@ -1451,7 +1323,6 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, lc_time_names_number(thd_arg->variables.lc_time_names->number), charset_database_number(0), table_map_for_update((ulonglong)thd_arg->table_map_for_update), - master_data_written(0), gtid_flags_extra(thd_arg->get_binlog_flags_for_alter()), sa_seq_no(0) { @@ -1939,7 +1810,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, goto end; } - set_thd_db(thd, rpl_filter, db, db_len); + set_thd_db(thd, rpl_filter, LEX_CSTRING{db, db_len}); /* Setting the character set and collation of the current database thd->db. @@ -2009,6 +1880,17 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, thd->variables.sql_mode= (sql_mode_t) ((thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE) | (sql_mode & ~(sql_mode_t) MODE_NO_DIR_IN_CREATE)); + + size_t cslen= thd->variables.character_set_collations.from_binary( + character_set_collations.str, + character_set_collations.length); + if (cslen != character_set_collations.length) + { + // Fatal: either a broken even, or an unknown collation ID + thd->variables.character_set_collations.init(); + goto compare_errors; // QQ: report an error here? + } + if (charset_inited) { rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info; @@ -2295,7 +2177,7 @@ compare_errors: expected_error, actual_error ? thd->get_stmt_da()->message() : "no error", actual_error, - print_slave_db_safe(db), query_arg); + safe_str(db), query_arg); thd->is_slave_error= 1; } /* @@ -2465,7 +2347,7 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi) bool Query_log_event::peek_is_commit_rollback(const uchar *event_start, size_t event_len, - enum enum_binlog_checksum_alg + enum_binlog_checksum_alg checksum_alg) { if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) @@ -2485,23 +2367,11 @@ Query_log_event::peek_is_commit_rollback(const uchar *event_start, !memcmp(event_start + (event_len-9), "\0ROLLBACK", 9); } -#endif - - -/************************************************************************** - Start_log_event_v3 methods -**************************************************************************/ - -Start_log_event_v3::Start_log_event_v3() - :Log_event(), created(0), binlog_version(BINLOG_VERSION), - dont_set_created(0) -{ - memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); -} - +/*************************************************************************** + Format_description_log_event methods +****************************************************************************/ -#if defined(HAVE_REPLICATION) -void Start_log_event_v3::pack_info(Protocol *protocol) +void Format_description_log_event::pack_info(Protocol *protocol) { char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos; pos= strmov(buf, "Server ver: "); @@ -2510,115 +2380,13 @@ void Start_log_event_v3::pack_info(Protocol *protocol) pos= int10_to_str(binlog_version, pos, 10); protocol->store(buf, (uint) (pos-buf), &my_charset_bin); } -#endif - - -bool Start_log_event_v3::write() -{ - char buff[START_V3_HEADER_LEN]; - int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); - memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); - if (!dont_set_created) - created= get_time(); // this sets when and when_sec_part as a side effect - int4store(buff + ST_CREATED_OFFSET,created); - return write_header(sizeof(buff)) || - write_data(buff, sizeof(buff)) || - write_footer(); -} - - -#if defined(HAVE_REPLICATION) - -/** - Start_log_event_v3::do_apply_event() . - The master started - - IMPLEMENTATION - - To handle the case where the master died without having time to write - DROP TEMPORARY TABLE, DO RELEASE_LOCK (prepared statements' deletion is - TODO), we clean up all temporary tables that we got, if we are sure we - can (see below). - - @todo - - Remove all active user locks. - Guilhem 2003-06: this is true but not urgent: the worst it can cause is - the use of a bit of memory for a user lock which will not be used - anymore. If the user lock is later used, the old one will be released. In - other words, no deadlock problem. -*/ - -int Start_log_event_v3::do_apply_event(rpl_group_info *rgi) -{ - DBUG_ENTER("Start_log_event_v3::do_apply_event"); - int error= 0; - Relay_log_info *rli= rgi->rli; - - switch (binlog_version) - { - case 3: - case 4: - /* - This can either be 4.x (then a Start_log_event_v3 is only at master - startup so we are sure the master has restarted and cleared his temp - tables; the event always has 'created'>0) or 5.0 (then we have to test - 'created'). - */ - if (created) - { - rli->close_temporary_tables(); - - /* - The following is only false if we get here with a BINLOG statement - */ - if (rli->mi) - cleanup_load_tmpdir(&rli->mi->cmp_connection_name); - } - break; - - /* - Now the older formats; in that case load_tmpdir is cleaned up by the I/O - thread. - */ - case 1: - if (strncmp(rli->relay_log.description_event_for_exec->server_version, - "3.23.57",7) >= 0 && created) - { - /* - Can distinguish, based on the value of 'created': this event was - generated at master startup. - */ - rli->close_temporary_tables(); - } - /* - Otherwise, can't distinguish a Start_log_event generated at - master startup and one generated by master FLUSH LOGS, so cannot - be sure temp tables have to be dropped. So do nothing. - */ - break; - default: - /* - This case is not expected. It can be either an event corruption or an - unsupported binary log version. - */ - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, - ER_THD(thd, ER_SLAVE_FATAL_ERROR), - "Binlog version not supported"); - DBUG_RETURN(1); - } - DBUG_RETURN(error); -} #endif /* defined(HAVE_REPLICATION) */ -/*************************************************************************** - Format_description_log_event methods -****************************************************************************/ - -bool Format_description_log_event::write() +bool Format_description_log_event::write(Log_event_writer *writer) { bool ret; - bool no_checksum; /* - We don't call Start_log_event_v3::write() because this would make 2 + We don't call Start_log_event_v::write() because this would make 2 my_b_safe_write(). */ uchar buff[START_V3_HEADER_LEN+1]; @@ -2639,11 +2407,9 @@ bool Format_description_log_event::write() FD_queue checksum_alg value. */ compile_time_assert(BINLOG_CHECKSUM_ALG_DESC_LEN == 1); -#ifdef DBUG_ASSERT_EXISTS - data_written= 0; // to prepare for need_checksum assert -#endif - uint8 checksum_byte= (uint8) - (need_checksum() ? checksum_alg : BINLOG_CHECKSUM_ALG_OFF); + uint8 checksum_byte= (uint8) (used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF ? + used_checksum_alg : BINLOG_CHECKSUM_ALG_OFF); + DBUG_ASSERT(used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); /* FD of checksum-aware server is always checksum-equipped, (V) is in, regardless of @@global.binlog_checksum policy. @@ -2657,17 +2423,14 @@ bool Format_description_log_event::write() 1 + 4 bytes bigger comparing to the former FD. */ - if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF))) - { - checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway - } - ret= write_header(rec_size) || - write_data(buff, sizeof(buff)) || - write_data(post_header_len, number_of_event_types) || - write_data(&checksum_byte, sizeof(checksum_byte)) || - write_footer(); - if (no_checksum) - checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + uint orig_checksum_len= writer->checksum_len; + writer->checksum_len= BINLOG_CHECKSUM_LEN; + ret= write_header(writer, rec_size) || + write_data(writer, buff, sizeof(buff)) || + write_data(writer, post_header_len, number_of_event_types) || + write_data(writer, &checksum_byte, sizeof(checksum_byte)) || + write_footer(writer); + writer->checksum_len= orig_checksum_len; return ret; } @@ -2740,9 +2503,8 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) } /* - If this event comes from ourselves, there is no cleaning task to - perform, we don't call Start_log_event_v3::do_apply_event() - (this was just to update the log's description event). + If this event comes from ourselves, there is no cleaning task to perform, + we don't do cleanup (this was just to update the log's description event). */ if (server_id != (uint32) global_system_variables.server_id) { @@ -2755,7 +2517,24 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) 0, then 96, then jump to first really asked event (which is >96). So this is ok. */ - ret= Start_log_event_v3::do_apply_event(rgi); + switch (binlog_version) + { + case 4: + if (created) + { + rli->close_temporary_tables(); + + /* The following is only false if we get here with a BINLOG statement */ + if (rli->mi) + cleanup_load_tmpdir(&rli->mi->cmp_connection_name); + } + break; + default: + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "Binlog version not supported"); + ret= 1; + } } if (!ret) @@ -2824,568 +2603,6 @@ int Start_encryption_log_event::do_update_pos(rpl_group_info *rgi) /************************************************************************** - Load_log_event methods -**************************************************************************/ - -#if defined(HAVE_REPLICATION) -bool Load_log_event::print_query(THD *thd, bool need_db, const char *cs, - String *buf, my_off_t *fn_start, - my_off_t *fn_end, const char *qualify_db) -{ - if (need_db && db && db_len) - { - buf->append(STRING_WITH_LEN("use ")); - append_identifier(thd, buf, db, db_len); - buf->append(STRING_WITH_LEN("; ")); - } - - buf->append(STRING_WITH_LEN("LOAD DATA ")); - - if (is_concurrent) - buf->append(STRING_WITH_LEN("CONCURRENT ")); - - if (fn_start) - *fn_start= buf->length(); - - if (check_fname_outside_temp_buf()) - buf->append(STRING_WITH_LEN("LOCAL ")); - buf->append(STRING_WITH_LEN("INFILE '")); - buf->append_for_single_quote(fname, fname_len); - buf->append(STRING_WITH_LEN("' ")); - - if (sql_ex.opt_flags & REPLACE_FLAG) - buf->append(STRING_WITH_LEN("REPLACE ")); - else if (sql_ex.opt_flags & IGNORE_FLAG) - buf->append(STRING_WITH_LEN("IGNORE ")); - - buf->append(STRING_WITH_LEN("INTO")); - - if (fn_end) - *fn_end= buf->length(); - - buf->append(STRING_WITH_LEN(" TABLE ")); - if (qualify_db) - { - append_identifier(thd, buf, qualify_db, strlen(qualify_db)); - buf->append(STRING_WITH_LEN(".")); - } - append_identifier(thd, buf, table_name, table_name_len); - - if (cs != NULL) - { - buf->append(STRING_WITH_LEN(" CHARACTER SET ")); - buf->append(cs, strlen(cs)); - } - - /* We have to create all optional fields as the default is not empty */ - buf->append(STRING_WITH_LEN(" FIELDS TERMINATED BY ")); - pretty_print_str(buf, sql_ex.field_term, sql_ex.field_term_len); - if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) - buf->append(STRING_WITH_LEN(" OPTIONALLY ")); - buf->append(STRING_WITH_LEN(" ENCLOSED BY ")); - pretty_print_str(buf, sql_ex.enclosed, sql_ex.enclosed_len); - - buf->append(STRING_WITH_LEN(" ESCAPED BY ")); - pretty_print_str(buf, sql_ex.escaped, sql_ex.escaped_len); - - buf->append(STRING_WITH_LEN(" LINES TERMINATED BY ")); - pretty_print_str(buf, sql_ex.line_term, sql_ex.line_term_len); - if (sql_ex.line_start_len) - { - buf->append(STRING_WITH_LEN(" STARTING BY ")); - pretty_print_str(buf, sql_ex.line_start, sql_ex.line_start_len); - } - - if ((long) skip_lines > 0) - { - buf->append(STRING_WITH_LEN(" IGNORE ")); - buf->append_ulonglong(skip_lines); - buf->append(STRING_WITH_LEN(" LINES ")); - } - - if (num_fields) - { - uint i; - const char *field= fields; - buf->append(STRING_WITH_LEN(" (")); - for (i = 0; i < num_fields; i++) - { - if (i) - { - /* - Yes, the space and comma is reversed here. But this is mostly dead - code, at most used when reading really old binlogs from old servers, - so better just leave it as is... - */ - buf->append(STRING_WITH_LEN(" ,")); - } - append_identifier(thd, buf, field, field_lens[i]); - field+= field_lens[i] + 1; - } - buf->append(STRING_WITH_LEN(")")); - } - return 0; -} - - -void Load_log_event::pack_info(Protocol *protocol) -{ - char query_buffer[1024]; - String query_str(query_buffer, sizeof(query_buffer), system_charset_info); - - query_str.length(0); - print_query(protocol->thd, TRUE, NULL, &query_str, 0, 0, NULL); - protocol->store(query_str.ptr(), query_str.length(), &my_charset_bin); -} -#endif /* defined(HAVE_REPLICATION) */ - - -bool Load_log_event::write_data_header() -{ - char buf[LOAD_HEADER_LEN]; - int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id); - int4store(buf + L_EXEC_TIME_OFFSET, exec_time); - int4store(buf + L_SKIP_LINES_OFFSET, skip_lines); - buf[L_TBL_LEN_OFFSET] = (char)table_name_len; - buf[L_DB_LEN_OFFSET] = (char)db_len; - int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); - return write_data(buf, LOAD_HEADER_LEN) != 0; -} - - -bool Load_log_event::write_data_body() -{ - if (sql_ex.write_data(writer)) - return 1; - if (num_fields && fields && field_lens) - { - if (write_data(field_lens, num_fields) || - write_data(fields, field_block_len)) - return 1; - } - return (write_data(table_name, table_name_len + 1) || - write_data(db, db_len + 1) || - write_data(fname, fname_len)); -} - - -Load_log_event::Load_log_event(THD *thd_arg, const sql_exchange *ex, - const char *db_arg, const char *table_name_arg, - List<Item> &fields_arg, - bool is_concurrent_arg, - enum enum_duplicates handle_dup, - bool ignore, bool using_trans) - :Log_event(thd_arg, - (thd_arg->used & THD::THREAD_SPECIFIC_USED) - ? LOG_EVENT_THREAD_SPECIFIC_F : 0, - using_trans), - thread_id(thd_arg->thread_id), - slave_proxy_id((ulong)thd_arg->variables.pseudo_thread_id), - num_fields(0),fields(0), - field_lens(0),field_block_len(0), - table_name(table_name_arg ? table_name_arg : ""), - db(db_arg), fname(ex->file_name), local_fname(FALSE), - is_concurrent(is_concurrent_arg) -{ - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd_arg->start_time); - /* db can never be a zero pointer in 4.0 */ - db_len = (uint32) strlen(db); - table_name_len = (uint32) strlen(table_name); - fname_len = (fname) ? (uint) strlen(fname) : 0; - sql_ex.field_term = ex->field_term->ptr(); - sql_ex.field_term_len = (uint8) ex->field_term->length(); - sql_ex.enclosed = ex->enclosed->ptr(); - sql_ex.enclosed_len = (uint8) ex->enclosed->length(); - sql_ex.line_term = ex->line_term->ptr(); - sql_ex.line_term_len = (uint8) ex->line_term->length(); - sql_ex.line_start = ex->line_start->ptr(); - sql_ex.line_start_len = (uint8) ex->line_start->length(); - sql_ex.escaped = ex->escaped->ptr(); - sql_ex.escaped_len = (uint8) ex->escaped->length(); - sql_ex.opt_flags = 0; - sql_ex.cached_new_format = -1; - - if (ex->dumpfile) - sql_ex.opt_flags|= DUMPFILE_FLAG; - if (ex->opt_enclosed) - sql_ex.opt_flags|= OPT_ENCLOSED_FLAG; - - sql_ex.empty_flags= 0; - - switch (handle_dup) { - case DUP_REPLACE: - sql_ex.opt_flags|= REPLACE_FLAG; - break; - case DUP_UPDATE: // Impossible here - case DUP_ERROR: - break; - } - if (ignore) - sql_ex.opt_flags|= IGNORE_FLAG; - - if (!ex->field_term->length()) - sql_ex.empty_flags |= FIELD_TERM_EMPTY; - if (!ex->enclosed->length()) - sql_ex.empty_flags |= ENCLOSED_EMPTY; - if (!ex->line_term->length()) - sql_ex.empty_flags |= LINE_TERM_EMPTY; - if (!ex->line_start->length()) - sql_ex.empty_flags |= LINE_START_EMPTY; - if (!ex->escaped->length()) - sql_ex.empty_flags |= ESCAPED_EMPTY; - - skip_lines = ex->skip_lines; - - List_iterator<Item> li(fields_arg); - field_lens_buf.length(0); - fields_buf.length(0); - Item* item; - while ((item = li++)) - { - num_fields++; - uchar len= (uchar) item->name.length; - field_block_len += len + 1; - fields_buf.append(item->name.str, len + 1); - field_lens_buf.append((char*)&len, 1); - } - - field_lens = (const uchar*)field_lens_buf.ptr(); - fields = fields_buf.ptr(); -} - - -/** - Load_log_event::set_fields() - - @note - This function can not use the member variable - for the database, since LOAD DATA INFILE on the slave - can be for a different database than the current one. - This is the reason for the affected_db argument to this method. -*/ - -void Load_log_event::set_fields(const char* affected_db, - List<Item> &field_list, - Name_resolution_context *context) -{ - uint i; - const char* field = fields; - for (i= 0; i < num_fields; i++) - { - LEX_CSTRING field_name= {field, field_lens[i] }; - field_list.push_back(new (thd->mem_root) - Item_field(thd, context, - Lex_cstring_strlen(affected_db), - Lex_cstring_strlen(table_name), - field_name), - thd->mem_root); - field+= field_lens[i] + 1; - } -} - - -#if defined(HAVE_REPLICATION) -/** - Does the data loading job when executing a LOAD DATA on the slave. - - @param net - @param rli - @param use_rli_only_for_errors If set to 1, rli is provided to - Load_log_event::exec_event only for this - function to have RPL_LOG_NAME and - rli->last_slave_error, both being used by - error reports. rli's position advancing - is skipped (done by the caller which is - Execute_load_log_event::exec_event). - If set to 0, rli is provided for full use, - i.e. for error reports and position - advancing. - - @todo - fix this; this can be done by testing rules in - Create_file_log_event::exec_event() and then discarding Append_block and - al. - @todo - this is a bug - this needs to be moved to the I/O thread - - @retval - 0 Success - @retval - 1 Failure -*/ - -int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi, - bool use_rli_only_for_errors) -{ - Relay_log_info const *rli= rgi->rli; - Rpl_filter *rpl_filter= rli->mi->rpl_filter; - DBUG_ENTER("Load_log_event::do_apply_event"); - - DBUG_ASSERT(thd->query() == 0); - set_thd_db(thd, rpl_filter, db, db_len); - thd->clear_error(1); - - /* see Query_log_event::do_apply_event() and BUG#13360 */ - DBUG_ASSERT(!rgi->m_table_map.count()); - /* - Usually lex_start() is called by mysql_parse(), but we need it here - as the present method does not call mysql_parse(). - */ - lex_start(thd); - thd->lex->local_file= local_fname; - thd->reset_for_next_command(0); // Errors are cleared above - - /* - We test replicate_*_db rules. Note that we have already prepared - the file to load, even if we are going to ignore and delete it - now. So it is possible that we did a lot of disk writes for - nothing. In other words, a big LOAD DATA INFILE on the master will - still consume a lot of space on the slave (space in the relay log - + space of temp files: twice the space of the file to load...) - even if it will finally be ignored. TODO: fix this; this can be - done by testing rules in Create_file_log_event::do_apply_event() - and then discarding Append_block and al. Another way is do the - filtering in the I/O thread (more efficient: no disk writes at - all). - - Note: We do not need to execute reset_one_shot_variables() if this - db_ok() test fails. - Reason: The db stored in binlog events is the same for SET and for - its companion query. If the SET is ignored because of - db_ok(), the companion query will also be ignored, and if - the companion query is ignored in the db_ok() test of - ::do_apply_event(), then the companion SET also have so - we don't need to reset_one_shot_variables(). - */ - if (rpl_filter->db_ok(thd->db.str)) - { -#ifdef WITH_WSREP - if (!wsrep_thd_is_applying(thd)) -#endif - thd->set_time(when, when_sec_part); - thd->set_query_id(next_query_id()); - thd->get_stmt_da()->opt_clear_warning_info(thd->query_id); - - TABLE_LIST tables; - LEX_CSTRING db_name= { thd->strmake(thd->db.str, thd->db.length), thd->db.length }; - if (lower_case_table_names) - my_casedn_str(system_charset_info, (char *)table_name); - LEX_CSTRING tbl_name= { table_name, strlen(table_name) }; - tables.init_one_table(&db_name, &tbl_name, 0, TL_WRITE); - tables.updating= 1; - - // the table will be opened in mysql_load - if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db.str, &tables)) - { - // TODO: this is a bug - this needs to be moved to the I/O thread - if (net) - skip_load_data_infile(net); - } - else - { - enum enum_duplicates handle_dup; - bool ignore= 0; - char query_buffer[1024]; - String query_str(query_buffer, sizeof(query_buffer), system_charset_info); - char *load_data_query; - - query_str.length(0); - /* - Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST - and written to slave's binlog if binlogging is on. - */ - print_query(thd, FALSE, NULL, &query_str, NULL, NULL, NULL); - if (!(load_data_query= (char *)thd->strmake(query_str.ptr(), - query_str.length()))) - { - /* - This will set thd->fatal_error in case of OOM. So we surely will notice - that something is wrong. - */ - goto error; - } - - thd->set_query(load_data_query, (uint) (query_str.length())); - - if (sql_ex.opt_flags & REPLACE_FLAG) - handle_dup= DUP_REPLACE; - else if (sql_ex.opt_flags & IGNORE_FLAG) - { - ignore= 1; - handle_dup= DUP_ERROR; - } - else - { - /* - When replication is running fine, if it was DUP_ERROR on the - master then we could choose IGNORE here, because if DUP_ERROR - suceeded on master, and data is identical on the master and slave, - then there should be no uniqueness errors on slave, so IGNORE is - the same as DUP_ERROR. But in the unlikely case of uniqueness errors - (because the data on the master and slave happen to be different - (user error or bug), we want LOAD DATA to print an error message on - the slave to discover the problem. - - If reading from net (a 3.23 master), mysql_load() will change this - to IGNORE. - */ - handle_dup= DUP_ERROR; - } - /* - We need to set thd->lex->sql_command and thd->lex->duplicates - since InnoDB tests these variables to decide if this is a LOAD - DATA ... REPLACE INTO ... statement even though mysql_parse() - is not called. This is not needed in 5.0 since there the LOAD - DATA ... statement is replicated using mysql_parse(), which - sets the thd->lex fields correctly. - */ - thd->lex->sql_command= SQLCOM_LOAD; - thd->lex->duplicates= handle_dup; - - sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG); - String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs); - String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs); - String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs); - String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs); - String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs); - ex.field_term= &field_term; - ex.enclosed= &enclosed; - ex.line_term= &line_term; - ex.line_start= &line_start; - ex.escaped= &escaped; - - ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); - if (sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); - - ex.skip_lines = skip_lines; - List<Item> field_list; - thd->lex->first_select_lex()->context.resolve_in_table_list_only(&tables); - set_fields(tables.db.str, - field_list, &thd->lex->first_select_lex()->context); - thd->variables.pseudo_thread_id= thread_id; - if (net) - { - // mysql_load will use thd->net to read the file - thd->net.vio = net->vio; - // Make sure the client does not get confused about the packet sequence - thd->net.pkt_nr = net->pkt_nr; - } - /* - It is safe to use tmp_list twice because we are not going to - update it inside mysql_load(). - */ - List<Item> tmp_list; - if (thd->open_temporary_tables(&tables) || - mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list, - handle_dup, ignore, net != 0)) - thd->is_slave_error= 1; - if (thd->cuted_fields) - { - /* log_pos is the position of the LOAD event in the master log */ - sql_print_warning("Slave: load data infile on table '%s' at " - "log position %llu in log '%s' produced %ld " - "warning(s). Default database: '%s'", - (char*) table_name, log_pos, RPL_LOG_NAME, - (ulong) thd->cuted_fields, - thd->get_db()); - } - if (net) - net->pkt_nr= thd->net.pkt_nr; - } - } - else - { - /* - We will just ask the master to send us /dev/null if we do not - want to load the data. - TODO: this a bug - needs to be done in I/O thread - */ - if (net) - skip_load_data_infile(net); - } - -error: - thd->net.vio = 0; - const char *remember_db= thd->get_db(); - thd->catalog= 0; - thd->set_db(&null_clex_str); /* will free the current database */ - thd->reset_query(); - thd->get_stmt_da()->set_overwrite_status(true); - thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN); - thd->get_stmt_da()->set_overwrite_status(false); - close_thread_tables(thd); - /* - - If transaction rollback was requested due to deadlock - perform it and release metadata locks. - - If inside a multi-statement transaction, - defer the release of metadata locks until the current - transaction is either committed or rolled back. This prevents - other statements from modifying the table for the entire - duration of this transaction. This provides commit ordering - and guarantees serializability across multiple transactions. - - If in autocommit mode, or outside a transactional context, - automatically release metadata locks of the current statement. - */ - if (thd->transaction_rollback_request) - { - trans_rollback_implicit(thd); - thd->release_transactional_locks(); - } - else if (! thd->in_multi_stmt_transaction_mode()) - thd->release_transactional_locks(); - else - thd->mdl_context.release_statement_locks(); - - DBUG_EXECUTE_IF("LOAD_DATA_INFILE_has_fatal_error", - thd->is_slave_error= 0; thd->is_fatal_error= 1;); - - if (unlikely(thd->is_slave_error)) - { - /* this err/sql_errno code is copy-paste from net_send_error() */ - const char *err; - int sql_errno; - if (thd->is_error()) - { - err= thd->get_stmt_da()->message(); - sql_errno= thd->get_stmt_da()->sql_errno(); - } - else - { - sql_errno=ER_UNKNOWN_ERROR; - err= ER_THD(thd, sql_errno); - } - rli->report(ERROR_LEVEL, sql_errno, rgi->gtid_info(), "\ -Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", - err, (char*)table_name, remember_db); - free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); - DBUG_RETURN(1); - } - free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); - - if (unlikely(thd->is_fatal_error)) - { - char buf[256]; - my_snprintf(buf, sizeof(buf), - "Running LOAD DATA INFILE on table '%-.64s'." - " Default database: '%-.64s'", - (char*)table_name, - remember_db); - - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(), - ER_THD(thd, ER_SLAVE_FATAL_ERROR), buf); - DBUG_RETURN(1); - } - - DBUG_RETURN( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rgi) ); -} -#endif - - -/************************************************************************** Rotate_log_event methods **************************************************************************/ @@ -3421,14 +2638,14 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, } -bool Rotate_log_event::write() +bool Rotate_log_event::write(Log_event_writer *writer) { char buf[ROTATE_HEADER_LEN]; int8store(buf + R_POS_OFFSET, pos); - return (write_header(ROTATE_HEADER_LEN + ident_len) || - write_data(buf, ROTATE_HEADER_LEN) || - write_data(new_log_ident, (uint) ident_len) || - write_footer()); + return (write_header(writer, ROTATE_HEADER_LEN + ident_len) || + write_data(writer, buf, ROTATE_HEADER_LEN) || + write_data(writer, new_log_ident, (uint) ident_len) || + write_footer(writer)); } @@ -3578,14 +2795,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( } -bool Binlog_checkpoint_log_event::write() +bool Binlog_checkpoint_log_event::write(Log_event_writer *writer) { uchar buf[BINLOG_CHECKPOINT_HEADER_LEN]; int4store(buf, binlog_file_len); - return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || - write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) || - write_data(binlog_file_name, binlog_file_len) || - write_footer(); + return write_header(writer, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || + write_data(writer, buf, BINLOG_CHECKPOINT_HEADER_LEN) || + write_data(writer, binlog_file_name, binlog_file_len) || + write_footer(writer); } @@ -3683,7 +2900,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, */ bool Gtid_log_event::peek(const uchar *event_start, size_t event_len, - enum enum_binlog_checksum_alg checksum_alg, + enum_binlog_checksum_alg checksum_alg, uint32 *domain_id, uint32 *server_id, uint64 *seq_no, uchar *flags2, const Format_description_log_event *fdev) { @@ -3714,7 +2931,7 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len, bool -Gtid_log_event::write() +Gtid_log_event::write(Log_event_writer *writer) { uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4]; size_t write_len= 13; @@ -3777,9 +2994,9 @@ Gtid_log_event::write() bzero(buf+write_len, GTID_HEADER_LEN-write_len); write_len= GTID_HEADER_LEN; } - return write_header(write_len) || - write_data(buf, write_len) || - write_footer(); + return write_header(writer, write_len) || + write_data(writer, buf, write_len) || + write_footer(writer); } @@ -3795,7 +3012,7 @@ Gtid_log_event::write() int Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, ulong ev_offset, - enum enum_binlog_checksum_alg checksum_alg) + enum_binlog_checksum_alg checksum_alg) { uchar flags2; if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) @@ -4067,7 +3284,7 @@ Gtid_list_log_event::to_packet(String *packet) bool -Gtid_list_log_event::write() +Gtid_list_log_event::write(Log_event_writer *writer) { char buf[128]; String packet(buf, sizeof(buf), system_charset_info); @@ -4075,9 +3292,9 @@ Gtid_list_log_event::write() packet.length(0); if (to_packet(&packet)) return true; - return write_header(get_data_size()) || - write_data(packet.ptr(), packet.length()) || - write_footer(); + return write_header(writer, get_data_size()) || + write_data(writer, packet.ptr(), packet.length()) || + write_footer(writer); } @@ -4171,14 +3388,14 @@ void Intvar_log_event::pack_info(Protocol *protocol) #endif -bool Intvar_log_event::write() +bool Intvar_log_event::write(Log_event_writer *writer) { uchar buf[9]; buf[I_TYPE_OFFSET]= (uchar) type; int8store(buf + I_VAL_OFFSET, val); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -4250,14 +3467,14 @@ void Rand_log_event::pack_info(Protocol *protocol) #endif -bool Rand_log_event::write() +bool Rand_log_event::write(Log_event_writer *writer) { uchar buf[16]; int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED2_OFFSET, seed2); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -4529,12 +3746,12 @@ int Xid_log_event::do_commit() #endif -bool Xid_log_event::write() +bool Xid_log_event::write(Log_event_writer *writer) { DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(sizeof(xid)) || - write_data((uchar*)&xid, sizeof(xid)) || - write_footer(); + return write_header(writer, sizeof(xid)) || + write_data(writer, (uchar*)&xid, sizeof(xid)) || + write_footer(writer); } /************************************************************************** @@ -4580,7 +3797,7 @@ int XA_prepare_log_event::do_commit() #endif // HAVE_REPLICATION -bool XA_prepare_log_event::write() +bool XA_prepare_log_event::write(Log_event_writer *writer) { uchar data[1 + 4 + 4 + 4]= {one_phase,}; uint8 one_phase_byte= one_phase; @@ -4591,14 +3808,14 @@ bool XA_prepare_log_event::write() DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1); - return write_header(sizeof(one_phase_byte) + xid_subheader_no_data + + return write_header(writer, sizeof(one_phase_byte) + xid_subheader_no_data + static_cast<XID*>(xid)->gtrid_length + static_cast<XID*>(xid)->bqual_length) || - write_data(data, sizeof(data)) || - write_data((uchar*) static_cast<XID*>(xid)->data, + write_data(writer, data, sizeof(data)) || + write_data(writer, (uchar*) static_cast<XID*>(xid)->data, static_cast<XID*>(xid)->gtrid_length + static_cast<XID*>(xid)->bqual_length) || - write_footer(); + write_footer(writer); } @@ -4713,7 +3930,7 @@ void User_var_log_event::pack_info(Protocol* protocol) MY_CS_COLLATION_NAME_SIZE)) return; beg= const_cast<char *>(buf.ptr()) + old_len; - end= str_to_hex(beg, val, val_len); + end= str_to_hex(beg, (uchar*)val, val_len); buf.length(old_len + (end - beg)); if (buf.append(STRING_WITH_LEN(" COLLATE ")) || buf.append(cs->coll_name)) @@ -4732,7 +3949,7 @@ void User_var_log_event::pack_info(Protocol* protocol) #endif // HAVE_REPLICATION -bool User_var_log_event::write() +bool User_var_log_event::write(Log_event_writer *writer) { char buf[UV_NAME_LEN_SIZE]; char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + @@ -4796,17 +4013,19 @@ bool User_var_log_event::write() uchar unsig= m_is_unsigned ? CHUNK_UNSIGNED : CHUNK_SIGNED; uchar data_type_name_length= (uchar) m_data_type_name.length; - return write_header(event_length) || - write_data(buf, sizeof(buf)) || - write_data(name, name_len) || - write_data(buf1, buf1_length) || - write_data(pos, val_len) || - write_data(&unsig, unsigned_len) || - write_data(&data_type_name_chunk_signature, + return write_header(writer, event_length) || + write_data(writer, buf, sizeof(buf)) || + write_data(writer, name, name_len) || + write_data(writer, buf1, buf1_length) || + write_data(writer, pos, val_len) || + write_data(writer, &unsig, unsigned_len) || + write_data(writer, &data_type_name_chunk_signature, data_type_name_chunk_signature_length) || - write_data(&data_type_name_length, data_type_name_length_length) || - write_data(m_data_type_name.str, (uint) m_data_type_name.length) || - write_footer(); + write_data(writer, &data_type_name_length, + data_type_name_length_length) || + write_data(writer, m_data_type_name.str, + (uint) m_data_type_name.length) || + write_footer(writer); } @@ -4958,10 +4177,9 @@ User_var_log_event::do_shall_skip(rpl_group_info *rgi) written all DROP TEMPORARY TABLE (prepared statements' deletion is TODO only when we binlog prep stmts). We used to clean up slave_load_tmpdir, but this is useless as it has been cleared at the - end of LOAD DATA INFILE. So we have nothing to do here. The place - were we must do this cleaning is in - Start_log_event_v3::do_apply_event(), not here. Because if we come - here, the master was sane. + end of LOAD DATA INFILE. So we have nothing to do here. The place were we + must do this cleaning is in Format_description_log_event::do_apply_event(), + not here. Because if we come here, the master was sane. This must only be called from the Slave SQL thread, since it calls Relay_log_info::flush(). @@ -4995,178 +4213,6 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi) /************************************************************************** - Create_file_log_event methods -**************************************************************************/ - -Create_file_log_event:: -Create_file_log_event(THD* thd_arg, sql_exchange* ex, - const char* db_arg, const char* table_name_arg, - List<Item>& fields_arg, - bool is_concurrent_arg, - enum enum_duplicates handle_dup, - bool ignore, - uchar* block_arg, uint block_len_arg, bool using_trans) - :Load_log_event(thd_arg, ex, db_arg, table_name_arg, fields_arg, - is_concurrent_arg, - handle_dup, ignore, using_trans), - fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg), - file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) -{ - DBUG_ENTER("Create_file_log_event"); - sql_ex.force_new_format(); - DBUG_VOID_RETURN; -} - - -/* - Create_file_log_event::write_data_body() -*/ - -bool Create_file_log_event::write_data_body() -{ - bool res; - if ((res= Load_log_event::write_data_body()) || fake_base) - return res; - return write_data("", 1) || - write_data(block, block_len); -} - - -/* - Create_file_log_event::write_data_header() -*/ - -bool Create_file_log_event::write_data_header() -{ - bool res; - uchar buf[CREATE_FILE_HEADER_LEN]; - if ((res= Load_log_event::write_data_header()) || fake_base) - return res; - int4store(buf + CF_FILE_ID_OFFSET, file_id); - return write_data(buf, CREATE_FILE_HEADER_LEN) != 0; -} - - -/* - Create_file_log_event::write_base() -*/ - -bool Create_file_log_event::write_base() -{ - bool res; - fake_base= 1; // pretend we are Load event - res= write(); - fake_base= 0; - return res; -} - - -#if defined(HAVE_REPLICATION) -void Create_file_log_event::pack_info(Protocol *protocol) -{ - char buf[SAFE_NAME_LEN*2 + 30 + 21*2], *pos; - pos= strmov(buf, "db="); - memcpy(pos, db, db_len); - pos= strmov(pos + db_len, ";table="); - memcpy(pos, table_name, table_name_len); - pos= strmov(pos + table_name_len, ";file_id="); - pos= int10_to_str((long) file_id, pos, 10); - pos= strmov(pos, ";block_len="); - pos= int10_to_str((long) block_len, pos, 10); - protocol->store(buf, (uint) (pos-buf), &my_charset_bin); -} -#endif /* defined(HAVE_REPLICATION) */ - - -/** - Create_file_log_event::do_apply_event() - Constructor for Create_file_log_event to intantiate an event - from the relay log on the slave. - - @retval - 0 Success - @retval - 1 Failure -*/ - -#if defined(HAVE_REPLICATION) -int Create_file_log_event::do_apply_event(rpl_group_info *rgi) -{ - char fname_buf[FN_REFLEN]; - char *ext; - int fd = -1; - IO_CACHE file; - Log_event_writer lew(&file, 0); - int error = 1; - Relay_log_info const *rli= rgi->rli; - - THD_STAGE_INFO(thd, stage_making_temp_file_create_before_load_data); - bzero((char*)&file, sizeof(file)); - ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info", - &rli->mi->connection_name); - /* old copy may exist already */ - mysql_file_delete(key_file_log_event_info, fname_buf, MYF(0)); - if ((fd= mysql_file_create(key_file_log_event_info, - fname_buf, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, - MYF(MY_WME))) < 0 || - init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, - MYF(MY_WME|MY_NABP))) - { - rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), - "Error in Create_file event: could not open file '%s'", - fname_buf); - goto err; - } - - // a trick to avoid allocating another buffer - fname= fname_buf; - fname_len= (uint) (strmov(ext, ".data") - fname); - writer= &lew; - if (write_base()) - { - strmov(ext, ".info"); // to have it right in the error message - rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), - "Error in Create_file event: could not write to file '%s'", - fname_buf); - goto err; - } - end_io_cache(&file); - mysql_file_close(fd, MYF(0)); - - // fname_buf now already has .data, not .info, because we did our trick - /* old copy may exist already */ - mysql_file_delete(key_file_log_event_data, fname_buf, MYF(0)); - if ((fd= mysql_file_create(key_file_log_event_data, - fname_buf, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, - MYF(MY_WME))) < 0) - { - rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), - "Error in Create_file event: could not open file '%s'", - fname_buf); - goto err; - } - if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP))) - { - rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), - "Error in Create_file event: write to '%s' failed", - fname_buf); - goto err; - } - error=0; // Everything is ok - -err: - if (unlikely(error)) - end_io_cache(&file); - if (likely(fd >= 0)) - mysql_file_close(fd, MYF(0)); - return error != 0; -} -#endif /* defined(HAVE_REPLICATION) */ - - -/************************************************************************** Append_block_log_event methods **************************************************************************/ @@ -5181,14 +4227,14 @@ Append_block_log_event::Append_block_log_event(THD *thd_arg, } -bool Append_block_log_event::write() +bool Append_block_log_event::write(Log_event_writer *writer) { uchar buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); - return write_header(APPEND_BLOCK_HEADER_LEN + block_len) || - write_data(buf, APPEND_BLOCK_HEADER_LEN) || - write_data(block, block_len) || - write_footer(); + return write_header(writer, APPEND_BLOCK_HEADER_LEN + block_len) || + write_data(writer, buf, APPEND_BLOCK_HEADER_LEN) || + write_data(writer, block, block_len) || + write_footer(writer); } @@ -5291,13 +4337,13 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg, } -bool Delete_file_log_event::write() +bool Delete_file_log_event::write(Log_event_writer *writer) { uchar buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -5328,130 +4374,6 @@ int Delete_file_log_event::do_apply_event(rpl_group_info *rgi) /************************************************************************** - Execute_load_log_event methods -**************************************************************************/ - -Execute_load_log_event::Execute_load_log_event(THD *thd_arg, - const char* db_arg, - bool using_trans) - :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg) -{ -} - - -bool Execute_load_log_event::write() -{ - uchar buf[EXEC_LOAD_HEADER_LEN]; - int4store(buf + EL_FILE_ID_OFFSET, file_id); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); -} - - -#if defined(HAVE_REPLICATION) -void Execute_load_log_event::pack_info(Protocol *protocol) -{ - char buf[64]; - uint length; - length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id); - protocol->store(buf, (int32) length, &my_charset_bin); -} - - -/* - Execute_load_log_event::do_apply_event() -*/ - -int Execute_load_log_event::do_apply_event(rpl_group_info *rgi) -{ - char fname[FN_REFLEN+10]; - char *ext; - int fd; - int error= 1; - IO_CACHE file; - Load_log_event *lev= 0; - Relay_log_info const *rli= rgi->rli; - - ext= slave_load_file_stem(fname, file_id, server_id, ".info", - &rli->mi->cmp_connection_name); - if ((fd= mysql_file_open(key_file_log_event_info, - fname, O_RDONLY | O_BINARY | O_NOFOLLOW, - MYF(MY_WME))) < 0 || - init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, - MYF(MY_WME|MY_NABP))) - { - rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), - "Error in Exec_load event: could not open file '%s'", - fname); - goto err; - } - if (!(lev= (Load_log_event*) - Log_event::read_log_event(&file, - rli->relay_log.description_event_for_exec, - opt_slave_sql_verify_checksum)) || - lev->get_type_code() != NEW_LOAD_EVENT) - { - rli->report(ERROR_LEVEL, 0, rgi->gtid_info(), "Error in Exec_load event: " - "file '%s' appears corrupted", fname); - goto err; - } - lev->thd = thd; - /* - lev->do_apply_event should use rli only for errors i.e. should - not advance rli's position. - - lev->do_apply_event is the place where the table is loaded (it - calls mysql_load()). - */ - - if (lev->do_apply_event(0,rgi,1)) - { - /* - We want to indicate the name of the file that could not be loaded - (SQL_LOADxxx). - But as we are here we are sure the error is in rli->last_slave_error and - rli->last_slave_errno (example of error: duplicate entry for key), so we - don't want to overwrite it with the filename. - What we want instead is add the filename to the current error message. - */ - char *tmp= my_strdup(PSI_INSTRUMENT_ME, rli->last_error().message, MYF(MY_WME)); - if (tmp) - { - rli->report(ERROR_LEVEL, rli->last_error().number, rgi->gtid_info(), - "%s. Failed executing load from '%s'", tmp, fname); - my_free(tmp); - } - goto err; - } - /* - We have an open file descriptor to the .info file; we need to close it - or Windows will refuse to delete the file in mysql_file_delete(). - */ - if (fd >= 0) - { - mysql_file_close(fd, MYF(0)); - end_io_cache(&file); - fd= -1; - } - mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME)); - memcpy(ext, ".data", 6); - mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME)); - error = 0; - -err: - delete lev; - if (fd >= 0) - { - mysql_file_close(fd, MYF(0)); - end_io_cache(&file); - } - return error; -} - -#endif /* defined(HAVE_REPLICATION) */ - -/************************************************************************** Begin_load_query_log_event methods **************************************************************************/ @@ -5504,14 +4426,14 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, bool -Execute_load_query_log_event::write_post_header_for_derived() +Execute_load_query_log_event::write_post_header_for_derived(Log_event_writer *writer) { uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; int4store(buf, file_id); int4store(buf + 4, fn_pos_start); int4store(buf + 4 + 4, fn_pos_end); *(buf + 4 + 4 + 4)= (uchar) dup_handling; - return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); + return write_data(writer, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); } @@ -5800,6 +4722,7 @@ inline void restore_empty_query_table_list(LEX *lex) int Rows_log_event::do_apply_event(rpl_group_info *rgi) { + DBUG_ASSERT(rgi); Relay_log_info const *rli= rgi->rli; TABLE* table; DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); @@ -5840,12 +4763,11 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) thd->set_query_timer(); /* - If there is no locks taken, this is the first binrow event seen - after the table map events. We should then lock all the tables - used in the transaction and proceed with execution of the actual - event. + If there are no tables open, this must be the first row event seen + after the table map events. We should then open and lock all tables + used in the transaction and proceed with execution of the actual event. */ - if (!thd->lock) + if (!thd->open_tables) { /* Lock_tables() reads the contents of thd->lex, so they must be @@ -6086,17 +5008,13 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) } } -#ifdef HAVE_QUERY_CACHE /* Moved invalidation right before the call to rows_event_stmt_cleanup(), to avoid query cache being polluted with stale entries, + Query cache is not invalidated on wsrep applier here */ -# ifdef WITH_WSREP - /* Query cache is not invalidated on wsrep applier here */ if (!(WSREP(thd) && wsrep_thd_is_applying(thd))) -# endif /* WITH_WSREP */ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); -#endif /* HAVE_QUERY_CACHE */ } table= m_table= rgi->m_table_map.get_table(m_table_id); @@ -6121,7 +5039,9 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) if (m_width == table->s->fields && bitmap_is_set_all(&m_cols)) set_flags(COMPLETE_ROWS_F); - /* + Rpl_table_data rpl_data= *(RPL_TABLE_LIST*)table->pos_in_table_list; + + /* Set tables write and read sets. Read_set contains all slave columns (in case we are going to fetch @@ -6134,17 +5054,24 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) DBUG_PRINT_BITSET("debug", "Setting table's read_set from: %s", &m_cols); bitmap_set_all(table->read_set); - if (get_general_type_code() == DELETE_ROWS_EVENT || - get_general_type_code() == UPDATE_ROWS_EVENT) - bitmap_intersect(table->read_set,&m_cols); - bitmap_set_all(table->write_set); table->rpl_write_set= table->write_set; - /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */ - MY_BITMAP *after_image= ((get_general_type_code() == UPDATE_ROWS_EVENT) ? - &m_cols_ai : &m_cols); - bitmap_intersect(table->write_set, after_image); + if (rpl_data.copy_fields) + /* always full rows, all bits set */; + else + if (get_general_type_code() == WRITE_ROWS_EVENT) + bitmap_copy(table->write_set, &m_cols); // for sequences + else // If online alter, leave all columns set (i.e. skip intersects) + if (!thd->slave_thread || !table->s->online_alter_binlog) + { + bitmap_intersect(table->read_set,&m_cols); + if (get_general_type_code() == UPDATE_ROWS_EVENT) + bitmap_intersect(table->write_set, &m_cols_ai); + table->mark_columns_per_binlog_row_image(); + if (table->vfield) + table->mark_virtual_columns_for_write(0); + } if (table->versioned()) { @@ -6155,10 +5082,11 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) } m_table->mark_columns_per_binlog_row_image(); - this->slave_exec_mode= slave_exec_mode_options; // fix the mode + if (!rpl_data.is_online_alter()) + this->slave_exec_mode= (enum_slave_exec_mode)slave_exec_mode_options; // Do event specific preparations - error= do_before_row_operations(rli); + error= do_before_row_operations(rgi); /* Bug#56662 Assertion failed: next_insert_id == 0, file handler.cc @@ -6170,7 +5098,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) */ sql_mode_t saved_sql_mode= thd->variables.sql_mode; if (!is_auto_inc_in_extra_columns()) - thd->variables.sql_mode= MODE_NO_AUTO_VALUE_ON_ZERO; + thd->variables.sql_mode= (rpl_data.copy_fields ? saved_sql_mode : 0) + | MODE_NO_AUTO_VALUE_ON_ZERO; // row processing loop @@ -6183,10 +5112,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) THD_STAGE_INFO(thd, stage_executing); do { - /* in_use can have been set to NULL in close_tables_for_reopen */ - THD* old_thd= table->in_use; - if (!table->in_use) - table->in_use= thd; + DBUG_ASSERT(table->in_use); error= do_exec_row(rgi); @@ -6194,8 +5120,6 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) DBUG_PRINT("info", ("error: %s", HA_ERR(error))); DBUG_ASSERT(error != HA_ERR_RECORD_DELETED); - table->in_use = old_thd; - if (unlikely(error)) { int actual_error= convert_handler_error(error, thd, table); @@ -6205,7 +5129,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) ignored_error_code(actual_error) : 0); #ifdef WITH_WSREP - if (WSREP(thd) && thd->wsrep_applier && + if (WSREP(thd) && wsrep_thd_is_applying(thd) && wsrep_ignored_error_code(this, actual_error)) { idempotent_error= true; @@ -6245,6 +5169,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) thd->transaction->stmt.modified_non_trans_table= TRUE; if (likely(error == 0)) { + m_row_count++; error= thd->killed_errno(); if (error && !thd->is_error()) my_error(error, MYF(0)); @@ -6252,6 +5177,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) } // row processing loop while (error == 0 && (m_curr_row != m_rows_end)); + thd->inc_examined_row_count(m_row_count); + /* Restore the sql_mode after the rows event is processed. */ @@ -6268,7 +5195,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) const_cast<Relay_log_info*>(rli)->abort_slave= 1;); } - if (unlikely(error= do_after_row_operations(rli, error)) && + if (unlikely(error= do_after_row_operations(error)) && ignored_error_code(convert_handler_error(error, thd, table))) { @@ -6279,34 +5206,40 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) thd->clear_error(1); error= 0; } - } // if (table) + if (unlikely(error)) + { + if (rpl_data.is_online_alter()) + goto err; + slave_rows_error_report(ERROR_LEVEL, error, rgi, thd, table, + get_type_str(), + RPL_LOG_NAME, log_pos); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. - if (unlikely(error)) - { - slave_rows_error_report(ERROR_LEVEL, error, rgi, thd, table, - get_type_str(), - RPL_LOG_NAME, log_pos); - /* - @todo We should probably not call - reset_current_stmt_binlog_format_row() from here. + Note: this applies to log_event_old.cc too. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); + thd->is_slave_error= 1; + /* remove trigger's tables */ + goto err; + } + } // if (table) - Note: this applies to log_event_old.cc too. - /Sven - */ - thd->reset_current_stmt_binlog_format_row(); - thd->is_slave_error= 1; - /* remove trigger's tables */ - goto err; - } + DBUG_ASSERT(error == 0); - /* remove trigger's tables */ - restore_empty_query_table_list(thd->lex); + /* + Remove trigger's tables. In case of ONLINE ALTER TABLE, event doesn't own + the table (hence, no tables are locked), and therefore no cleanup should be + done after each event. + */ + if (rgi->tables_to_lock_count) + restore_empty_query_table_list(thd->lex); -#if defined(WITH_WSREP) && defined(HAVE_QUERY_CACHE) if (WSREP(thd) && wsrep_thd_is_applying(thd)) - query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); -#endif /* WITH_WSREP && HAVE_QUERY_CACHE */ + query_cache_invalidate_locked_for_write(thd, rgi->tables_to_lock); if (get_flags(STMT_END_F)) { @@ -6322,8 +5255,11 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) DBUG_RETURN(error); err: - restore_empty_query_table_list(thd->lex); - rgi->slave_close_thread_tables(thd); + if (rgi->tables_to_lock_count) + { + restore_empty_query_table_list(thd->lex); + rgi->slave_close_thread_tables(thd); + } thd->reset_query_timer(); DBUG_RETURN(error); } @@ -6476,7 +5412,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ -bool Rows_log_event::write_data_header() +bool Rows_log_event::write_data_header(Log_event_writer *writer) { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer DBUG_ASSERT(m_table_id != UINT32_MAX); @@ -6484,14 +5420,14 @@ bool Rows_log_event::write_data_header() { int4store(buf + 0, (ulong) m_table_id); int2store(buf + 4, m_flags); - return (write_data(buf, 6)); + return (write_data(writer, buf, 6)); }); int6store(buf + RW_MAPID_OFFSET, m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return write_data(buf, ROWS_HEADER_LEN); + return write_data(writer, buf, ROWS_HEADER_LEN_V1); } -bool Rows_log_event::write_data_body() +bool Rows_log_event::write_data_body(Log_event_writer *writer) { /* Note that this should be the number of *bits*, not the number of @@ -6506,13 +5442,13 @@ bool Rows_log_event::write_data_body() DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf)); + res= res || write_data(writer, sbuf, (size_t) (sbuf_end - sbuf)); bitmap= (uchar*) my_alloca(bitmap_size); bitmap_export(bitmap, &m_cols); DBUG_DUMP("m_cols", bitmap, bitmap_size); - res= res || write_data(bitmap, bitmap_size); + res= res || write_data(writer, bitmap, bitmap_size); /* TODO[refactor write]: Remove the "down cast" here (and elsewhere). */ @@ -6522,17 +5458,17 @@ bool Rows_log_event::write_data_body() bitmap_export(bitmap, &m_cols_ai); DBUG_DUMP("m_cols_ai", bitmap, bitmap_size); - res= res || write_data(bitmap, bitmap_size); + res= res || write_data(writer, bitmap, bitmap_size); } DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || write_data(m_rows_buf, (size_t) data_size); + res= res || write_data(writer, m_rows_buf, (size_t) data_size); my_afree(bitmap); return res; } -bool Rows_log_event::write_compressed() +bool Rows_log_event::write_compressed(Log_event_writer *writer) { uchar *m_rows_buf_tmp= m_rows_buf; uchar *m_rows_cur_tmp= m_rows_cur; @@ -6546,7 +5482,7 @@ bool Rows_log_event::write_compressed() (uint32)(m_rows_cur_tmp - m_rows_buf_tmp), &comlen)) { m_rows_cur= comlen + m_rows_buf; - ret= Log_event::write(); + ret= Log_event::write(writer); } my_safe_afree(m_rows_buf, alloc_size); m_rows_buf= m_rows_buf_tmp; @@ -6588,15 +5524,15 @@ Annotate_rows_log_event::Annotate_rows_log_event(THD *thd, } -bool Annotate_rows_log_event::write_data_header() +bool Annotate_rows_log_event::write_data_header(Log_event_writer *writer) { return 0; } -bool Annotate_rows_log_event::write_data_body() +bool Annotate_rows_log_event::write_data_body(Log_event_writer *writer) { - return write_data(m_query_txt, m_query_len); + return write_data(writer, m_query_txt, m_query_len); } @@ -6900,6 +5836,13 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list) DBUG_RETURN(res); } +table_def Table_map_log_event::get_table_def() +{ + return table_def(m_coltype, m_colcnt, + m_field_metadata, m_field_metadata_size, + m_null_bits, m_flags); +} + int Table_map_log_event::do_apply_event(rpl_group_info *rgi) { RPL_TABLE_LIST *table_list; @@ -6938,16 +5881,26 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) LEX_CSTRING tmp_db_name= {db_mem, db_mem_length }; LEX_CSTRING tmp_tbl_name= {tname_mem, tname_mem_length }; - table_list->init_one_table(&tmp_db_name, &tmp_tbl_name, 0, TL_WRITE); - table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ? 0 : m_table_id; - table_list->updating= 1; + /* + The memory allocated by the table_def structure (i.e., not the + memory allocated *for* the table_def structure) is released + inside rpl_group_info::clear_tables_to_lock() by calling the + table_def destructor explicitly. + */ + new(table_list) RPL_TABLE_LIST(&tmp_db_name, &tmp_tbl_name, TL_WRITE, + get_table_def(), + m_flags & TM_BIT_HAS_TRIGGERS_F); + + table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ? + 0: m_table_id; table_list->required_type= TABLE_TYPE_NORMAL; + table_list->open_type= OT_BASE_ONLY; + DBUG_ASSERT(table_list->updating); DBUG_PRINT("debug", ("table: %s is mapped to %llu", table_list->table_name.str, table_list->table_id)); - table_list->master_had_triggers= ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? 1 : 0); - DBUG_PRINT("debug", ("table->master_had_triggers=%d", + DBUG_PRINT("debug", ("table->master_had_triggers=%d", (int)table_list->master_had_triggers)); enum_tbl_map_status tblmap_status= check_table_map(rgi, table_list); @@ -6956,23 +5909,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) DBUG_ASSERT(thd->lex->query_tables != table_list); /* - Use placement new to construct the table_def instance in the - memory allocated for it inside table_list. - - The memory allocated by the table_def structure (i.e., not the - memory allocated *for* the table_def structure) is released - inside Relay_log_info::clear_tables_to_lock() by calling the - table_def destructor explicitly. - */ - new (&table_list->m_tabledef) - table_def(m_coltype, m_colcnt, - m_field_metadata, m_field_metadata_size, - m_null_bits, m_flags); - table_list->m_tabledef_valid= TRUE; - table_list->m_conv_table= NULL; - table_list->open_type= OT_BASE_ONLY; - - /* We record in the slave's information that the table should be locked by linking the table into the list of tables to lock. */ @@ -7016,8 +5952,9 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) execute in a user session */ my_error(ER_SLAVE_FATAL_ERROR, MYF(0), buf); - } - + } + + table_list->~RPL_TABLE_LIST(); my_free(memory); } @@ -7042,7 +5979,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ -bool Table_map_log_event::write_data_header() +bool Table_map_log_event::write_data_header(Log_event_writer *writer) { DBUG_ASSERT(m_table_id != UINT32_MAX); uchar buf[TABLE_MAP_HEADER_LEN]; @@ -7050,14 +5987,14 @@ bool Table_map_log_event::write_data_header() { int4store(buf + 0, (ulong) m_table_id); int2store(buf + 4, m_flags); - return (write_data(buf, 6)); + return (write_data(writer, buf, 6)); }); int6store(buf + TM_MAPID_OFFSET, m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); - return write_data(buf, TABLE_MAP_HEADER_LEN); + return write_data(writer, buf, TABLE_MAP_HEADER_LEN); } -bool Table_map_log_event::write_data_body() +bool Table_map_log_event::write_data_body(Log_event_writer *writer) { DBUG_ASSERT(m_dbnam != NULL); DBUG_ASSERT(m_tblnam != NULL); @@ -7078,17 +6015,17 @@ bool Table_map_log_event::write_data_body() uchar mbuf[MAX_INT_WIDTH]; uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size); - return write_data(dbuf, sizeof(dbuf)) || - write_data(m_dbnam, m_dblen+1) || - write_data(tbuf, sizeof(tbuf)) || - write_data(m_tblnam, m_tbllen+1) || - write_data(cbuf, (size_t) (cbuf_end - cbuf)) || - write_data(m_coltype, m_colcnt) || - write_data(mbuf, (size_t) (mbuf_end - mbuf)) || - write_data(m_field_metadata, m_field_metadata_size), - write_data(m_null_bits, (m_colcnt + 7) / 8) || - write_data((const uchar*) m_metadata_buf.ptr(), - m_metadata_buf.length()); + return write_data(writer, dbuf, sizeof(dbuf)) || + write_data(writer, m_dbnam, m_dblen+1) || + write_data(writer, tbuf, sizeof(tbuf)) || + write_data(writer, m_tblnam, m_tbllen+1) || + write_data(writer, cbuf, (size_t) (cbuf_end - cbuf)) || + write_data(writer, m_coltype, m_colcnt) || + write_data(writer, mbuf, (size_t) (mbuf_end - mbuf)) || + write_data(writer, m_field_metadata, m_field_metadata_size), + write_data(writer, m_null_bits, (m_colcnt + 7) / 8) || + write_data(writer, (const uchar*) m_metadata_buf.ptr(), + m_metadata_buf.length()); } /** @@ -7516,15 +6453,15 @@ Write_rows_compressed_log_event::Write_rows_compressed_log_event( m_type = WRITE_ROWS_COMPRESSED_EVENT_V1; } -bool Write_rows_compressed_log_event::write() +bool Write_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } #if defined(HAVE_REPLICATION) int -Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +Write_rows_log_event::do_before_row_operations(const rpl_group_info *) { int error= 0; @@ -7602,8 +6539,7 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability } int -Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, - int error) +Write_rows_log_event::do_after_row_operations(int error) { int local_error= 0; @@ -7734,12 +6670,11 @@ is_duplicate_key_error(int errcode) @c ha_update_row() or first deleted and then new record written. */ -int -Rows_log_event::write_row(rpl_group_info *rgi, - const bool overwrite) +int Rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite) { DBUG_ENTER("write_row"); - DBUG_ASSERT(m_table != NULL && thd != NULL); + DBUG_ASSERT(m_table != NULL); + DBUG_ASSERT(thd != NULL); TABLE *table= m_table; // pointer to event's table int error; @@ -7821,10 +6756,8 @@ Rows_log_event::write_row(rpl_group_info *rgi, error= update_sequence(); else while (unlikely(error= table->file->ha_write_row(table->record[0]))) { - if (error == HA_ERR_LOCK_DEADLOCK || - error == HA_ERR_LOCK_WAIT_TIMEOUT || - (keynum= table->file->get_dup_key(error)) < 0 || - !overwrite) + if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT || + (keynum= table->file->get_dup_key(error)) < 0 || !overwrite) { DBUG_PRINT("info",("get_dup_key returns %d)", keynum)); /* @@ -8098,7 +7031,7 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi) #if defined(HAVE_REPLICATION) -uint8 Write_rows_log_event::get_trg_event_map() +uint8 Write_rows_log_event::get_trg_event_map() const { return trg2bit(TRG_EVENT_INSERT) | trg2bit(TRG_EVENT_UPDATE) | trg2bit(TRG_EVENT_DELETE); @@ -8110,16 +7043,19 @@ uint8 Write_rows_log_event::get_trg_event_map() **************************************************************************/ #if defined(HAVE_REPLICATION) -/* - Compares table->record[0] and table->record[1] +/** + @brief Compares table->record[0] and table->record[1] - Returns TRUE if different. + @returns true if different. */ static bool record_compare(TABLE *table, bool vers_from_plain= false) { - bool result= FALSE; + bool result= false; + bool all_values_set= bitmap_is_set_all(&table->has_value_set); + /** Compare full record only if: + - all fields were given values - there are no blob fields (otherwise we would also need to compare blobs contents as well); - there are no varchar fields (otherwise we would also need @@ -8130,24 +7066,23 @@ static bool record_compare(TABLE *table, bool vers_from_plain= false) */ if ((table->s->blob_fields + table->s->varchar_fields + - table->s->null_fields) == 0) + table->s->null_fields) == 0 + && all_values_set) { - result= cmp_record(table,record[1]); + result= cmp_record(table, record[1]); goto record_compare_exit; } /* Compare null bits */ - if (memcmp(table->null_flags, - table->null_flags+table->s->rec_buff_length, - table->s->null_bytes)) - { - result= TRUE; // Diff in NULL value - goto record_compare_exit; - } + if (all_values_set && memcmp(table->null_flags, + table->null_flags + table->s->rec_buff_length, + table->s->null_bytes)) + goto record_compare_differ; // Diff in NULL value /* Compare fields */ for (Field **ptr=table->field ; *ptr ; ptr++) { + Field *f= *ptr; /* If the table is versioned, don't compare using the version if there is a primary key. If there isn't a primary key, we need the version to @@ -8157,27 +7092,118 @@ static bool record_compare(TABLE *table, bool vers_from_plain= false) because the implicit row_end value will be set to the maximum value for the latest row update (which is what we care about). */ - if (table->versioned() && (*ptr)->vers_sys_field() && + if (table->versioned() && f->vers_sys_field() && (table->s->primary_key < MAX_KEY || - (vers_from_plain && table->vers_start_field() == (*ptr)))) + (vers_from_plain && table->vers_start_field() == f))) continue; - /** - We only compare field contents that are not null. - NULL fields (i.e., their null bits) were compared - earlier. + + /* + We only compare fields that exist on the master (or in ONLINE + ALTER case, that were in the original table). */ - if (!(*(ptr))->is_null()) + if (!all_values_set) { - if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length)) - { - result= TRUE; - goto record_compare_exit; - } + if (!f->has_explicit_value() && + /* Don't skip row_end if replicating unversioned -> versioned */ + !(vers_from_plain && table->vers_end_field() == f)) + continue; + if (f->is_null() != f->is_null(table->s->rec_buff_length)) + goto record_compare_differ; } + + if (!f->is_null() && !f->vcol_info && + f->cmp_binary_offset(table->s->rec_buff_length)) + goto record_compare_differ; } record_compare_exit: return result; +record_compare_differ: + return true; +} +/** + Traverses default item expr of a field, and underlying field's default values. + If it is an extra field and has no value replicated, then its default expr + should be also checked. + */ +class Rpl_key_part_checker: public Field_enumerator +{ + bool online_alter; + Field *next_number_field; + bool field_usable; +public: + + + void visit_field(Item_field *item) override + { + if (!field_usable) + return; + field_usable= check_field(item->field); + } + + bool check_field(Field *f) + { + if (f->has_explicit_value()) + return true; + + if ((!f->vcol_info && !online_alter) || f == next_number_field) + return false; + + Virtual_column_info *computed= f->vcol_info ? f->vcol_info + : f->default_value; + + if (computed == NULL) + return true; // No DEFAULT, or constant DEFAULT + + // Deterministic DEFAULT or vcol expression + return !(computed->flags & VCOL_NOT_STRICTLY_DETERMINISTIC) + && !computed->expr->walk(&Item::enumerate_field_refs_processor, + false, this) + && field_usable; + } + + Rpl_key_part_checker(bool online_alter, Field *next_number_field): + online_alter(online_alter), next_number_field(next_number_field), + field_usable(true) {} +}; + + +/** + Newly added fields with non-deterministic defaults (i.e. DEFAULT(RANDOM()), + CURRENT_TIMESTAMP, AUTO_INCREMENT) should be excluded from key search. + Basically we exclude all the default-filled fields based on + has_explicit_value bitmap. +*/ +uint Rows_log_event::find_key_parts(const KEY *key) const +{ + RPL_TABLE_LIST *tl= (RPL_TABLE_LIST*)m_table->pos_in_table_list; + const bool online_alter= tl->m_online_alter_copy_fields; + uint p; + + if (!m_table->s->keys_in_use.is_set(uint(key - m_table->key_info))) + return 0; + + if (!online_alter) + { + if (m_cols.n_bits >= m_table->s->fields) // replicated more than slave has + return key->user_defined_key_parts; + if (m_table->s->virtual_fields == 0) + { + for (p= 0; p < key->user_defined_key_parts; p++) + if (key->key_part[p].fieldnr > m_cols.n_bits) // extra + break; + return p; + } + } + + Rpl_key_part_checker key_part_checker(online_alter, + m_table->found_next_number_field); + for (p= 0; p < key->user_defined_key_parts; p++) + { + if (!key_part_checker.check_field(key->key_part[p].field)) + break; + } + return p; } @@ -8187,74 +7213,127 @@ record_compare_exit: A primary key is preferred if it exists; otherwise a unique index is preferred. Else we pick the index with the smalles rec_per_key value. - If a suitable key is found, set @c m_key, @c m_key_nr and @c m_key_info - member fields appropriately. + If a suitable key is found, set @c m_key, @c m_key_nr, @c m_key_info, + and @c m_usable_key_parts member fields appropriately. @returns Error code on failure, 0 on success. */ -int Rows_log_event::find_key() +int Rows_log_event::find_key(const rpl_group_info *rgi) { - uint i, best_key_nr, last_part; - KEY *key, *UNINIT_VAR(best_key); + DBUG_ASSERT(m_table); + RPL_TABLE_LIST *tl= (RPL_TABLE_LIST*)m_table->pos_in_table_list; + uint i, best_key_nr= 0, best_usable_key_parts= 0; + KEY *key; ulong UNINIT_VAR(best_rec_per_key), tmp; DBUG_ENTER("Rows_log_event::find_key"); - DBUG_ASSERT(m_table); - - best_key_nr= MAX_KEY; - /* - Keys are sorted so that any primary key is first, followed by unique keys, - followed by any other. So we will automatically pick the primary key if - it exists. - */ - for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++) + if ((best_key_nr= tl->cached_key_nr) != ~0U) { - if (!m_table->s->keys_in_use.is_set(i)) - continue; + DBUG_ASSERT(best_key_nr <= MAX_KEY); // use the cached value + best_usable_key_parts= tl->cached_usable_key_parts; + } + else + { + best_key_nr= MAX_KEY; + /* - We cannot use a unique key with NULL-able columns to uniquely identify - a row (but we can still select it for range scan below if nothing better - is available). + if the source (in the row event) and destination (in m_table) records + don't have the same structure, some keys below might be unusable + for find_row(). + + If it's a replication and slave table (m_table) has less columns + than the master's - easy, all keys are usable. + + If slave's table has more columns, but none of them are generated - + then any column beyond m_cols.n_bits makes an index unusable. + + If slave's table has generated columns or it's the online alter table + where arbitrary structure conversion is possible (in the replication case + one table must be a prefix of the other, see table_def::compatible_with) + we cannot deduce what destination columns will be affected by m_cols, + we have to actually unpack one row and examine has_explicit_value() */ - if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) + + if (tl->m_online_alter_copy_fields || + (m_cols.n_bits < m_table->s->fields && + m_table->s->virtual_fields)) { - best_key_nr= i; - best_key= key; - break; + const uchar *curr_row_end= m_curr_row_end; + Check_level_instant_set clis(m_table->in_use, CHECK_FIELD_IGNORE); + if (int err= unpack_row(rgi, m_table, m_width, m_curr_row, &m_cols, + &curr_row_end, &m_master_reclength, m_rows_end)) + DBUG_RETURN(err); } + /* - We can only use a non-unique key if it allows range scans (ie. skip - FULLTEXT indexes and such). + Keys are sorted so that any primary key is first, followed by unique keys, + followed by any other. So we will automatically pick the primary key if + it exists. */ - last_part= key->user_defined_key_parts - 1; - DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu", - key->name.str, last_part, key->rec_per_key[last_part])); - if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT)) - continue; - - tmp= key->rec_per_key[last_part]; - if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key)) + for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++) { - best_key_nr= i; - best_key= key; - best_rec_per_key= tmp; + uint usable_key_parts= find_key_parts(key); + if (usable_key_parts == 0) + continue; + /* + We cannot use a unique key with NULL-able columns to uniquely identify + a row (but we can still select it for range scan below if nothing better + is available). + */ + if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME && + usable_key_parts == key->user_defined_key_parts) + { + best_key_nr= i; + best_usable_key_parts= usable_key_parts; + break; + } + /* + We can only use a non-unique key if it allows range scans (ie. skip + FULLTEXT indexes and such). + */ + uint last_part= usable_key_parts - 1; + DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu", + key->name.str, last_part, key->rec_per_key[last_part])); + if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT)) + continue; + + tmp= key->rec_per_key[last_part]; + if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key)) + { + best_key_nr= i; + best_usable_key_parts= usable_key_parts; + best_rec_per_key= tmp; + } } + tl->cached_key_nr= best_key_nr; + tl->cached_usable_key_parts= best_usable_key_parts; } + m_key_nr= best_key_nr; + m_usable_key_parts= best_usable_key_parts; if (best_key_nr == MAX_KEY) - { m_key_info= NULL; - DBUG_RETURN(0); + else + { + m_key_info= m_table->key_info + best_key_nr; + + if (!use_pk_position()) + { + // Allocate buffer for key searches + m_key= (uchar *) my_malloc(PSI_INSTRUMENT_ME, m_key_info->key_length, MYF(MY_WME)); + if (m_key == NULL) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + } } - // Allocate buffer for key searches - m_key= (uchar *) my_malloc(PSI_INSTRUMENT_ME, best_key->key_length, MYF(MY_WME)); - if (m_key == NULL) - DBUG_RETURN(HA_ERR_OUT_OF_MEM); - m_key_info= best_key; - m_key_nr= best_key_nr; + DBUG_EXECUTE_IF("rpl_report_chosen_key", + push_warning_printf(m_table->in_use, + Sql_condition::WARN_LEVEL_NOTE, + ER_UNKNOWN_ERROR, "Key chosen: %d", + m_key_nr == MAX_KEY ? + -1 : m_key_nr);); - DBUG_RETURN(0);; + DBUG_RETURN(0); } @@ -8315,6 +7394,14 @@ static int row_not_found_error(rpl_group_info *rgi) ? HA_ERR_KEY_NOT_FOUND : HA_ERR_RECORD_CHANGED; } +bool Rows_log_event::use_pk_position() const +{ + return m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION + && m_table->s->primary_key < MAX_KEY + && m_key_nr == m_table->s->primary_key + && m_usable_key_parts == m_table->key_info->user_defined_key_parts; +} + static int end_of_file_error(rpl_group_info *rgi) { return rgi->speculation != rpl_group_info::SPECULATE_OPTIMISTIC @@ -8356,11 +7443,13 @@ int Rows_log_event::find_row(rpl_group_info *rgi) { DBUG_ENTER("Rows_log_event::find_row"); - DBUG_ASSERT(m_table && m_table->in_use != NULL); + DBUG_ASSERT(m_table); + DBUG_ASSERT(m_table->in_use != NULL); TABLE *table= m_table; int error= 0; bool is_table_scan= false, is_index_scan= false; + Check_level_instant_set clis(table->in_use, CHECK_FIELD_IGNORE); /* rpl_row_tabledefs.test specifies that @@ -8391,8 +7480,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi) DBUG_PRINT("info",("looking for the following record")); DBUG_DUMP("record[0]", table->record[0], table->s->reclength); - if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && - table->s->primary_key < MAX_KEY) + if (use_pk_position()) { /* Use a more efficient method to fetch the record given by @@ -8413,7 +7501,6 @@ int Rows_log_event::find_row(rpl_group_info *rgi) table->s->reclength) == 0); */ - int error; DBUG_PRINT("info",("locating record using primary key (position)")); error= table->file->ha_rnd_pos_by_record(table->record[0]); @@ -8429,12 +7516,6 @@ int Rows_log_event::find_row(rpl_group_info *rgi) // We can't use position() - try other methods. - /* - We need to retrieve all fields - TODO: Move this out from this function to main loop - */ - table->use_all_columns(); - /* Save copy of the record in table->record[1]. It might be needed later if linear search is used to find exact match. @@ -8481,10 +7562,13 @@ int Rows_log_event::find_row(rpl_group_info *rgi) table->record[0][table->s->null_bytes - 1]|= 256U - (1U << table->s->last_null_bit_pos); - if (unlikely((error= table->file->ha_index_read_map(table->record[0], - m_key, - HA_WHOLE_KEY, - HA_READ_KEY_EXACT)))) + const enum ha_rkey_function find_flag= + m_usable_key_parts == m_key_info->user_defined_key_parts + ? HA_READ_KEY_EXACT : HA_READ_KEY_OR_NEXT; + error= table->file->ha_index_read_map(table->record[0], m_key, + make_keypart_map(m_usable_key_parts), + find_flag); + if (unlikely(error)) { DBUG_PRINT("info",("no record matching the key found in the table")); if (error == HA_ERR_KEY_NOT_FOUND) @@ -8516,10 +7600,10 @@ int Rows_log_event::find_row(rpl_group_info *rgi) found. I can see no scenario where it would be incorrect to chose the row to change only using a PK or an UNNI. */ - if (table->key_info->flags & HA_NOSAME) + if (find_flag == HA_READ_KEY_EXACT && table->key_info->flags & HA_NOSAME) { /* Unique does not have non nullable part */ - if (!(table->key_info->flags & (HA_NULL_PART_KEY))) + if (!(table->key_info->flags & HA_NULL_PART_KEY)) { error= 0; goto end; @@ -8592,9 +7676,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi) /* Continue until we find the right record or have made a full loop */ do { - error= table->file->ha_rnd_next(table->record[0]); - - if (unlikely(error)) + if (unlikely((error= table->file->ha_rnd_next(table->record[0])))) DBUG_PRINT("info", ("error: %s", HA_ERR(error))); switch (error) { @@ -8656,16 +7738,16 @@ Delete_rows_compressed_log_event::Delete_rows_compressed_log_event( m_type= DELETE_ROWS_COMPRESSED_EVENT_V1; } -bool Delete_rows_compressed_log_event::write() +bool Delete_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } #if defined(HAVE_REPLICATION) int -Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +Delete_rows_log_event::do_before_row_operations(const rpl_group_info *rgi) { /* Increment the global status delete count variable @@ -8673,23 +7755,14 @@ Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability if (get_flags(STMT_END_F)) status_var_increment(thd->status_var.com_stat[SQLCOM_DELETE]); - if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && - m_table->s->primary_key < MAX_KEY) - { - /* - We don't need to allocate any memory for m_key since it is not used. - */ - return 0; - } if (do_invoke_trigger()) m_table->prepare_triggers_for_delete_stmt_or_event(); - return find_key(); + return find_key(rgi); } int -Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, - int error) +Delete_rows_log_event::do_after_row_operations(int error) { m_table->file->ha_index_or_rnd_end(); my_free(m_key); @@ -8748,7 +7821,6 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) error= HA_ERR_GENERIC; // in case if error is not set yet if (likely(!error)) { - m_table->mark_columns_per_binlog_row_image(); if (m_vers_from_plain && m_table->versioned(VERS_TIMESTAMP)) { Field *end= m_table->vers_end_field(); @@ -8761,7 +7833,6 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) { error= m_table->file->ha_delete_row(m_table->record[0]); } - m_table->default_column_bitmaps(); } if (invoke_triggers && likely(!error) && unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_AFTER, FALSE))) @@ -8775,7 +7846,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ #if defined(HAVE_REPLICATION) -uint8 Delete_rows_log_event::get_trg_event_map() +uint8 Delete_rows_log_event::get_trg_event_map() const { return trg2bit(TRG_EVENT_DELETE); } @@ -8805,9 +7876,9 @@ Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1; } -bool Update_rows_compressed_log_event::write() +bool Update_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } void Update_rows_log_event::init(MY_BITMAP const *cols) @@ -8827,7 +7898,7 @@ void Update_rows_log_event::init(MY_BITMAP const *cols) #if defined(HAVE_REPLICATION) int -Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +Update_rows_log_event::do_before_row_operations(const rpl_group_info *rgi) { /* Increment the global status update count variable @@ -8836,7 +7907,7 @@ Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability status_var_increment(thd->status_var.com_stat[SQLCOM_UPDATE]); int err; - if ((err= find_key())) + if ((err= find_key(rgi))) return err; if (do_invoke_trigger()) @@ -8846,8 +7917,7 @@ Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability } int -Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, - int error) +Update_rows_log_event::do_after_row_operations(int error) { /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ m_table->file->ha_index_or_rnd_end(); @@ -9004,7 +8074,7 @@ err: #if defined(HAVE_REPLICATION) -uint8 Update_rows_log_event::get_trg_event_map() +uint8 Update_rows_log_event::get_trg_event_map() const { return trg2bit(TRG_EVENT_UPDATE); } @@ -9089,23 +8159,23 @@ int Incident_log_event::do_apply_event(rpl_group_info *rgi) bool -Incident_log_event::write_data_header() +Incident_log_event::write_data_header(Log_event_writer *writer) { DBUG_ENTER("Incident_log_event::write_data_header"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); uchar buf[sizeof(int16)]; int2store(buf, (int16) m_incident); - DBUG_RETURN(write_data(buf, sizeof(buf))); + DBUG_RETURN(write_data(writer, buf, sizeof(buf))); } bool -Incident_log_event::write_data_body() +Incident_log_event::write_data_body(Log_event_writer *writer) { uchar tmp[1]; DBUG_ENTER("Incident_log_event::write_data_body"); tmp[0]= (uchar) m_message.length; - DBUG_RETURN(write_data(tmp, sizeof(tmp)) || - write_data(m_message.str, m_message.length)); + DBUG_RETURN(write_data(writer, tmp, sizeof(tmp)) || + write_data(writer, m_message.str, m_message.length)); } |