diff options
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r-- | sql/log_event_server.cc | 123 |
1 files changed, 75 insertions, 48 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 5cb15c1c..84a00b5d 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -918,6 +918,10 @@ int Log_event_writer::write_header(uchar *pos, size_t len) int Log_event_writer::write_data(const uchar *pos, size_t len) { DBUG_ENTER("Log_event_writer::write_data"); + + if (!len) + DBUG_RETURN(0); + if (checksum_len) crc= my_checksum(crc, pos, len); @@ -4561,11 +4565,16 @@ bool XA_prepare_log_event::write() #if defined(HAVE_REPLICATION) static bool user_var_append_name_part(THD *thd, String *buf, - const char *name, size_t name_len) + const char *name, size_t name_len, + const LEX_CSTRING &data_type_name) { return buf->append('@') || append_identifier(thd, buf, name, name_len) || - buf->append('='); + buf->append('=') || + (data_type_name.length && + (buf->append(STRING_WITH_LEN("/*")) || + buf->append(data_type_name.str, data_type_name.length) || + buf->append(STRING_WITH_LEN("*/")))); } void User_var_log_event::pack_info(Protocol* protocol) @@ -4575,14 +4584,15 @@ void User_var_log_event::pack_info(Protocol* protocol) char buf_mem[FN_REFLEN+7]; String buf(buf_mem, sizeof(buf_mem), system_charset_info); buf.length(0); - if (user_var_append_name_part(protocol->thd, &buf, name, name_len) || + if (user_var_append_name_part(protocol->thd, &buf, name, name_len, + m_data_type_name) || buf.append(NULL_clex_str)) return; protocol->store(buf.ptr(), buf.length(), &my_charset_bin); } else { - switch (type) { + switch (m_type) { case REAL_RESULT: { double real_val; @@ -4591,7 +4601,8 @@ void User_var_log_event::pack_info(Protocol* protocol) String buf(buf_mem, sizeof(buf_mem), system_charset_info); float8get(real_val, val); buf.length(0); - if (user_var_append_name_part(protocol->thd, &buf, name, name_len) || + if (user_var_append_name_part(protocol->thd, &buf, name, name_len, + m_data_type_name) || buf.append(buf2, my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH, buf2, NULL))) return; @@ -4604,10 +4615,11 @@ void User_var_log_event::pack_info(Protocol* protocol) char buf_mem[FN_REFLEN + 22]; String buf(buf_mem, sizeof(buf_mem), system_charset_info); buf.length(0); - if (user_var_append_name_part(protocol->thd, &buf, name, name_len) || + if (user_var_append_name_part(protocol->thd, &buf, name, name_len, + m_data_type_name) || buf.append(buf2, longlong10_to_str(uint8korr(val), buf2, - ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10))-buf2)) + (is_unsigned() ? 10 : -10))-buf2)) return; protocol->store(buf.ptr(), buf.length(), &my_charset_bin); break; @@ -4620,7 +4632,8 @@ void User_var_log_event::pack_info(Protocol* protocol) String str(buf2, sizeof(buf2), &my_charset_bin); buf.length(0); my_decimal((const uchar *) (val + 2), val[0], val[1]).to_string(&str); - if (user_var_append_name_part(protocol->thd, &buf, name, name_len) || + if (user_var_append_name_part(protocol->thd, &buf, name, name_len, + m_data_type_name) || buf.append(str)) return; protocol->store(buf.ptr(), buf.length(), &my_charset_bin); @@ -4636,7 +4649,7 @@ void User_var_log_event::pack_info(Protocol* protocol) String buf(buf_mem, sizeof(buf_mem), system_charset_info); CHARSET_INFO *cs; buf.length(0); - if (!(cs= get_charset(charset_number, MYF(0)))) + if (!(cs= get_charset(m_charset_number, MYF(0)))) { if (buf.append(STRING_WITH_LEN("???"))) return; @@ -4645,7 +4658,8 @@ void User_var_log_event::pack_info(Protocol* protocol) { size_t old_len; char *beg, *end; - if (user_var_append_name_part(protocol->thd, &buf, name, name_len) || + if (user_var_append_name_part(protocol->thd, &buf, name, name_len, + m_data_type_name) || buf.append('_') || buf.append(cs->cs_name) || buf.append(' ')) @@ -4693,10 +4707,10 @@ bool User_var_log_event::write() } else { - buf1[1]= type; - int4store(buf1 + 2, charset_number); + buf1[1]= m_type; + int4store(buf1 + 2, m_charset_number); - switch (type) { + switch (m_type) { case REAL_RESULT: float8store(buf2, *(double*) val); break; @@ -4726,15 +4740,28 @@ bool User_var_log_event::write() buf1_length= 10; } + uchar data_type_name_chunk_signature= (uchar) CHUNK_DATA_TYPE_NAME; + uint data_type_name_chunk_signature_length= m_data_type_name.length ? 1 : 0; + uchar data_type_name_length_length= m_data_type_name.length ? 1 : 0; + /* Length of the whole event */ - event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; + event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len + + data_type_name_chunk_signature_length + + data_type_name_length_length + + (uint) m_data_type_name.length; + uchar unsig= m_is_unsigned ? CHUNK_UNSIGNED : CHUNK_SIGNED; + uchar data_type_name_length= (uchar) m_data_type_name.length; return write_header(event_length) || write_data(buf, sizeof(buf)) || write_data(name, name_len) || write_data(buf1, buf1_length) || write_data(pos, val_len) || - write_data(&flags, unsigned_len) || + write_data(&unsig, unsigned_len) || + write_data(&data_type_name_chunk_signature, + data_type_name_chunk_signature_length) || + write_data(&data_type_name_length, data_type_name_length_length) || + write_data(m_data_type_name.str, (uint) m_data_type_name.length) || write_footer(); } @@ -4758,7 +4785,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi) current_thd->query_id= query_id; /* recreating original time context */ } - if (!(charset= get_charset(charset_number, MYF(MY_WME)))) + if (!(charset= get_charset(m_charset_number, MYF(MY_WME)))) { rgi->rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4777,7 +4804,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi) } else { - switch (type) { + switch (m_type) { case REAL_RESULT: if (val_len != 8) { @@ -4841,13 +4868,10 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi) if (e->fix_fields(thd, 0)) DBUG_RETURN(1); - /* - A variable can just be considered as a table with - a single record and with a single column. Thus, like - a column value, it could always have IMPLICIT derivation. - */ - e->update_hash((void*) val, val_len, type, charset, - (flags & User_var_log_event::UNSIGNED_F)); + const Type_handler *th= Type_handler::handler_by_log_event_data_type(thd, + *this); + e->update_hash((void*) val, val_len, th, charset); + if (!is_deferred()) free_root(thd->mem_root, 0); else @@ -5575,13 +5599,14 @@ bool sql_ex_info::write_data(Log_event_writer *writer) Rows_log_event member functions **************************************************************************/ -Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, +Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, + ulonglong table_id, MY_BITMAP const *cols, bool is_transactional, Log_event_type event_type) : Log_event(thd_arg, 0, is_transactional), m_row_count(0), m_table(tbl_arg), - m_table_id(tid), + m_table_id(table_id), m_width(tbl_arg ? tbl_arg->s->fields : 1), m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0), m_type(event_type), m_extra_row_data(0) @@ -5593,12 +5618,13 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, { /* We allow a special form of dummy event when the table, and cols - are null and the table id is ~0UL. This is a temporary + are null and the table id is UINT32_MAX. This is a temporary solution, to be able to terminate a started statement in the binary log: the extraneous events will be removed in the future. */ - DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) || - (!tbl_arg && !cols && tid == ~0UL)); + DBUG_ASSERT((tbl_arg && tbl_arg->s && + (table_id & MAX_TABLE_MAP_ID) != UINT32_MAX) || + (!tbl_arg && !cols && (table_id & MAX_TABLE_MAP_ID) == UINT32_MAX)); if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS) set_flags(NO_FOREIGN_KEY_CHECKS_F); @@ -5745,12 +5771,12 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) LEX *lex= thd->lex; uint8 new_trg_event_map= get_trg_event_map(); /* - If m_table_id == ~0ULL, then we have a dummy event that does not + If m_table_id == UINT32_MAX, then we have a dummy event that does not contain any data. In that case, we just remove all tables in the tables_to_lock list, close the thread tables, and return with success. */ - if (m_table_id == ~0ULL) + if (m_table_id == UINT32_MAX) { /* This one is supposed to be set: just an extra check so that @@ -6412,10 +6438,10 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) bool Rows_log_event::write_data_header() { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer - DBUG_ASSERT(m_table_id != ~0ULL); + DBUG_ASSERT(m_table_id != UINT32_MAX); DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", { - int4store(buf + 0, m_table_id); + int4store(buf + 0, (ulong) m_table_id); int2store(buf + 4, m_flags); return (write_data(buf, 6)); }); @@ -6620,7 +6646,7 @@ int Table_map_log_event::save_field_metadata() Mats says tbl->s lives longer than this event so it's ok to copy pointers (tbl->s->db etc) and not pointer content. */ -Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, +Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulonglong tid, bool is_transactional) : Log_event(thd, 0, is_transactional), m_table(tbl), @@ -6643,7 +6669,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, uchar cbuf[MAX_INT_WIDTH]; uchar *cbuf_end; DBUG_ENTER("Table_map_log_event::Table_map_log_event(TABLE)"); - DBUG_ASSERT(m_table_id != ~0ULL); + DBUG_ASSERT(m_table_id != UINT32_MAX); /* In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in table.cc / alloc_table_share(): @@ -6929,7 +6955,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) char buf[256]; my_snprintf(buf, sizeof(buf), - "Found table map event mapping table id %u which " + "Found table map event mapping table id %llu which " "was already mapped but with different settings.", table_list->table_id); @@ -6970,11 +6996,11 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) bool Table_map_log_event::write_data_header() { - DBUG_ASSERT(m_table_id != ~0ULL); + DBUG_ASSERT(m_table_id != UINT32_MAX); uchar buf[TABLE_MAP_HEADER_LEN]; DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", { - int4store(buf + 0, m_table_id); + int4store(buf + 0, (ulong) m_table_id); int2store(buf + 4, m_flags); return (write_data(buf, 6)); }); @@ -7410,7 +7436,7 @@ void Table_map_log_event::pack_info(Protocol *protocol) { char buf[256]; size_t bytes= my_snprintf(buf, sizeof(buf), - "table_id: %llu (%s.%s)", + "table_id: %llu (%s.%s)", m_table_id, m_dbnam, m_tblnam); protocol->store(buf, bytes, &my_charset_bin); } @@ -7425,7 +7451,7 @@ void Table_map_log_event::pack_info(Protocol *protocol) Constructor used to build an event for writing to the binary log. */ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, - ulong tid_arg, + ulonglong tid_arg, bool is_transactional) :Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->rpl_write_set, is_transactional, WRITE_ROWS_EVENT_V1) @@ -7435,7 +7461,7 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, Write_rows_compressed_log_event::Write_rows_compressed_log_event( THD *thd_arg, TABLE *tbl_arg, - ulong tid_arg, + ulonglong tid_arg, bool is_transactional) : Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional) { @@ -7521,7 +7547,7 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability indexed and it cannot have a DEFAULT value). */ m_table->auto_increment_field_not_null= FALSE; - m_table->mark_auto_increment_column(); + m_table->mark_auto_increment_column(true); } return error; @@ -8554,7 +8580,8 @@ end: */ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, - ulong tid, bool is_transactional) + ulonglong tid, + bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional, DELETE_ROWS_EVENT_V1) { @@ -8562,7 +8589,7 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, Delete_rows_compressed_log_event::Delete_rows_compressed_log_event( THD *thd_arg, TABLE *tbl_arg, - ulong tid_arg, + ulonglong tid_arg, bool is_transactional) : Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional) { @@ -8702,7 +8729,7 @@ uint8 Delete_rows_log_event::get_trg_event_map() Constructor used to build an event for writing to the binary log. */ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, - ulong tid, + ulonglong tid, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional, UPDATE_ROWS_EVENT_V1) @@ -8710,9 +8737,9 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, init(tbl_arg->rpl_write_set); } -Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, - ulong tid, - bool is_transactional) +Update_rows_compressed_log_event:: +Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, + ulonglong tid, bool is_transactional) : Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional) { m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1; |