diff options
Diffstat (limited to 'sql/log_event_client.cc')
-rw-r--r-- | sql/log_event_client.cc | 469 |
1 files changed, 189 insertions, 280 deletions
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc index d9f7d52a..94ae2559 100644 --- a/sql/log_event_client.cc +++ b/sql/log_event_client.cc @@ -346,14 +346,14 @@ bool Log_event::print_header(IO_CACHE* file, /* print the checksum */ - if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + if (read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "%p " size_t const bytes_written= - my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08x ", crc); + my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08x ", read_checksum_value); if (my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib, - checksum_alg)) || + read_checksum_alg)) || my_b_printf(file, checksum_buf, bytes_written)) goto err; } @@ -951,8 +951,7 @@ size_t Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, PRINT_EVENT_INFO *print_event_info, MY_BITMAP *cols_bitmap, - const uchar *value, const uchar *prefix, - const my_bool no_fill_output) + const uchar *value, const uchar *prefix) { const uchar *value0= value; const uchar *null_bits= value; @@ -972,8 +971,7 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, value+= (bitmap_bits_set(cols_bitmap) + 7) / 8; - if (!no_fill_output) - if (my_b_printf(file, "%s", prefix)) + if (my_b_printf(file, "%s", prefix)) goto err; for (uint i= 0; i < (uint)td->size(); i ++) @@ -985,25 +983,22 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, if (bitmap_is_set(cols_bitmap, i) == 0) continue; - if (!no_fill_output) - if (my_b_printf(file, "### @%d=", static_cast<int>(i + 1))) - goto err; + if (my_b_printf(file, "### @%d=", static_cast<int>(i + 1))) + goto err; if (!is_null) { size_t fsize= td->calc_field_size((uint)i, (uchar*) value); if (value + fsize > m_rows_end) { - if (!no_fill_output) - if (my_b_printf(file, "***Corrupted replication event was detected." - " Not printing the value***\n")) - goto err; + if (my_b_printf(file, "***Corrupted replication event was detected." + " Not printing the value***\n")) + goto err; value+= fsize; return 0; } } - if (!no_fill_output) { size= log_event_print_value(file, print_event_info, is_null? NULL: value, td->type(i), td->field_metadata(i), @@ -1055,16 +1050,6 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, } #endif } - else - { - IO_CACHE tmp_cache; - open_cached_file(&tmp_cache, NULL, NULL, 0, MYF(MY_WME | MY_NABP)); - size= log_event_print_value(&tmp_cache, print_event_info, - is_null ? NULL: value, - td->type(i), td->field_metadata(i), - typestr, sizeof(typestr)); - close_cached_file(&tmp_cache); - } if (!size) goto err; @@ -1072,7 +1057,7 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, if (!is_null) value+= size; - if (print_event_info->verbose > 1 && !no_fill_output) + if (print_event_info->verbose > 1) { if (my_b_write(file, (uchar*)" /* ", 4) || my_b_printf(file, "%s ", typestr) || @@ -1083,9 +1068,8 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, goto err; } - if (!no_fill_output) - if (my_b_write_byte(file, '\n')) - goto err; + if (my_b_write_byte(file, '\n')) + goto err; null_bit_index++; } @@ -1095,6 +1079,42 @@ err: return 0; } +/** + Construct a row image according to field information. + + @param[in] cols_bitmap The bitmap of row columns + @param[in] fields Values of columns used to construct a row image. + @buf[out] buf Where the row image stored. + */ +static uchar *fill_row_image(const MY_BITMAP *cols_bitmap, + const Rows_log_event::Field_info *fields, + uchar *buf) +{ + uchar *null_bits= buf; + uint nulls_bit_index= 0; + size_t null_bytes= (bitmap_bits_set(cols_bitmap) + 7) / 8; + + memset(null_bits, 0, null_bytes); + buf+= null_bytes;; + + for (uint i= 0; i < cols_bitmap->n_bits; i ++) + { + if (!bitmap_is_set(cols_bitmap, i)) continue; + + if (fields[i].pos) // Field is not null + { + memcpy(buf, fields[i].pos, fields[i].length); + buf+= fields[i].length; + } + else + { + // set the null bit + null_bits[nulls_bit_index / 8]|= (1 << (nulls_bit_index % 8)); + } + nulls_bit_index++; + } + return buf; +} /** Exchange the SET part and WHERE part for the Update events. @@ -1111,8 +1131,9 @@ void Rows_log_event::change_to_flashback_event(PRINT_EVENT_INFO *print_event_inf Table_map_log_event *map; table_def *td; DYNAMIC_ARRAY rows_arr; - uchar *swap_buff1; uchar *rows_pos= rows_buff + m_rows_before_size; + Field_info *ai_fields= nullptr; + Field_info *bi_fields= nullptr; if (!(map= print_event_info->m_table_map.get_table(m_table_id)) || !(td= map->create_table_def())) @@ -1124,66 +1145,96 @@ void Rows_log_event::change_to_flashback_event(PRINT_EVENT_INFO *print_event_inf (void) my_init_dynamic_array(PSI_NOT_INSTRUMENTED, &rows_arr, sizeof(LEX_STRING), 8, 8, MYF(0)); + if (get_general_type_code() == UPDATE_ROWS_EVENT || + get_general_type_code() == UPDATE_ROWS_EVENT_V1) + { + if (!bitmap_is_set_all(&m_cols_ai)) + { + ai_fields= (Field_info *) my_malloc(PSI_NOT_INSTRUMENTED, + sizeof(Field_info) * td->size(), + MYF(MY_ZEROFILL)); + bi_fields= (Field_info *) my_malloc(PSI_NOT_INSTRUMENTED, + sizeof(Field_info) * td->size(), + MYF(MY_ZEROFILL)); + if (!ai_fields || !bi_fields) + { + fprintf(stderr, "\nError: Out of memory. " + "Could not exchange to flashback event.\n"); + exit(1); + } + } + } + for (uchar *value= m_rows_buf; value < m_rows_end; ) { uchar *start_pos= value; size_t length1= 0; - if (!(length1= print_verbose_one_row(NULL, td, print_event_info, - &m_cols, value, - (const uchar*) "", TRUE))) + if (!(length1= calc_row_event_length(td, &m_cols, value, bi_fields))) { fprintf(stderr, "\nError row length: %zu\n", length1); exit(1); } value+= length1; - swap_buff1= (uchar *) my_malloc(PSI_NOT_INSTRUMENTED, length1, MYF(0)); - if (!swap_buff1) - { - fprintf(stderr, "\nError: Out of memory. " - "Could not exchange to flashback event.\n"); - exit(1); - } - memcpy(swap_buff1, start_pos, length1); - // For Update_event, we have the second part size_t length2= 0; if (ev_type == UPDATE_ROWS_EVENT || ev_type == UPDATE_ROWS_EVENT_V1) { - if (!(length2= print_verbose_one_row(NULL, td, print_event_info, - &m_cols, value, - (const uchar*) "", TRUE))) + if (!(length2= calc_row_event_length(td, &m_cols_ai, value, ai_fields))) { fprintf(stderr, "\nError row length: %zu\n", length2); exit(1); } value+= length2; - - void *swap_buff2= my_malloc(PSI_NOT_INSTRUMENTED, length2, MYF(0)); - if (!swap_buff2) - { - fprintf(stderr, "\nError: Out of memory. " - "Could not exchange to flashback event.\n"); - exit(1); - } - memcpy(swap_buff2, start_pos + length1, length2); // WHERE part - - /* Swap SET and WHERE part */ - memcpy(start_pos, swap_buff2, length2); - memcpy(start_pos + length2, swap_buff1, length1); - my_free(swap_buff2); } - my_free(swap_buff1); - /* Copying one row into a buff, and pushing into the array */ LEX_STRING one_row; one_row.length= length1 + length2; - one_row.str= (char *) my_malloc(PSI_NOT_INSTRUMENTED, one_row.length, MYF(0)); - memcpy(one_row.str, start_pos, one_row.length); - if (one_row.str == NULL || push_dynamic(&rows_arr, (uchar *) &one_row)) + one_row.str= (char *) my_malloc(PSI_NOT_INSTRUMENTED, one_row.length, MYF(0)); + if (!one_row.str) + { + fprintf(stderr, "\nError: Out of memory. " + "Could not exchange to flashback event.\n"); + exit(1); + } + + if (length2 != 0) // It has before and after image + { + if (!bi_fields) + { + // Both bi and ai inclues all columns, Swap WHERE and SET Part + memcpy(one_row.str, start_pos + length1, length2); + memcpy(one_row.str+length2, start_pos, length1); + } + else + { + for (uint i= 0; i < (uint)td->size(); i ++) + { + // swap after and before image columns + if (bitmap_is_set(&m_cols, i) && bitmap_is_set(&m_cols_ai, i)) + { + Field_info tmp_field= bi_fields[i]; + bi_fields[i]= ai_fields[i]; + ai_fields[i]= tmp_field; + } + } + + // Recreate the before and after image + uchar *pos= (uchar*)one_row.str; + pos= fill_row_image(&m_cols, bi_fields, pos); + pos= fill_row_image(&m_cols_ai, ai_fields, pos); + assert(pos == (uchar*)one_row.str + one_row.length); + } + } + else + { + memcpy(one_row.str, start_pos, one_row.length); + } + + if (push_dynamic(&rows_arr, (uchar *) &one_row)) { fprintf(stderr, "\nError: Out of memory. " "Could not push flashback event into array.\n"); @@ -1203,6 +1254,11 @@ void Rows_log_event::change_to_flashback_event(PRINT_EVENT_INFO *print_event_inf delete_dynamic(&rows_arr); end: + if (bi_fields) + { + my_free(bi_fields); + my_free(ai_fields); + } delete td; } @@ -1323,12 +1379,22 @@ static size_t calc_field_event_length(const uchar *ptr, uint type, uint meta) return 0; } +/** + It parses a row image and returns its length and information of + columns if 'fields' is not null. + + @param[in] td Table definition of the row image + @param[in] cols_bitmap It marks the columns present in the row image + @param[in] value The row image which will be parsed + @param[out] fields Returns field information of the parsed row image. + @return length of the parsed row image if succeeds, otherwise 0 is returned. + */ size_t Rows_log_event::calc_row_event_length(table_def *td, - PRINT_EVENT_INFO *print_event_info, MY_BITMAP *cols_bitmap, - const uchar *value) + const uchar *value, + Field_info *fields) { const uchar *value0= value; const uchar *null_bits= value; @@ -1361,8 +1427,19 @@ Rows_log_event::calc_row_event_length(table_def *td, if (!(size= calc_field_event_length(value, td->type(i), td->field_metadata(i)))) return 0; + + if (fields) + { + fields[i].pos= value; + fields[i].length= size; + } + value+= size; } + else if (fields) + { + fields[i].pos= NULL; + } null_bit_index++; } return value - value0; @@ -1409,8 +1486,7 @@ void Rows_log_event::count_row_events(PRINT_EVENT_INFO *print_event_info) print_event_info->row_events++; /* Print the first image */ - if (!(length= calc_row_event_length(td, print_event_info, - &m_cols, value))) + if (!(length= calc_row_event_length(td, &m_cols, value, NULL))) break; value+= length; DBUG_ASSERT(value <= m_rows_end); @@ -1418,8 +1494,7 @@ void Rows_log_event::count_row_events(PRINT_EVENT_INFO *print_event_info) /* Print the second image (for UPDATE only) */ if (row_events == 2) { - if (!(length= calc_row_event_length(td, print_event_info, - &m_cols_ai, value))) + if (!(length= calc_row_event_length(td, &m_cols_ai, value, NULL))) break; value+= length; DBUG_ASSERT(value <= m_rows_end); @@ -1466,7 +1541,7 @@ bool Rows_log_event::print_verbose(IO_CACHE *file, */ const int buff_len= 2 + (256 * 2) + 1; char buff[buff_len]; - str_to_hex(buff, (const char*) &m_extra_row_data[EXTRA_ROW_INFO_HDR_BYTES], + str_to_hex(buff, &m_extra_row_data[EXTRA_ROW_INFO_HDR_BYTES], extra_payload_len); if (my_b_printf(file, "%s", buff)) goto err; @@ -1605,8 +1680,8 @@ bool Log_event::print_base64(IO_CACHE* file, uint tmp_size= size; Rows_log_event *ev= NULL; Log_event_type ev_type = (enum Log_event_type) ptr[EVENT_TYPE_OFFSET]; - if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && - checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + if (read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && + read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) tmp_size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header switch (ev_type) { case WRITE_ROWS_EVENT: @@ -1673,8 +1748,8 @@ bool Log_event::print_base64(IO_CACHE* file, Rows_log_event *ev= NULL; Log_event_type et= (Log_event_type) ptr[EVENT_TYPE_OFFSET]; - if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && - checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + if (read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && + read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header switch (et) @@ -1983,6 +2058,38 @@ bool Query_log_event::print_query_header(IO_CACHE* file, memcpy(print_event_info->charset, charset, 6); print_event_info->charset_inited= 1; } + + if (character_set_collations.length) + { + Charset_collation_map_st map; + size_t length= map.from_binary(character_set_collations.str, + character_set_collations.length); + if (length == character_set_collations.length) + { + Binary_string str; + size_t nbytes= map.text_format_nbytes_needed(); + if (str.alloc(nbytes)) + goto err; + size_t text_length= map.print((char*) str.ptr(), nbytes); + str.length(text_length); + /* + my_b_printf() does not seem to support '%.*s' + so append a \0 terminator. + */ + str.append_char('\0'); + if (my_b_printf(file, "SET @@session.character_set_collations='%s'%s\n", + str.ptr(), print_event_info->delimiter)) + goto err; + } + else + { + if (my_b_printf(file, + "/* SET @@session.character_set_collations='%s' */\n", + "<format not recognized>")) + goto err; + } + } + if (time_zone_len) { if (memcmp(print_event_info->time_zone_str, @@ -2105,9 +2212,9 @@ err: } -bool Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +bool Format_description_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { - DBUG_ENTER("Start_log_event_v3::print"); + DBUG_ENTER("Format_description_log_event::print"); Write_on_release_cache cache(&print_event_info->head_cache, file, Write_on_release_cache::FLUSH_F); @@ -2189,122 +2296,6 @@ bool Start_encryption_log_event::print(FILE* file, } -bool Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) -{ - return print(file, print_event_info, 0); -} - - -bool Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info, - bool commented) -{ - Write_on_release_cache cache(&print_event_info->head_cache, file_arg); - bool different_db= 1; - DBUG_ENTER("Load_log_event::print"); - - if (!print_event_info->short_form) - { - if (print_header(&cache, print_event_info, FALSE) || - my_b_printf(&cache, "\tQuery\tthread_id=%ld\texec_time=%ld\n", - thread_id, exec_time)) - goto err; - } - - if (db) - { - /* - If the database is different from the one of the previous statement, we - need to print the "use" command, and we update the last_db. - But if commented, the "use" is going to be commented so we should not - update the last_db. - */ - if ((different_db= memcmp(print_event_info->db, db, db_len + 1)) && - !commented) - memcpy(print_event_info->db, db, db_len + 1); - } - - if (db && db[0] && different_db) - if (my_b_printf(&cache, "%suse %`s%s\n", - commented ? "# " : "", - db, print_event_info->delimiter)) - goto err; - - if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - if (my_b_printf(&cache,"%sSET @@session.pseudo_thread_id=%lu%s\n", - commented ? "# " : "", (ulong)thread_id, - print_event_info->delimiter)) - goto err; - if (my_b_printf(&cache, "%sLOAD DATA ", - commented ? "# " : "")) - goto err; - if (check_fname_outside_temp_buf()) - if (my_b_write_string(&cache, "LOCAL ")) - goto err; - if (my_b_printf(&cache, "INFILE '%-*s' ", fname_len, fname)) - goto err; - - if (sql_ex.opt_flags & REPLACE_FLAG) - { - if (my_b_write_string(&cache, "REPLACE ")) - goto err; - } - else if (sql_ex.opt_flags & IGNORE_FLAG) - if (my_b_write_string(&cache, "IGNORE ")) - goto err; - - if (my_b_printf(&cache, "INTO TABLE `%s`", table_name) || - my_b_write_string(&cache, " FIELDS TERMINATED BY ") || - pretty_print_str(&cache, sql_ex.field_term, sql_ex.field_term_len)) - goto err; - - if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) - if (my_b_write_string(&cache, " OPTIONALLY ")) - goto err; - if (my_b_write_string(&cache, " ENCLOSED BY ") || - pretty_print_str(&cache, sql_ex.enclosed, sql_ex.enclosed_len) || - my_b_write_string(&cache, " ESCAPED BY ") || - pretty_print_str(&cache, sql_ex.escaped, sql_ex.escaped_len) || - my_b_write_string(&cache, " LINES TERMINATED BY ") || - pretty_print_str(&cache, sql_ex.line_term, sql_ex.line_term_len)) - goto err; - - if (sql_ex.line_start) - { - if (my_b_write_string(&cache," STARTING BY ") || - pretty_print_str(&cache, sql_ex.line_start, sql_ex.line_start_len)) - goto err; - } - if ((long) skip_lines > 0) - if (my_b_printf(&cache, " IGNORE %ld LINES", (long) skip_lines)) - goto err; - - if (num_fields) - { - uint i; - const char* field = fields; - if (my_b_write_string(&cache, " (")) - goto err; - for (i = 0; i < num_fields; i++) - { - if (i) - if (my_b_write_byte(&cache, ',')) - goto err; - if (my_b_printf(&cache, "%`s", field)) - goto err; - field += field_lens[i] + 1; - } - if (my_b_write_byte(&cache, ')')) - goto err; - } - - if (my_b_printf(&cache, "%s\n", print_event_info->delimiter)) - goto err; - DBUG_RETURN(cache.flush_data()); -err: - DBUG_RETURN(1); -} - - bool Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { if (print_event_info->short_form) @@ -2549,7 +2540,7 @@ bool User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) hex_str= (char *) my_malloc(PSI_NOT_INSTRUMENTED, 2 * val_len + 1 + 3, MYF(MY_WME)); if (!hex_str) goto err; - str_to_hex(hex_str, val, val_len); + str_to_hex(hex_str, (uchar*)val, val_len); /* For proper behaviour when mysqlbinlog|mysql, we need to explicitly specify the variable's collation. It will however cause problems when @@ -2626,61 +2617,6 @@ bool Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #endif -bool Create_file_log_event::print(FILE* file, - PRINT_EVENT_INFO* print_event_info, - bool enable_local) -{ - if (print_event_info->short_form) - { - if (enable_local && check_fname_outside_temp_buf()) - return Load_log_event::print(file, print_event_info); - return 0; - } - - Write_on_release_cache cache(&print_event_info->head_cache, file); - - if (enable_local) - { - if (Load_log_event::print(file, print_event_info, - !check_fname_outside_temp_buf())) - goto err; - - /** - reduce the size of io cache so that the write function is called - for every call to my_b_printf(). - */ - DBUG_EXECUTE_IF ("simulate_create_event_write_error", - {(&cache)->write_pos= (&cache)->write_end; - DBUG_SET("+d,simulate_file_write_error");}); - /* - That one is for "file_id: etc" below: in mysqlbinlog we want the #, in - SHOW BINLOG EVENTS we don't. - */ - if (my_b_write_byte(&cache, '#')) - goto err; - } - - if (my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len)) - goto err; - - return cache.flush_data(); -err: - return 1; - -} - - -bool Create_file_log_event::print(FILE* file, - PRINT_EVENT_INFO* print_event_info) -{ - return print(file, print_event_info, 0); -} - - -/* - Append_block_log_event::print() -*/ - bool Append_block_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { @@ -2700,10 +2636,6 @@ err: } -/* - Delete_file_log_event::print() -*/ - bool Delete_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { @@ -2719,25 +2651,6 @@ bool Delete_file_log_event::print(FILE* file, return cache.flush_data(); } -/* - Execute_load_log_event::print() -*/ - -bool Execute_load_log_event::print(FILE* file, - PRINT_EVENT_INFO* print_event_info) -{ - if (print_event_info->short_form) - return 0; - - Write_on_release_cache cache(&print_event_info->head_cache, file); - - if (print_header(&cache, print_event_info, FALSE) || - my_b_printf(&cache, "\n#Exec_load: file_id=%d\n", - file_id)) - return 1; - - return cache.flush_data(); -} bool Execute_load_query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) @@ -2995,10 +2908,6 @@ err: where fragments are represented by a pair of indexed user "one shot" variables. - @note - If any changes made don't forget to duplicate them to - Old_rows_log_event as long as it's supported. - @param file pointer to IO_CACHE @param print_event_info pointer to print_event_info specializing what out of and how to print the event @@ -3673,7 +3582,7 @@ bool Write_rows_compressed_log_event::print(FILE *file, ulong len; bool is_malloc = false; if(!row_log_event_uncompress(glob_description_event, - checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) { @@ -3710,7 +3619,7 @@ bool Delete_rows_compressed_log_event::print(FILE *file, ulong len; bool is_malloc = false; if(!row_log_event_uncompress(glob_description_event, - checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) { @@ -3747,7 +3656,7 @@ Update_rows_compressed_log_event::print(FILE *file, ulong len; bool is_malloc= false; if(!row_log_event_uncompress(glob_description_event, - checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) { |