summaryrefslogtreecommitdiffstats
path: root/sql/log_event_server.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sql/log_event_server.cc2024
1 files changed, 547 insertions, 1477 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 6e22a3ab..f7029b39 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -53,6 +53,7 @@
#include "wsrep_mysqld.h"
#include "sql_insert.h"
#include "sql_table.h"
+#include <mysql/service_wsrep.h>
#include <my_bitmap.h>
#include "rpl_utility.h"
@@ -232,19 +233,12 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
#if defined(HAVE_REPLICATION)
static void set_thd_db(THD *thd, Rpl_filter *rpl_filter,
- const char *db, uint32 db_len)
+ const LEX_CSTRING &db)
{
- char lcase_db_buf[NAME_LEN +1];
- LEX_CSTRING new_db;
- new_db.length= db_len;
- if (lower_case_table_names == 1)
- {
- strmov(lcase_db_buf, db);
- my_casedn_str(system_charset_info, lcase_db_buf);
- new_db.str= lcase_db_buf;
- }
- else
- new_db.str= db;
+ IdentBuffer<NAME_LEN> lcase_db_buf;
+ LEX_CSTRING new_db= lower_case_table_names == 1 ?
+ lcase_db_buf.copy_casedn(db).to_lex_cstring() :
+ db;
/* TODO WARNING this makes rewrite_db respect lower_case_table_names values
* for more info look MDEV-17446 */
new_db.str= rpl_filter->get_rewrite_db(new_db.str, &new_db.length);
@@ -367,37 +361,6 @@ inline bool unexpected_error_code(int unexpected_error)
}
}
-/*
- pretty_print_str()
-*/
-
-static void
-pretty_print_str(String *packet, const char *str, int len)
-{
- const char *end= str + len;
- packet->append(STRING_WITH_LEN("'"));
- while (str < end)
- {
- char c;
- switch ((c=*str++)) {
- case '\n': packet->append(STRING_WITH_LEN("\\n")); break;
- case '\r': packet->append(STRING_WITH_LEN("\\r")); break;
- case '\\': packet->append(STRING_WITH_LEN("\\\\")); break;
- case '\b': packet->append(STRING_WITH_LEN("\\b")); break;
- case '\t': packet->append(STRING_WITH_LEN("\\t")); break;
- case '\'': packet->append(STRING_WITH_LEN("\\'")); break;
- case 0 : packet->append(STRING_WITH_LEN("\\0")); break;
- default:
- packet->append(&c, 1);
- break;
- }
- }
- packet->append(STRING_WITH_LEN("'"));
-}
-#endif /* HAVE_REPLICATION */
-
-
-#if defined(HAVE_REPLICATION)
/**
Create a prefix for the temporary files that is to be used for
@@ -540,7 +503,7 @@ int append_query_string(CHARSET_INFO *csinfo, String *to,
beg= (char*) to->ptr() + to->length();
ptr= beg;
if (csinfo->escape_with_backslash_is_dangerous)
- ptr= str_to_hex(ptr, str, len);
+ ptr= str_to_hex(ptr, (uchar*)str, len);
else
{
*ptr++= '\'';
@@ -574,8 +537,8 @@ int append_query_string(CHARSET_INFO *csinfo, String *to,
**************************************************************************/
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg),
- checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+ :log_pos(0), temp_buf(0), exec_time(0),
+ slave_exec_mode(SLAVE_EXEC_MODE_STRICT), thd(thd_arg)
{
server_id= thd->variables.server_id;
when= thd->start_time;
@@ -599,7 +562,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
Log_event::Log_event()
:temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE),
- thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+ slave_exec_mode(SLAVE_EXEC_MODE_STRICT), thd(0)
{
server_id= global_system_variables.server_id;
/*
@@ -620,29 +583,17 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Log_event::do_update_pos");
+ DBUG_ASSERT(rli);
DBUG_ASSERT(!rli->belongs_to_client());
+
/*
- rli is null when (as far as I (Guilhem) know) the caller is
- Load_log_event::do_apply_event *and* that one is called from
- Execute_load_log_event::do_apply_event. In this case, we don't
- do anything here ; Execute_load_log_event::do_apply_event will
- call Log_event::do_apply_event again later with the proper rli.
- Strictly speaking, if we were sure that rli is null only in the
- case discussed above, 'if (rli)' is useless here. But as we are
- not 100% sure, keep it for now.
-
- Matz: I don't think we will need this check with this refactoring.
+ In parallel execution, delay position update for the events that are
+ not part of event groups (format description, rotate, and such) until
+ the actual event execution reaches that point.
*/
- if (rli)
- {
- /*
- In parallel execution, delay position update for the events that are
- not part of event groups (format description, rotate, and such) until
- the actual event execution reaches that point.
- */
- if (!rgi->is_parallel_exec || is_group_event(get_type_code()))
- rli->stmt_done(log_pos, thd, rgi);
- }
+ if (!rgi->is_parallel_exec || is_group_event(get_type_code()))
+ rli->stmt_done(log_pos, thd, rgi);
+
DBUG_RETURN(0); // Cannot fail currently
}
@@ -736,85 +687,6 @@ void Log_event::init_show_field_list(THD *thd, List<Item>* field_list)
mem_root);
}
-/**
- A decider of whether to trigger checksum computation or not.
- To be invoked in Log_event::write() stack.
- The decision is positive
-
- S,M) if it's been marked for checksumming with @c checksum_alg
-
- M) otherwise, if @@global.binlog_checksum is not NONE and the event is
- directly written to the binlog file.
- The to-be-cached event decides at @c write_cache() time.
-
- Otherwise the decision is negative.
-
- @note A side effect of the method is altering Log_event::checksum_alg
- it the latter was undefined at calling.
-
- @return true Checksum should be used. Log_event::checksum_alg is set.
- @return false No checksum
-*/
-
-my_bool Log_event::need_checksum()
-{
- my_bool ret;
- DBUG_ENTER("Log_event::need_checksum");
-
- /*
- few callers of Log_event::write
- (incl FD::write, FD constructing code on the slave side, Rotate relay log
- and Stop event)
- provides their checksum alg preference through Log_event::checksum_alg.
- */
- if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
- ret= checksum_alg != BINLOG_CHECKSUM_ALG_OFF;
- else
- {
- ret= binlog_checksum_options && cache_type == Log_event::EVENT_NO_CACHE;
- checksum_alg= ret ? (enum_binlog_checksum_alg)binlog_checksum_options
- : BINLOG_CHECKSUM_ALG_OFF;
- }
- /*
- FD calls the methods before data_written has been calculated.
- The following invariant claims if the current is not the first
- call (and therefore data_written is not zero) then `ret' must be
- TRUE. It may not be null because FD is always checksummed.
- */
-
- DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
- data_written == 0);
-
- DBUG_ASSERT(!ret ||
- ((checksum_alg == binlog_checksum_options ||
- /*
- Stop event closes the relay-log and its checksum alg
- preference is set by the caller can be different
- from the server's binlog_checksum_options.
- */
- get_type_code() == STOP_EVENT ||
- /*
- Rotate:s can be checksummed regardless of the server's
- binlog_checksum_options. That applies to both
- the local RL's Rotate and the master's Rotate
- which IO thread instantiates via queue_binlog_ver_3_event.
- */
- get_type_code() == ROTATE_EVENT ||
- get_type_code() == START_ENCRYPTION_EVENT ||
- /* FD is always checksummed */
- get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
- checksum_alg != BINLOG_CHECKSUM_ALG_OFF));
-
- DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
-
- DBUG_ASSERT(((get_type_code() != ROTATE_EVENT &&
- get_type_code() != STOP_EVENT) ||
- get_type_code() != FORMAT_DESCRIPTION_EVENT) ||
- cache_type == Log_event::EVENT_NO_CACHE);
-
- DBUG_RETURN(ret);
-}
-
int Log_event_writer::write_internal(const uchar *pos, size_t len)
{
DBUG_ASSERT(!ctx || encrypt_or_write == &Log_event_writer::encrypt_and_write);
@@ -954,7 +826,7 @@ int Log_event_writer::write_footer()
Log_event::write_header()
*/
-bool Log_event::write_header(size_t event_data_length)
+bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
ulong now;
@@ -963,8 +835,6 @@ bool Log_event::write_header(size_t event_data_length)
(longlong) writer->pos(), event_data_length,
(int) get_type_code()));
- writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0;
-
/* Store number of bytes that will be written by this event */
data_written= event_data_length + sizeof(header) + writer->checksum_len;
@@ -973,11 +843,17 @@ bool Log_event::write_header(size_t event_data_length)
change the position
*/
- if (is_artificial_event())
+ if (is_artificial_event() ||
+ cache_type == Log_event::EVENT_STMT_CACHE ||
+ cache_type == Log_event::EVENT_TRANSACTIONAL_CACHE)
{
/*
Artificial events are automatically generated and do not exist
in master's binary log, so log_pos should be set to 0.
+
+ Events written through transaction or statement cache have log_pos set
+ to 0 so that they can be copied directly to the binlog without having
+ to compute the real end_log_pos.
*/
log_pos= 0;
}
@@ -1112,7 +988,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src,
will print!
*/
-bool Query_log_event::write()
+bool Query_log_event::write(Log_event_writer *writer)
{
uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS];
uchar *start, *start_of_status;
@@ -1184,7 +1060,7 @@ bool Query_log_event::write()
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
{
store_str_with_code_and_len(&start,
- catalog, catalog_len, Q_CATALOG_NZ_CODE);
+ catalog, catalog_len, (uint) Q_CATALOG_NZ_CODE);
/*
In 5.0.x where x<4 masters we used to store the end zero here. This was
a waste of one byte so we don't do it in x>=4 masters. We change code to
@@ -1211,6 +1087,14 @@ bool Query_log_event::write()
int2store(start+2, auto_increment_offset);
start+= 4;
}
+
+ if (thd && (thd->used & THD::CHARACTER_SET_COLLATIONS_USED))
+ {
+ *start++= Q_CHARACTER_SET_COLLATIONS;
+ size_t len= thd->variables.character_set_collations.to_binary((char*)start);
+ start+= len;
+ }
+
if (charset_inited)
{
*start++= Q_CHARSET_CODE;
@@ -1244,18 +1128,6 @@ bool Query_log_event::write()
int8store(start, table_map_for_update);
start+= 8;
}
- if (master_data_written != 0)
- {
- /*
- Q_MASTER_DATA_WRITTEN_CODE only exists in relay logs where the master
- has binlog_version<4 and the slave has binlog_version=4. See comment
- for master_data_written in log_event.h for details.
- */
- *start++= Q_MASTER_DATA_WRITTEN_CODE;
- int4store(start, master_data_written);
- start+= 4;
- }
-
if (thd && thd->need_binlog_invoker())
{
LEX_CSTRING user;
@@ -1365,16 +1237,16 @@ bool Query_log_event::write()
event_length= ((uint) (start-buf) + get_post_header_size_for_derived() +
db_len + 1 + q_len);
- return write_header(event_length) ||
- write_data(buf, QUERY_HEADER_LEN) ||
- write_post_header_for_derived() ||
- write_data(start_of_status, (uint) status_vars_len) ||
- write_data(db, db_len + 1) ||
- write_data(query, q_len) ||
- write_footer();
+ return write_header(writer, event_length) ||
+ write_data(writer, buf, QUERY_HEADER_LEN) ||
+ write_post_header_for_derived(writer) ||
+ write_data(writer, start_of_status, (uint) status_vars_len) ||
+ write_data(writer, db, db_len + 1) ||
+ write_data(writer, query, q_len) ||
+ write_footer(writer);
}
-bool Query_compressed_log_event::write()
+bool Query_compressed_log_event::write(Log_event_writer *writer)
{
uchar *buffer;
uint32 alloc_size, compressed_size;
@@ -1393,7 +1265,7 @@ bool Query_compressed_log_event::write()
uint32 q_len_tmp= q_len;
query= (char*) buffer;
q_len= compressed_size;
- ret= Query_log_event::write();
+ ret= Query_log_event::write(writer);
query= query_tmp;
q_len= q_len_tmp;
}
@@ -1451,7 +1323,6 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
lc_time_names_number(thd_arg->variables.lc_time_names->number),
charset_database_number(0),
table_map_for_update((ulonglong)thd_arg->table_map_for_update),
- master_data_written(0),
gtid_flags_extra(thd_arg->get_binlog_flags_for_alter()),
sa_seq_no(0)
{
@@ -1939,7 +1810,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
goto end;
}
- set_thd_db(thd, rpl_filter, db, db_len);
+ set_thd_db(thd, rpl_filter, LEX_CSTRING{db, db_len});
/*
Setting the character set and collation of the current database thd->db.
@@ -2009,6 +1880,17 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
thd->variables.sql_mode=
(sql_mode_t) ((thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE) |
(sql_mode & ~(sql_mode_t) MODE_NO_DIR_IN_CREATE));
+
+ size_t cslen= thd->variables.character_set_collations.from_binary(
+ character_set_collations.str,
+ character_set_collations.length);
+ if (cslen != character_set_collations.length)
+ {
+ // Fatal: either a broken even, or an unknown collation ID
+ thd->variables.character_set_collations.init();
+ goto compare_errors; // QQ: report an error here?
+ }
+
if (charset_inited)
{
rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
@@ -2295,7 +2177,7 @@ compare_errors:
expected_error,
actual_error ? thd->get_stmt_da()->message() : "no error",
actual_error,
- print_slave_db_safe(db), query_arg);
+ safe_str(db), query_arg);
thd->is_slave_error= 1;
}
/*
@@ -2465,7 +2347,7 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
bool
Query_log_event::peek_is_commit_rollback(const uchar *event_start,
size_t event_len,
- enum enum_binlog_checksum_alg
+ enum_binlog_checksum_alg
checksum_alg)
{
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
@@ -2485,23 +2367,11 @@ Query_log_event::peek_is_commit_rollback(const uchar *event_start,
!memcmp(event_start + (event_len-9), "\0ROLLBACK", 9);
}
-#endif
-
-
-/**************************************************************************
- Start_log_event_v3 methods
-**************************************************************************/
-
-Start_log_event_v3::Start_log_event_v3()
- :Log_event(), created(0), binlog_version(BINLOG_VERSION),
- dont_set_created(0)
-{
- memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
-}
-
+/***************************************************************************
+ Format_description_log_event methods
+****************************************************************************/
-#if defined(HAVE_REPLICATION)
-void Start_log_event_v3::pack_info(Protocol *protocol)
+void Format_description_log_event::pack_info(Protocol *protocol)
{
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
pos= strmov(buf, "Server ver: ");
@@ -2510,115 +2380,13 @@ void Start_log_event_v3::pack_info(Protocol *protocol)
pos= int10_to_str(binlog_version, pos, 10);
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
}
-#endif
-
-
-bool Start_log_event_v3::write()
-{
- char buff[START_V3_HEADER_LEN];
- int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
- memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
- if (!dont_set_created)
- created= get_time(); // this sets when and when_sec_part as a side effect
- int4store(buff + ST_CREATED_OFFSET,created);
- return write_header(sizeof(buff)) ||
- write_data(buff, sizeof(buff)) ||
- write_footer();
-}
-
-
-#if defined(HAVE_REPLICATION)
-
-/**
- Start_log_event_v3::do_apply_event() .
- The master started
-
- IMPLEMENTATION
- - To handle the case where the master died without having time to write
- DROP TEMPORARY TABLE, DO RELEASE_LOCK (prepared statements' deletion is
- TODO), we clean up all temporary tables that we got, if we are sure we
- can (see below).
-
- @todo
- - Remove all active user locks.
- Guilhem 2003-06: this is true but not urgent: the worst it can cause is
- the use of a bit of memory for a user lock which will not be used
- anymore. If the user lock is later used, the old one will be released. In
- other words, no deadlock problem.
-*/
-
-int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
-{
- DBUG_ENTER("Start_log_event_v3::do_apply_event");
- int error= 0;
- Relay_log_info *rli= rgi->rli;
-
- switch (binlog_version)
- {
- case 3:
- case 4:
- /*
- This can either be 4.x (then a Start_log_event_v3 is only at master
- startup so we are sure the master has restarted and cleared his temp
- tables; the event always has 'created'>0) or 5.0 (then we have to test
- 'created').
- */
- if (created)
- {
- rli->close_temporary_tables();
-
- /*
- The following is only false if we get here with a BINLOG statement
- */
- if (rli->mi)
- cleanup_load_tmpdir(&rli->mi->cmp_connection_name);
- }
- break;
-
- /*
- Now the older formats; in that case load_tmpdir is cleaned up by the I/O
- thread.
- */
- case 1:
- if (strncmp(rli->relay_log.description_event_for_exec->server_version,
- "3.23.57",7) >= 0 && created)
- {
- /*
- Can distinguish, based on the value of 'created': this event was
- generated at master startup.
- */
- rli->close_temporary_tables();
- }
- /*
- Otherwise, can't distinguish a Start_log_event generated at
- master startup and one generated by master FLUSH LOGS, so cannot
- be sure temp tables have to be dropped. So do nothing.
- */
- break;
- default:
- /*
- This case is not expected. It can be either an event corruption or an
- unsupported binary log version.
- */
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
- ER_THD(thd, ER_SLAVE_FATAL_ERROR),
- "Binlog version not supported");
- DBUG_RETURN(1);
- }
- DBUG_RETURN(error);
-}
#endif /* defined(HAVE_REPLICATION) */
-/***************************************************************************
- Format_description_log_event methods
-****************************************************************************/
-
-bool Format_description_log_event::write()
+bool Format_description_log_event::write(Log_event_writer *writer)
{
bool ret;
- bool no_checksum;
/*
- We don't call Start_log_event_v3::write() because this would make 2
+ We don't call Start_log_event_v::write() because this would make 2
my_b_safe_write().
*/
uchar buff[START_V3_HEADER_LEN+1];
@@ -2639,11 +2407,9 @@ bool Format_description_log_event::write()
FD_queue checksum_alg value.
*/
compile_time_assert(BINLOG_CHECKSUM_ALG_DESC_LEN == 1);
-#ifdef DBUG_ASSERT_EXISTS
- data_written= 0; // to prepare for need_checksum assert
-#endif
- uint8 checksum_byte= (uint8)
- (need_checksum() ? checksum_alg : BINLOG_CHECKSUM_ALG_OFF);
+ uint8 checksum_byte= (uint8) (used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF ?
+ used_checksum_alg : BINLOG_CHECKSUM_ALG_OFF);
+ DBUG_ASSERT(used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
/*
FD of checksum-aware server is always checksum-equipped, (V) is in,
regardless of @@global.binlog_checksum policy.
@@ -2657,17 +2423,14 @@ bool Format_description_log_event::write()
1 + 4 bytes bigger comparing to the former FD.
*/
- if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF)))
- {
- checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
- }
- ret= write_header(rec_size) ||
- write_data(buff, sizeof(buff)) ||
- write_data(post_header_len, number_of_event_types) ||
- write_data(&checksum_byte, sizeof(checksum_byte)) ||
- write_footer();
- if (no_checksum)
- checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ uint orig_checksum_len= writer->checksum_len;
+ writer->checksum_len= BINLOG_CHECKSUM_LEN;
+ ret= write_header(writer, rec_size) ||
+ write_data(writer, buff, sizeof(buff)) ||
+ write_data(writer, post_header_len, number_of_event_types) ||
+ write_data(writer, &checksum_byte, sizeof(checksum_byte)) ||
+ write_footer(writer);
+ writer->checksum_len= orig_checksum_len;
return ret;
}
@@ -2740,9 +2503,8 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
}
/*
- If this event comes from ourselves, there is no cleaning task to
- perform, we don't call Start_log_event_v3::do_apply_event()
- (this was just to update the log's description event).
+ If this event comes from ourselves, there is no cleaning task to perform,
+ we don't do cleanup (this was just to update the log's description event).
*/
if (server_id != (uint32) global_system_variables.server_id)
{
@@ -2755,7 +2517,24 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
0, then 96, then jump to first really asked event (which is
>96). So this is ok.
*/
- ret= Start_log_event_v3::do_apply_event(rgi);
+ switch (binlog_version)
+ {
+ case 4:
+ if (created)
+ {
+ rli->close_temporary_tables();
+
+ /* The following is only false if we get here with a BINLOG statement */
+ if (rli->mi)
+ cleanup_load_tmpdir(&rli->mi->cmp_connection_name);
+ }
+ break;
+ default:
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
+ "Binlog version not supported");
+ ret= 1;
+ }
}
if (!ret)
@@ -2824,568 +2603,6 @@ int Start_encryption_log_event::do_update_pos(rpl_group_info *rgi)
/**************************************************************************
- Load_log_event methods
-**************************************************************************/
-
-#if defined(HAVE_REPLICATION)
-bool Load_log_event::print_query(THD *thd, bool need_db, const char *cs,
- String *buf, my_off_t *fn_start,
- my_off_t *fn_end, const char *qualify_db)
-{
- if (need_db && db && db_len)
- {
- buf->append(STRING_WITH_LEN("use "));
- append_identifier(thd, buf, db, db_len);
- buf->append(STRING_WITH_LEN("; "));
- }
-
- buf->append(STRING_WITH_LEN("LOAD DATA "));
-
- if (is_concurrent)
- buf->append(STRING_WITH_LEN("CONCURRENT "));
-
- if (fn_start)
- *fn_start= buf->length();
-
- if (check_fname_outside_temp_buf())
- buf->append(STRING_WITH_LEN("LOCAL "));
- buf->append(STRING_WITH_LEN("INFILE '"));
- buf->append_for_single_quote(fname, fname_len);
- buf->append(STRING_WITH_LEN("' "));
-
- if (sql_ex.opt_flags & REPLACE_FLAG)
- buf->append(STRING_WITH_LEN("REPLACE "));
- else if (sql_ex.opt_flags & IGNORE_FLAG)
- buf->append(STRING_WITH_LEN("IGNORE "));
-
- buf->append(STRING_WITH_LEN("INTO"));
-
- if (fn_end)
- *fn_end= buf->length();
-
- buf->append(STRING_WITH_LEN(" TABLE "));
- if (qualify_db)
- {
- append_identifier(thd, buf, qualify_db, strlen(qualify_db));
- buf->append(STRING_WITH_LEN("."));
- }
- append_identifier(thd, buf, table_name, table_name_len);
-
- if (cs != NULL)
- {
- buf->append(STRING_WITH_LEN(" CHARACTER SET "));
- buf->append(cs, strlen(cs));
- }
-
- /* We have to create all optional fields as the default is not empty */
- buf->append(STRING_WITH_LEN(" FIELDS TERMINATED BY "));
- pretty_print_str(buf, sql_ex.field_term, sql_ex.field_term_len);
- if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
- buf->append(STRING_WITH_LEN(" OPTIONALLY "));
- buf->append(STRING_WITH_LEN(" ENCLOSED BY "));
- pretty_print_str(buf, sql_ex.enclosed, sql_ex.enclosed_len);
-
- buf->append(STRING_WITH_LEN(" ESCAPED BY "));
- pretty_print_str(buf, sql_ex.escaped, sql_ex.escaped_len);
-
- buf->append(STRING_WITH_LEN(" LINES TERMINATED BY "));
- pretty_print_str(buf, sql_ex.line_term, sql_ex.line_term_len);
- if (sql_ex.line_start_len)
- {
- buf->append(STRING_WITH_LEN(" STARTING BY "));
- pretty_print_str(buf, sql_ex.line_start, sql_ex.line_start_len);
- }
-
- if ((long) skip_lines > 0)
- {
- buf->append(STRING_WITH_LEN(" IGNORE "));
- buf->append_ulonglong(skip_lines);
- buf->append(STRING_WITH_LEN(" LINES "));
- }
-
- if (num_fields)
- {
- uint i;
- const char *field= fields;
- buf->append(STRING_WITH_LEN(" ("));
- for (i = 0; i < num_fields; i++)
- {
- if (i)
- {
- /*
- Yes, the space and comma is reversed here. But this is mostly dead
- code, at most used when reading really old binlogs from old servers,
- so better just leave it as is...
- */
- buf->append(STRING_WITH_LEN(" ,"));
- }
- append_identifier(thd, buf, field, field_lens[i]);
- field+= field_lens[i] + 1;
- }
- buf->append(STRING_WITH_LEN(")"));
- }
- return 0;
-}
-
-
-void Load_log_event::pack_info(Protocol *protocol)
-{
- char query_buffer[1024];
- String query_str(query_buffer, sizeof(query_buffer), system_charset_info);
-
- query_str.length(0);
- print_query(protocol->thd, TRUE, NULL, &query_str, 0, 0, NULL);
- protocol->store(query_str.ptr(), query_str.length(), &my_charset_bin);
-}
-#endif /* defined(HAVE_REPLICATION) */
-
-
-bool Load_log_event::write_data_header()
-{
- char buf[LOAD_HEADER_LEN];
- int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
- int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
- int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
- buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
- buf[L_DB_LEN_OFFSET] = (char)db_len;
- int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
- return write_data(buf, LOAD_HEADER_LEN) != 0;
-}
-
-
-bool Load_log_event::write_data_body()
-{
- if (sql_ex.write_data(writer))
- return 1;
- if (num_fields && fields && field_lens)
- {
- if (write_data(field_lens, num_fields) ||
- write_data(fields, field_block_len))
- return 1;
- }
- return (write_data(table_name, table_name_len + 1) ||
- write_data(db, db_len + 1) ||
- write_data(fname, fname_len));
-}
-
-
-Load_log_event::Load_log_event(THD *thd_arg, const sql_exchange *ex,
- const char *db_arg, const char *table_name_arg,
- List<Item> &fields_arg,
- bool is_concurrent_arg,
- enum enum_duplicates handle_dup,
- bool ignore, bool using_trans)
- :Log_event(thd_arg,
- (thd_arg->used & THD::THREAD_SPECIFIC_USED)
- ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
- using_trans),
- thread_id(thd_arg->thread_id),
- slave_proxy_id((ulong)thd_arg->variables.pseudo_thread_id),
- num_fields(0),fields(0),
- field_lens(0),field_block_len(0),
- table_name(table_name_arg ? table_name_arg : ""),
- db(db_arg), fname(ex->file_name), local_fname(FALSE),
- is_concurrent(is_concurrent_arg)
-{
- time_t end_time;
- time(&end_time);
- exec_time = (ulong) (end_time - thd_arg->start_time);
- /* db can never be a zero pointer in 4.0 */
- db_len = (uint32) strlen(db);
- table_name_len = (uint32) strlen(table_name);
- fname_len = (fname) ? (uint) strlen(fname) : 0;
- sql_ex.field_term = ex->field_term->ptr();
- sql_ex.field_term_len = (uint8) ex->field_term->length();
- sql_ex.enclosed = ex->enclosed->ptr();
- sql_ex.enclosed_len = (uint8) ex->enclosed->length();
- sql_ex.line_term = ex->line_term->ptr();
- sql_ex.line_term_len = (uint8) ex->line_term->length();
- sql_ex.line_start = ex->line_start->ptr();
- sql_ex.line_start_len = (uint8) ex->line_start->length();
- sql_ex.escaped = ex->escaped->ptr();
- sql_ex.escaped_len = (uint8) ex->escaped->length();
- sql_ex.opt_flags = 0;
- sql_ex.cached_new_format = -1;
-
- if (ex->dumpfile)
- sql_ex.opt_flags|= DUMPFILE_FLAG;
- if (ex->opt_enclosed)
- sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
-
- sql_ex.empty_flags= 0;
-
- switch (handle_dup) {
- case DUP_REPLACE:
- sql_ex.opt_flags|= REPLACE_FLAG;
- break;
- case DUP_UPDATE: // Impossible here
- case DUP_ERROR:
- break;
- }
- if (ignore)
- sql_ex.opt_flags|= IGNORE_FLAG;
-
- if (!ex->field_term->length())
- sql_ex.empty_flags |= FIELD_TERM_EMPTY;
- if (!ex->enclosed->length())
- sql_ex.empty_flags |= ENCLOSED_EMPTY;
- if (!ex->line_term->length())
- sql_ex.empty_flags |= LINE_TERM_EMPTY;
- if (!ex->line_start->length())
- sql_ex.empty_flags |= LINE_START_EMPTY;
- if (!ex->escaped->length())
- sql_ex.empty_flags |= ESCAPED_EMPTY;
-
- skip_lines = ex->skip_lines;
-
- List_iterator<Item> li(fields_arg);
- field_lens_buf.length(0);
- fields_buf.length(0);
- Item* item;
- while ((item = li++))
- {
- num_fields++;
- uchar len= (uchar) item->name.length;
- field_block_len += len + 1;
- fields_buf.append(item->name.str, len + 1);
- field_lens_buf.append((char*)&len, 1);
- }
-
- field_lens = (const uchar*)field_lens_buf.ptr();
- fields = fields_buf.ptr();
-}
-
-
-/**
- Load_log_event::set_fields()
-
- @note
- This function can not use the member variable
- for the database, since LOAD DATA INFILE on the slave
- can be for a different database than the current one.
- This is the reason for the affected_db argument to this method.
-*/
-
-void Load_log_event::set_fields(const char* affected_db,
- List<Item> &field_list,
- Name_resolution_context *context)
-{
- uint i;
- const char* field = fields;
- for (i= 0; i < num_fields; i++)
- {
- LEX_CSTRING field_name= {field, field_lens[i] };
- field_list.push_back(new (thd->mem_root)
- Item_field(thd, context,
- Lex_cstring_strlen(affected_db),
- Lex_cstring_strlen(table_name),
- field_name),
- thd->mem_root);
- field+= field_lens[i] + 1;
- }
-}
-
-
-#if defined(HAVE_REPLICATION)
-/**
- Does the data loading job when executing a LOAD DATA on the slave.
-
- @param net
- @param rli
- @param use_rli_only_for_errors If set to 1, rli is provided to
- Load_log_event::exec_event only for this
- function to have RPL_LOG_NAME and
- rli->last_slave_error, both being used by
- error reports. rli's position advancing
- is skipped (done by the caller which is
- Execute_load_log_event::exec_event).
- If set to 0, rli is provided for full use,
- i.e. for error reports and position
- advancing.
-
- @todo
- fix this; this can be done by testing rules in
- Create_file_log_event::exec_event() and then discarding Append_block and
- al.
- @todo
- this is a bug - this needs to be moved to the I/O thread
-
- @retval
- 0 Success
- @retval
- 1 Failure
-*/
-
-int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
- bool use_rli_only_for_errors)
-{
- Relay_log_info const *rli= rgi->rli;
- Rpl_filter *rpl_filter= rli->mi->rpl_filter;
- DBUG_ENTER("Load_log_event::do_apply_event");
-
- DBUG_ASSERT(thd->query() == 0);
- set_thd_db(thd, rpl_filter, db, db_len);
- thd->clear_error(1);
-
- /* see Query_log_event::do_apply_event() and BUG#13360 */
- DBUG_ASSERT(!rgi->m_table_map.count());
- /*
- Usually lex_start() is called by mysql_parse(), but we need it here
- as the present method does not call mysql_parse().
- */
- lex_start(thd);
- thd->lex->local_file= local_fname;
- thd->reset_for_next_command(0); // Errors are cleared above
-
- /*
- We test replicate_*_db rules. Note that we have already prepared
- the file to load, even if we are going to ignore and delete it
- now. So it is possible that we did a lot of disk writes for
- nothing. In other words, a big LOAD DATA INFILE on the master will
- still consume a lot of space on the slave (space in the relay log
- + space of temp files: twice the space of the file to load...)
- even if it will finally be ignored. TODO: fix this; this can be
- done by testing rules in Create_file_log_event::do_apply_event()
- and then discarding Append_block and al. Another way is do the
- filtering in the I/O thread (more efficient: no disk writes at
- all).
-
- Note: We do not need to execute reset_one_shot_variables() if this
- db_ok() test fails.
- Reason: The db stored in binlog events is the same for SET and for
- its companion query. If the SET is ignored because of
- db_ok(), the companion query will also be ignored, and if
- the companion query is ignored in the db_ok() test of
- ::do_apply_event(), then the companion SET also have so
- we don't need to reset_one_shot_variables().
- */
- if (rpl_filter->db_ok(thd->db.str))
- {
-#ifdef WITH_WSREP
- if (!wsrep_thd_is_applying(thd))
-#endif
- thd->set_time(when, when_sec_part);
- thd->set_query_id(next_query_id());
- thd->get_stmt_da()->opt_clear_warning_info(thd->query_id);
-
- TABLE_LIST tables;
- LEX_CSTRING db_name= { thd->strmake(thd->db.str, thd->db.length), thd->db.length };
- if (lower_case_table_names)
- my_casedn_str(system_charset_info, (char *)table_name);
- LEX_CSTRING tbl_name= { table_name, strlen(table_name) };
- tables.init_one_table(&db_name, &tbl_name, 0, TL_WRITE);
- tables.updating= 1;
-
- // the table will be opened in mysql_load
- if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db.str, &tables))
- {
- // TODO: this is a bug - this needs to be moved to the I/O thread
- if (net)
- skip_load_data_infile(net);
- }
- else
- {
- enum enum_duplicates handle_dup;
- bool ignore= 0;
- char query_buffer[1024];
- String query_str(query_buffer, sizeof(query_buffer), system_charset_info);
- char *load_data_query;
-
- query_str.length(0);
- /*
- Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
- and written to slave's binlog if binlogging is on.
- */
- print_query(thd, FALSE, NULL, &query_str, NULL, NULL, NULL);
- if (!(load_data_query= (char *)thd->strmake(query_str.ptr(),
- query_str.length())))
- {
- /*
- This will set thd->fatal_error in case of OOM. So we surely will notice
- that something is wrong.
- */
- goto error;
- }
-
- thd->set_query(load_data_query, (uint) (query_str.length()));
-
- if (sql_ex.opt_flags & REPLACE_FLAG)
- handle_dup= DUP_REPLACE;
- else if (sql_ex.opt_flags & IGNORE_FLAG)
- {
- ignore= 1;
- handle_dup= DUP_ERROR;
- }
- else
- {
- /*
- When replication is running fine, if it was DUP_ERROR on the
- master then we could choose IGNORE here, because if DUP_ERROR
- suceeded on master, and data is identical on the master and slave,
- then there should be no uniqueness errors on slave, so IGNORE is
- the same as DUP_ERROR. But in the unlikely case of uniqueness errors
- (because the data on the master and slave happen to be different
- (user error or bug), we want LOAD DATA to print an error message on
- the slave to discover the problem.
-
- If reading from net (a 3.23 master), mysql_load() will change this
- to IGNORE.
- */
- handle_dup= DUP_ERROR;
- }
- /*
- We need to set thd->lex->sql_command and thd->lex->duplicates
- since InnoDB tests these variables to decide if this is a LOAD
- DATA ... REPLACE INTO ... statement even though mysql_parse()
- is not called. This is not needed in 5.0 since there the LOAD
- DATA ... statement is replicated using mysql_parse(), which
- sets the thd->lex fields correctly.
- */
- thd->lex->sql_command= SQLCOM_LOAD;
- thd->lex->duplicates= handle_dup;
-
- sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
- String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
- String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
- String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
- String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
- String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
- ex.field_term= &field_term;
- ex.enclosed= &enclosed;
- ex.line_term= &line_term;
- ex.line_start= &line_start;
- ex.escaped= &escaped;
-
- ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
- if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
- ex.field_term->length(0);
-
- ex.skip_lines = skip_lines;
- List<Item> field_list;
- thd->lex->first_select_lex()->context.resolve_in_table_list_only(&tables);
- set_fields(tables.db.str,
- field_list, &thd->lex->first_select_lex()->context);
- thd->variables.pseudo_thread_id= thread_id;
- if (net)
- {
- // mysql_load will use thd->net to read the file
- thd->net.vio = net->vio;
- // Make sure the client does not get confused about the packet sequence
- thd->net.pkt_nr = net->pkt_nr;
- }
- /*
- It is safe to use tmp_list twice because we are not going to
- update it inside mysql_load().
- */
- List<Item> tmp_list;
- if (thd->open_temporary_tables(&tables) ||
- mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
- handle_dup, ignore, net != 0))
- thd->is_slave_error= 1;
- if (thd->cuted_fields)
- {
- /* log_pos is the position of the LOAD event in the master log */
- sql_print_warning("Slave: load data infile on table '%s' at "
- "log position %llu in log '%s' produced %ld "
- "warning(s). Default database: '%s'",
- (char*) table_name, log_pos, RPL_LOG_NAME,
- (ulong) thd->cuted_fields,
- thd->get_db());
- }
- if (net)
- net->pkt_nr= thd->net.pkt_nr;
- }
- }
- else
- {
- /*
- We will just ask the master to send us /dev/null if we do not
- want to load the data.
- TODO: this a bug - needs to be done in I/O thread
- */
- if (net)
- skip_load_data_infile(net);
- }
-
-error:
- thd->net.vio = 0;
- const char *remember_db= thd->get_db();
- thd->catalog= 0;
- thd->set_db(&null_clex_str); /* will free the current database */
- thd->reset_query();
- thd->get_stmt_da()->set_overwrite_status(true);
- thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
- thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN);
- thd->get_stmt_da()->set_overwrite_status(false);
- close_thread_tables(thd);
- /*
- - If transaction rollback was requested due to deadlock
- perform it and release metadata locks.
- - If inside a multi-statement transaction,
- defer the release of metadata locks until the current
- transaction is either committed or rolled back. This prevents
- other statements from modifying the table for the entire
- duration of this transaction. This provides commit ordering
- and guarantees serializability across multiple transactions.
- - If in autocommit mode, or outside a transactional context,
- automatically release metadata locks of the current statement.
- */
- if (thd->transaction_rollback_request)
- {
- trans_rollback_implicit(thd);
- thd->release_transactional_locks();
- }
- else if (! thd->in_multi_stmt_transaction_mode())
- thd->release_transactional_locks();
- else
- thd->mdl_context.release_statement_locks();
-
- DBUG_EXECUTE_IF("LOAD_DATA_INFILE_has_fatal_error",
- thd->is_slave_error= 0; thd->is_fatal_error= 1;);
-
- if (unlikely(thd->is_slave_error))
- {
- /* this err/sql_errno code is copy-paste from net_send_error() */
- const char *err;
- int sql_errno;
- if (thd->is_error())
- {
- err= thd->get_stmt_da()->message();
- sql_errno= thd->get_stmt_da()->sql_errno();
- }
- else
- {
- sql_errno=ER_UNKNOWN_ERROR;
- err= ER_THD(thd, sql_errno);
- }
- rli->report(ERROR_LEVEL, sql_errno, rgi->gtid_info(), "\
-Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
- err, (char*)table_name, remember_db);
- free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
- DBUG_RETURN(1);
- }
- free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
-
- if (unlikely(thd->is_fatal_error))
- {
- char buf[256];
- my_snprintf(buf, sizeof(buf),
- "Running LOAD DATA INFILE on table '%-.64s'."
- " Default database: '%-.64s'",
- (char*)table_name,
- remember_db);
-
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
- ER_THD(thd, ER_SLAVE_FATAL_ERROR), buf);
- DBUG_RETURN(1);
- }
-
- DBUG_RETURN( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rgi) );
-}
-#endif
-
-
-/**************************************************************************
Rotate_log_event methods
**************************************************************************/
@@ -3421,14 +2638,14 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
}
-bool Rotate_log_event::write()
+bool Rotate_log_event::write(Log_event_writer *writer)
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
- return (write_header(ROTATE_HEADER_LEN + ident_len) ||
- write_data(buf, ROTATE_HEADER_LEN) ||
- write_data(new_log_ident, (uint) ident_len) ||
- write_footer());
+ return (write_header(writer, ROTATE_HEADER_LEN + ident_len) ||
+ write_data(writer, buf, ROTATE_HEADER_LEN) ||
+ write_data(writer, new_log_ident, (uint) ident_len) ||
+ write_footer(writer));
}
@@ -3578,14 +2795,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
}
-bool Binlog_checkpoint_log_event::write()
+bool Binlog_checkpoint_log_event::write(Log_event_writer *writer)
{
uchar buf[BINLOG_CHECKPOINT_HEADER_LEN];
int4store(buf, binlog_file_len);
- return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
- write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
- write_data(binlog_file_name, binlog_file_len) ||
- write_footer();
+ return write_header(writer, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
+ write_data(writer, buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
+ write_data(writer, binlog_file_name, binlog_file_len) ||
+ write_footer(writer);
}
@@ -3683,7 +2900,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
*/
bool
Gtid_log_event::peek(const uchar *event_start, size_t event_len,
- enum enum_binlog_checksum_alg checksum_alg,
+ enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev)
{
@@ -3714,7 +2931,7 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len,
bool
-Gtid_log_event::write()
+Gtid_log_event::write(Log_event_writer *writer)
{
uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4];
size_t write_len= 13;
@@ -3777,9 +2994,9 @@ Gtid_log_event::write()
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
write_len= GTID_HEADER_LEN;
}
- return write_header(write_len) ||
- write_data(buf, write_len) ||
- write_footer();
+ return write_header(writer, write_len) ||
+ write_data(writer, buf, write_len) ||
+ write_footer(writer);
}
@@ -3795,7 +3012,7 @@ Gtid_log_event::write()
int
Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset,
- enum enum_binlog_checksum_alg checksum_alg)
+ enum_binlog_checksum_alg checksum_alg)
{
uchar flags2;
if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
@@ -4067,7 +3284,7 @@ Gtid_list_log_event::to_packet(String *packet)
bool
-Gtid_list_log_event::write()
+Gtid_list_log_event::write(Log_event_writer *writer)
{
char buf[128];
String packet(buf, sizeof(buf), system_charset_info);
@@ -4075,9 +3292,9 @@ Gtid_list_log_event::write()
packet.length(0);
if (to_packet(&packet))
return true;
- return write_header(get_data_size()) ||
- write_data(packet.ptr(), packet.length()) ||
- write_footer();
+ return write_header(writer, get_data_size()) ||
+ write_data(writer, packet.ptr(), packet.length()) ||
+ write_footer(writer);
}
@@ -4171,14 +3388,14 @@ void Intvar_log_event::pack_info(Protocol *protocol)
#endif
-bool Intvar_log_event::write()
+bool Intvar_log_event::write(Log_event_writer *writer)
{
uchar buf[9];
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
- return write_header(sizeof(buf)) ||
- write_data(buf, sizeof(buf)) ||
- write_footer();
+ return write_header(writer, sizeof(buf)) ||
+ write_data(writer, buf, sizeof(buf)) ||
+ write_footer(writer);
}
@@ -4250,14 +3467,14 @@ void Rand_log_event::pack_info(Protocol *protocol)
#endif
-bool Rand_log_event::write()
+bool Rand_log_event::write(Log_event_writer *writer)
{
uchar buf[16];
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
- return write_header(sizeof(buf)) ||
- write_data(buf, sizeof(buf)) ||
- write_footer();
+ return write_header(writer, sizeof(buf)) ||
+ write_data(writer, buf, sizeof(buf)) ||
+ write_footer(writer);
}
@@ -4529,12 +3746,12 @@ int Xid_log_event::do_commit()
#endif
-bool Xid_log_event::write()
+bool Xid_log_event::write(Log_event_writer *writer)
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
- return write_header(sizeof(xid)) ||
- write_data((uchar*)&xid, sizeof(xid)) ||
- write_footer();
+ return write_header(writer, sizeof(xid)) ||
+ write_data(writer, (uchar*)&xid, sizeof(xid)) ||
+ write_footer(writer);
}
/**************************************************************************
@@ -4580,7 +3797,7 @@ int XA_prepare_log_event::do_commit()
#endif // HAVE_REPLICATION
-bool XA_prepare_log_event::write()
+bool XA_prepare_log_event::write(Log_event_writer *writer)
{
uchar data[1 + 4 + 4 + 4]= {one_phase,};
uint8 one_phase_byte= one_phase;
@@ -4591,14 +3808,14 @@ bool XA_prepare_log_event::write()
DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1);
- return write_header(sizeof(one_phase_byte) + xid_subheader_no_data +
+ return write_header(writer, sizeof(one_phase_byte) + xid_subheader_no_data +
static_cast<XID*>(xid)->gtrid_length +
static_cast<XID*>(xid)->bqual_length) ||
- write_data(data, sizeof(data)) ||
- write_data((uchar*) static_cast<XID*>(xid)->data,
+ write_data(writer, data, sizeof(data)) ||
+ write_data(writer, (uchar*) static_cast<XID*>(xid)->data,
static_cast<XID*>(xid)->gtrid_length +
static_cast<XID*>(xid)->bqual_length) ||
- write_footer();
+ write_footer(writer);
}
@@ -4713,7 +3930,7 @@ void User_var_log_event::pack_info(Protocol* protocol)
MY_CS_COLLATION_NAME_SIZE))
return;
beg= const_cast<char *>(buf.ptr()) + old_len;
- end= str_to_hex(beg, val, val_len);
+ end= str_to_hex(beg, (uchar*)val, val_len);
buf.length(old_len + (end - beg));
if (buf.append(STRING_WITH_LEN(" COLLATE ")) ||
buf.append(cs->coll_name))
@@ -4732,7 +3949,7 @@ void User_var_log_event::pack_info(Protocol* protocol)
#endif // HAVE_REPLICATION
-bool User_var_log_event::write()
+bool User_var_log_event::write(Log_event_writer *writer)
{
char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
@@ -4796,17 +4013,19 @@ bool User_var_log_event::write()
uchar unsig= m_is_unsigned ? CHUNK_UNSIGNED : CHUNK_SIGNED;
uchar data_type_name_length= (uchar) m_data_type_name.length;
- return write_header(event_length) ||
- write_data(buf, sizeof(buf)) ||
- write_data(name, name_len) ||
- write_data(buf1, buf1_length) ||
- write_data(pos, val_len) ||
- write_data(&unsig, unsigned_len) ||
- write_data(&data_type_name_chunk_signature,
+ return write_header(writer, event_length) ||
+ write_data(writer, buf, sizeof(buf)) ||
+ write_data(writer, name, name_len) ||
+ write_data(writer, buf1, buf1_length) ||
+ write_data(writer, pos, val_len) ||
+ write_data(writer, &unsig, unsigned_len) ||
+ write_data(writer, &data_type_name_chunk_signature,
data_type_name_chunk_signature_length) ||
- write_data(&data_type_name_length, data_type_name_length_length) ||
- write_data(m_data_type_name.str, (uint) m_data_type_name.length) ||
- write_footer();
+ write_data(writer, &data_type_name_length,
+ data_type_name_length_length) ||
+ write_data(writer, m_data_type_name.str,
+ (uint) m_data_type_name.length) ||
+ write_footer(writer);
}
@@ -4958,10 +4177,9 @@ User_var_log_event::do_shall_skip(rpl_group_info *rgi)
written all DROP TEMPORARY TABLE (prepared statements' deletion is
TODO only when we binlog prep stmts). We used to clean up
slave_load_tmpdir, but this is useless as it has been cleared at the
- end of LOAD DATA INFILE. So we have nothing to do here. The place
- were we must do this cleaning is in
- Start_log_event_v3::do_apply_event(), not here. Because if we come
- here, the master was sane.
+ end of LOAD DATA INFILE. So we have nothing to do here. The place were we
+ must do this cleaning is in Format_description_log_event::do_apply_event(),
+ not here. Because if we come here, the master was sane.
This must only be called from the Slave SQL thread, since it calls
Relay_log_info::flush().
@@ -4995,178 +4213,6 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
/**************************************************************************
- Create_file_log_event methods
-**************************************************************************/
-
-Create_file_log_event::
-Create_file_log_event(THD* thd_arg, sql_exchange* ex,
- const char* db_arg, const char* table_name_arg,
- List<Item>& fields_arg,
- bool is_concurrent_arg,
- enum enum_duplicates handle_dup,
- bool ignore,
- uchar* block_arg, uint block_len_arg, bool using_trans)
- :Load_log_event(thd_arg, ex, db_arg, table_name_arg, fields_arg,
- is_concurrent_arg,
- handle_dup, ignore, using_trans),
- fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
- file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
-{
- DBUG_ENTER("Create_file_log_event");
- sql_ex.force_new_format();
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Create_file_log_event::write_data_body()
-*/
-
-bool Create_file_log_event::write_data_body()
-{
- bool res;
- if ((res= Load_log_event::write_data_body()) || fake_base)
- return res;
- return write_data("", 1) ||
- write_data(block, block_len);
-}
-
-
-/*
- Create_file_log_event::write_data_header()
-*/
-
-bool Create_file_log_event::write_data_header()
-{
- bool res;
- uchar buf[CREATE_FILE_HEADER_LEN];
- if ((res= Load_log_event::write_data_header()) || fake_base)
- return res;
- int4store(buf + CF_FILE_ID_OFFSET, file_id);
- return write_data(buf, CREATE_FILE_HEADER_LEN) != 0;
-}
-
-
-/*
- Create_file_log_event::write_base()
-*/
-
-bool Create_file_log_event::write_base()
-{
- bool res;
- fake_base= 1; // pretend we are Load event
- res= write();
- fake_base= 0;
- return res;
-}
-
-
-#if defined(HAVE_REPLICATION)
-void Create_file_log_event::pack_info(Protocol *protocol)
-{
- char buf[SAFE_NAME_LEN*2 + 30 + 21*2], *pos;
- pos= strmov(buf, "db=");
- memcpy(pos, db, db_len);
- pos= strmov(pos + db_len, ";table=");
- memcpy(pos, table_name, table_name_len);
- pos= strmov(pos + table_name_len, ";file_id=");
- pos= int10_to_str((long) file_id, pos, 10);
- pos= strmov(pos, ";block_len=");
- pos= int10_to_str((long) block_len, pos, 10);
- protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
-}
-#endif /* defined(HAVE_REPLICATION) */
-
-
-/**
- Create_file_log_event::do_apply_event()
- Constructor for Create_file_log_event to intantiate an event
- from the relay log on the slave.
-
- @retval
- 0 Success
- @retval
- 1 Failure
-*/
-
-#if defined(HAVE_REPLICATION)
-int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
-{
- char fname_buf[FN_REFLEN];
- char *ext;
- int fd = -1;
- IO_CACHE file;
- Log_event_writer lew(&file, 0);
- int error = 1;
- Relay_log_info const *rli= rgi->rli;
-
- THD_STAGE_INFO(thd, stage_making_temp_file_create_before_load_data);
- bzero((char*)&file, sizeof(file));
- ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info",
- &rli->mi->connection_name);
- /* old copy may exist already */
- mysql_file_delete(key_file_log_event_info, fname_buf, MYF(0));
- if ((fd= mysql_file_create(key_file_log_event_info,
- fname_buf, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0 ||
- init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
- MYF(MY_WME|MY_NABP)))
- {
- rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
- "Error in Create_file event: could not open file '%s'",
- fname_buf);
- goto err;
- }
-
- // a trick to avoid allocating another buffer
- fname= fname_buf;
- fname_len= (uint) (strmov(ext, ".data") - fname);
- writer= &lew;
- if (write_base())
- {
- strmov(ext, ".info"); // to have it right in the error message
- rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
- "Error in Create_file event: could not write to file '%s'",
- fname_buf);
- goto err;
- }
- end_io_cache(&file);
- mysql_file_close(fd, MYF(0));
-
- // fname_buf now already has .data, not .info, because we did our trick
- /* old copy may exist already */
- mysql_file_delete(key_file_log_event_data, fname_buf, MYF(0));
- if ((fd= mysql_file_create(key_file_log_event_data,
- fname_buf, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
- {
- rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
- "Error in Create_file event: could not open file '%s'",
- fname_buf);
- goto err;
- }
- if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
- {
- rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
- "Error in Create_file event: write to '%s' failed",
- fname_buf);
- goto err;
- }
- error=0; // Everything is ok
-
-err:
- if (unlikely(error))
- end_io_cache(&file);
- if (likely(fd >= 0))
- mysql_file_close(fd, MYF(0));
- return error != 0;
-}
-#endif /* defined(HAVE_REPLICATION) */
-
-
-/**************************************************************************
Append_block_log_event methods
**************************************************************************/
@@ -5181,14 +4227,14 @@ Append_block_log_event::Append_block_log_event(THD *thd_arg,
}
-bool Append_block_log_event::write()
+bool Append_block_log_event::write(Log_event_writer *writer)
{
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
- return write_header(APPEND_BLOCK_HEADER_LEN + block_len) ||
- write_data(buf, APPEND_BLOCK_HEADER_LEN) ||
- write_data(block, block_len) ||
- write_footer();
+ return write_header(writer, APPEND_BLOCK_HEADER_LEN + block_len) ||
+ write_data(writer, buf, APPEND_BLOCK_HEADER_LEN) ||
+ write_data(writer, block, block_len) ||
+ write_footer(writer);
}
@@ -5291,13 +4337,13 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
}
-bool Delete_file_log_event::write()
+bool Delete_file_log_event::write(Log_event_writer *writer)
{
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
- return write_header(sizeof(buf)) ||
- write_data(buf, sizeof(buf)) ||
- write_footer();
+ return write_header(writer, sizeof(buf)) ||
+ write_data(writer, buf, sizeof(buf)) ||
+ write_footer(writer);
}
@@ -5328,130 +4374,6 @@ int Delete_file_log_event::do_apply_event(rpl_group_info *rgi)
/**************************************************************************
- Execute_load_log_event methods
-**************************************************************************/
-
-Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
- const char* db_arg,
- bool using_trans)
- :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
-{
-}
-
-
-bool Execute_load_log_event::write()
-{
- uchar buf[EXEC_LOAD_HEADER_LEN];
- int4store(buf + EL_FILE_ID_OFFSET, file_id);
- return write_header(sizeof(buf)) ||
- write_data(buf, sizeof(buf)) ||
- write_footer();
-}
-
-
-#if defined(HAVE_REPLICATION)
-void Execute_load_log_event::pack_info(Protocol *protocol)
-{
- char buf[64];
- uint length;
- length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
- protocol->store(buf, (int32) length, &my_charset_bin);
-}
-
-
-/*
- Execute_load_log_event::do_apply_event()
-*/
-
-int Execute_load_log_event::do_apply_event(rpl_group_info *rgi)
-{
- char fname[FN_REFLEN+10];
- char *ext;
- int fd;
- int error= 1;
- IO_CACHE file;
- Load_log_event *lev= 0;
- Relay_log_info const *rli= rgi->rli;
-
- ext= slave_load_file_stem(fname, file_id, server_id, ".info",
- &rli->mi->cmp_connection_name);
- if ((fd= mysql_file_open(key_file_log_event_info,
- fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
- MYF(MY_WME))) < 0 ||
- init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
- MYF(MY_WME|MY_NABP)))
- {
- rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
- "Error in Exec_load event: could not open file '%s'",
- fname);
- goto err;
- }
- if (!(lev= (Load_log_event*)
- Log_event::read_log_event(&file,
- rli->relay_log.description_event_for_exec,
- opt_slave_sql_verify_checksum)) ||
- lev->get_type_code() != NEW_LOAD_EVENT)
- {
- rli->report(ERROR_LEVEL, 0, rgi->gtid_info(), "Error in Exec_load event: "
- "file '%s' appears corrupted", fname);
- goto err;
- }
- lev->thd = thd;
- /*
- lev->do_apply_event should use rli only for errors i.e. should
- not advance rli's position.
-
- lev->do_apply_event is the place where the table is loaded (it
- calls mysql_load()).
- */
-
- if (lev->do_apply_event(0,rgi,1))
- {
- /*
- We want to indicate the name of the file that could not be loaded
- (SQL_LOADxxx).
- But as we are here we are sure the error is in rli->last_slave_error and
- rli->last_slave_errno (example of error: duplicate entry for key), so we
- don't want to overwrite it with the filename.
- What we want instead is add the filename to the current error message.
- */
- char *tmp= my_strdup(PSI_INSTRUMENT_ME, rli->last_error().message, MYF(MY_WME));
- if (tmp)
- {
- rli->report(ERROR_LEVEL, rli->last_error().number, rgi->gtid_info(),
- "%s. Failed executing load from '%s'", tmp, fname);
- my_free(tmp);
- }
- goto err;
- }
- /*
- We have an open file descriptor to the .info file; we need to close it
- or Windows will refuse to delete the file in mysql_file_delete().
- */
- if (fd >= 0)
- {
- mysql_file_close(fd, MYF(0));
- end_io_cache(&file);
- fd= -1;
- }
- mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME));
- memcpy(ext, ".data", 6);
- mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
- error = 0;
-
-err:
- delete lev;
- if (fd >= 0)
- {
- mysql_file_close(fd, MYF(0));
- end_io_cache(&file);
- }
- return error;
-}
-
-#endif /* defined(HAVE_REPLICATION) */
-
-/**************************************************************************
Begin_load_query_log_event methods
**************************************************************************/
@@ -5504,14 +4426,14 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
bool
-Execute_load_query_log_event::write_post_header_for_derived()
+Execute_load_query_log_event::write_post_header_for_derived(Log_event_writer *writer)
{
uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
int4store(buf, file_id);
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
- return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+ return write_data(writer, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
@@ -5800,6 +4722,7 @@ inline void restore_empty_query_table_list(LEX *lex)
int Rows_log_event::do_apply_event(rpl_group_info *rgi)
{
+ DBUG_ASSERT(rgi);
Relay_log_info const *rli= rgi->rli;
TABLE* table;
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
@@ -5840,12 +4763,11 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
thd->set_query_timer();
/*
- If there is no locks taken, this is the first binrow event seen
- after the table map events. We should then lock all the tables
- used in the transaction and proceed with execution of the actual
- event.
+ If there are no tables open, this must be the first row event seen
+ after the table map events. We should then open and lock all tables
+ used in the transaction and proceed with execution of the actual event.
*/
- if (!thd->lock)
+ if (!thd->open_tables)
{
/*
Lock_tables() reads the contents of thd->lex, so they must be
@@ -6086,17 +5008,13 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
}
}
-#ifdef HAVE_QUERY_CACHE
/*
Moved invalidation right before the call to rows_event_stmt_cleanup(),
to avoid query cache being polluted with stale entries,
+ Query cache is not invalidated on wsrep applier here
*/
-# ifdef WITH_WSREP
- /* Query cache is not invalidated on wsrep applier here */
if (!(WSREP(thd) && wsrep_thd_is_applying(thd)))
-# endif /* WITH_WSREP */
query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
-#endif /* HAVE_QUERY_CACHE */
}
table= m_table= rgi->m_table_map.get_table(m_table_id);
@@ -6121,7 +5039,9 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (m_width == table->s->fields && bitmap_is_set_all(&m_cols))
set_flags(COMPLETE_ROWS_F);
- /*
+ Rpl_table_data rpl_data= *(RPL_TABLE_LIST*)table->pos_in_table_list;
+
+ /*
Set tables write and read sets.
Read_set contains all slave columns (in case we are going to fetch
@@ -6134,17 +5054,24 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_PRINT_BITSET("debug", "Setting table's read_set from: %s", &m_cols);
bitmap_set_all(table->read_set);
- if (get_general_type_code() == DELETE_ROWS_EVENT ||
- get_general_type_code() == UPDATE_ROWS_EVENT)
- bitmap_intersect(table->read_set,&m_cols);
-
bitmap_set_all(table->write_set);
table->rpl_write_set= table->write_set;
- /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
- MY_BITMAP *after_image= ((get_general_type_code() == UPDATE_ROWS_EVENT) ?
- &m_cols_ai : &m_cols);
- bitmap_intersect(table->write_set, after_image);
+ if (rpl_data.copy_fields)
+ /* always full rows, all bits set */;
+ else
+ if (get_general_type_code() == WRITE_ROWS_EVENT)
+ bitmap_copy(table->write_set, &m_cols); // for sequences
+ else // If online alter, leave all columns set (i.e. skip intersects)
+ if (!thd->slave_thread || !table->s->online_alter_binlog)
+ {
+ bitmap_intersect(table->read_set,&m_cols);
+ if (get_general_type_code() == UPDATE_ROWS_EVENT)
+ bitmap_intersect(table->write_set, &m_cols_ai);
+ table->mark_columns_per_binlog_row_image();
+ if (table->vfield)
+ table->mark_virtual_columns_for_write(0);
+ }
if (table->versioned())
{
@@ -6155,10 +5082,11 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
}
m_table->mark_columns_per_binlog_row_image();
- this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+ if (!rpl_data.is_online_alter())
+ this->slave_exec_mode= (enum_slave_exec_mode)slave_exec_mode_options;
// Do event specific preparations
- error= do_before_row_operations(rli);
+ error= do_before_row_operations(rgi);
/*
Bug#56662 Assertion failed: next_insert_id == 0, file handler.cc
@@ -6170,7 +5098,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
sql_mode_t saved_sql_mode= thd->variables.sql_mode;
if (!is_auto_inc_in_extra_columns())
- thd->variables.sql_mode= MODE_NO_AUTO_VALUE_ON_ZERO;
+ thd->variables.sql_mode= (rpl_data.copy_fields ? saved_sql_mode : 0)
+ | MODE_NO_AUTO_VALUE_ON_ZERO;
// row processing loop
@@ -6183,10 +5112,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
THD_STAGE_INFO(thd, stage_executing);
do
{
- /* in_use can have been set to NULL in close_tables_for_reopen */
- THD* old_thd= table->in_use;
- if (!table->in_use)
- table->in_use= thd;
+ DBUG_ASSERT(table->in_use);
error= do_exec_row(rgi);
@@ -6194,8 +5120,6 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
- table->in_use = old_thd;
-
if (unlikely(error))
{
int actual_error= convert_handler_error(error, thd, table);
@@ -6205,7 +5129,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
ignored_error_code(actual_error) : 0);
#ifdef WITH_WSREP
- if (WSREP(thd) && thd->wsrep_applier &&
+ if (WSREP(thd) && wsrep_thd_is_applying(thd) &&
wsrep_ignored_error_code(this, actual_error))
{
idempotent_error= true;
@@ -6245,6 +5169,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
thd->transaction->stmt.modified_non_trans_table= TRUE;
if (likely(error == 0))
{
+ m_row_count++;
error= thd->killed_errno();
if (error && !thd->is_error())
my_error(error, MYF(0));
@@ -6252,6 +5177,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
} // row processing loop
while (error == 0 && (m_curr_row != m_rows_end));
+ thd->inc_examined_row_count(m_row_count);
+
/*
Restore the sql_mode after the rows event is processed.
*/
@@ -6268,7 +5195,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
}
- if (unlikely(error= do_after_row_operations(rli, error)) &&
+ if (unlikely(error= do_after_row_operations(error)) &&
ignored_error_code(convert_handler_error(error, thd, table)))
{
@@ -6279,34 +5206,40 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
thd->clear_error(1);
error= 0;
}
- } // if (table)
+ if (unlikely(error))
+ {
+ if (rpl_data.is_online_alter())
+ goto err;
+ slave_rows_error_report(ERROR_LEVEL, error, rgi, thd, table,
+ get_type_str(),
+ RPL_LOG_NAME, log_pos);
+ /*
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
- if (unlikely(error))
- {
- slave_rows_error_report(ERROR_LEVEL, error, rgi, thd, table,
- get_type_str(),
- RPL_LOG_NAME, log_pos);
- /*
- @todo We should probably not call
- reset_current_stmt_binlog_format_row() from here.
+ Note: this applies to log_event_old.cc too.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
+ thd->is_slave_error= 1;
+ /* remove trigger's tables */
+ goto err;
+ }
+ } // if (table)
- Note: this applies to log_event_old.cc too.
- /Sven
- */
- thd->reset_current_stmt_binlog_format_row();
- thd->is_slave_error= 1;
- /* remove trigger's tables */
- goto err;
- }
+ DBUG_ASSERT(error == 0);
- /* remove trigger's tables */
- restore_empty_query_table_list(thd->lex);
+ /*
+ Remove trigger's tables. In case of ONLINE ALTER TABLE, event doesn't own
+ the table (hence, no tables are locked), and therefore no cleanup should be
+ done after each event.
+ */
+ if (rgi->tables_to_lock_count)
+ restore_empty_query_table_list(thd->lex);
-#if defined(WITH_WSREP) && defined(HAVE_QUERY_CACHE)
if (WSREP(thd) && wsrep_thd_is_applying(thd))
- query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
-#endif /* WITH_WSREP && HAVE_QUERY_CACHE */
+ query_cache_invalidate_locked_for_write(thd, rgi->tables_to_lock);
if (get_flags(STMT_END_F))
{
@@ -6322,8 +5255,11 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_RETURN(error);
err:
- restore_empty_query_table_list(thd->lex);
- rgi->slave_close_thread_tables(thd);
+ if (rgi->tables_to_lock_count)
+ {
+ restore_empty_query_table_list(thd->lex);
+ rgi->slave_close_thread_tables(thd);
+ }
thd->reset_query_timer();
DBUG_RETURN(error);
}
@@ -6476,7 +5412,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* defined(HAVE_REPLICATION) */
-bool Rows_log_event::write_data_header()
+bool Rows_log_event::write_data_header(Log_event_writer *writer)
{
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
DBUG_ASSERT(m_table_id != UINT32_MAX);
@@ -6484,14 +5420,14 @@ bool Rows_log_event::write_data_header()
{
int4store(buf + 0, (ulong) m_table_id);
int2store(buf + 4, m_flags);
- return (write_data(buf, 6));
+ return (write_data(writer, buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return write_data(buf, ROWS_HEADER_LEN);
+ return write_data(writer, buf, ROWS_HEADER_LEN_V1);
}
-bool Rows_log_event::write_data_body()
+bool Rows_log_event::write_data_body(Log_event_writer *writer)
{
/*
Note that this should be the number of *bits*, not the number of
@@ -6506,13 +5442,13 @@ bool Rows_log_event::write_data_body()
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || write_data(writer, sbuf, (size_t) (sbuf_end - sbuf));
bitmap= (uchar*) my_alloca(bitmap_size);
bitmap_export(bitmap, &m_cols);
DBUG_DUMP("m_cols", bitmap, bitmap_size);
- res= res || write_data(bitmap, bitmap_size);
+ res= res || write_data(writer, bitmap, bitmap_size);
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
@@ -6522,17 +5458,17 @@ bool Rows_log_event::write_data_body()
bitmap_export(bitmap, &m_cols_ai);
DBUG_DUMP("m_cols_ai", bitmap, bitmap_size);
- res= res || write_data(bitmap, bitmap_size);
+ res= res || write_data(writer, bitmap, bitmap_size);
}
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || write_data(m_rows_buf, (size_t) data_size);
+ res= res || write_data(writer, m_rows_buf, (size_t) data_size);
my_afree(bitmap);
return res;
}
-bool Rows_log_event::write_compressed()
+bool Rows_log_event::write_compressed(Log_event_writer *writer)
{
uchar *m_rows_buf_tmp= m_rows_buf;
uchar *m_rows_cur_tmp= m_rows_cur;
@@ -6546,7 +5482,7 @@ bool Rows_log_event::write_compressed()
(uint32)(m_rows_cur_tmp - m_rows_buf_tmp), &comlen))
{
m_rows_cur= comlen + m_rows_buf;
- ret= Log_event::write();
+ ret= Log_event::write(writer);
}
my_safe_afree(m_rows_buf, alloc_size);
m_rows_buf= m_rows_buf_tmp;
@@ -6588,15 +5524,15 @@ Annotate_rows_log_event::Annotate_rows_log_event(THD *thd,
}
-bool Annotate_rows_log_event::write_data_header()
+bool Annotate_rows_log_event::write_data_header(Log_event_writer *writer)
{
return 0;
}
-bool Annotate_rows_log_event::write_data_body()
+bool Annotate_rows_log_event::write_data_body(Log_event_writer *writer)
{
- return write_data(m_query_txt, m_query_len);
+ return write_data(writer, m_query_txt, m_query_len);
}
@@ -6900,6 +5836,13 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list)
DBUG_RETURN(res);
}
+table_def Table_map_log_event::get_table_def()
+{
+ return table_def(m_coltype, m_colcnt,
+ m_field_metadata, m_field_metadata_size,
+ m_null_bits, m_flags);
+}
+
int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
{
RPL_TABLE_LIST *table_list;
@@ -6938,16 +5881,26 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
LEX_CSTRING tmp_db_name= {db_mem, db_mem_length };
LEX_CSTRING tmp_tbl_name= {tname_mem, tname_mem_length };
- table_list->init_one_table(&tmp_db_name, &tmp_tbl_name, 0, TL_WRITE);
- table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ? 0 : m_table_id;
- table_list->updating= 1;
+ /*
+ The memory allocated by the table_def structure (i.e., not the
+ memory allocated *for* the table_def structure) is released
+ inside rpl_group_info::clear_tables_to_lock() by calling the
+ table_def destructor explicitly.
+ */
+ new(table_list) RPL_TABLE_LIST(&tmp_db_name, &tmp_tbl_name, TL_WRITE,
+ get_table_def(),
+ m_flags & TM_BIT_HAS_TRIGGERS_F);
+
+ table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ?
+ 0: m_table_id;
table_list->required_type= TABLE_TYPE_NORMAL;
+ table_list->open_type= OT_BASE_ONLY;
+ DBUG_ASSERT(table_list->updating);
DBUG_PRINT("debug", ("table: %s is mapped to %llu",
table_list->table_name.str,
table_list->table_id));
- table_list->master_had_triggers= ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? 1 : 0);
- DBUG_PRINT("debug", ("table->master_had_triggers=%d",
+ DBUG_PRINT("debug", ("table->master_had_triggers=%d",
(int)table_list->master_had_triggers));
enum_tbl_map_status tblmap_status= check_table_map(rgi, table_list);
@@ -6956,23 +5909,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_ASSERT(thd->lex->query_tables != table_list);
/*
- Use placement new to construct the table_def instance in the
- memory allocated for it inside table_list.
-
- The memory allocated by the table_def structure (i.e., not the
- memory allocated *for* the table_def structure) is released
- inside Relay_log_info::clear_tables_to_lock() by calling the
- table_def destructor explicitly.
- */
- new (&table_list->m_tabledef)
- table_def(m_coltype, m_colcnt,
- m_field_metadata, m_field_metadata_size,
- m_null_bits, m_flags);
- table_list->m_tabledef_valid= TRUE;
- table_list->m_conv_table= NULL;
- table_list->open_type= OT_BASE_ONLY;
-
- /*
We record in the slave's information that the table should be
locked by linking the table into the list of tables to lock.
*/
@@ -7016,8 +5952,9 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
execute in a user session
*/
my_error(ER_SLAVE_FATAL_ERROR, MYF(0), buf);
- }
-
+ }
+
+ table_list->~RPL_TABLE_LIST();
my_free(memory);
}
@@ -7042,7 +5979,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* defined(HAVE_REPLICATION) */
-bool Table_map_log_event::write_data_header()
+bool Table_map_log_event::write_data_header(Log_event_writer *writer)
{
DBUG_ASSERT(m_table_id != UINT32_MAX);
uchar buf[TABLE_MAP_HEADER_LEN];
@@ -7050,14 +5987,14 @@ bool Table_map_log_event::write_data_header()
{
int4store(buf + 0, (ulong) m_table_id);
int2store(buf + 4, m_flags);
- return (write_data(buf, 6));
+ return (write_data(writer, buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return write_data(buf, TABLE_MAP_HEADER_LEN);
+ return write_data(writer, buf, TABLE_MAP_HEADER_LEN);
}
-bool Table_map_log_event::write_data_body()
+bool Table_map_log_event::write_data_body(Log_event_writer *writer)
{
DBUG_ASSERT(m_dbnam != NULL);
DBUG_ASSERT(m_tblnam != NULL);
@@ -7078,17 +6015,17 @@ bool Table_map_log_event::write_data_body()
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- return write_data(dbuf, sizeof(dbuf)) ||
- write_data(m_dbnam, m_dblen+1) ||
- write_data(tbuf, sizeof(tbuf)) ||
- write_data(m_tblnam, m_tbllen+1) ||
- write_data(cbuf, (size_t) (cbuf_end - cbuf)) ||
- write_data(m_coltype, m_colcnt) ||
- write_data(mbuf, (size_t) (mbuf_end - mbuf)) ||
- write_data(m_field_metadata, m_field_metadata_size),
- write_data(m_null_bits, (m_colcnt + 7) / 8) ||
- write_data((const uchar*) m_metadata_buf.ptr(),
- m_metadata_buf.length());
+ return write_data(writer, dbuf, sizeof(dbuf)) ||
+ write_data(writer, m_dbnam, m_dblen+1) ||
+ write_data(writer, tbuf, sizeof(tbuf)) ||
+ write_data(writer, m_tblnam, m_tbllen+1) ||
+ write_data(writer, cbuf, (size_t) (cbuf_end - cbuf)) ||
+ write_data(writer, m_coltype, m_colcnt) ||
+ write_data(writer, mbuf, (size_t) (mbuf_end - mbuf)) ||
+ write_data(writer, m_field_metadata, m_field_metadata_size),
+ write_data(writer, m_null_bits, (m_colcnt + 7) / 8) ||
+ write_data(writer, (const uchar*) m_metadata_buf.ptr(),
+ m_metadata_buf.length());
}
/**
@@ -7516,15 +6453,15 @@ Write_rows_compressed_log_event::Write_rows_compressed_log_event(
m_type = WRITE_ROWS_COMPRESSED_EVENT_V1;
}
-bool Write_rows_compressed_log_event::write()
+bool Write_rows_compressed_log_event::write(Log_event_writer *writer)
{
- return Rows_log_event::write_compressed();
+ return Rows_log_event::write_compressed(writer);
}
#if defined(HAVE_REPLICATION)
int
-Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+Write_rows_log_event::do_before_row_operations(const rpl_group_info *)
{
int error= 0;
@@ -7602,8 +6539,7 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
}
int
-Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
- int error)
+Write_rows_log_event::do_after_row_operations(int error)
{
int local_error= 0;
@@ -7734,12 +6670,11 @@ is_duplicate_key_error(int errcode)
@c ha_update_row() or first deleted and then new record written.
*/
-int
-Rows_log_event::write_row(rpl_group_info *rgi,
- const bool overwrite)
+int Rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
{
DBUG_ENTER("write_row");
- DBUG_ASSERT(m_table != NULL && thd != NULL);
+ DBUG_ASSERT(m_table != NULL);
+ DBUG_ASSERT(thd != NULL);
TABLE *table= m_table; // pointer to event's table
int error;
@@ -7821,10 +6756,8 @@ Rows_log_event::write_row(rpl_group_info *rgi,
error= update_sequence();
else while (unlikely(error= table->file->ha_write_row(table->record[0])))
{
- if (error == HA_ERR_LOCK_DEADLOCK ||
- error == HA_ERR_LOCK_WAIT_TIMEOUT ||
- (keynum= table->file->get_dup_key(error)) < 0 ||
- !overwrite)
+ if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT ||
+ (keynum= table->file->get_dup_key(error)) < 0 || !overwrite)
{
DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
/*
@@ -8098,7 +7031,7 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
#if defined(HAVE_REPLICATION)
-uint8 Write_rows_log_event::get_trg_event_map()
+uint8 Write_rows_log_event::get_trg_event_map() const
{
return trg2bit(TRG_EVENT_INSERT) | trg2bit(TRG_EVENT_UPDATE) |
trg2bit(TRG_EVENT_DELETE);
@@ -8110,16 +7043,19 @@ uint8 Write_rows_log_event::get_trg_event_map()
**************************************************************************/
#if defined(HAVE_REPLICATION)
-/*
- Compares table->record[0] and table->record[1]
+/**
+ @brief Compares table->record[0] and table->record[1]
- Returns TRUE if different.
+ @returns true if different.
*/
static bool record_compare(TABLE *table, bool vers_from_plain= false)
{
- bool result= FALSE;
+ bool result= false;
+ bool all_values_set= bitmap_is_set_all(&table->has_value_set);
+
/**
Compare full record only if:
+ - all fields were given values
- there are no blob fields (otherwise we would also need
to compare blobs contents as well);
- there are no varchar fields (otherwise we would also need
@@ -8130,24 +7066,23 @@ static bool record_compare(TABLE *table, bool vers_from_plain= false)
*/
if ((table->s->blob_fields +
table->s->varchar_fields +
- table->s->null_fields) == 0)
+ table->s->null_fields) == 0
+ && all_values_set)
{
- result= cmp_record(table,record[1]);
+ result= cmp_record(table, record[1]);
goto record_compare_exit;
}
/* Compare null bits */
- if (memcmp(table->null_flags,
- table->null_flags+table->s->rec_buff_length,
- table->s->null_bytes))
- {
- result= TRUE; // Diff in NULL value
- goto record_compare_exit;
- }
+ if (all_values_set && memcmp(table->null_flags,
+ table->null_flags + table->s->rec_buff_length,
+ table->s->null_bytes))
+ goto record_compare_differ; // Diff in NULL value
/* Compare fields */
for (Field **ptr=table->field ; *ptr ; ptr++)
{
+ Field *f= *ptr;
/*
If the table is versioned, don't compare using the version if there is a
primary key. If there isn't a primary key, we need the version to
@@ -8157,27 +7092,118 @@ static bool record_compare(TABLE *table, bool vers_from_plain= false)
because the implicit row_end value will be set to the maximum value for
the latest row update (which is what we care about).
*/
- if (table->versioned() && (*ptr)->vers_sys_field() &&
+ if (table->versioned() && f->vers_sys_field() &&
(table->s->primary_key < MAX_KEY ||
- (vers_from_plain && table->vers_start_field() == (*ptr))))
+ (vers_from_plain && table->vers_start_field() == f)))
continue;
- /**
- We only compare field contents that are not null.
- NULL fields (i.e., their null bits) were compared
- earlier.
+
+ /*
+ We only compare fields that exist on the master (or in ONLINE
+ ALTER case, that were in the original table).
*/
- if (!(*(ptr))->is_null())
+ if (!all_values_set)
{
- if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
- {
- result= TRUE;
- goto record_compare_exit;
- }
+ if (!f->has_explicit_value() &&
+ /* Don't skip row_end if replicating unversioned -> versioned */
+ !(vers_from_plain && table->vers_end_field() == f))
+ continue;
+ if (f->is_null() != f->is_null(table->s->rec_buff_length))
+ goto record_compare_differ;
}
+
+ if (!f->is_null() && !f->vcol_info &&
+ f->cmp_binary_offset(table->s->rec_buff_length))
+ goto record_compare_differ;
}
record_compare_exit:
return result;
+record_compare_differ:
+ return true;
+}
+/**
+ Traverses default item expr of a field, and underlying field's default values.
+ If it is an extra field and has no value replicated, then its default expr
+ should be also checked.
+ */
+class Rpl_key_part_checker: public Field_enumerator
+{
+ bool online_alter;
+ Field *next_number_field;
+ bool field_usable;
+public:
+
+
+ void visit_field(Item_field *item) override
+ {
+ if (!field_usable)
+ return;
+ field_usable= check_field(item->field);
+ }
+
+ bool check_field(Field *f)
+ {
+ if (f->has_explicit_value())
+ return true;
+
+ if ((!f->vcol_info && !online_alter) || f == next_number_field)
+ return false;
+
+ Virtual_column_info *computed= f->vcol_info ? f->vcol_info
+ : f->default_value;
+
+ if (computed == NULL)
+ return true; // No DEFAULT, or constant DEFAULT
+
+ // Deterministic DEFAULT or vcol expression
+ return !(computed->flags & VCOL_NOT_STRICTLY_DETERMINISTIC)
+ && !computed->expr->walk(&Item::enumerate_field_refs_processor,
+ false, this)
+ && field_usable;
+ }
+
+ Rpl_key_part_checker(bool online_alter, Field *next_number_field):
+ online_alter(online_alter), next_number_field(next_number_field),
+ field_usable(true) {}
+};
+
+
+/**
+ Newly added fields with non-deterministic defaults (i.e. DEFAULT(RANDOM()),
+ CURRENT_TIMESTAMP, AUTO_INCREMENT) should be excluded from key search.
+ Basically we exclude all the default-filled fields based on
+ has_explicit_value bitmap.
+*/
+uint Rows_log_event::find_key_parts(const KEY *key) const
+{
+ RPL_TABLE_LIST *tl= (RPL_TABLE_LIST*)m_table->pos_in_table_list;
+ const bool online_alter= tl->m_online_alter_copy_fields;
+ uint p;
+
+ if (!m_table->s->keys_in_use.is_set(uint(key - m_table->key_info)))
+ return 0;
+
+ if (!online_alter)
+ {
+ if (m_cols.n_bits >= m_table->s->fields) // replicated more than slave has
+ return key->user_defined_key_parts;
+ if (m_table->s->virtual_fields == 0)
+ {
+ for (p= 0; p < key->user_defined_key_parts; p++)
+ if (key->key_part[p].fieldnr > m_cols.n_bits) // extra
+ break;
+ return p;
+ }
+ }
+
+ Rpl_key_part_checker key_part_checker(online_alter,
+ m_table->found_next_number_field);
+ for (p= 0; p < key->user_defined_key_parts; p++)
+ {
+ if (!key_part_checker.check_field(key->key_part[p].field))
+ break;
+ }
+ return p;
}
@@ -8187,74 +7213,127 @@ record_compare_exit:
A primary key is preferred if it exists; otherwise a unique index is
preferred. Else we pick the index with the smalles rec_per_key value.
- If a suitable key is found, set @c m_key, @c m_key_nr and @c m_key_info
- member fields appropriately.
+ If a suitable key is found, set @c m_key, @c m_key_nr, @c m_key_info,
+ and @c m_usable_key_parts member fields appropriately.
@returns Error code on failure, 0 on success.
*/
-int Rows_log_event::find_key()
+int Rows_log_event::find_key(const rpl_group_info *rgi)
{
- uint i, best_key_nr, last_part;
- KEY *key, *UNINIT_VAR(best_key);
+ DBUG_ASSERT(m_table);
+ RPL_TABLE_LIST *tl= (RPL_TABLE_LIST*)m_table->pos_in_table_list;
+ uint i, best_key_nr= 0, best_usable_key_parts= 0;
+ KEY *key;
ulong UNINIT_VAR(best_rec_per_key), tmp;
DBUG_ENTER("Rows_log_event::find_key");
- DBUG_ASSERT(m_table);
-
- best_key_nr= MAX_KEY;
- /*
- Keys are sorted so that any primary key is first, followed by unique keys,
- followed by any other. So we will automatically pick the primary key if
- it exists.
- */
- for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++)
+ if ((best_key_nr= tl->cached_key_nr) != ~0U)
{
- if (!m_table->s->keys_in_use.is_set(i))
- continue;
+ DBUG_ASSERT(best_key_nr <= MAX_KEY); // use the cached value
+ best_usable_key_parts= tl->cached_usable_key_parts;
+ }
+ else
+ {
+ best_key_nr= MAX_KEY;
+
/*
- We cannot use a unique key with NULL-able columns to uniquely identify
- a row (but we can still select it for range scan below if nothing better
- is available).
+ if the source (in the row event) and destination (in m_table) records
+ don't have the same structure, some keys below might be unusable
+ for find_row().
+
+ If it's a replication and slave table (m_table) has less columns
+ than the master's - easy, all keys are usable.
+
+ If slave's table has more columns, but none of them are generated -
+ then any column beyond m_cols.n_bits makes an index unusable.
+
+ If slave's table has generated columns or it's the online alter table
+ where arbitrary structure conversion is possible (in the replication case
+ one table must be a prefix of the other, see table_def::compatible_with)
+ we cannot deduce what destination columns will be affected by m_cols,
+ we have to actually unpack one row and examine has_explicit_value()
*/
- if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)
+
+ if (tl->m_online_alter_copy_fields ||
+ (m_cols.n_bits < m_table->s->fields &&
+ m_table->s->virtual_fields))
{
- best_key_nr= i;
- best_key= key;
- break;
+ const uchar *curr_row_end= m_curr_row_end;
+ Check_level_instant_set clis(m_table->in_use, CHECK_FIELD_IGNORE);
+ if (int err= unpack_row(rgi, m_table, m_width, m_curr_row, &m_cols,
+ &curr_row_end, &m_master_reclength, m_rows_end))
+ DBUG_RETURN(err);
}
+
/*
- We can only use a non-unique key if it allows range scans (ie. skip
- FULLTEXT indexes and such).
+ Keys are sorted so that any primary key is first, followed by unique keys,
+ followed by any other. So we will automatically pick the primary key if
+ it exists.
*/
- last_part= key->user_defined_key_parts - 1;
- DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu",
- key->name.str, last_part, key->rec_per_key[last_part]));
- if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT))
- continue;
-
- tmp= key->rec_per_key[last_part];
- if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key))
+ for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++)
{
- best_key_nr= i;
- best_key= key;
- best_rec_per_key= tmp;
+ uint usable_key_parts= find_key_parts(key);
+ if (usable_key_parts == 0)
+ continue;
+ /*
+ We cannot use a unique key with NULL-able columns to uniquely identify
+ a row (but we can still select it for range scan below if nothing better
+ is available).
+ */
+ if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME &&
+ usable_key_parts == key->user_defined_key_parts)
+ {
+ best_key_nr= i;
+ best_usable_key_parts= usable_key_parts;
+ break;
+ }
+ /*
+ We can only use a non-unique key if it allows range scans (ie. skip
+ FULLTEXT indexes and such).
+ */
+ uint last_part= usable_key_parts - 1;
+ DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu",
+ key->name.str, last_part, key->rec_per_key[last_part]));
+ if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT))
+ continue;
+
+ tmp= key->rec_per_key[last_part];
+ if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key))
+ {
+ best_key_nr= i;
+ best_usable_key_parts= usable_key_parts;
+ best_rec_per_key= tmp;
+ }
}
+ tl->cached_key_nr= best_key_nr;
+ tl->cached_usable_key_parts= best_usable_key_parts;
}
+ m_key_nr= best_key_nr;
+ m_usable_key_parts= best_usable_key_parts;
if (best_key_nr == MAX_KEY)
- {
m_key_info= NULL;
- DBUG_RETURN(0);
+ else
+ {
+ m_key_info= m_table->key_info + best_key_nr;
+
+ if (!use_pk_position())
+ {
+ // Allocate buffer for key searches
+ m_key= (uchar *) my_malloc(PSI_INSTRUMENT_ME, m_key_info->key_length, MYF(MY_WME));
+ if (m_key == NULL)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ }
}
- // Allocate buffer for key searches
- m_key= (uchar *) my_malloc(PSI_INSTRUMENT_ME, best_key->key_length, MYF(MY_WME));
- if (m_key == NULL)
- DBUG_RETURN(HA_ERR_OUT_OF_MEM);
- m_key_info= best_key;
- m_key_nr= best_key_nr;
+ DBUG_EXECUTE_IF("rpl_report_chosen_key",
+ push_warning_printf(m_table->in_use,
+ Sql_condition::WARN_LEVEL_NOTE,
+ ER_UNKNOWN_ERROR, "Key chosen: %d",
+ m_key_nr == MAX_KEY ?
+ -1 : m_key_nr););
- DBUG_RETURN(0);;
+ DBUG_RETURN(0);
}
@@ -8315,6 +7394,14 @@ static int row_not_found_error(rpl_group_info *rgi)
? HA_ERR_KEY_NOT_FOUND : HA_ERR_RECORD_CHANGED;
}
+bool Rows_log_event::use_pk_position() const
+{
+ return m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION
+ && m_table->s->primary_key < MAX_KEY
+ && m_key_nr == m_table->s->primary_key
+ && m_usable_key_parts == m_table->key_info->user_defined_key_parts;
+}
+
static int end_of_file_error(rpl_group_info *rgi)
{
return rgi->speculation != rpl_group_info::SPECULATE_OPTIMISTIC
@@ -8356,11 +7443,13 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
{
DBUG_ENTER("Rows_log_event::find_row");
- DBUG_ASSERT(m_table && m_table->in_use != NULL);
+ DBUG_ASSERT(m_table);
+ DBUG_ASSERT(m_table->in_use != NULL);
TABLE *table= m_table;
int error= 0;
bool is_table_scan= false, is_index_scan= false;
+ Check_level_instant_set clis(table->in_use, CHECK_FIELD_IGNORE);
/*
rpl_row_tabledefs.test specifies that
@@ -8391,8 +7480,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
DBUG_PRINT("info",("looking for the following record"));
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
- if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
- table->s->primary_key < MAX_KEY)
+ if (use_pk_position())
{
/*
Use a more efficient method to fetch the record given by
@@ -8413,7 +7501,6 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
table->s->reclength) == 0);
*/
- int error;
DBUG_PRINT("info",("locating record using primary key (position)"));
error= table->file->ha_rnd_pos_by_record(table->record[0]);
@@ -8429,12 +7516,6 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
// We can't use position() - try other methods.
- /*
- We need to retrieve all fields
- TODO: Move this out from this function to main loop
- */
- table->use_all_columns();
-
/*
Save copy of the record in table->record[1]. It might be needed
later if linear search is used to find exact match.
@@ -8481,10 +7562,13 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
table->record[0][table->s->null_bytes - 1]|=
256U - (1U << table->s->last_null_bit_pos);
- if (unlikely((error= table->file->ha_index_read_map(table->record[0],
- m_key,
- HA_WHOLE_KEY,
- HA_READ_KEY_EXACT))))
+ const enum ha_rkey_function find_flag=
+ m_usable_key_parts == m_key_info->user_defined_key_parts
+ ? HA_READ_KEY_EXACT : HA_READ_KEY_OR_NEXT;
+ error= table->file->ha_index_read_map(table->record[0], m_key,
+ make_keypart_map(m_usable_key_parts),
+ find_flag);
+ if (unlikely(error))
{
DBUG_PRINT("info",("no record matching the key found in the table"));
if (error == HA_ERR_KEY_NOT_FOUND)
@@ -8516,10 +7600,10 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
found. I can see no scenario where it would be incorrect to
chose the row to change only using a PK or an UNNI.
*/
- if (table->key_info->flags & HA_NOSAME)
+ if (find_flag == HA_READ_KEY_EXACT && table->key_info->flags & HA_NOSAME)
{
/* Unique does not have non nullable part */
- if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
+ if (!(table->key_info->flags & HA_NULL_PART_KEY))
{
error= 0;
goto end;
@@ -8592,9 +7676,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
/* Continue until we find the right record or have made a full loop */
do
{
- error= table->file->ha_rnd_next(table->record[0]);
-
- if (unlikely(error))
+ if (unlikely((error= table->file->ha_rnd_next(table->record[0]))))
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
switch (error) {
@@ -8656,16 +7738,16 @@ Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
m_type= DELETE_ROWS_COMPRESSED_EVENT_V1;
}
-bool Delete_rows_compressed_log_event::write()
+bool Delete_rows_compressed_log_event::write(Log_event_writer *writer)
{
- return Rows_log_event::write_compressed();
+ return Rows_log_event::write_compressed(writer);
}
#if defined(HAVE_REPLICATION)
int
-Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+Delete_rows_log_event::do_before_row_operations(const rpl_group_info *rgi)
{
/*
Increment the global status delete count variable
@@ -8673,23 +7755,14 @@ Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability
if (get_flags(STMT_END_F))
status_var_increment(thd->status_var.com_stat[SQLCOM_DELETE]);
- if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
- m_table->s->primary_key < MAX_KEY)
- {
- /*
- We don't need to allocate any memory for m_key since it is not used.
- */
- return 0;
- }
if (do_invoke_trigger())
m_table->prepare_triggers_for_delete_stmt_or_event();
- return find_key();
+ return find_key(rgi);
}
int
-Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
- int error)
+Delete_rows_log_event::do_after_row_operations(int error)
{
m_table->file->ha_index_or_rnd_end();
my_free(m_key);
@@ -8748,7 +7821,6 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
error= HA_ERR_GENERIC; // in case if error is not set yet
if (likely(!error))
{
- m_table->mark_columns_per_binlog_row_image();
if (m_vers_from_plain && m_table->versioned(VERS_TIMESTAMP))
{
Field *end= m_table->vers_end_field();
@@ -8761,7 +7833,6 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
error= m_table->file->ha_delete_row(m_table->record[0]);
}
- m_table->default_column_bitmaps();
}
if (invoke_triggers && likely(!error) &&
unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_AFTER, FALSE)))
@@ -8775,7 +7846,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
#endif /* defined(HAVE_REPLICATION) */
#if defined(HAVE_REPLICATION)
-uint8 Delete_rows_log_event::get_trg_event_map()
+uint8 Delete_rows_log_event::get_trg_event_map() const
{
return trg2bit(TRG_EVENT_DELETE);
}
@@ -8805,9 +7876,9 @@ Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1;
}
-bool Update_rows_compressed_log_event::write()
+bool Update_rows_compressed_log_event::write(Log_event_writer *writer)
{
- return Rows_log_event::write_compressed();
+ return Rows_log_event::write_compressed(writer);
}
void Update_rows_log_event::init(MY_BITMAP const *cols)
@@ -8827,7 +7898,7 @@ void Update_rows_log_event::init(MY_BITMAP const *cols)
#if defined(HAVE_REPLICATION)
int
-Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+Update_rows_log_event::do_before_row_operations(const rpl_group_info *rgi)
{
/*
Increment the global status update count variable
@@ -8836,7 +7907,7 @@ Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability
status_var_increment(thd->status_var.com_stat[SQLCOM_UPDATE]);
int err;
- if ((err= find_key()))
+ if ((err= find_key(rgi)))
return err;
if (do_invoke_trigger())
@@ -8846,8 +7917,7 @@ Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability
}
int
-Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
- int error)
+Update_rows_log_event::do_after_row_operations(int error)
{
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
m_table->file->ha_index_or_rnd_end();
@@ -9004,7 +8074,7 @@ err:
#if defined(HAVE_REPLICATION)
-uint8 Update_rows_log_event::get_trg_event_map()
+uint8 Update_rows_log_event::get_trg_event_map() const
{
return trg2bit(TRG_EVENT_UPDATE);
}
@@ -9089,23 +8159,23 @@ int Incident_log_event::do_apply_event(rpl_group_info *rgi)
bool
-Incident_log_event::write_data_header()
+Incident_log_event::write_data_header(Log_event_writer *writer)
{
DBUG_ENTER("Incident_log_event::write_data_header");
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
- DBUG_RETURN(write_data(buf, sizeof(buf)));
+ DBUG_RETURN(write_data(writer, buf, sizeof(buf)));
}
bool
-Incident_log_event::write_data_body()
+Incident_log_event::write_data_body(Log_event_writer *writer)
{
uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body");
tmp[0]= (uchar) m_message.length;
- DBUG_RETURN(write_data(tmp, sizeof(tmp)) ||
- write_data(m_message.str, m_message.length));
+ DBUG_RETURN(write_data(writer, tmp, sizeof(tmp)) ||
+ write_data(writer, m_message.str, m_message.length));
}