summaryrefslogtreecommitdiffstats
path: root/sql/log_event_server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r--sql/log_event_server.cc123
1 files changed, 75 insertions, 48 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 5cb15c1c..84a00b5d 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -918,6 +918,10 @@ int Log_event_writer::write_header(uchar *pos, size_t len)
int Log_event_writer::write_data(const uchar *pos, size_t len)
{
DBUG_ENTER("Log_event_writer::write_data");
+
+ if (!len)
+ DBUG_RETURN(0);
+
if (checksum_len)
crc= my_checksum(crc, pos, len);
@@ -4561,11 +4565,16 @@ bool XA_prepare_log_event::write()
#if defined(HAVE_REPLICATION)
static bool
user_var_append_name_part(THD *thd, String *buf,
- const char *name, size_t name_len)
+ const char *name, size_t name_len,
+ const LEX_CSTRING &data_type_name)
{
return buf->append('@') ||
append_identifier(thd, buf, name, name_len) ||
- buf->append('=');
+ buf->append('=') ||
+ (data_type_name.length &&
+ (buf->append(STRING_WITH_LEN("/*")) ||
+ buf->append(data_type_name.str, data_type_name.length) ||
+ buf->append(STRING_WITH_LEN("*/"))));
}
void User_var_log_event::pack_info(Protocol* protocol)
@@ -4575,14 +4584,15 @@ void User_var_log_event::pack_info(Protocol* protocol)
char buf_mem[FN_REFLEN+7];
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
buf.length(0);
- if (user_var_append_name_part(protocol->thd, &buf, name, name_len) ||
+ if (user_var_append_name_part(protocol->thd, &buf, name, name_len,
+ m_data_type_name) ||
buf.append(NULL_clex_str))
return;
protocol->store(buf.ptr(), buf.length(), &my_charset_bin);
}
else
{
- switch (type) {
+ switch (m_type) {
case REAL_RESULT:
{
double real_val;
@@ -4591,7 +4601,8 @@ void User_var_log_event::pack_info(Protocol* protocol)
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
float8get(real_val, val);
buf.length(0);
- if (user_var_append_name_part(protocol->thd, &buf, name, name_len) ||
+ if (user_var_append_name_part(protocol->thd, &buf, name, name_len,
+ m_data_type_name) ||
buf.append(buf2, my_gcvt(real_val, MY_GCVT_ARG_DOUBLE,
MY_GCVT_MAX_FIELD_WIDTH, buf2, NULL)))
return;
@@ -4604,10 +4615,11 @@ void User_var_log_event::pack_info(Protocol* protocol)
char buf_mem[FN_REFLEN + 22];
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
buf.length(0);
- if (user_var_append_name_part(protocol->thd, &buf, name, name_len) ||
+ if (user_var_append_name_part(protocol->thd, &buf, name, name_len,
+ m_data_type_name) ||
buf.append(buf2,
longlong10_to_str(uint8korr(val), buf2,
- ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10))-buf2))
+ (is_unsigned() ? 10 : -10))-buf2))
return;
protocol->store(buf.ptr(), buf.length(), &my_charset_bin);
break;
@@ -4620,7 +4632,8 @@ void User_var_log_event::pack_info(Protocol* protocol)
String str(buf2, sizeof(buf2), &my_charset_bin);
buf.length(0);
my_decimal((const uchar *) (val + 2), val[0], val[1]).to_string(&str);
- if (user_var_append_name_part(protocol->thd, &buf, name, name_len) ||
+ if (user_var_append_name_part(protocol->thd, &buf, name, name_len,
+ m_data_type_name) ||
buf.append(str))
return;
protocol->store(buf.ptr(), buf.length(), &my_charset_bin);
@@ -4636,7 +4649,7 @@ void User_var_log_event::pack_info(Protocol* protocol)
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
CHARSET_INFO *cs;
buf.length(0);
- if (!(cs= get_charset(charset_number, MYF(0))))
+ if (!(cs= get_charset(m_charset_number, MYF(0))))
{
if (buf.append(STRING_WITH_LEN("???")))
return;
@@ -4645,7 +4658,8 @@ void User_var_log_event::pack_info(Protocol* protocol)
{
size_t old_len;
char *beg, *end;
- if (user_var_append_name_part(protocol->thd, &buf, name, name_len) ||
+ if (user_var_append_name_part(protocol->thd, &buf, name, name_len,
+ m_data_type_name) ||
buf.append('_') ||
buf.append(cs->cs_name) ||
buf.append(' '))
@@ -4693,10 +4707,10 @@ bool User_var_log_event::write()
}
else
{
- buf1[1]= type;
- int4store(buf1 + 2, charset_number);
+ buf1[1]= m_type;
+ int4store(buf1 + 2, m_charset_number);
- switch (type) {
+ switch (m_type) {
case REAL_RESULT:
float8store(buf2, *(double*) val);
break;
@@ -4726,15 +4740,28 @@ bool User_var_log_event::write()
buf1_length= 10;
}
+ uchar data_type_name_chunk_signature= (uchar) CHUNK_DATA_TYPE_NAME;
+ uint data_type_name_chunk_signature_length= m_data_type_name.length ? 1 : 0;
+ uchar data_type_name_length_length= m_data_type_name.length ? 1 : 0;
+
/* Length of the whole event */
- event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
+ event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len +
+ data_type_name_chunk_signature_length +
+ data_type_name_length_length +
+ (uint) m_data_type_name.length;
+ uchar unsig= m_is_unsigned ? CHUNK_UNSIGNED : CHUNK_SIGNED;
+ uchar data_type_name_length= (uchar) m_data_type_name.length;
return write_header(event_length) ||
write_data(buf, sizeof(buf)) ||
write_data(name, name_len) ||
write_data(buf1, buf1_length) ||
write_data(pos, val_len) ||
- write_data(&flags, unsigned_len) ||
+ write_data(&unsig, unsigned_len) ||
+ write_data(&data_type_name_chunk_signature,
+ data_type_name_chunk_signature_length) ||
+ write_data(&data_type_name_length, data_type_name_length_length) ||
+ write_data(m_data_type_name.str, (uint) m_data_type_name.length) ||
write_footer();
}
@@ -4758,7 +4785,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
current_thd->query_id= query_id; /* recreating original time context */
}
- if (!(charset= get_charset(charset_number, MYF(MY_WME))))
+ if (!(charset= get_charset(m_charset_number, MYF(MY_WME))))
{
rgi->rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4777,7 +4804,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
}
else
{
- switch (type) {
+ switch (m_type) {
case REAL_RESULT:
if (val_len != 8)
{
@@ -4841,13 +4868,10 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
if (e->fix_fields(thd, 0))
DBUG_RETURN(1);
- /*
- A variable can just be considered as a table with
- a single record and with a single column. Thus, like
- a column value, it could always have IMPLICIT derivation.
- */
- e->update_hash((void*) val, val_len, type, charset,
- (flags & User_var_log_event::UNSIGNED_F));
+ const Type_handler *th= Type_handler::handler_by_log_event_data_type(thd,
+ *this);
+ e->update_hash((void*) val, val_len, th, charset);
+
if (!is_deferred())
free_root(thd->mem_root, 0);
else
@@ -5575,13 +5599,14 @@ bool sql_ex_info::write_data(Log_event_writer *writer)
Rows_log_event member functions
**************************************************************************/
-Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
+Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulonglong table_id,
MY_BITMAP const *cols, bool is_transactional,
Log_event_type event_type)
: Log_event(thd_arg, 0, is_transactional),
m_row_count(0),
m_table(tbl_arg),
- m_table_id(tid),
+ m_table_id(table_id),
m_width(tbl_arg ? tbl_arg->s->fields : 1),
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0),
m_type(event_type), m_extra_row_data(0)
@@ -5593,12 +5618,13 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
{
/*
We allow a special form of dummy event when the table, and cols
- are null and the table id is ~0UL. This is a temporary
+ are null and the table id is UINT32_MAX. This is a temporary
solution, to be able to terminate a started statement in the
binary log: the extraneous events will be removed in the future.
*/
- DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
- (!tbl_arg && !cols && tid == ~0UL));
+ DBUG_ASSERT((tbl_arg && tbl_arg->s &&
+ (table_id & MAX_TABLE_MAP_ID) != UINT32_MAX) ||
+ (!tbl_arg && !cols && (table_id & MAX_TABLE_MAP_ID) == UINT32_MAX));
if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
set_flags(NO_FOREIGN_KEY_CHECKS_F);
@@ -5745,12 +5771,12 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
LEX *lex= thd->lex;
uint8 new_trg_event_map= get_trg_event_map();
/*
- If m_table_id == ~0ULL, then we have a dummy event that does not
+ If m_table_id == UINT32_MAX, then we have a dummy event that does not
contain any data. In that case, we just remove all tables in the
tables_to_lock list, close the thread tables, and return with
success.
*/
- if (m_table_id == ~0ULL)
+ if (m_table_id == UINT32_MAX)
{
/*
This one is supposed to be set: just an extra check so that
@@ -6412,10 +6438,10 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
bool Rows_log_event::write_data_header()
{
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
- DBUG_ASSERT(m_table_id != ~0ULL);
+ DBUG_ASSERT(m_table_id != UINT32_MAX);
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
{
- int4store(buf + 0, m_table_id);
+ int4store(buf + 0, (ulong) m_table_id);
int2store(buf + 4, m_flags);
return (write_data(buf, 6));
});
@@ -6620,7 +6646,7 @@ int Table_map_log_event::save_field_metadata()
Mats says tbl->s lives longer than this event so it's ok to copy pointers
(tbl->s->db etc) and not pointer content.
*/
-Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
+Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulonglong tid,
bool is_transactional)
: Log_event(thd, 0, is_transactional),
m_table(tbl),
@@ -6643,7 +6669,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
uchar cbuf[MAX_INT_WIDTH];
uchar *cbuf_end;
DBUG_ENTER("Table_map_log_event::Table_map_log_event(TABLE)");
- DBUG_ASSERT(m_table_id != ~0ULL);
+ DBUG_ASSERT(m_table_id != UINT32_MAX);
/*
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
table.cc / alloc_table_share():
@@ -6929,7 +6955,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
char buf[256];
my_snprintf(buf, sizeof(buf),
- "Found table map event mapping table id %u which "
+ "Found table map event mapping table id %llu which "
"was already mapped but with different settings.",
table_list->table_id);
@@ -6970,11 +6996,11 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
bool Table_map_log_event::write_data_header()
{
- DBUG_ASSERT(m_table_id != ~0ULL);
+ DBUG_ASSERT(m_table_id != UINT32_MAX);
uchar buf[TABLE_MAP_HEADER_LEN];
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
{
- int4store(buf + 0, m_table_id);
+ int4store(buf + 0, (ulong) m_table_id);
int2store(buf + 4, m_flags);
return (write_data(buf, 6));
});
@@ -7410,7 +7436,7 @@ void Table_map_log_event::pack_info(Protocol *protocol)
{
char buf[256];
size_t bytes= my_snprintf(buf, sizeof(buf),
- "table_id: %llu (%s.%s)",
+ "table_id: %llu (%s.%s)",
m_table_id, m_dbnam, m_tblnam);
protocol->store(buf, bytes, &my_charset_bin);
}
@@ -7425,7 +7451,7 @@ void Table_map_log_event::pack_info(Protocol *protocol)
Constructor used to build an event for writing to the binary log.
*/
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid_arg,
+ ulonglong tid_arg,
bool is_transactional)
:Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->rpl_write_set,
is_transactional, WRITE_ROWS_EVENT_V1)
@@ -7435,7 +7461,7 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
Write_rows_compressed_log_event::Write_rows_compressed_log_event(
THD *thd_arg,
TABLE *tbl_arg,
- ulong tid_arg,
+ ulonglong tid_arg,
bool is_transactional)
: Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
{
@@ -7521,7 +7547,7 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
indexed and it cannot have a DEFAULT value).
*/
m_table->auto_increment_field_not_null= FALSE;
- m_table->mark_auto_increment_column();
+ m_table->mark_auto_increment_column(true);
}
return error;
@@ -8554,7 +8580,8 @@ end:
*/
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid, bool is_transactional)
+ ulonglong tid,
+ bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional,
DELETE_ROWS_EVENT_V1)
{
@@ -8562,7 +8589,7 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
THD *thd_arg, TABLE *tbl_arg,
- ulong tid_arg,
+ ulonglong tid_arg,
bool is_transactional)
: Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
{
@@ -8702,7 +8729,7 @@ uint8 Delete_rows_log_event::get_trg_event_map()
Constructor used to build an event for writing to the binary log.
*/
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid,
+ ulonglong tid,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional,
UPDATE_ROWS_EVENT_V1)
@@ -8710,9 +8737,9 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
init(tbl_arg->rpl_write_set);
}
-Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid,
- bool is_transactional)
+Update_rows_compressed_log_event::
+Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulonglong tid, bool is_transactional)
: Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional)
{
m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1;