summaryrefslogtreecommitdiffstats
path: root/sql/log_event.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /sql/log_event.cc
parentInitial commit. (diff)
downloadmariadb-10.5-upstream.tar.xz
mariadb-10.5-upstream.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--sql/log_event.cc4077
1 files changed, 4077 insertions, 0 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
new file mode 100644
index 00000000..b35ed188
--- /dev/null
+++ b/sql/log_event.cc
@@ -0,0 +1,4077 @@
+/*
+ Copyright (c) 2000, 2018, Oracle and/or its affiliates.
+ Copyright (c) 2009, 2021, MariaDB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
+
+
+#include "mariadb.h"
+#include "sql_priv.h"
+#include "handler.h"
+#ifndef MYSQL_CLIENT
+#include "unireg.h"
+#include "log_event.h"
+#include "sql_base.h" // close_thread_tables
+#include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
+#include "sql_locale.h" // MY_LOCALE, my_locale_by_number, my_locale_en_US
+#include "key.h" // key_copy
+#include "lock.h" // mysql_unlock_tables
+#include "sql_parse.h" // mysql_test_parse_for_slave
+#include "tztime.h" // struct Time_zone
+#include "sql_load.h" // mysql_load
+#include "sql_db.h" // load_db_opt_by_name
+#include "slave.h"
+#include "rpl_rli.h"
+#include "rpl_mi.h"
+#include "rpl_filter.h"
+#include "rpl_record.h"
+#include "transaction.h"
+#include <my_dir.h>
+#include "sql_show.h" // append_identifier
+#include <mysql/psi/mysql_statement.h>
+#include <strfunc.h>
+#include "compat56.h"
+#include "sql_insert.h"
+#else
+#include "mysqld_error.h"
+#endif /* MYSQL_CLIENT */
+
+#include <my_bitmap.h>
+#include "rpl_utility.h"
+#include "rpl_constants.h"
+#include "sql_digest.h"
+#include "zlib.h"
+
+#define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
+
+#ifndef _AIX
+PSI_memory_key key_memory_log_event;
+#endif
+PSI_memory_key key_memory_Incident_log_event_message;
+PSI_memory_key key_memory_Rows_query_log_event_rows_query;
+
+/**
+ BINLOG_CHECKSUM variable.
+*/
+const char *binlog_checksum_type_names[]= {
+ "NONE",
+ "CRC32",
+ NullS
+};
+
+unsigned int binlog_checksum_type_length[]= {
+ sizeof("NONE") - 1,
+ sizeof("CRC32") - 1,
+ 0
+};
+
+TYPELIB binlog_checksum_typelib=
+{
+ array_elements(binlog_checksum_type_names) - 1, "",
+ binlog_checksum_type_names,
+ binlog_checksum_type_length
+};
+
+
+#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
+
+/*
+ Size of buffer for printing a double in format %.<PREC>g
+
+ optional '-' + optional zero + '.' + PREC digits + 'e' + sign +
+ exponent digits + '\0'
+*/
+#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
+
+/*
+ replication event checksum is introduced in the following "checksum-home" version.
+ The checksum-aware servers extract FD's version to decide whether the FD event
+ carries checksum info.
+
+ TODO: correct the constant when it has been determined
+ (which main tree to push and when)
+*/
+const Version checksum_version_split_mysql(5, 6, 1);
+const Version checksum_version_split_mariadb(5, 3, 0);
+
+// First MySQL version with fraction seconds
+const Version fsp_version_split_mysql(5, 6, 0);
+
+/*
+ Cache that will automatically be written to a dedicated file on
+ destruction.
+
+ DESCRIPTION
+
+ */
+class Write_on_release_cache
+{
+public:
+ enum flag
+ {
+ FLUSH_F
+ };
+
+ typedef unsigned short flag_set;
+
+ /*
+ Constructor.
+
+ SYNOPSIS
+ Write_on_release_cache
+ cache Pointer to cache to use
+ file File to write cache to upon destruction
+ flags Flags for the cache
+
+ DESCRIPTION
+ Cache common parameters and ensure common flush_data() code
+ on successful copy of the cache, the cache will be reinited as a
+ WRITE_CACHE.
+
+ Currently, a pointer to the cache is provided in the
+ constructor, but it would be possible to create a subclass
+ holding the IO_CACHE itself.
+ */
+ Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0, Log_event *ev = NULL)
+ : m_cache(cache), m_file(file), m_flags(flags), m_ev(ev)
+ {
+ reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE);
+ }
+
+ ~Write_on_release_cache() {}
+
+ bool flush_data()
+ {
+#ifdef MYSQL_CLIENT
+ if (m_ev == NULL)
+ {
+ if (copy_event_cache_to_file_and_reinit(m_cache, m_file))
+ return 1;
+ if ((m_flags & FLUSH_F) && fflush(m_file))
+ return 1;
+ }
+ else // if m_ev<>NULL, then storing the output in output_buf
+ {
+ LEX_STRING tmp_str;
+ bool res;
+ if (copy_event_cache_to_string_and_reinit(m_cache, &tmp_str))
+ return 1;
+ /* use 2 argument append as tmp_str is not \0 terminated */
+ res= m_ev->output_buf.append(tmp_str.str, tmp_str.length);
+ my_free(tmp_str.str);
+ return res ? res : 0;
+ }
+#else /* MySQL_SERVER */
+ if (copy_event_cache_to_file_and_reinit(m_cache, m_file))
+ return 1;
+ if ((m_flags & FLUSH_F) && fflush(m_file))
+ return 1;
+#endif
+ return 0;
+ }
+
+ /*
+ Return a pointer to the internal IO_CACHE.
+
+ SYNOPSIS
+ operator&()
+
+ DESCRIPTION
+
+ Function to return a pointer to the internal cache, so that the
+ object can be treated as a IO_CACHE and used with the my_b_*
+ IO_CACHE functions
+
+ RETURN VALUE
+ A pointer to the internal IO_CACHE.
+ */
+ IO_CACHE *operator&()
+ {
+ return m_cache;
+ }
+
+private:
+ // Hidden, to prevent usage.
+ Write_on_release_cache(Write_on_release_cache const&);
+
+ IO_CACHE *m_cache;
+ FILE *m_file;
+ flag_set m_flags;
+ Log_event *m_ev; // Used for Flashback
+};
+
+#ifndef DBUG_OFF
+#define DBUG_DUMP_EVENT_BUF(B,L) \
+ do { \
+ const uchar *_buf=(uchar*)(B); \
+ size_t _len=(L); \
+ if (_len >= LOG_EVENT_MINIMAL_HEADER_LEN) \
+ { \
+ DBUG_PRINT("data", ("header: timestamp:%u type:%u server_id:%u len:%u log_pos:%u flags:%u", \
+ uint4korr(_buf), _buf[EVENT_TYPE_OFFSET], \
+ uint4korr(_buf+SERVER_ID_OFFSET), \
+ uint4korr(_buf+EVENT_LEN_OFFSET), \
+ uint4korr(_buf+LOG_POS_OFFSET), \
+ uint4korr(_buf+FLAGS_OFFSET))); \
+ DBUG_DUMP("data", _buf+LOG_EVENT_MINIMAL_HEADER_LEN, \
+ _len-LOG_EVENT_MINIMAL_HEADER_LEN); \
+ } \
+ else \
+ DBUG_DUMP("data", _buf, _len); \
+ } while(0)
+#else
+#define DBUG_DUMP_EVENT_BUF(B,L) do { } while(0)
+#endif
+
+/*
+ read_str()
+*/
+
+static inline int read_str(const char **buf, const char *buf_end,
+ const char **str, uint8 *len)
+{
+ if (*buf + ((uint) (uchar) **buf) >= buf_end)
+ return 1;
+ *len= (uint8) **buf;
+ *str= (*buf)+1;
+ (*buf)+= (uint) *len+1;
+ return 0;
+}
+
+
+/**
+ Transforms a string into "" or its expression in X'HHHH' form.
+*/
+
+char *str_to_hex(char *to, const char *from, size_t len)
+{
+ if (len)
+ {
+ *to++= 'X';
+ *to++= '\'';
+ to= octet2hex(to, from, len);
+ *to++= '\'';
+ *to= '\0';
+ }
+ else
+ to= strmov(to, "\"\"");
+ return to; // pointer to end 0 of 'to'
+}
+
+#define BINLOG_COMPRESSED_HEADER_LEN 1
+#define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4
+/**
+ Compressed Record
+ Record Header: 1 Byte
+ 7 Bit: Always 1, mean compressed;
+ 4-6 Bit: Compressed algorithm - Always 0, means zlib
+ It maybe support other compression algorithm in the future.
+ 0-3 Bit: Bytes of "Record Original Length"
+ Record Original Length: 1-4 Bytes
+ Compressed Buf:
+*/
+
+/**
+ Get the length of compress content.
+*/
+
+uint32 binlog_get_compress_len(uint32 len)
+{
+ /* 5 for the begin content, 1 reserved for a '\0'*/
+ return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES)
+ + compressBound(len) + 1);
+}
+
+/**
+ Compress buf from 'src' to 'dst'.
+
+ Note: 1) Then the caller should guarantee the length of 'dst', which
+ can be got by binlog_get_uncompress_len, is enough to hold
+ the content uncompressed.
+ 2) The 'comlen' should stored the length of 'dst', and it will
+ be set as the size of compressed content after return.
+
+ return zero if successful, others otherwise.
+*/
+int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
+{
+ uchar lenlen;
+ if (len & 0xFF000000)
+ {
+ dst[1] = uchar(len >> 24);
+ dst[2] = uchar(len >> 16);
+ dst[3] = uchar(len >> 8);
+ dst[4] = uchar(len);
+ lenlen = 4;
+ }
+ else if (len & 0x00FF0000)
+ {
+ dst[1] = uchar(len >> 16);
+ dst[2] = uchar(len >> 8);
+ dst[3] = uchar(len);
+ lenlen = 3;
+ }
+ else if (len & 0x0000FF00)
+ {
+ dst[1] = uchar(len >> 8);
+ dst[2] = uchar(len);
+ lenlen = 2;
+ }
+ else
+ {
+ dst[1] = uchar(len);
+ lenlen = 1;
+ }
+ dst[0] = 0x80 | (lenlen & 0x07);
+
+ uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1;
+ if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen,
+ (const Bytef *)src, (uLongf)len) != Z_OK)
+ {
+ return 1;
+ }
+ *comlen = (uint32)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen;
+ return 0;
+}
+
+/**
+ Convert a query_compressed_log_event to query_log_event
+ from 'src' to 'dst', the size after compression stored in 'newlen'.
+
+ @Note:
+ 1) The caller should call my_free to release 'dst' if *is_malloc is
+ returned as true.
+ 2) If *is_malloc is retuened as false, then 'dst' reuses the passed-in
+ 'buf'.
+
+ return zero if successful, non-zero otherwise.
+*/
+
+int
+query_event_uncompress(const Format_description_log_event *description_event,
+ bool contain_checksum, const char *src, ulong src_len,
+ char* buf, ulong buf_size, bool* is_malloc, char **dst,
+ ulong *newlen)
+{
+ ulong len = uint4korr(src + EVENT_LEN_OFFSET);
+ const char *tmp = src;
+ const char *end = src + len;
+
+ // bad event
+ if (src_len < len )
+ return 1;
+
+ DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT);
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len=
+ description_event->post_header_len[QUERY_COMPRESSED_EVENT-1];
+
+ *is_malloc = false;
+
+ tmp += common_header_len;
+ // bad event
+ if (end <= tmp)
+ return 1;
+
+ uint db_len = (uint)tmp[Q_DB_LEN_OFFSET];
+ uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET);
+
+ tmp += post_header_len + status_vars_len + db_len + 1;
+ // bad event
+ if (end <= tmp)
+ return 1;
+
+ int32 comp_len = (int32)(len - (tmp - src) -
+ (contain_checksum ? BINLOG_CHECKSUM_LEN : 0));
+ uint32 un_len = binlog_get_uncompress_len(tmp);
+
+ // bad event
+ if (comp_len < 0 || un_len == 0)
+ return 1;
+
+ *newlen = (ulong)(tmp - src) + un_len;
+ if(contain_checksum)
+ *newlen += BINLOG_CHECKSUM_LEN;
+
+ uint32 alloc_size = (uint32)ALIGN_SIZE(*newlen);
+ char *new_dst = NULL;
+
+
+ if (alloc_size <= buf_size)
+ {
+ new_dst = buf;
+ }
+ else
+ {
+ new_dst = (char *)my_malloc(PSI_INSTRUMENT_ME, alloc_size, MYF(MY_WME));
+ if (!new_dst)
+ return 1;
+
+ *is_malloc = true;
+ }
+
+ /* copy the head*/
+ memcpy(new_dst, src , tmp - src);
+ if (binlog_buf_uncompress(tmp, new_dst + (tmp - src),
+ comp_len, &un_len))
+ {
+ if (*is_malloc)
+ my_free(new_dst);
+
+ *is_malloc = false;
+
+ return 1;
+ }
+
+ new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT;
+ int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
+ if(contain_checksum)
+ {
+ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
+ int4store(new_dst + clear_len,
+ my_checksum(0L, (uchar *)new_dst, clear_len));
+ }
+ *dst = new_dst;
+ return 0;
+}
+
+int
+row_log_event_uncompress(const Format_description_log_event *description_event,
+ bool contain_checksum, const char *src, ulong src_len,
+ char* buf, ulong buf_size, bool* is_malloc, char **dst,
+ ulong *newlen)
+{
+ Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
+ ulong len = uint4korr(src + EVENT_LEN_OFFSET);
+ const char *tmp = src;
+ char *new_dst = NULL;
+ const char *end = tmp + len;
+
+ // bad event
+ if (src_len < len)
+ return 1;
+
+ DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[type-1];
+
+ tmp += common_header_len + ROWS_HEADER_LEN_V1;
+ if (post_header_len == ROWS_HEADER_LEN_V2)
+ {
+ /*
+ Have variable length header, check length,
+ which includes length bytes
+ */
+
+ // bad event
+ if (end - tmp <= 2)
+ return 1;
+
+ uint16 var_header_len= uint2korr(tmp);
+ DBUG_ASSERT(var_header_len >= 2);
+
+ /* skip over var-len header, extracting 'chunks' */
+ tmp += var_header_len;
+
+ /* get the uncompressed event type */
+ type=
+ (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT);
+ }
+ else
+ {
+ /* get the uncompressed event type */
+ type= (Log_event_type)
+ (type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1);
+ }
+
+ //bad event
+ if (end <= tmp)
+ return 1;
+
+ ulong m_width = net_field_length((uchar **)&tmp);
+ tmp += (m_width + 7) / 8;
+
+ if (type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_EVENT)
+ {
+ tmp += (m_width + 7) / 8;
+ }
+
+ //bad event
+ if (end <= tmp)
+ return 1;
+
+ uint32 un_len = binlog_get_uncompress_len(tmp);
+ //bad event
+ if (un_len == 0)
+ return 1;
+
+ int32 comp_len = (int32)(len - (tmp - src) -
+ (contain_checksum ? BINLOG_CHECKSUM_LEN : 0));
+ //bad event
+ if (comp_len <=0)
+ return 1;
+
+ *newlen = ulong(tmp - src) + un_len;
+ if(contain_checksum)
+ *newlen += BINLOG_CHECKSUM_LEN;
+
+ size_t alloc_size = ALIGN_SIZE(*newlen);
+
+ *is_malloc = false;
+ if (alloc_size <= buf_size)
+ {
+ new_dst = buf;
+ }
+ else
+ {
+ new_dst = (char *)my_malloc(PSI_INSTRUMENT_ME, alloc_size, MYF(MY_WME));
+ if (!new_dst)
+ return 1;
+
+ *is_malloc = true;
+ }
+
+ /* Copy the head. */
+ memcpy(new_dst, src , tmp - src);
+ /* Uncompress the body. */
+ if (binlog_buf_uncompress(tmp, new_dst + (tmp - src),
+ comp_len, &un_len))
+ {
+ if (*is_malloc)
+ my_free(new_dst);
+
+ return 1;
+ }
+
+ new_dst[EVENT_TYPE_OFFSET] = type;
+ int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
+ if(contain_checksum){
+ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
+ int4store(new_dst + clear_len,
+ my_checksum(0L, (uchar *)new_dst, clear_len));
+ }
+ *dst = new_dst;
+ return 0;
+}
+
+/**
+ Get the length of uncompress content.
+ return 0 means error.
+*/
+
+uint32 binlog_get_uncompress_len(const char *buf)
+{
+ uint32 len = 0;
+ uint32 lenlen = 0;
+
+ if ((buf == NULL) || ((buf[0] & 0xe0) != 0x80))
+ return len;
+
+ lenlen = buf[0] & 0x07;
+
+ switch(lenlen)
+ {
+ case 1:
+ len = uchar(buf[1]);
+ break;
+ case 2:
+ len = uchar(buf[1]) << 8 | uchar(buf[2]);
+ break;
+ case 3:
+ len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]);
+ break;
+ case 4:
+ len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 |
+ uchar(buf[3]) << 8 | uchar(buf[4]);
+ break;
+ default:
+ DBUG_ASSERT(lenlen >= 1 && lenlen <= 4);
+ break;
+ }
+ return len;
+}
+
+/**
+ Uncompress the content in 'src' with length of 'len' to 'dst'.
+
+ Note: 1) Then the caller should guarantee the length of 'dst' (which
+ can be got by statement_get_uncompress_len) is enough to hold
+ the content uncompressed.
+ 2) The 'newlen' should stored the length of 'dst', and it will
+ be set as the size of uncompressed content after return.
+
+ return zero if successful, others otherwise.
+*/
+int binlog_buf_uncompress(const char *src, char *dst, uint32 len,
+ uint32 *newlen)
+{
+ if((src[0] & 0x80) == 0)
+ {
+ return 1;
+ }
+
+ uint32 lenlen= src[0] & 0x07;
+ uLongf buflen= *newlen;
+
+ uint32 alg = (src[0] & 0x70) >> 4;
+ switch(alg)
+ {
+ case 0:
+ // zlib
+ if(uncompress((Bytef *)dst, &buflen,
+ (const Bytef*)src + 1 + lenlen, len - 1 - lenlen) != Z_OK)
+ {
+ return 1;
+ }
+ break;
+ default:
+ //TODO
+ //bad algorithm
+ return 1;
+ }
+
+ DBUG_ASSERT(*newlen == (uint32)buflen);
+ *newlen = (uint32)buflen;
+ return 0;
+}
+
+
+/**************************************************************************
+ Log_event methods (= the parent class of all events)
+**************************************************************************/
+
+/**
+ @return
+ returns the human readable name of the event's type
+*/
+
+const char* Log_event::get_type_str(Log_event_type type)
+{
+ switch(type) {
+ case START_EVENT_V3: return "Start_v3";
+ case STOP_EVENT: return "Stop";
+ case QUERY_EVENT: return "Query";
+ case ROTATE_EVENT: return "Rotate";
+ case INTVAR_EVENT: return "Intvar";
+ case LOAD_EVENT: return "Load";
+ case NEW_LOAD_EVENT: return "New_load";
+ case SLAVE_EVENT: return "Slave";
+ case CREATE_FILE_EVENT: return "Create_file";
+ case APPEND_BLOCK_EVENT: return "Append_block";
+ case DELETE_FILE_EVENT: return "Delete_file";
+ case EXEC_LOAD_EVENT: return "Exec_load";
+ case RAND_EVENT: return "RAND";
+ case XID_EVENT: return "Xid";
+ case USER_VAR_EVENT: return "User var";
+ case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
+ case TABLE_MAP_EVENT: return "Table_map";
+ case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
+ case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
+ case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
+ case WRITE_ROWS_EVENT_V1: return "Write_rows_v1";
+ case UPDATE_ROWS_EVENT_V1: return "Update_rows_v1";
+ case DELETE_ROWS_EVENT_V1: return "Delete_rows_v1";
+ case WRITE_ROWS_EVENT: return "Write_rows";
+ case UPDATE_ROWS_EVENT: return "Update_rows";
+ case DELETE_ROWS_EVENT: return "Delete_rows";
+ case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
+ case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
+ case INCIDENT_EVENT: return "Incident";
+ case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
+ case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
+ case GTID_EVENT: return "Gtid";
+ case GTID_LIST_EVENT: return "Gtid_list";
+ case START_ENCRYPTION_EVENT: return "Start_encryption";
+
+ /* The following is only for mysqlbinlog */
+ case IGNORABLE_LOG_EVENT: return "Ignorable log event";
+ case ROWS_QUERY_LOG_EVENT: return "MySQL Rows_query";
+ case GTID_LOG_EVENT: return "MySQL Gtid";
+ case ANONYMOUS_GTID_LOG_EVENT: return "MySQL Anonymous_Gtid";
+ case PREVIOUS_GTIDS_LOG_EVENT: return "MySQL Previous_gtids";
+ case HEARTBEAT_LOG_EVENT: return "Heartbeat";
+ case TRANSACTION_CONTEXT_EVENT: return "Transaction_context";
+ case VIEW_CHANGE_EVENT: return "View_change";
+ case XA_PREPARE_LOG_EVENT: return "XA_prepare";
+ case QUERY_COMPRESSED_EVENT: return "Query_compressed";
+ case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed";
+ case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed";
+ case DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed";
+ case WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1";
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1";
+ case DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1";
+
+ default: return "Unknown"; /* impossible */
+ }
+}
+
+const char* Log_event::get_type_str()
+{
+ return get_type_str(get_type_code());
+}
+
+
+/*
+ Log_event::Log_event()
+*/
+
+Log_event::Log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
+ checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+{
+#ifndef MYSQL_CLIENT
+ thd = 0;
+#endif
+ when = uint4korr(buf);
+ when_sec_part= ~0UL;
+ server_id = uint4korr(buf + SERVER_ID_OFFSET);
+ data_written= uint4korr(buf + EVENT_LEN_OFFSET);
+ if (description_event->binlog_version==1)
+ {
+ log_pos= 0;
+ flags= 0;
+ return;
+ }
+ /* 4.0 or newer */
+ log_pos= uint4korr(buf + LOG_POS_OFFSET);
+ /*
+ If the log is 4.0 (so here it can only be a 4.0 relay log read by
+ the SQL thread or a 4.0 master binlog read by the I/O thread),
+ log_pos is the beginning of the event: we transform it into the end
+ of the event, which is more useful.
+ But how do you know that the log is 4.0: you know it if
+ description_event is version 3 *and* you are not reading a
+ Format_desc (remember that mysqlbinlog starts by assuming that 5.0
+ logs are in 4.0 format, until it finds a Format_desc).
+ */
+ if (description_event->binlog_version==3 &&
+ (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
+ {
+ /*
+ If log_pos=0, don't change it. log_pos==0 is a marker to mean
+ "don't change rli->group_master_log_pos" (see
+ inc_group_relay_log_pos()). As it is unreal log_pos, adding the
+ event len's is nonsense. For example, a fake Rotate event should
+ not have its log_pos (which is 0) changed or it will modify
+ Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
+ value of (a non-zero offset which does not exist in the master's
+ binlog, so which will cause problems if the user uses this value
+ in CHANGE MASTER).
+ */
+ log_pos+= data_written; /* purecov: inspected */
+ }
+ DBUG_PRINT("info", ("log_pos: %llu", log_pos));
+
+ flags= uint2korr(buf + FLAGS_OFFSET);
+ if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
+ {
+ /*
+ These events always have a header which stops here (i.e. their
+ header is FROZEN).
+ */
+ /*
+ Initialization to zero of all other Log_event members as they're
+ not specified. Currently there are no such members; in the future
+ there will be an event UID (but Format_description and Rotate
+ don't need this UID, as they are not propagated through
+ --log-slave-updates (remember the UID is used to not play a query
+ twice when you have two masters which are slaves of a 3rd master).
+ Then we are done.
+ */
+ return;
+ }
+ /* otherwise, go on with reading the header from buf (nothing now) */
+}
+
+
+/**
+ This needn't be format-tolerant, because we only parse the first
+ LOG_EVENT_MINIMAL_HEADER_LEN bytes (just need the event's length).
+*/
+
+int Log_event::read_log_event(IO_CACHE* file, String* packet,
+ const Format_description_log_event *fdle,
+ enum enum_binlog_checksum_alg checksum_alg_arg)
+{
+ ulong data_len;
+ char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
+ uchar ev_offset= packet->length();
+#if !defined(MYSQL_CLIENT)
+ THD *thd=current_thd;
+ ulong max_allowed_packet= thd ? thd->slave_thread ? slave_max_allowed_packet
+ : thd->variables.max_allowed_packet
+ : ~(uint)0;
+#endif
+ DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,String*...)");
+
+ if (my_b_read(file, (uchar*) buf, sizeof(buf)))
+ {
+ /*
+ If the read hits eof, we must report it as eof so the caller
+ will know it can go into cond_wait to be woken up on the next
+ update to the log.
+ */
+ DBUG_PRINT("error",("file->error: %d", file->error));
+ DBUG_RETURN(file->error == 0 ? LOG_READ_EOF :
+ file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
+ }
+ data_len= uint4korr(buf + EVENT_LEN_OFFSET);
+
+ /* Append the log event header to packet */
+ if (packet->append(buf, sizeof(buf)))
+ DBUG_RETURN(LOG_READ_MEM);
+
+ if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN)
+ DBUG_RETURN(LOG_READ_BOGUS);
+
+ if (data_len > MY_MAX(max_allowed_packet,
+ opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
+ DBUG_RETURN(LOG_READ_TOO_LARGE);
+
+ if (likely(data_len > LOG_EVENT_MINIMAL_HEADER_LEN))
+ {
+ /* Append rest of event, read directly from file into packet */
+ if (packet->append(file, data_len - LOG_EVENT_MINIMAL_HEADER_LEN))
+ {
+ /*
+ Fatal error occurred when appending rest of the event
+ to packet, possible failures:
+ 1. EOF occurred when reading from file, it's really an error
+ as there's supposed to be more bytes available.
+ file->error will have been set to number of bytes left to read
+ 2. Read was interrupted, file->error would normally be set to -1
+ 3. Failed to allocate memory for packet, my_errno
+ will be ENOMEM(file->error should be 0, but since the
+ memory allocation occurs before the call to read it might
+ be uninitialized)
+ */
+ DBUG_RETURN(my_errno == ENOMEM ? LOG_READ_MEM :
+ (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
+ }
+ }
+
+ if (fdle->crypto_data.scheme)
+ {
+ uchar iv[BINLOG_IV_LENGTH];
+ fdle->crypto_data.set_iv(iv, (uint32) (my_b_tell(file) - data_len));
+ size_t sz= data_len + ev_offset + 1;
+#ifdef HAVE_WOLFSSL
+ /*
+ Workaround for MDEV-19582.
+ WolfSSL reads memory out of bounds with decryption/NOPAD)
+ We allocate a little more memory therefore.
+ */
+ sz += MY_AES_BLOCK_SIZE;
+#endif
+ char *newpkt= (char*)my_malloc(PSI_INSTRUMENT_ME, sz, MYF(MY_WME));
+ if (!newpkt)
+ DBUG_RETURN(LOG_READ_MEM);
+ memcpy(newpkt, packet->ptr(), ev_offset);
+
+ uint dstlen;
+ uchar *src= (uchar*)packet->ptr() + ev_offset;
+ uchar *dst= (uchar*)newpkt + ev_offset;
+ memcpy(src + EVENT_LEN_OFFSET, src, 4);
+ if (encryption_crypt(src + 4, data_len - 4, dst + 4, &dstlen,
+ fdle->crypto_data.key, fdle->crypto_data.key_length, iv,
+ sizeof(iv), ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD,
+ ENCRYPTION_KEY_SYSTEM_DATA, fdle->crypto_data.key_version))
+ {
+ my_free(newpkt);
+ DBUG_RETURN(LOG_READ_DECRYPT);
+ }
+ DBUG_ASSERT(dstlen == data_len - 4);
+ memcpy(dst, dst + EVENT_LEN_OFFSET, 4);
+ int4store(dst + EVENT_LEN_OFFSET, data_len);
+ packet->reset(newpkt, data_len + ev_offset, data_len + ev_offset + 1,
+ &my_charset_bin);
+ }
+
+ /*
+ CRC verification of the Dump thread
+ */
+ if (data_len > LOG_EVENT_MINIMAL_HEADER_LEN)
+ {
+ /* Corrupt the event for Dump thread*/
+ DBUG_EXECUTE_IF("corrupt_read_log_event2",
+ uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
+ if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ {
+ int debug_cor_pos = rand() % (data_len - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event2");
+ }
+ );
+ if (event_checksum_test((uchar*) packet->ptr() + ev_offset,
+ data_len, checksum_alg_arg))
+ DBUG_RETURN(LOG_READ_CHECKSUM_FAILURE);
+ }
+ DBUG_RETURN(0);
+}
+
+Log_event* Log_event::read_log_event(IO_CACHE* file,
+ const Format_description_log_event *fdle,
+ my_bool crc_check)
+{
+ DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,Format_description_log_event*...)");
+ DBUG_ASSERT(fdle != 0);
+ String event;
+ const char *error= 0;
+ Log_event *res= 0;
+
+ switch (read_log_event(file, &event, fdle, BINLOG_CHECKSUM_ALG_OFF))
+ {
+ case 0:
+ break;
+ case LOG_READ_EOF: // no error here; we are at the file's end
+ goto err;
+ case LOG_READ_BOGUS:
+ error= "Event invalid";
+ goto err;
+ case LOG_READ_IO:
+ error= "read error";
+ goto err;
+ case LOG_READ_MEM:
+ error= "Out of memory";
+ goto err;
+ case LOG_READ_TRUNC:
+ error= "Event truncated";
+ goto err;
+ case LOG_READ_TOO_LARGE:
+ error= "Event too big";
+ goto err;
+ case LOG_READ_DECRYPT:
+ error= "Event decryption failure";
+ goto err;
+ case LOG_READ_CHECKSUM_FAILURE:
+ default:
+ DBUG_ASSERT(0);
+ error= "internal error";
+ goto err;
+ }
+
+ if ((res= read_log_event(event.ptr(), event.length(),
+ &error, fdle, crc_check)))
+ res->register_temp_buf(event.release(), true);
+
+err:
+ if (unlikely(error))
+ {
+ DBUG_ASSERT(!res);
+#ifdef MYSQL_CLIENT
+ if (force_opt)
+ DBUG_RETURN(new Unknown_log_event());
+#endif
+ if (event.length() >= OLD_HEADER_LEN)
+ sql_print_error("Error in Log_event::read_log_event(): '%s',"
+ " data_len: %lu, event_type: %u", error,
+ (ulong) uint4korr(&event[EVENT_LEN_OFFSET]),
+ (uint) (uchar)event[EVENT_TYPE_OFFSET]);
+ else
+ sql_print_error("Error in Log_event::read_log_event(): '%s'", error);
+ /*
+ The SQL slave thread will check if file->error<0 to know
+ if there was an I/O error. Even if there is no "low-level" I/O errors
+ with 'file', any of the high-level above errors is worrying
+ enough to stop the SQL thread now ; as we are skipping the current event,
+ going on with reading and successfully executing other events can
+ only corrupt the slave's databases. So stop.
+ */
+ file->error= -1;
+ }
+ DBUG_RETURN(res);
+}
+
+
+/**
+ Binlog format tolerance is in (buf, event_len, fdle)
+ constructors.
+*/
+
+Log_event* Log_event::read_log_event(const char* buf, uint event_len,
+ const char **error,
+ const Format_description_log_event *fdle,
+ my_bool crc_check)
+{
+ Log_event* ev;
+ enum enum_binlog_checksum_alg alg;
+ DBUG_ENTER("Log_event::read_log_event(char*,...)");
+ DBUG_ASSERT(fdle != 0);
+ DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version));
+ DBUG_DUMP_EVENT_BUF(buf, event_len);
+
+ /*
+ Check the integrity; This is needed because handle_slave_io() doesn't
+ check if packet is of proper length.
+ */
+ if (event_len < EVENT_LEN_OFFSET)
+ {
+ *error="Sanity check failed"; // Needed to free buffer
+ DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
+ }
+
+ uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
+ // all following START events in the current file are without checksum
+ if (event_type == START_EVENT_V3)
+ (const_cast< Format_description_log_event *>(fdle))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ /*
+ CRC verification by SQL and Show-Binlog-Events master side.
+ The caller has to provide @fdle->checksum_alg to
+ be the last seen FD's (A) descriptor.
+ If event is FD the descriptor is in it.
+ Notice, FD of the binlog can be only in one instance and therefore
+ Show-Binlog-Events executing master side thread needs just to know
+ the only FD's (A) value - whereas RL can contain more.
+ In the RL case, the alg is kept in FD_e (@fdle) which is reset
+ to the newer read-out event after its execution with possibly new alg descriptor.
+ Therefore in a typical sequence of RL:
+ {FD_s^0, FD_m, E_m^1} E_m^1
+ will be verified with (A) of FD_m.
+
+ See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg docs
+ lines (log.h).
+
+ Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF.
+ */
+ alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
+ fdle->checksum_alg : get_checksum_alg(buf, event_len);
+ // Emulate the corruption during reading an event
+ DBUG_EXECUTE_IF("corrupt_read_log_event_char",
+ if (event_type != FORMAT_DESCRIPTION_EVENT)
+ {
+ char *debug_event_buf_c = (char *)buf;
+ int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event(char*,...): byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event_char");
+ }
+ );
+ if (crc_check &&
+ event_checksum_test((uchar *) buf, event_len, alg))
+ {
+#ifdef MYSQL_CLIENT
+ *error= "Event crc check failed! Most likely there is event corruption.";
+ if (force_opt)
+ {
+ ev= new Unknown_log_event(buf, fdle);
+ DBUG_RETURN(ev);
+ }
+ else
+ DBUG_RETURN(NULL);
+#else
+ *error= ER_THD_OR_DEFAULT(current_thd, ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE);
+ sql_print_error("%s", *error);
+ DBUG_RETURN(NULL);
+#endif
+ }
+
+ if (event_type > fdle->number_of_event_types &&
+ event_type != FORMAT_DESCRIPTION_EVENT)
+ {
+ /*
+ It is unsafe to use the fdle if its post_header_len
+ array does not include the event type.
+ */
+ DBUG_PRINT("error", ("event type %d found, but the current "
+ "Format_description_log_event supports only %d event "
+ "types", event_type,
+ fdle->number_of_event_types));
+ ev= NULL;
+ }
+ else
+ {
+ /*
+ In some previuos versions (see comment in
+ Format_description_log_event::Format_description_log_event(char*,...)),
+ event types were assigned different id numbers than in the
+ present version. In order to replicate from such versions to the
+ present version, we must map those event type id's to our event
+ type id's. The mapping is done with the event_type_permutation
+ array, which was set up when the Format_description_log_event
+ was read.
+ */
+ if (fdle->event_type_permutation)
+ {
+ int new_event_type= fdle->event_type_permutation[event_type];
+ DBUG_PRINT("info", ("converting event type %d to %d (%s)",
+ event_type, new_event_type,
+ get_type_str((Log_event_type)new_event_type)));
+ event_type= new_event_type;
+ }
+
+ if (alg != BINLOG_CHECKSUM_ALG_UNDEF &&
+ (event_type == FORMAT_DESCRIPTION_EVENT ||
+ alg != BINLOG_CHECKSUM_ALG_OFF))
+ event_len= event_len - BINLOG_CHECKSUM_LEN;
+
+ /*
+ Create an object of Ignorable_log_event for unrecognized sub-class.
+ So that SLAVE SQL THREAD will only update the position and continue.
+ We should look for this flag first instead of judging by event_type
+ Any event can be Ignorable_log_event if it has this flag on.
+ look into @note of Ignorable_log_event
+ */
+ if (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)
+ {
+ ev= new Ignorable_log_event(buf, fdle,
+ get_type_str((Log_event_type) event_type));
+ goto exit;
+ }
+ switch(event_type) {
+ case QUERY_EVENT:
+ ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT);
+ break;
+ case QUERY_COMPRESSED_EVENT:
+ ev = new Query_compressed_log_event(buf, event_len, fdle,
+ QUERY_COMPRESSED_EVENT);
+ break;
+ case LOAD_EVENT:
+ ev = new Load_log_event(buf, event_len, fdle);
+ break;
+ case NEW_LOAD_EVENT:
+ ev = new Load_log_event(buf, event_len, fdle);
+ break;
+ case ROTATE_EVENT:
+ ev = new Rotate_log_event(buf, event_len, fdle);
+ break;
+ case BINLOG_CHECKPOINT_EVENT:
+ ev = new Binlog_checkpoint_log_event(buf, event_len, fdle);
+ break;
+ case GTID_EVENT:
+ ev = new Gtid_log_event(buf, event_len, fdle);
+ break;
+ case GTID_LIST_EVENT:
+ ev = new Gtid_list_log_event(buf, event_len, fdle);
+ break;
+ case CREATE_FILE_EVENT:
+ ev = new Create_file_log_event(buf, event_len, fdle);
+ break;
+ case APPEND_BLOCK_EVENT:
+ ev = new Append_block_log_event(buf, event_len, fdle);
+ break;
+ case DELETE_FILE_EVENT:
+ ev = new Delete_file_log_event(buf, event_len, fdle);
+ break;
+ case EXEC_LOAD_EVENT:
+ ev = new Execute_load_log_event(buf, event_len, fdle);
+ break;
+ case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
+ ev = new Start_log_event_v3(buf, event_len, fdle);
+ break;
+ case STOP_EVENT:
+ ev = new Stop_log_event(buf, fdle);
+ break;
+ case INTVAR_EVENT:
+ ev = new Intvar_log_event(buf, fdle);
+ break;
+ case XID_EVENT:
+ ev = new Xid_log_event(buf, fdle);
+ break;
+ case XA_PREPARE_LOG_EVENT:
+ ev = new XA_prepare_log_event(buf, fdle);
+ break;
+ case RAND_EVENT:
+ ev = new Rand_log_event(buf, fdle);
+ break;
+ case USER_VAR_EVENT:
+ ev = new User_var_log_event(buf, event_len, fdle);
+ break;
+ case FORMAT_DESCRIPTION_EVENT:
+ ev = new Format_description_log_event(buf, event_len, fdle);
+ break;
+#if defined(HAVE_REPLICATION)
+ case PRE_GA_WRITE_ROWS_EVENT:
+ ev = new Write_rows_log_event_old(buf, event_len, fdle);
+ break;
+ case PRE_GA_UPDATE_ROWS_EVENT:
+ ev = new Update_rows_log_event_old(buf, event_len, fdle);
+ break;
+ case PRE_GA_DELETE_ROWS_EVENT:
+ ev = new Delete_rows_log_event_old(buf, event_len, fdle);
+ break;
+ case WRITE_ROWS_EVENT_V1:
+ case WRITE_ROWS_EVENT:
+ ev = new Write_rows_log_event(buf, event_len, fdle);
+ break;
+ case UPDATE_ROWS_EVENT_V1:
+ case UPDATE_ROWS_EVENT:
+ ev = new Update_rows_log_event(buf, event_len, fdle);
+ break;
+ case DELETE_ROWS_EVENT_V1:
+ case DELETE_ROWS_EVENT:
+ ev = new Delete_rows_log_event(buf, event_len, fdle);
+ break;
+
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Write_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Update_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Delete_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+
+ /* MySQL GTID events are ignored */
+ case GTID_LOG_EVENT:
+ case ANONYMOUS_GTID_LOG_EVENT:
+ case PREVIOUS_GTIDS_LOG_EVENT:
+ case TRANSACTION_CONTEXT_EVENT:
+ case VIEW_CHANGE_EVENT:
+ ev= new Ignorable_log_event(buf, fdle,
+ get_type_str((Log_event_type) event_type));
+ break;
+
+ case TABLE_MAP_EVENT:
+ ev = new Table_map_log_event(buf, event_len, fdle);
+ break;
+#endif
+ case BEGIN_LOAD_QUERY_EVENT:
+ ev = new Begin_load_query_log_event(buf, event_len, fdle);
+ break;
+ case EXECUTE_LOAD_QUERY_EVENT:
+ ev= new Execute_load_query_log_event(buf, event_len, fdle);
+ break;
+ case INCIDENT_EVENT:
+ ev = new Incident_log_event(buf, event_len, fdle);
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ ev = new Annotate_rows_log_event(buf, event_len, fdle);
+ break;
+ case START_ENCRYPTION_EVENT:
+ ev = new Start_encryption_log_event(buf, event_len, fdle);
+ break;
+ default:
+ DBUG_PRINT("error",("Unknown event code: %d",
+ (uchar) buf[EVENT_TYPE_OFFSET]));
+ ev= NULL;
+ break;
+ }
+ }
+exit:
+
+ if (ev)
+ {
+ ev->checksum_alg= alg;
+#ifdef MYSQL_CLIENT
+ if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ ev->crc= uint4korr(buf + (event_len));
+#endif
+ }
+
+ DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)",
+ ev ? ev->get_type_str() : "<unknown>",
+ (uchar)buf[EVENT_TYPE_OFFSET],
+ event_len));
+ /*
+ is_valid() are small event-specific sanity tests which are
+ important; for example there are some my_malloc() in constructors
+ (e.g. Query_log_event::Query_log_event(char*...)); when these
+ my_malloc() fail we can't return an error out of the constructor
+ (because constructor is "void") ; so instead we leave the pointer we
+ wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
+ Same for Format_description_log_event, member 'post_header_len'.
+
+ SLAVE_EVENT is never used, so it should not be read ever.
+ */
+ if (!ev || !ev->is_valid() || (event_type == SLAVE_EVENT))
+ {
+ DBUG_PRINT("error",("Found invalid event in binary log"));
+
+ delete ev;
+#ifdef MYSQL_CLIENT
+ if (!force_opt) /* then mysqlbinlog dies */
+ {
+ *error= "Found invalid event in binary log";
+ DBUG_RETURN(0);
+ }
+ ev= new Unknown_log_event(buf, fdle);
+#else
+ *error= "Found invalid event in binary log";
+ DBUG_RETURN(0);
+#endif
+ }
+ DBUG_RETURN(ev);
+}
+
+
+
+/* 2 utility functions for the next method */
+
+/**
+ Read a string with length from memory.
+
+ This function reads the string-with-length stored at
+ <code>src</code> and extract the length into <code>*len</code> and
+ a pointer to the start of the string into <code>*dst</code>. The
+ string can then be copied using <code>memcpy()</code> with the
+ number of bytes given in <code>*len</code>.
+
+ @param src Pointer to variable holding a pointer to the memory to
+ read the string from.
+ @param dst Pointer to variable holding a pointer where the actual
+ string starts. Starting from this position, the string
+ can be copied using @c memcpy().
+ @param len Pointer to variable where the length will be stored.
+ @param end One-past-the-end of the memory where the string is
+ stored.
+
+ @return Zero if the entire string can be copied successfully,
+ @c UINT_MAX if the length could not be read from memory
+ (that is, if <code>*src >= end</code>), otherwise the
+ number of bytes that are missing to read the full
+ string, which happends <code>*dst + *len >= end</code>.
+*/
+static int
+get_str_len_and_pointer(const Log_event::Byte **src,
+ const char **dst,
+ uint *len,
+ const Log_event::Byte *end)
+{
+ if (*src >= end)
+ return -1; // Will be UINT_MAX in two-complement arithmetics
+ uint length= **src;
+ if (length > 0)
+ {
+ if (*src + length >= end)
+ return (int)(*src + length - end + 1); // Number of bytes missing
+ *dst= (char *)*src + 1; // Will be copied later
+ }
+ *len= length;
+ *src+= length + 1;
+ return 0;
+}
+
+static void copy_str_and_move(const char **src,
+ Log_event::Byte **dst,
+ size_t len)
+{
+ memcpy(*dst, *src, len);
+ *src= (const char *)*dst;
+ (*dst)+= len;
+ *(*dst)++= 0;
+}
+
+
+#ifndef DBUG_OFF
+static char const *
+code_name(int code)
+{
+ static char buf[255];
+ switch (code) {
+ case Q_FLAGS2_CODE: return "Q_FLAGS2_CODE";
+ case Q_SQL_MODE_CODE: return "Q_SQL_MODE_CODE";
+ case Q_CATALOG_CODE: return "Q_CATALOG_CODE";
+ case Q_AUTO_INCREMENT: return "Q_AUTO_INCREMENT";
+ case Q_CHARSET_CODE: return "Q_CHARSET_CODE";
+ case Q_TIME_ZONE_CODE: return "Q_TIME_ZONE_CODE";
+ case Q_CATALOG_NZ_CODE: return "Q_CATALOG_NZ_CODE";
+ case Q_LC_TIME_NAMES_CODE: return "Q_LC_TIME_NAMES_CODE";
+ case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE";
+ case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE";
+ case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
+ case Q_HRNOW: return "Q_HRNOW";
+ }
+ sprintf(buf, "CODE#%d", code);
+ return buf;
+}
+#endif
+
+#define VALIDATE_BYTES_READ(CUR_POS, START, EVENT_LEN) \
+ do { \
+ uchar *cur_pos= (uchar *)CUR_POS; \
+ uchar *start= (uchar *)START; \
+ uint len= EVENT_LEN; \
+ uint bytes_read= (uint)(cur_pos - start); \
+ DBUG_PRINT("info", ("Bytes read: %u event_len:%u.\n",\
+ bytes_read, len)); \
+ if (bytes_read >= len) \
+ DBUG_VOID_RETURN; \
+ } while (0)
+
+/**
+ Macro to check that there is enough space to read from memory.
+
+ @param PTR Pointer to memory
+ @param END End of memory
+ @param CNT Number of bytes that should be read.
+ */
+#define CHECK_SPACE(PTR,END,CNT) \
+ do { \
+ DBUG_PRINT("info", ("Read %s", code_name(pos[-1]))); \
+ if ((PTR) + (CNT) > (END)) { \
+ DBUG_PRINT("info", ("query= 0")); \
+ query= 0; \
+ DBUG_VOID_RETURN; \
+ } \
+ } while (0)
+
+
+/**
+ This is used by the SQL slave thread to prepare the event before execution.
+*/
+Query_log_event::Query_log_event(const char* buf, uint event_len,
+ const Format_description_log_event
+ *description_event,
+ Log_event_type event_type)
+ :Log_event(buf, description_event), data_buf(0), query(NullS),
+ db(NullS), catalog_len(0), status_vars_len(0),
+ flags2_inited(0), sql_mode_inited(0), charset_inited(0), flags2(0),
+ auto_increment_increment(1), auto_increment_offset(1),
+ time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
+ table_map_for_update(0), master_data_written(0)
+{
+ ulong data_len;
+ uint32 tmp;
+ uint8 common_header_len, post_header_len;
+ Log_event::Byte *start;
+ const Log_event::Byte *end;
+ bool catalog_nz= 1;
+ DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
+
+ memset(&user, 0, sizeof(user));
+ memset(&host, 0, sizeof(host));
+ common_header_len= description_event->common_header_len;
+ post_header_len= description_event->post_header_len[event_type-1];
+ DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
+ event_len, common_header_len, post_header_len));
+
+ /*
+ We test if the event's length is sensible, and if so we compute data_len.
+ We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
+ We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
+ */
+ if (event_len < (uint)(common_header_len + post_header_len))
+ DBUG_VOID_RETURN;
+ data_len = event_len - (common_header_len + post_header_len);
+ buf+= common_header_len;
+
+ thread_id = slave_proxy_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
+ exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
+ db_len = (uchar)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
+ error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
+
+ /*
+ 5.0 format starts here.
+ Depending on the format, we may or not have affected/warnings etc
+ The remnent post-header to be parsed has length:
+ */
+ tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
+ if (tmp)
+ {
+ status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
+ /*
+ Check if status variable length is corrupt and will lead to very
+ wrong data. We could be even more strict and require data_len to
+ be even bigger, but this will suffice to catch most corruption
+ errors that can lead to a crash.
+ */
+ if (status_vars_len > MY_MIN(data_len, MAX_SIZE_LOG_EVENT_STATUS))
+ {
+ DBUG_PRINT("info", ("status_vars_len (%u) > data_len (%lu); query= 0",
+ status_vars_len, data_len));
+ query= 0;
+ DBUG_VOID_RETURN;
+ }
+ data_len-= status_vars_len;
+ DBUG_PRINT("info", ("Query_log_event has status_vars_len: %u",
+ (uint) status_vars_len));
+ tmp-= 2;
+ }
+ else
+ {
+ /*
+ server version < 5.0 / binlog_version < 4 master's event is
+ relay-logged with storing the original size of the event in
+ Q_MASTER_DATA_WRITTEN_CODE status variable.
+ The size is to be restored at reading Q_MASTER_DATA_WRITTEN_CODE-marked
+ event from the relay log.
+ */
+ DBUG_ASSERT(description_event->binlog_version < 4);
+ master_data_written= (uint32)data_written;
+ }
+ /*
+ We have parsed everything we know in the post header for QUERY_EVENT,
+ the rest of post header is either comes from older version MySQL or
+ dedicated to derived events (e.g. Execute_load_query...)
+ */
+
+ /* variable-part: the status vars; only in MySQL 5.0 */
+
+ start= (Log_event::Byte*) (buf+post_header_len);
+ end= (const Log_event::Byte*) (start+status_vars_len);
+ for (const Log_event::Byte* pos= start; pos < end;)
+ {
+ switch (*pos++) {
+ case Q_FLAGS2_CODE:
+ CHECK_SPACE(pos, end, 4);
+ flags2_inited= 1;
+ flags2= uint4korr(pos);
+ DBUG_PRINT("info",("In Query_log_event, read flags2: %lu", (ulong) flags2));
+ pos+= 4;
+ break;
+ case Q_SQL_MODE_CODE:
+ {
+ CHECK_SPACE(pos, end, 8);
+ sql_mode_inited= 1;
+ sql_mode= (sql_mode_t) uint8korr(pos);
+ DBUG_PRINT("info",("In Query_log_event, read sql_mode: %llu", sql_mode));
+ pos+= 8;
+ break;
+ }
+ case Q_CATALOG_NZ_CODE:
+ DBUG_PRINT("info", ("case Q_CATALOG_NZ_CODE; pos:%p; end:%p",
+ pos, end));
+ if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
+ {
+ DBUG_PRINT("info", ("query= 0"));
+ query= 0;
+ DBUG_VOID_RETURN;
+ }
+ break;
+ case Q_AUTO_INCREMENT:
+ CHECK_SPACE(pos, end, 4);
+ auto_increment_increment= uint2korr(pos);
+ auto_increment_offset= uint2korr(pos+2);
+ pos+= 4;
+ break;
+ case Q_CHARSET_CODE:
+ {
+ CHECK_SPACE(pos, end, 6);
+ charset_inited= 1;
+ memcpy(charset, pos, 6);
+ pos+= 6;
+ break;
+ }
+ case Q_TIME_ZONE_CODE:
+ {
+ if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
+ {
+ DBUG_PRINT("info", ("Q_TIME_ZONE_CODE: query= 0"));
+ query= 0;
+ DBUG_VOID_RETURN;
+ }
+ break;
+ }
+ case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
+ CHECK_SPACE(pos, end, 1);
+ if ((catalog_len= *pos))
+ catalog= (char*) pos+1; // Will be copied later
+ CHECK_SPACE(pos, end, catalog_len + 2);
+ pos+= catalog_len+2; // leap over end 0
+ catalog_nz= 0; // catalog has end 0 in event
+ break;
+ case Q_LC_TIME_NAMES_CODE:
+ CHECK_SPACE(pos, end, 2);
+ lc_time_names_number= uint2korr(pos);
+ pos+= 2;
+ break;
+ case Q_CHARSET_DATABASE_CODE:
+ CHECK_SPACE(pos, end, 2);
+ charset_database_number= uint2korr(pos);
+ pos+= 2;
+ break;
+ case Q_TABLE_MAP_FOR_UPDATE_CODE:
+ CHECK_SPACE(pos, end, 8);
+ table_map_for_update= uint8korr(pos);
+ pos+= 8;
+ break;
+ case Q_MASTER_DATA_WRITTEN_CODE:
+ CHECK_SPACE(pos, end, 4);
+ data_written= master_data_written= uint4korr(pos);
+ pos+= 4;
+ break;
+ case Q_INVOKER:
+ {
+ CHECK_SPACE(pos, end, 1);
+ user.length= *pos++;
+ CHECK_SPACE(pos, end, user.length);
+ user.str= (char *)pos;
+ pos+= user.length;
+
+ CHECK_SPACE(pos, end, 1);
+ host.length= *pos++;
+ CHECK_SPACE(pos, end, host.length);
+ host.str= (char *)pos;
+ pos+= host.length;
+ break;
+ }
+ case Q_HRNOW:
+ {
+ CHECK_SPACE(pos, end, 3);
+ when_sec_part= uint3korr(pos);
+ pos+= 3;
+ break;
+ }
+ default:
+ /* That's why you must write status vars in growing order of code */
+ DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\
+ code: %u), skipping the rest of them", (uint) *(pos-1)));
+ pos= (const uchar*) end; // Break loop
+ }
+ }
+
+#if !defined(MYSQL_CLIENT)
+ if (description_event->server_version_split.kind ==
+ Format_description_log_event::master_version_split::KIND_MYSQL)
+ {
+ // Handle MariaDB/MySQL incompatible sql_mode bits
+ sql_mode_t mysql_sql_mode= sql_mode;
+ sql_mode&= MODE_MASK_MYSQL_COMPATIBLE; // Unset MySQL specific bits
+
+ /*
+ sql_mode flags related to fraction second rounding/truncation
+ have opposite meaning in MySQL vs MariaDB.
+ MySQL:
+ - rounds fractional seconds by default
+ - truncates if TIME_TRUNCATE_FRACTIONAL is set
+ MariaDB:
+ - truncates fractional seconds by default
+ - rounds if TIME_ROUND_FRACTIONAL is set
+ */
+ if (description_event->server_version_split >= fsp_version_split_mysql &&
+ !(mysql_sql_mode & MODE_MYSQL80_TIME_TRUNCATE_FRACTIONAL))
+ sql_mode|= MODE_TIME_ROUND_FRACTIONAL;
+ }
+#endif
+
+ /**
+ Layout for the data buffer is as follows
+ +--------+-----------+------+------+---------+----+-------+
+ | catlog | time_zone | user | host | db name | \0 | Query |
+ +--------+-----------+------+------+---------+----+-------+
+
+ To support the query cache we append the following buffer to the above
+ +-------+----------------------------------------+-------+
+ |db len | uninitiatlized space of size of db len | FLAGS |
+ +-------+----------------------------------------+-------+
+
+ The area of buffer starting from Query field all the way to the end belongs
+ to the Query buffer and its structure is described in alloc_query() in
+ sql_parse.cc
+ */
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
+ if (!(start= data_buf = (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
+ catalog_len + 1
+ + time_zone_len + 1
+ + user.length + 1
+ + host.length + 1
+ + data_len + 1
+ + sizeof(size_t)//for db_len
+ + db_len + 1
+ + QUERY_CACHE_DB_LENGTH_SIZE
+ + QUERY_CACHE_FLAGS_SIZE,
+ MYF(MY_WME))))
+#else
+ if (!(start= data_buf = (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
+ catalog_len + 1
+ + time_zone_len + 1
+ + user.length + 1
+ + host.length + 1
+ + data_len + 1,
+ MYF(MY_WME))))
+#endif
+ DBUG_VOID_RETURN;
+ if (catalog_len) // If catalog is given
+ {
+ /**
+ @todo we should clean up and do only copy_str_and_move; it
+ works for both cases. Then we can remove the catalog_nz
+ flag. /sven
+ */
+ if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
+ copy_str_and_move(&catalog, &start, catalog_len);
+ else
+ {
+ memcpy(start, catalog, catalog_len+1); // copy end 0
+ catalog= (const char *)start;
+ start+= catalog_len+1;
+ }
+ }
+ if (time_zone_len)
+ copy_str_and_move(&time_zone_str, &start, time_zone_len);
+
+ if (user.length)
+ {
+ copy_str_and_move(&user.str, &start, user.length);
+ }
+ else
+ {
+ user.str= (char*) start;
+ *(start++)= 0;
+ }
+
+ if (host.length)
+ copy_str_and_move(&host.str, &start, host.length);
+ else
+ {
+ host.str= (char*) start;
+ *(start++)= 0;
+ }
+
+ /**
+ if time_zone_len or catalog_len are 0, then time_zone and catalog
+ are uninitialized at this point. shouldn't they point to the
+ zero-length null-terminated strings we allocated space for in the
+ my_alloc call above? /sven
+ */
+
+ /* A 2nd variable part; this is common to all versions */
+ memcpy((char*) start, end, data_len); // Copy db and query
+ start[data_len]= '\0'; // End query with \0 (For safetly)
+ db= (char *)start;
+ query= (char *)(start + db_len + 1);
+ q_len= data_len - db_len -1;
+
+ if (data_len && (data_len < db_len ||
+ data_len < q_len ||
+ data_len != (db_len + q_len + 1)))
+ {
+ q_len= 0;
+ query= NULL;
+ DBUG_VOID_RETURN;
+ }
+
+ uint32 max_length= uint32(event_len - ((const char*)(end + db_len + 1) -
+ (buf - common_header_len)));
+ if (q_len != max_length ||
+ (event_len < uint((const char*)(end + db_len + 1) -
+ (buf - common_header_len))))
+ {
+ q_len= 0;
+ query= NULL;
+ DBUG_VOID_RETURN;
+ }
+ /**
+ Append the db length at the end of the buffer. This will be used by
+ Query_cache::send_result_to_client() in case the query cache is On.
+ */
+#if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
+ size_t db_length= (size_t)db_len;
+ memcpy(start + data_len + 1, &db_length, sizeof(size_t));
+#endif
+ DBUG_VOID_RETURN;
+}
+
+Query_compressed_log_event::Query_compressed_log_event(const char *buf,
+ uint event_len,
+ const Format_description_log_event
+ *description_event,
+ Log_event_type event_type)
+ :Query_log_event(buf, event_len, description_event, event_type),
+ query_buf(NULL)
+{
+ if(query)
+ {
+ uint32 un_len=binlog_get_uncompress_len(query);
+ if (!un_len)
+ {
+ query = 0;
+ return;
+ }
+
+ /* Reserve one byte for '\0' */
+ query_buf = (Log_event::Byte*)my_malloc(PSI_INSTRUMENT_ME,
+ ALIGN_SIZE(un_len + 1), MYF(MY_WME));
+ if(query_buf &&
+ !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len))
+ {
+ query_buf[un_len] = 0;
+ query = (const char *)query_buf;
+ q_len = un_len;
+ }
+ else
+ {
+ query= 0;
+ }
+ }
+}
+
+/*
+ Replace a binlog event read into a packet with a dummy event. Either a
+ Query_log_event that has just a comment, or if that will not fit in the
+ space used for the event to be replaced, then a NULL user_var event.
+
+ This is used when sending binlog data to a slave which does not understand
+ this particular event and which is too old to support informational events
+ or holes in the event stream.
+
+ This allows to write such events into the binlog on the master and still be
+ able to replicate against old slaves without them breaking.
+
+ Clears the flag LOG_EVENT_THREAD_SPECIFIC_F and set LOG_EVENT_SUPPRESS_USE_F.
+ Overwrites the type with QUERY_EVENT (or USER_VAR_EVENT), and replaces the
+ body with a minimal query / NULL user var.
+
+ Returns zero on success, -1 if error due to too little space in original
+ event. A minimum of 25 bytes (19 bytes fixed header + 6 bytes in the body)
+ is needed in any event to be replaced with a dummy event.
+*/
+int
+Query_log_event::dummy_event(String *packet, ulong ev_offset,
+ enum enum_binlog_checksum_alg checksum_alg)
+{
+ uchar *p= (uchar *)packet->ptr() + ev_offset;
+ size_t data_len= packet->length() - ev_offset;
+ uint16 flags;
+ static const size_t min_user_var_event_len=
+ LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + 1 + UV_VAL_IS_NULL; // 25
+ static const size_t min_query_event_len=
+ LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN + 1 + 1; // 34
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ data_len-= BINLOG_CHECKSUM_LEN;
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ if (data_len < min_user_var_event_len)
+ /* Cannot replace with dummy, event too short. */
+ return -1;
+
+ flags= uint2korr(p + FLAGS_OFFSET);
+ flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
+ flags|= LOG_EVENT_SUPPRESS_USE_F;
+ int2store(p + FLAGS_OFFSET, flags);
+
+ if (data_len < min_query_event_len)
+ {
+ /*
+ Have to use dummy user_var event for such a short packet.
+
+ This works, but the event will be considered part of an event group with
+ the following event. So for example @@global.sql_slave_skip_counter=1
+ will skip not only the dummy event, but also the immediately following
+ event.
+
+ We write a NULL user var with the name @`!dummyvar` (or as much
+ as that as will fit within the size of the original event - so
+ possibly just @`!`).
+ */
+ static const char var_name[]= "!dummyvar";
+ size_t name_len= data_len - (min_user_var_event_len - 1);
+
+ p[EVENT_TYPE_OFFSET]= USER_VAR_EVENT;
+ int4store(p + LOG_EVENT_HEADER_LEN, name_len);
+ memcpy(p + LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE, var_name, name_len);
+ p[LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + name_len]= 1; // indicates NULL
+ }
+ else
+ {
+ /*
+ Use a dummy query event, just a comment.
+ */
+ static const char message[]=
+ "# Dummy event replacing event type %u that slave cannot handle.";
+ char buf[sizeof(message)+1]; /* +1, as %u can expand to 3 digits. */
+ uchar old_type= p[EVENT_TYPE_OFFSET];
+ uchar *q= p + LOG_EVENT_HEADER_LEN;
+ size_t comment_len, len;
+
+ p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
+ int4store(q + Q_THREAD_ID_OFFSET, 0);
+ int4store(q + Q_EXEC_TIME_OFFSET, 0);
+ q[Q_DB_LEN_OFFSET]= 0;
+ int2store(q + Q_ERR_CODE_OFFSET, 0);
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
+ q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 1;
+ len= my_snprintf(buf, sizeof(buf), message, old_type);
+ comment_len= data_len - (min_query_event_len - 1);
+ if (comment_len <= len)
+ memcpy(q, buf, comment_len);
+ else
+ {
+ memcpy(q, buf, len);
+ memset(q+len, ' ', comment_len - len);
+ }
+ }
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ ha_checksum crc= my_checksum(0, p, data_len);
+ int4store(p + data_len, crc);
+ }
+ return 0;
+}
+
+/*
+ Replace an event (GTID event) with a BEGIN query event, to be compatible
+ with an old slave.
+*/
+int
+Query_log_event::begin_event(String *packet, ulong ev_offset,
+ enum enum_binlog_checksum_alg checksum_alg)
+{
+ uchar *p= (uchar *)packet->ptr() + ev_offset;
+ uchar *q= p + LOG_EVENT_HEADER_LEN;
+ size_t data_len= packet->length() - ev_offset;
+ uint16 flags;
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ data_len-= BINLOG_CHECKSUM_LEN;
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ /*
+ Currently we only need to replace GTID event.
+ The length of GTID differs depending on whether it contains commit id.
+ */
+ DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN ||
+ data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
+ if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN &&
+ data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2)
+ return 1;
+
+ flags= uint2korr(p + FLAGS_OFFSET);
+ flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
+ flags|= LOG_EVENT_SUPPRESS_USE_F;
+ int2store(p + FLAGS_OFFSET, flags);
+
+ p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
+ int4store(q + Q_THREAD_ID_OFFSET, 0);
+ int4store(q + Q_EXEC_TIME_OFFSET, 0);
+ q[Q_DB_LEN_OFFSET]= 0;
+ int2store(q + Q_ERR_CODE_OFFSET, 0);
+ if (data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ {
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
+ q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 1;
+ }
+ else
+ {
+ DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
+ /* Put in an empty time_zone_str to take up the extra 2 bytes. */
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 2);
+ q[Q_DATA_OFFSET]= Q_TIME_ZONE_CODE;
+ q[Q_DATA_OFFSET+1]= 0; /* Zero length for empty time_zone_str */
+ q[Q_DATA_OFFSET+2]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 3;
+ }
+ memcpy(q, "BEGIN", 5);
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ ha_checksum crc= my_checksum(0, p, data_len);
+ int4store(p + data_len, crc);
+ }
+ return 0;
+}
+
+
+/**************************************************************************
+ Start_log_event_v3 methods
+**************************************************************************/
+
+
+Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ :Log_event(buf, description_event), binlog_version(BINLOG_VERSION)
+{
+ if (event_len < LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET)
+ {
+ server_version[0]= 0;
+ return;
+ }
+ buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
+ binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
+ memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
+ ST_SERVER_VER_LEN);
+ // prevent overrun if log is corrupted on disk
+ server_version[ST_SERVER_VER_LEN-1]= 0;
+ created= uint4korr(buf+ST_CREATED_OFFSET);
+ dont_set_created= 1;
+}
+
+
+/***************************************************************************
+ Format_description_log_event methods
+****************************************************************************/
+
+/**
+ Format_description_log_event 1st ctor.
+
+ Ctor. Can be used to create the event to write to the binary log (when the
+ server starts or when FLUSH LOGS), or to create artificial events to parse
+ binlogs from MySQL 3.23 or 4.x.
+ When in a client, only the 2nd use is possible.
+
+ @param binlog_version the binlog version for which we want to build
+ an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
+ x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
+ old 4.0 (binlog version 2) is not supported;
+ it should not be used for replication with
+ 5.0.
+ @param server_ver a string containing the server version.
+*/
+
+Format_description_log_event::
+Format_description_log_event(uint8 binlog_ver, const char* server_ver)
+ :Start_log_event_v3(), event_type_permutation(0)
+{
+ binlog_version= binlog_ver;
+ switch (binlog_ver) {
+ case 4: /* MySQL 5.0 */
+ memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
+ DBUG_EXECUTE_IF("pretend_version_50034_in_binlog",
+ strmov(server_version, "5.0.34"););
+ common_header_len= LOG_EVENT_HEADER_LEN;
+ number_of_event_types= LOG_EVENT_TYPES;
+ /* we'll catch my_malloc() error in is_valid() */
+ post_header_len=(uint8*) my_malloc(PSI_INSTRUMENT_ME,
+ number_of_event_types*sizeof(uint8)
+ + BINLOG_CHECKSUM_ALG_DESC_LEN,
+ MYF(0));
+ /*
+ This long list of assignments is not beautiful, but I see no way to
+ make it nicer, as the right members are #defines, not array members, so
+ it's impossible to write a loop.
+ */
+ if (post_header_len)
+ {
+#ifndef DBUG_OFF
+ // Allows us to sanity-check that all events initialized their
+ // events (see the end of this 'if' block).
+ memset(post_header_len, 255, number_of_event_types*sizeof(uint8));
+#endif
+
+ /* Note: all event types must explicitly fill in their lengths here. */
+ post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
+ post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
+ post_header_len[STOP_EVENT-1]= STOP_HEADER_LEN;
+ post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
+ post_header_len[INTVAR_EVENT-1]= INTVAR_HEADER_LEN;
+ post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
+ post_header_len[SLAVE_EVENT-1]= SLAVE_HEADER_LEN;
+ post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
+ post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
+ post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
+ post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
+ post_header_len[NEW_LOAD_EVENT-1]= NEW_LOAD_HEADER_LEN;
+ post_header_len[RAND_EVENT-1]= RAND_HEADER_LEN;
+ post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN;
+ post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
+ post_header_len[XID_EVENT-1]= XID_HEADER_LEN;
+ post_header_len[XA_PREPARE_LOG_EVENT-1]= XA_PREPARE_HEADER_LEN;
+ post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN;
+ post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
+ /*
+ The PRE_GA events are never be written to any binlog, but
+ their lengths are included in Format_description_log_event.
+ Hence, we need to be assign some value here, to avoid reading
+ uninitialized memory when the array is written to disk.
+ */
+ post_header_len[PRE_GA_WRITE_ROWS_EVENT-1] = 0;
+ post_header_len[PRE_GA_UPDATE_ROWS_EVENT-1] = 0;
+ post_header_len[PRE_GA_DELETE_ROWS_EVENT-1] = 0;
+
+ post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
+ post_header_len[WRITE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[UPDATE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[DELETE_ROWS_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ /*
+ We here have the possibility to simulate a master of before we changed
+ the table map id to be stored in 6 bytes: when it was stored in 4
+ bytes (=> post_header_len was 6). This is used to test backward
+ compatibility.
+ This code can be removed after a few months (today is Dec 21st 2005),
+ when we know that the 4-byte masters are not deployed anymore (check
+ with Tomas Ulin first!), and the accompanying test (rpl_row_4_bytes)
+ too.
+ */
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ post_header_len[TABLE_MAP_EVENT-1]=
+ post_header_len[WRITE_ROWS_EVENT_V1-1]=
+ post_header_len[UPDATE_ROWS_EVENT_V1-1]=
+ post_header_len[DELETE_ROWS_EVENT_V1-1]= 6;);
+ post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
+ post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
+ post_header_len[IGNORABLE_LOG_EVENT-1]= 0;
+ post_header_len[ROWS_QUERY_LOG_EVENT-1]= 0;
+ post_header_len[GTID_LOG_EVENT-1]= 0;
+ post_header_len[ANONYMOUS_GTID_LOG_EVENT-1]= 0;
+ post_header_len[PREVIOUS_GTIDS_LOG_EVENT-1]= 0;
+ post_header_len[TRANSACTION_CONTEXT_EVENT-1]= 0;
+ post_header_len[VIEW_CHANGE_EVENT-1]= 0;
+ post_header_len[XA_PREPARE_LOG_EVENT-1]= 0;
+ post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
+
+ // Set header length of the reserved events to 0
+ memset(post_header_len + MYSQL_EVENTS_END - 1, 0,
+ (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8));
+
+ // Set header lengths of Maria events
+ post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
+ post_header_len[BINLOG_CHECKPOINT_EVENT-1]=
+ BINLOG_CHECKPOINT_HEADER_LEN;
+ post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN;
+ post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
+ post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN;
+
+ //compressed event
+ post_header_len[QUERY_COMPRESSED_EVENT-1]= QUERY_HEADER_LEN;
+ post_header_len[WRITE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[UPDATE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[DELETE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+
+ // Sanity-check that all post header lengths are initialized.
+ int i;
+ for (i=0; i<number_of_event_types; i++)
+ DBUG_ASSERT(post_header_len[i] != 255);
+ }
+ break;
+
+ case 1: /* 3.23 */
+ case 3: /* 4.0.x x>=2 */
+ /*
+ We build an artificial (i.e. not sent by the master) event, which
+ describes what those old master versions send.
+ */
+ if (binlog_ver==1)
+ strmov(server_version, server_ver ? server_ver : "3.23");
+ else
+ strmov(server_version, server_ver ? server_ver : "4.0");
+ common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
+ LOG_EVENT_MINIMAL_HEADER_LEN;
+ /*
+ The first new event in binlog version 4 is Format_desc. So any event type
+ after that does not exist in older versions. We use the events known by
+ version 3, even if version 1 had only a subset of them (this is not a
+ problem: it uses a few bytes for nothing but unifies code; it does not
+ make the slave detect less corruptions).
+ */
+ number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
+ post_header_len=(uint8*) my_malloc(PSI_INSTRUMENT_ME,
+ number_of_event_types*sizeof(uint8), MYF(0));
+ if (post_header_len)
+ {
+ post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
+ post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
+ post_header_len[STOP_EVENT-1]= 0;
+ post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
+ post_header_len[INTVAR_EVENT-1]= 0;
+ post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
+ post_header_len[SLAVE_EVENT-1]= 0;
+ post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
+ post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
+ post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
+ post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
+ post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
+ post_header_len[RAND_EVENT-1]= 0;
+ post_header_len[USER_VAR_EVENT-1]= 0;
+ }
+ break;
+ default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
+ post_header_len= 0; /* will make is_valid() fail */
+ break;
+ }
+ calc_server_version_split();
+ checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ reset_crypto();
+}
+
+
+/**
+ The problem with this constructor is that the fixed header may have a
+ length different from this version, but we don't know this length as we
+ have not read the Format_description_log_event which says it, yet. This
+ length is in the post-header of the event, but we don't know where the
+ post-header starts.
+
+ So this type of event HAS to:
+ - either have the header's length at the beginning (in the header, at a
+ fixed position which will never be changed), not in the post-header. That
+ would make the header be "shifted" compared to other events.
+ - or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
+ versions, so that we know for sure.
+
+ I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
+ it is sent before Format_description_log_event).
+*/
+
+Format_description_log_event::
+Format_description_log_event(const char* buf,
+ uint event_len,
+ const
+ Format_description_log_event*
+ description_event)
+ :Start_log_event_v3(buf, event_len, description_event),
+ common_header_len(0), post_header_len(NULL), event_type_permutation(0)
+{
+ DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)");
+ if (!Start_log_event_v3::is_valid())
+ DBUG_VOID_RETURN; /* sanity check */
+ buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
+ if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
+ DBUG_VOID_RETURN; /* sanity check */
+ number_of_event_types=
+ event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET + 1);
+ DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d",
+ common_header_len, number_of_event_types));
+ /* If alloc fails, we'll detect it in is_valid() */
+
+ post_header_len= (uint8*) my_memdup(PSI_INSTRUMENT_ME,
+ buf+ST_COMMON_HEADER_LEN_OFFSET+1,
+ number_of_event_types*
+ sizeof(*post_header_len),
+ MYF(0));
+ calc_server_version_split();
+ if (!is_version_before_checksum(&server_version_split))
+ {
+ /* the last bytes are the checksum alg desc and value (or value's room) */
+ number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
+ checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types];
+ }
+ else
+ {
+ checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ }
+ reset_crypto();
+
+ DBUG_VOID_RETURN;
+}
+
+bool Format_description_log_event::start_decryption(Start_encryption_log_event* sele)
+{
+ DBUG_ASSERT(crypto_data.scheme == 0);
+
+ if (!sele->is_valid())
+ return 1;
+
+ memcpy(crypto_data.nonce, sele->nonce, BINLOG_NONCE_LENGTH);
+ return crypto_data.init(sele->crypto_scheme, sele->key_version);
+}
+
+
+Version::Version(const char *version, const char **endptr)
+{
+ const char *p= version;
+ ulong number;
+ for (uint i= 0; i<=2; i++)
+ {
+ char *r;
+ number= strtoul(p, &r, 10);
+ /*
+ It is an invalid version if any version number greater than 255 or
+ first number is not followed by '.'.
+ */
+ if (number < 256 && (*r == '.' || i != 0))
+ m_ver[i]= (uchar) number;
+ else
+ {
+ *this= Version();
+ break;
+ }
+
+ p= r;
+ if (*r == '.')
+ p++; // skip the dot
+ }
+ endptr[0]= p;
+}
+
+
+Format_description_log_event::
+ master_version_split::master_version_split(const char *version)
+{
+ const char *p;
+ static_cast<Version*>(this)[0]= Version(version, &p);
+ if (strstr(p, "MariaDB") != 0 || strstr(p, "-maria-") != 0)
+ kind= KIND_MARIADB;
+ else
+ kind= KIND_MYSQL;
+}
+
+
+/**
+ Splits the event's 'server_version' string into three numeric pieces stored
+ into 'server_version_split':
+ X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
+ X.Yabc -> {X,Y,0}
+ 'server_version_split' is then used for lookups to find if the server which
+ created this event has some known bug.
+*/
+void Format_description_log_event::calc_server_version_split()
+{
+ server_version_split= master_version_split(server_version);
+
+ DBUG_PRINT("info",("Format_description_log_event::server_version_split:"
+ " '%s' %d %d %d", server_version,
+ server_version_split[0],
+ server_version_split[1], server_version_split[2]));
+}
+
+
+/**
+ @return TRUE is the event's version is earlier than one that introduced
+ the replication event checksum. FALSE otherwise.
+*/
+bool
+Format_description_log_event::is_version_before_checksum(const master_version_split
+ *version_split)
+{
+ return *version_split <
+ (version_split->kind == master_version_split::KIND_MARIADB ?
+ checksum_version_split_mariadb : checksum_version_split_mysql);
+}
+
+/**
+ @param buf buffer holding serialized FD event
+ @param len netto (possible checksum is stripped off) length of the event buf
+
+ @return the version-safe checksum alg descriptor where zero
+ designates no checksum, 255 - the orginator is
+ checksum-unaware (effectively no checksum) and the actuall
+ [1-254] range alg descriptor.
+*/
+enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len)
+{
+ enum enum_binlog_checksum_alg ret;
+ char version[ST_SERVER_VER_LEN];
+
+ DBUG_ENTER("get_checksum_alg");
+ DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
+
+ memcpy(version,
+ buf + LOG_EVENT_MINIMAL_HEADER_LEN + ST_SERVER_VER_OFFSET,
+ ST_SERVER_VER_LEN);
+ version[ST_SERVER_VER_LEN - 1]= 0;
+
+ Format_description_log_event::master_version_split version_split(version);
+ ret= Format_description_log_event::is_version_before_checksum(&version_split)
+ ? BINLOG_CHECKSUM_ALG_UNDEF
+ : (enum_binlog_checksum_alg)buf[len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN];
+ DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF ||
+ ret == BINLOG_CHECKSUM_ALG_UNDEF ||
+ ret == BINLOG_CHECKSUM_ALG_CRC32);
+ DBUG_RETURN(ret);
+}
+
+Start_encryption_log_event::Start_encryption_log_event(
+ const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ if ((int)event_len ==
+ LOG_EVENT_MINIMAL_HEADER_LEN + Start_encryption_log_event::get_data_size())
+ {
+ buf += LOG_EVENT_MINIMAL_HEADER_LEN;
+ crypto_scheme = *(uchar*)buf;
+ key_version = uint4korr(buf + BINLOG_CRYPTO_SCHEME_LENGTH);
+ memcpy(nonce,
+ buf + BINLOG_CRYPTO_SCHEME_LENGTH + BINLOG_KEY_VERSION_LENGTH,
+ BINLOG_NONCE_LENGTH);
+ }
+ else
+ crypto_scheme= ~0; // invalid
+}
+
+
+ /**************************************************************************
+ Load_log_event methods
+ General note about Load_log_event: the binlogging of LOAD DATA INFILE is
+ going to be changed in 5.0 (or maybe in 5.1; not decided yet).
+ However, the 5.0 slave could still have to read such events (from a 4.x
+ master), convert them (which just means maybe expand the header, when 5.0
+ servers have a UID in events) (remember that whatever is after the header
+ will be like in 4.x, as this event's format is not modified in 5.0 as we
+ will use new types of events to log the new LOAD DATA INFILE features).
+ To be able to read/convert, we just need to not assume that the common
+ header is of length LOG_EVENT_HEADER_LEN (we must use the description
+ event).
+ Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
+ between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
+ positions displayed in SHOW SLAVE STATUS then are fine too).
+ **************************************************************************/
+
+
+/**
+ @note
+ The caller must do buf[event_len] = 0 before he starts using the
+ constructed event.
+*/
+Load_log_event::Load_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ :Log_event(buf, description_event), num_fields(0), fields(0),
+ field_lens(0),field_block_len(0),
+ table_name(0), db(0), fname(0), local_fname(FALSE),
+ /*
+ Load_log_event which comes from the binary log does not contain
+ information about the type of insert which was used on the master.
+ Assume that it was an ordinary, non-concurrent LOAD DATA.
+ */
+ is_concurrent(FALSE)
+{
+ DBUG_ENTER("Load_log_event");
+ /*
+ I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
+ 4.0->5.0 and 5.0->5.0 and it works.
+ */
+ if (event_len)
+ copy_log_event(buf, event_len,
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN +
+ description_event->common_header_len :
+ LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
+ description_event);
+ /* otherwise it's a derived class, will call copy_log_event() itself */
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Load_log_event::copy_log_event()
+*/
+
+int Load_log_event::copy_log_event(const char *buf, ulong event_len,
+ int body_offset,
+ const Format_description_log_event *description_event)
+{
+ DBUG_ENTER("Load_log_event::copy_log_event");
+ uint data_len;
+ if ((int) event_len <= body_offset)
+ DBUG_RETURN(1);
+ char* buf_end = (char*)buf + event_len;
+ /* this is the beginning of the post-header */
+ const char* data_head = buf + description_event->common_header_len;
+ thread_id= slave_proxy_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
+ exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
+ skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
+ table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
+ db_len = (uint)data_head[L_DB_LEN_OFFSET];
+ num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
+
+ /*
+ Sql_ex.init() on success returns the pointer to the first byte after
+ the sql_ex structure, which is the start of field lengths array.
+ */
+ if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
+ buf_end,
+ (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
+ DBUG_RETURN(1);
+
+ data_len = event_len - body_offset;
+ if (num_fields > data_len) // simple sanity check against corruption
+ DBUG_RETURN(1);
+ for (uint i = 0; i < num_fields; i++)
+ field_block_len += (uint)field_lens[i] + 1;
+
+ fields = (char*)field_lens + num_fields;
+ table_name = fields + field_block_len;
+ if (strlen(table_name) > NAME_LEN)
+ goto err;
+
+ db = table_name + table_name_len + 1;
+ DBUG_EXECUTE_IF ("simulate_invalid_address",
+ db_len = data_len;);
+ fname = db + db_len + 1;
+ if ((db_len > data_len) || (fname > buf_end))
+ goto err;
+ fname_len = (uint) strlen(fname);
+ if ((fname_len > data_len) || (fname + fname_len > buf_end))
+ goto err;
+ // null termination is accomplished by the caller doing buf[event_len]=0
+
+ DBUG_RETURN(0);
+
+err:
+ // Invalid event.
+ table_name = 0;
+ DBUG_RETURN(1);
+}
+
+
+/**************************************************************************
+ Rotate_log_event methods
+**************************************************************************/
+
+Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
+{
+ DBUG_ENTER("Rotate_log_event::Rotate_log_event(char*,...)");
+ // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
+ uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
+ uint ident_offset;
+ if (event_len < (uint)(LOG_EVENT_MINIMAL_HEADER_LEN + post_header_len))
+ DBUG_VOID_RETURN;
+ buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
+ pos= post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
+ ident_len= (uint)(event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + post_header_len));
+ ident_offset= post_header_len;
+ set_if_smaller(ident_len,FN_REFLEN-1);
+ new_log_ident= my_strndup(PSI_INSTRUMENT_ME, buf + ident_offset, (uint) ident_len, MYF(MY_WME));
+ DBUG_PRINT("debug", ("new_log_ident: '%s'", new_log_ident));
+ DBUG_VOID_RETURN;
+}
+
+
+/**************************************************************************
+ Binlog_checkpoint_log_event methods
+**************************************************************************/
+
+Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
+ const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ :Log_event(buf, description_event), binlog_file_name(0)
+{
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len=
+ description_event->post_header_len[BINLOG_CHECKPOINT_EVENT-1];
+ if (event_len < (uint) header_size + (uint) post_header_len ||
+ post_header_len < BINLOG_CHECKPOINT_HEADER_LEN)
+ return;
+ buf+= header_size;
+ /* See uint4korr and int4store below */
+ compile_time_assert(BINLOG_CHECKPOINT_HEADER_LEN == 4);
+ binlog_file_len= uint4korr(buf);
+ if (event_len - (header_size + post_header_len) < binlog_file_len)
+ return;
+ binlog_file_name= my_strndup(PSI_INSTRUMENT_ME, buf + post_header_len, binlog_file_len,
+ MYF(MY_WME));
+ return;
+}
+
+
+/**************************************************************************
+ Global transaction ID stuff
+**************************************************************************/
+
+Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ : Log_event(buf, description_event), seq_no(0), commit_id(0)
+{
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
+ if (event_len < (uint) header_size + (uint) post_header_len ||
+ post_header_len < GTID_HEADER_LEN)
+ return;
+
+ buf+= header_size;
+ seq_no= uint8korr(buf);
+ buf+= 8;
+ domain_id= uint4korr(buf);
+ buf+= 4;
+ flags2= *(buf++);
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ {
+ if (event_len < (uint)header_size + GTID_HEADER_LEN + 2)
+ {
+ seq_no= 0; // So is_valid() returns false
+ return;
+ }
+ commit_id= uint8korr(buf);
+ buf+= 8;
+ }
+ if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA))
+ {
+ xid.formatID= uint4korr(buf);
+ buf+= 4;
+
+ xid.gtrid_length= (long) buf[0];
+ xid.bqual_length= (long) buf[1];
+ buf+= 2;
+
+ long data_length= xid.bqual_length + xid.gtrid_length;
+ memcpy(xid.data, buf, data_length);
+ buf+= data_length;
+ }
+}
+
+
+/* GTID list. */
+
+Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ : Log_event(buf, description_event), count(0), list(0), sub_id_list(0)
+{
+ uint32 i;
+ uint32 val;
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
+ if (event_len < (uint) header_size + (uint) post_header_len ||
+ post_header_len < GTID_LIST_HEADER_LEN)
+ return;
+
+ buf+= header_size;
+ val= uint4korr(buf);
+ count= val & ((1<<28)-1);
+ gl_flags= val & ((uint32)0xf << 28);
+ buf+= 4;
+ if (event_len - (header_size + post_header_len) < count*element_size ||
+ (!(list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
+ count*sizeof(*list) + (count == 0), MYF(MY_WME)))))
+ return;
+
+ for (i= 0; i < count; ++i)
+ {
+ list[i].domain_id= uint4korr(buf);
+ buf+= 4;
+ list[i].server_id= uint4korr(buf);
+ buf+= 4;
+ list[i].seq_no= uint8korr(buf);
+ buf+= 8;
+ }
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+ if ((gl_flags & FLAG_IGN_GTIDS))
+ {
+ uint32 i;
+ if (!(sub_id_list= (uint64 *)my_malloc(PSI_INSTRUMENT_ME,
+ count*sizeof(uint64), MYF(MY_WME))))
+ {
+ my_free(list);
+ list= NULL;
+ return;
+ }
+ for (i= 0; i < count; ++i)
+ {
+ if (!(sub_id_list[i]=
+ rpl_global_gtid_slave_state->next_sub_id(list[i].domain_id)))
+ {
+ my_free(list);
+ my_free(sub_id_list);
+ list= NULL;
+ sub_id_list= NULL;
+ return;
+ }
+ }
+ }
+#endif
+}
+
+
+/*
+ Used to record gtid_list event while sending binlog to slave, without having to
+ fully contruct the event object.
+*/
+bool
+Gtid_list_log_event::peek(const char *event_start, size_t event_len,
+ enum enum_binlog_checksum_alg checksum_alg,
+ rpl_gtid **out_gtid_list, uint32 *out_list_len,
+ const Format_description_log_event *fdev)
+{
+ const char *p;
+ uint32 count_field, count;
+ rpl_gtid *gtid_list;
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ if (event_len > BINLOG_CHECKSUM_LEN)
+ event_len-= BINLOG_CHECKSUM_LEN;
+ else
+ event_len= 0;
+ }
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN)
+ return true;
+ p= event_start + fdev->common_header_len;
+ count_field= uint4korr(p);
+ p+= 4;
+ count= count_field & ((1<<28)-1);
+ if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN +
+ 16 * count)
+ return true;
+ if (!(gtid_list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
+ sizeof(rpl_gtid)*count + (count == 0), MYF(MY_WME))))
+ return true;
+ *out_gtid_list= gtid_list;
+ *out_list_len= count;
+ while (count--)
+ {
+ gtid_list->domain_id= uint4korr(p);
+ p+= 4;
+ gtid_list->server_id= uint4korr(p);
+ p+= 4;
+ gtid_list->seq_no= uint8korr(p);
+ p+= 8;
+ ++gtid_list;
+ }
+
+ return false;
+}
+
+
+/**************************************************************************
+ Intvar_log_event methods
+**************************************************************************/
+
+/*
+ Intvar_log_event::Intvar_log_event()
+*/
+
+Intvar_log_event::Intvar_log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ /* The Post-Header is empty. The Variable Data part begins immediately. */
+ buf+= description_event->common_header_len +
+ description_event->post_header_len[INTVAR_EVENT-1];
+ type= buf[I_TYPE_OFFSET];
+ val= uint8korr(buf+I_VAL_OFFSET);
+}
+
+
+/*
+ Intvar_log_event::get_var_type_name()
+*/
+
+const char* Intvar_log_event::get_var_type_name()
+{
+ switch(type) {
+ case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
+ case INSERT_ID_EVENT: return "INSERT_ID";
+ default: /* impossible */ return "UNKNOWN";
+ }
+}
+
+
+/**************************************************************************
+ Rand_log_event methods
+**************************************************************************/
+
+Rand_log_event::Rand_log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ /* The Post-Header is empty. The Variable Data part begins immediately. */
+ buf+= description_event->common_header_len +
+ description_event->post_header_len[RAND_EVENT-1];
+ seed1= uint8korr(buf+RAND_SEED1_OFFSET);
+ seed2= uint8korr(buf+RAND_SEED2_OFFSET);
+}
+
+
+/**************************************************************************
+ Xid_log_event methods
+**************************************************************************/
+
+/**
+ @note
+ It's ok not to use int8store here,
+ as long as xid_t::set(ulonglong) and
+ xid_t::get_my_xid doesn't do it either.
+ We don't care about actual values of xids as long as
+ identical numbers compare identically
+*/
+
+Xid_log_event::
+Xid_log_event(const char* buf,
+ const Format_description_log_event *description_event)
+ :Xid_apply_log_event(buf, description_event)
+{
+ /* The Post-Header is empty. The Variable Data part begins immediately. */
+ buf+= description_event->common_header_len +
+ description_event->post_header_len[XID_EVENT-1];
+ memcpy((char*) &xid, buf, sizeof(xid));
+}
+
+/**************************************************************************
+ XA_prepare_log_event methods
+**************************************************************************/
+XA_prepare_log_event::
+XA_prepare_log_event(const char* buf,
+ const Format_description_log_event *description_event)
+ :Xid_apply_log_event(buf, description_event)
+{
+ buf+= description_event->common_header_len +
+ description_event->post_header_len[XA_PREPARE_LOG_EVENT-1];
+ one_phase= * (bool *) buf;
+ buf+= 1;
+
+ m_xid.formatID= uint4korr(buf);
+ buf+= 4;
+ m_xid.gtrid_length= uint4korr(buf);
+ buf+= 4;
+ // Todo: validity here and elsewhere checks to be replaced by MDEV-21839 fixes
+ if (m_xid.gtrid_length <= 0 || m_xid.gtrid_length > MAXGTRIDSIZE)
+ {
+ m_xid.formatID= -1;
+ return;
+ }
+ m_xid.bqual_length= uint4korr(buf);
+ buf+= 4;
+ if (m_xid.bqual_length < 0 || m_xid.bqual_length > MAXBQUALSIZE)
+ {
+ m_xid.formatID= -1;
+ return;
+ }
+ DBUG_ASSERT(m_xid.gtrid_length + m_xid.bqual_length <= XIDDATASIZE);
+
+ memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length);
+
+ xid= NULL;
+}
+
+
+/**************************************************************************
+ User_var_log_event methods
+**************************************************************************/
+
+User_var_log_event::
+User_var_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+#ifndef MYSQL_CLIENT
+ , deferred(false), query_id(0)
+#endif
+{
+ bool error= false;
+ const char* buf_start= buf, *buf_end= buf + event_len;
+
+ /* The Post-Header is empty. The Variable Data part begins immediately. */
+ buf+= description_event->common_header_len +
+ description_event->post_header_len[USER_VAR_EVENT-1];
+ name_len= uint4korr(buf);
+ /* Avoid reading out of buffer */
+ if ((buf - buf_start) + UV_NAME_LEN_SIZE + name_len > event_len)
+ {
+ error= true;
+ goto err;
+ }
+
+ name= (char *) buf + UV_NAME_LEN_SIZE;
+
+ /*
+ We don't know yet is_null value, so we must assume that name_len
+ may have the bigger value possible, is_null= True and there is no
+ payload for val, or even that name_len is 0.
+ */
+ if (name + name_len + UV_VAL_IS_NULL > buf_end)
+ {
+ error= true;
+ goto err;
+ }
+
+ buf+= UV_NAME_LEN_SIZE + name_len;
+ is_null= (bool) *buf;
+ flags= User_var_log_event::UNDEF_F; // defaults to UNDEF_F
+ if (is_null)
+ {
+ type= STRING_RESULT;
+ charset_number= my_charset_bin.number;
+ val_len= 0;
+ val= 0;
+ }
+ else
+ {
+ val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
+
+ if (val > buf_end)
+ {
+ error= true;
+ goto err;
+ }
+
+ type= (Item_result) buf[UV_VAL_IS_NULL];
+ charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
+ val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE);
+
+ /**
+ We need to check if this is from an old server
+ that did not pack information for flags.
+ We do this by checking if there are extra bytes
+ after the packed value. If there are we take the
+ extra byte and it's value is assumed to contain
+ the flags value.
+
+ Old events will not have this extra byte, thence,
+ we keep the flags set to UNDEF_F.
+ */
+ size_t bytes_read= (val + val_len) - buf_start;
+ if (bytes_read > event_len)
+ {
+ error= true;
+ goto err;
+ }
+ if ((data_written - bytes_read) > 0)
+ {
+ flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE +
+ val_len);
+ }
+ }
+
+err:
+ if (unlikely(error))
+ name= 0;
+}
+
+
+/**************************************************************************
+ Create_file_log_event methods
+**************************************************************************/
+
+/*
+ Create_file_log_event ctor
+*/
+
+Create_file_log_event::Create_file_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
+{
+ DBUG_ENTER("Create_file_log_event::Create_file_log_event(char*,...)");
+ uint block_offset;
+ uint header_len= description_event->common_header_len;
+ uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1];
+ uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
+ if (!(event_buf= (char*) my_memdup(PSI_INSTRUMENT_ME, buf, len, MYF(MY_WME))) ||
+ copy_log_event(event_buf,len,
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ load_header_len + header_len :
+ (fake_base ? (header_len+load_header_len) :
+ (header_len+load_header_len) +
+ create_file_header_len)),
+ description_event))
+ DBUG_VOID_RETURN;
+ if (description_event->binlog_version!=1)
+ {
+ file_id= uint4korr(buf +
+ header_len +
+ load_header_len + CF_FILE_ID_OFFSET);
+ /*
+ Note that it's ok to use get_data_size() below, because it is computed
+ with values we have already read from this event (because we called
+ copy_log_event()); we are not using slave's format info to decode
+ master's format, we are really using master's format info.
+ Anyway, both formats should be identical (except the common_header_len)
+ as these Load events are not changed between 4.0 and 5.0 (as logging of
+ LOAD DATA INFILE does not use Load_log_event in 5.0).
+
+ The + 1 is for \0 terminating fname
+ */
+ block_offset= (description_event->common_header_len +
+ Load_log_event::get_data_size() +
+ create_file_header_len + 1);
+ if (len < block_offset)
+ DBUG_VOID_RETURN;
+ block = (uchar*)buf + block_offset;
+ block_len = len - block_offset;
+ }
+ else
+ {
+ sql_ex.force_new_format();
+ inited_from_old = 1;
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+/**************************************************************************
+ Append_block_log_event methods
+**************************************************************************/
+
+/*
+ Append_block_log_event ctor
+*/
+
+Append_block_log_event::Append_block_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event),block(0)
+{
+ DBUG_ENTER("Append_block_log_event::Append_block_log_event(char*,...)");
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 append_block_header_len=
+ description_event->post_header_len[APPEND_BLOCK_EVENT-1];
+ uint total_header_len= common_header_len+append_block_header_len;
+ if (len < total_header_len)
+ DBUG_VOID_RETURN;
+ file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
+ block= (uchar*)buf + total_header_len;
+ block_len= len - total_header_len;
+ DBUG_VOID_RETURN;
+}
+
+
+/**************************************************************************
+ Delete_file_log_event methods
+**************************************************************************/
+
+/*
+ Delete_file_log_event ctor
+*/
+
+Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event),file_id(0)
+{
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
+ if (len < (uint)(common_header_len + delete_file_header_len))
+ return;
+ file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
+}
+
+
+/**************************************************************************
+ Execute_load_log_event methods
+**************************************************************************/
+
+/*
+ Execute_load_log_event ctor
+*/
+
+Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event), file_id(0)
+{
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
+ if (len < (uint)(common_header_len+exec_load_header_len))
+ return;
+ file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
+}
+
+
+/**************************************************************************
+ Begin_load_query_log_event methods
+**************************************************************************/
+
+Begin_load_query_log_event::
+Begin_load_query_log_event(const char* buf, uint len,
+ const Format_description_log_event* desc_event)
+ :Append_block_log_event(buf, len, desc_event)
+{
+}
+
+
+/**************************************************************************
+ Execute_load_query_log_event methods
+**************************************************************************/
+
+
+Execute_load_query_log_event::
+Execute_load_query_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* desc_event):
+ Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
+ file_id(0), fn_pos_start(0), fn_pos_end(0)
+{
+ if (!Query_log_event::is_valid())
+ return;
+
+ buf+= desc_event->common_header_len;
+
+ fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
+ fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
+ dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
+
+ if (fn_pos_start > q_len || fn_pos_end > q_len ||
+ dup_handling > LOAD_DUP_REPLACE)
+ return;
+
+ file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
+}
+
+
+ulong Execute_load_query_log_event::get_post_header_size_for_derived()
+{
+ return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
+}
+
+
+/**************************************************************************
+ sql_ex_info methods
+**************************************************************************/
+
+/*
+ sql_ex_info::init()
+*/
+
+const char *sql_ex_info::init(const char *buf, const char *buf_end,
+ bool use_new_format)
+{
+ cached_new_format = use_new_format;
+ if (use_new_format)
+ {
+ empty_flags=0;
+ /*
+ The code below assumes that buf will not disappear from
+ under our feet during the lifetime of the event. This assumption
+ holds true in the slave thread if the log is in new format, but is not
+ the case when we have old format because we will be reusing net buffer
+ to read the actual file before we write out the Create_file event.
+ */
+ if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
+ read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
+ read_str(&buf, buf_end, &line_term, &line_term_len) ||
+ read_str(&buf, buf_end, &line_start, &line_start_len) ||
+ read_str(&buf, buf_end, &escaped, &escaped_len))
+ return 0;
+ opt_flags = *buf++;
+ }
+ else
+ {
+ if (buf_end - buf < 7)
+ return 0; // Wrong data
+ field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
+ field_term = buf++; // Use first byte in string
+ enclosed= buf++;
+ line_term= buf++;
+ line_start= buf++;
+ escaped= buf++;
+ opt_flags = *buf++;
+ empty_flags= *buf++;
+ if (empty_flags & FIELD_TERM_EMPTY)
+ field_term_len=0;
+ if (empty_flags & ENCLOSED_EMPTY)
+ enclosed_len=0;
+ if (empty_flags & LINE_TERM_EMPTY)
+ line_term_len=0;
+ if (empty_flags & LINE_START_EMPTY)
+ line_start_len=0;
+ if (empty_flags & ESCAPED_EMPTY)
+ escaped_len=0;
+ }
+ return buf;
+}
+
+
+
+/**************************************************************************
+ Rows_log_event member functions
+**************************************************************************/
+
+
+Rows_log_event::Rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Log_event(buf, description_event),
+ m_row_count(0),
+#ifndef MYSQL_CLIENT
+ m_table(NULL),
+#endif
+ m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
+ m_extra_row_data(0)
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ , m_curr_row(NULL), m_curr_row_end(NULL),
+ m_key(NULL), m_key_info(NULL), m_key_nr(0),
+ master_had_triggers(0)
+#endif
+{
+ DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
+ uint8 const common_header_len= description_event->common_header_len;
+ Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
+ m_type= event_type;
+ m_cols_ai.bitmap= 0;
+
+ uint8 const post_header_len= description_event->post_header_len[event_type-1];
+
+ if (event_len < (uint)(common_header_len + post_header_len))
+ {
+ m_cols.bitmap= 0;
+ DBUG_VOID_RETURN;
+ }
+
+ DBUG_PRINT("enter",("event_len: %u common_header_len: %d "
+ "post_header_len: %d",
+ event_len, common_header_len,
+ post_header_len));
+
+ const char *post_start= buf + common_header_len;
+ post_start+= RW_MAPID_OFFSET;
+ if (post_header_len == 6)
+ {
+ /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
+ m_table_id= uint4korr(post_start);
+ post_start+= 4;
+ }
+ else
+ {
+ m_table_id= (ulong) uint6korr(post_start);
+ post_start+= RW_FLAGS_OFFSET;
+ }
+
+ m_flags_pos= post_start - buf;
+ m_flags= uint2korr(post_start);
+ post_start+= 2;
+
+ uint16 var_header_len= 0;
+ if (post_header_len == ROWS_HEADER_LEN_V2)
+ {
+ /*
+ Have variable length header, check length,
+ which includes length bytes
+ */
+ var_header_len= uint2korr(post_start);
+ /* Check length and also avoid out of buffer read */
+ if (var_header_len < 2 ||
+ event_len < static_cast<unsigned int>(var_header_len +
+ (post_start - buf)))
+ {
+ m_cols.bitmap= 0;
+ DBUG_VOID_RETURN;
+ }
+ var_header_len-= 2;
+
+ /* Iterate over var-len header, extracting 'chunks' */
+ const char* start= post_start + 2;
+ const char* end= start + var_header_len;
+ for (const char* pos= start; pos < end;)
+ {
+ switch(*pos++)
+ {
+ case RW_V_EXTRAINFO_TAG:
+ {
+ /* Have an 'extra info' section, read it in */
+ assert((end - pos) >= EXTRA_ROW_INFO_HDR_BYTES);
+ uint8 infoLen= pos[EXTRA_ROW_INFO_LEN_OFFSET];
+ assert((end - pos) >= infoLen);
+ /* Just store/use the first tag of this type, skip others */
+ if (likely(!m_extra_row_data))
+ {
+ m_extra_row_data= (uchar*) my_malloc(PSI_INSTRUMENT_ME, infoLen,
+ MYF(MY_WME));
+ if (likely(m_extra_row_data != NULL))
+ {
+ memcpy(m_extra_row_data, pos, infoLen);
+ }
+ }
+ pos+= infoLen;
+ break;
+ }
+ default:
+ /* Unknown code, we will not understand anything further here */
+ pos= end; /* Break loop */
+ }
+ }
+ }
+
+ uchar const *const var_start=
+ (const uchar *)buf + common_header_len + post_header_len + var_header_len;
+ uchar const *const ptr_width= var_start;
+ uchar *ptr_after_width= (uchar*) ptr_width;
+ DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
+ m_width = net_field_length(&ptr_after_width);
+ DBUG_PRINT("debug", ("m_width=%lu", m_width));
+
+ /* Avoid reading out of buffer */
+ if (ptr_after_width + (m_width + 7) / 8 > (uchar*)buf + event_len)
+ {
+ m_cols.bitmap= NULL;
+ DBUG_VOID_RETURN;
+ }
+
+ /* if my_bitmap_init fails, caught in is_valid() */
+ if (likely(!my_bitmap_init(&m_cols,
+ m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
+ m_width,
+ false)))
+ {
+ DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
+ memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
+ create_last_word_mask(&m_cols);
+ ptr_after_width+= (m_width + 7) / 8;
+ DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
+ }
+ else
+ {
+ // Needed because my_bitmap_init() does not set it to null on failure
+ m_cols.bitmap= NULL;
+ DBUG_VOID_RETURN;
+ }
+
+ m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
+
+ if (LOG_EVENT_IS_UPDATE_ROW(event_type))
+ {
+ DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
+
+ /* if my_bitmap_init fails, caught in is_valid() */
+ if (likely(!my_bitmap_init(&m_cols_ai,
+ m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
+ m_width,
+ false)))
+ {
+ DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
+ memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
+ create_last_word_mask(&m_cols_ai);
+ ptr_after_width+= (m_width + 7) / 8;
+ DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
+ no_bytes_in_map(&m_cols_ai));
+ }
+ else
+ {
+ // Needed because my_bitmap_init() does not set it to null on failure
+ m_cols_ai.bitmap= 0;
+ DBUG_VOID_RETURN;
+ }
+ }
+
+ const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
+
+ size_t const read_size= ptr_rows_data - (const unsigned char *) buf;
+ if (read_size > event_len)
+ {
+ DBUG_VOID_RETURN;
+ }
+ size_t const data_size= event_len - read_size;
+ DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
+ m_table_id, m_flags, m_width, (ulong) data_size));
+
+ m_rows_buf= (uchar*) my_malloc(PSI_INSTRUMENT_ME, data_size, MYF(MY_WME));
+ if (likely((bool)m_rows_buf))
+ {
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ m_curr_row= m_rows_buf;
+#endif
+ m_rows_end= m_rows_buf + data_size;
+ m_rows_cur= m_rows_end;
+ memcpy(m_rows_buf, ptr_rows_data, data_size);
+ m_rows_before_size= ptr_rows_data - (const uchar *) buf; // Get the size that before SET part
+ }
+ else
+ m_cols.bitmap= 0; // to not free it
+
+ DBUG_VOID_RETURN;
+}
+
+void Rows_log_event::uncompress_buf()
+{
+ uint32 un_len = binlog_get_uncompress_len((char *)m_rows_buf);
+ if (!un_len)
+ return;
+
+ uchar *new_buf= (uchar*) my_malloc(PSI_INSTRUMENT_ME, ALIGN_SIZE(un_len), MYF(MY_WME));
+ if (new_buf)
+ {
+ if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf,
+ (uint32)(m_rows_cur - m_rows_buf), &un_len))
+ {
+ my_free(m_rows_buf);
+ m_rows_buf = new_buf;
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ m_curr_row= m_rows_buf;
+#endif
+ m_rows_end= m_rows_buf + un_len;
+ m_rows_cur= m_rows_end;
+ return;
+ }
+ else
+ {
+ my_free(new_buf);
+ }
+ }
+ m_cols.bitmap= 0; // catch it in is_valid
+}
+
+Rows_log_event::~Rows_log_event()
+{
+ if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
+ m_cols.bitmap= 0; // so no my_free in my_bitmap_free
+ my_bitmap_free(&m_cols); // To pair with my_bitmap_init().
+ my_free(m_rows_buf);
+ my_free(m_extra_row_data);
+}
+
+int Rows_log_event::get_data_size()
+{
+ int const general_type_code= get_general_type_code();
+
+ uchar buf[MAX_INT_WIDTH];
+ uchar *end= net_store_length(buf, m_width);
+
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ return (int)(6 + no_bytes_in_map(&m_cols) + (end - buf) +
+ (general_type_code == UPDATE_ROWS_EVENT ? no_bytes_in_map(&m_cols_ai) : 0) +
+ m_rows_cur - m_rows_buf););
+ int data_size= 0;
+ Log_event_type type = get_type_code();
+ bool is_v2_event= LOG_EVENT_IS_ROW_V2(type);
+ if (is_v2_event)
+ {
+ data_size= ROWS_HEADER_LEN_V2 +
+ (m_extra_row_data ?
+ RW_V_TAG_LEN + m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET]:
+ 0);
+ }
+ else
+ {
+ data_size= ROWS_HEADER_LEN_V1;
+ }
+ data_size+= no_bytes_in_map(&m_cols);
+ data_size+= (uint) (end - buf);
+
+ if (general_type_code == UPDATE_ROWS_EVENT)
+ data_size+= no_bytes_in_map(&m_cols_ai);
+
+ data_size+= (uint) (m_rows_cur - m_rows_buf);
+ return data_size;
+}
+
+
+/**************************************************************************
+ Annotate_rows_log_event member functions
+**************************************************************************/
+
+Annotate_rows_log_event::Annotate_rows_log_event(const char *buf,
+ uint event_len,
+ const Format_description_log_event *desc)
+ : Log_event(buf, desc),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0),
+ m_saved_thd_query(false),
+ m_used_query_txt(0)
+{
+ m_query_len= event_len - desc->common_header_len;
+ m_query_txt= (char*) buf + desc->common_header_len;
+}
+
+Annotate_rows_log_event::~Annotate_rows_log_event()
+{
+ DBUG_ENTER("Annotate_rows_log_event::~Annotate_rows_log_event");
+#ifndef MYSQL_CLIENT
+ if (m_saved_thd_query)
+ thd->set_query(m_save_thd_query_txt, m_save_thd_query_len);
+ else if (m_used_query_txt)
+ thd->reset_query();
+#endif
+ DBUG_VOID_RETURN;
+}
+
+int Annotate_rows_log_event::get_data_size()
+{
+ return m_query_len;
+}
+
+Log_event_type Annotate_rows_log_event::get_type_code()
+{
+ return ANNOTATE_ROWS_EVENT;
+}
+
+bool Annotate_rows_log_event::is_valid() const
+{
+ return (m_query_txt != NULL && m_query_len != 0);
+}
+
+
+/**************************************************************************
+ Table_map_log_event member functions and support functions
+**************************************************************************/
+
+/**
+ @page How replication of field metadata works.
+
+ When a table map is created, the master first calls
+ Table_map_log_event::save_field_metadata() which calculates how many
+ values will be in the field metadata. Only those fields that require the
+ extra data are added. The method also loops through all of the fields in
+ the table calling the method Field::save_field_metadata() which returns the
+ values for the field that will be saved in the metadata and replicated to
+ the slave. Once all fields have been processed, the table map is written to
+ the binlog adding the size of the field metadata and the field metadata to
+ the end of the body of the table map.
+
+ When a table map is read on the slave, the field metadata is read from the
+ table map and passed to the table_def class constructor which saves the
+ field metadata from the table map into an array based on the type of the
+ field. Field metadata values not present (those fields that do not use extra
+ data) in the table map are initialized as zero (0). The array size is the
+ same as the columns for the table on the slave.
+
+ Additionally, values saved for field metadata on the master are saved as a
+ string of bytes (uchar) in the binlog. A field may require 1 or more bytes
+ to store the information. In cases where values require multiple bytes
+ (e.g. values > 255), the endian-safe methods are used to properly encode
+ the values on the master and decode them on the slave. When the field
+ metadata values are captured on the slave, they are stored in an array of
+ type uint16. This allows the least number of casts to prevent casting bugs
+ when the field metadata is used in comparisons of field attributes. When
+ the field metadata is used for calculating addresses in pointer math, the
+ type used is uint32.
+*/
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#if defined(HAVE_REPLICATION)
+Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+
+ : Log_event(buf, description_event),
+#ifndef MYSQL_CLIENT
+ m_table(NULL),
+#endif
+ m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
+ m_colcnt(0), m_coltype(0),
+ m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
+ m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
+ m_null_bits(0), m_meta_memory(NULL),
+ m_optional_metadata_len(0), m_optional_metadata(NULL)
+{
+ unsigned int bytes_read= 0;
+ DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
+ DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
+ event_len, common_header_len, post_header_len));
+
+ /*
+ Don't print debug messages when running valgrind since they can
+ trigger false warnings.
+ */
+#ifndef HAVE_valgrind
+ DBUG_DUMP("event buffer", (uchar*) buf, event_len);
+#endif
+
+ if (event_len < (uint)(common_header_len + post_header_len))
+ DBUG_VOID_RETURN;
+
+ /* Read the post-header */
+ const char *post_start= buf + common_header_len;
+
+ post_start+= TM_MAPID_OFFSET;
+ VALIDATE_BYTES_READ(post_start, buf, event_len);
+ if (post_header_len == 6)
+ {
+ /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
+ m_table_id= uint4korr(post_start);
+ post_start+= 4;
+ }
+ else
+ {
+ DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
+ m_table_id= (ulong) uint6korr(post_start);
+ post_start+= TM_FLAGS_OFFSET;
+ }
+
+ DBUG_ASSERT(m_table_id != ~0ULL);
+
+ m_flags= uint2korr(post_start);
+
+ /* Read the variable part of the event */
+ const char *const vpart= buf + common_header_len + post_header_len;
+
+ /* Extract the length of the various parts from the buffer */
+ uchar const *const ptr_dblen= (uchar const*)vpart + 0;
+ VALIDATE_BYTES_READ(ptr_dblen, buf, event_len);
+ m_dblen= *(uchar*) ptr_dblen;
+
+ /* Length of database name + counter + terminating null */
+ uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
+ VALIDATE_BYTES_READ(ptr_tbllen, buf, event_len);
+ m_tbllen= *(uchar*) ptr_tbllen;
+
+ /* Length of table name + counter + terminating null */
+ uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
+ uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
+ VALIDATE_BYTES_READ(ptr_after_colcnt, buf, event_len);
+ m_colcnt= net_field_length(&ptr_after_colcnt);
+
+ DBUG_PRINT("info",("m_dblen: %lu off: %ld m_tbllen: %lu off: %ld m_colcnt: %lu off: %ld",
+ (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart),
+ (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
+ m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
+
+ /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
+ m_memory= (uchar*) my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
+ &m_dbnam, (uint) m_dblen + 1,
+ &m_tblnam, (uint) m_tbllen + 1,
+ &m_coltype, (uint) m_colcnt,
+ NullS);
+
+ if (m_memory)
+ {
+ /* Copy the different parts into their memory */
+ strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
+ strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
+ memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
+
+ ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
+ VALIDATE_BYTES_READ(ptr_after_colcnt, buf, event_len);
+ m_field_metadata_size= net_field_length(&ptr_after_colcnt);
+ if (m_field_metadata_size <= (m_colcnt * 2))
+ {
+ uint num_null_bytes= (m_colcnt + 7) / 8;
+ m_meta_memory= (uchar *)my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
+ &m_null_bits, num_null_bytes,
+ &m_field_metadata, m_field_metadata_size,
+ NULL);
+ memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
+ ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
+ memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
+ ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + num_null_bytes;
+ }
+ else
+ {
+ m_coltype= NULL;
+ my_free(m_memory);
+ m_memory= NULL;
+ DBUG_VOID_RETURN;
+ }
+
+ bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
+
+ /* After null_bits field, there are some new fields for extra metadata. */
+ if (bytes_read < event_len)
+ {
+ m_optional_metadata_len= event_len - bytes_read;
+ m_optional_metadata=
+ static_cast<unsigned char*>(my_malloc(PSI_INSTRUMENT_ME, m_optional_metadata_len, MYF(MY_WME)));
+ memcpy(m_optional_metadata, ptr_after_colcnt, m_optional_metadata_len);
+ }
+ }
+#ifdef MYSQL_SERVER
+ if (!m_table)
+ DBUG_VOID_RETURN;
+ binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
+ sizeof(Binlog_type_info));
+ for (uint i= 0; i < m_table->s->fields; i++)
+ binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
+#endif
+
+ DBUG_VOID_RETURN;
+}
+#endif
+
+Table_map_log_event::~Table_map_log_event()
+{
+ my_free(m_meta_memory);
+ my_free(m_memory);
+ my_free(m_optional_metadata);
+ m_optional_metadata= NULL;
+}
+
+/**
+ Parses SIGNEDNESS field.
+
+ @param[out] vec stores the signedness flags extracted from field.
+ @param[in] field SIGNEDNESS field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_signedness(std::vector<bool> &vec,
+ unsigned char *field, unsigned int length)
+{
+ for (unsigned int i= 0; i < length; i++)
+ {
+ for (unsigned char c= 0x80; c != 0; c>>= 1)
+ vec.push_back(field[i] & c);
+ }
+}
+
+/**
+ Parses DEFAULT_CHARSET field.
+
+ @param[out] default_charset stores collation numbers extracted from field.
+ @param[in] field DEFAULT_CHARSET field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_default_charset(Table_map_log_event::Optional_metadata_fields::
+ Default_charset &default_charset,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ default_charset.default_charset= net_field_length(&p);
+ while (p < field + length)
+ {
+ unsigned int col_index= net_field_length(&p);
+ unsigned int col_charset= net_field_length(&p);
+
+ default_charset.charset_pairs.push_back(std::make_pair(col_index,
+ col_charset));
+ }
+}
+
+/**
+ Parses COLUMN_CHARSET field.
+
+ @param[out] vec stores collation numbers extracted from field.
+ @param[in] field COLUMN_CHARSET field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_column_charset(std::vector<unsigned int> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ vec.push_back(net_field_length(&p));
+}
+
+/**
+ Parses COLUMN_NAME field.
+
+ @param[out] vec stores column names extracted from field.
+ @param[in] field COLUMN_NAME field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_column_name(std::vector<std::string> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ {
+ unsigned len= net_field_length(&p);
+ vec.push_back(std::string(reinterpret_cast<char *>(p), len));
+ p+= len;
+ }
+}
+
+/**
+ Parses SET_STR_VALUE/ENUM_STR_VALUE field.
+
+ @param[out] vec stores SET/ENUM column's string values extracted from
+ field. Each SET/ENUM column's string values are stored
+ into a string separate vector. All of them are stored
+ in 'vec'.
+ @param[in] field COLUMN_NAME field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_set_str_value(std::vector<Table_map_log_event::
+ Optional_metadata_fields::str_vector> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ {
+ unsigned int count= net_field_length(&p);
+
+ vec.push_back(std::vector<std::string>());
+ for (unsigned int i= 0; i < count; i++)
+ {
+ unsigned len1= net_field_length(&p);
+ vec.back().push_back(std::string(reinterpret_cast<char *>(p), len1));
+ p+= len1;
+ }
+ }
+}
+
+/**
+ Parses GEOMETRY_TYPE field.
+
+ @param[out] vec stores geometry column's types extracted from field.
+ @param[in] field GEOMETRY_TYPE field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_geometry_type(std::vector<unsigned int> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ vec.push_back(net_field_length(&p));
+}
+
+/**
+ Parses SIMPLE_PRIMARY_KEY field.
+
+ @param[out] vec stores primary key's column information extracted from
+ field. Each column has an index and a prefix which are
+ stored as a unit_pair. prefix is always 0 for
+ SIMPLE_PRIMARY_KEY field.
+ @param[in] field SIMPLE_PRIMARY_KEY field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_simple_pk(std::vector<Table_map_log_event::
+ Optional_metadata_fields::uint_pair> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ vec.push_back(std::make_pair(net_field_length(&p), 0));
+}
+
+/**
+ Parses PRIMARY_KEY_WITH_PREFIX field.
+
+ @param[out] vec stores primary key's column information extracted from
+ field. Each column has an index and a prefix which are
+ stored as a unit_pair.
+ @param[in] field PRIMARY_KEY_WITH_PREFIX field in table_map_event.
+ @param[in] length length of the field
+ */
+
+static void parse_pk_with_prefix(std::vector<Table_map_log_event::
+ Optional_metadata_fields::uint_pair> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ {
+ unsigned int col_index= net_field_length(&p);
+ unsigned int col_prefix= net_field_length(&p);
+ vec.push_back(std::make_pair(col_index, col_prefix));
+ }
+}
+
+Table_map_log_event::Optional_metadata_fields::
+Optional_metadata_fields(unsigned char* optional_metadata,
+ unsigned int optional_metadata_len)
+{
+ unsigned char* field= optional_metadata;
+
+ if (optional_metadata == NULL)
+ return;
+
+ while (field < optional_metadata + optional_metadata_len)
+ {
+ unsigned int len;
+ Optional_metadata_field_type type=
+ static_cast<Optional_metadata_field_type>(field[0]);
+
+ // Get length and move field to the value.
+ field++;
+ len= net_field_length(&field);
+
+ switch(type)
+ {
+ case SIGNEDNESS:
+ parse_signedness(m_signedness, field, len);
+ break;
+ case DEFAULT_CHARSET:
+ parse_default_charset(m_default_charset, field, len);
+ break;
+ case COLUMN_CHARSET:
+ parse_column_charset(m_column_charset, field, len);
+ break;
+ case COLUMN_NAME:
+ parse_column_name(m_column_name, field, len);
+ break;
+ case SET_STR_VALUE:
+ parse_set_str_value(m_set_str_value, field, len);
+ break;
+ case ENUM_STR_VALUE:
+ parse_set_str_value(m_enum_str_value, field, len);
+ break;
+ case GEOMETRY_TYPE:
+ parse_geometry_type(m_geometry_type, field, len);
+ break;
+ case SIMPLE_PRIMARY_KEY:
+ parse_simple_pk(m_primary_key, field, len);
+ break;
+ case PRIMARY_KEY_WITH_PREFIX:
+ parse_pk_with_prefix(m_primary_key, field, len);
+ break;
+ case ENUM_AND_SET_DEFAULT_CHARSET:
+ parse_default_charset(m_enum_and_set_default_charset, field, len);
+ break;
+ case ENUM_AND_SET_COLUMN_CHARSET:
+ parse_column_charset(m_enum_and_set_column_charset, field, len);
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ // next field
+ field+= len;
+ }
+}
+
+
+/**************************************************************************
+ Write_rows_log_event member functions
+**************************************************************************/
+
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+: Rows_log_event(buf, event_len, description_event)
+{
+}
+
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(
+ const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+: Write_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
+#endif
+
+
+/**************************************************************************
+ Delete_rows_log_event member functions
+**************************************************************************/
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Rows_log_event(buf, event_len, description_event)
+{
+}
+
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
+ const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Delete_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
+#endif
+
+/**************************************************************************
+ Update_rows_log_event member functions
+**************************************************************************/
+
+Update_rows_log_event::~Update_rows_log_event()
+{
+ if (m_cols_ai.bitmap)
+ {
+ if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
+ m_cols_ai.bitmap= 0; // so no my_free in my_bitmap_free
+ my_bitmap_free(&m_cols_ai); // To pair with my_bitmap_init().
+ }
+}
+
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
+ const
+ Format_description_log_event
+ *description_event)
+ : Rows_log_event(buf, event_len, description_event)
+{
+}
+
+Update_rows_compressed_log_event::Update_rows_compressed_log_event(
+ const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Update_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
+#endif
+
+Incident_log_event::Incident_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *descr_event)
+ : Log_event(buf, descr_event)
+{
+ DBUG_ENTER("Incident_log_event::Incident_log_event");
+ uint8 const common_header_len=
+ descr_event->common_header_len;
+ uint8 const post_header_len=
+ descr_event->post_header_len[INCIDENT_EVENT-1];
+
+ DBUG_PRINT("info",("event_len: %u; common_header_len: %d; post_header_len: %d",
+ event_len, common_header_len, post_header_len));
+
+ m_message.str= NULL;
+ m_message.length= 0;
+ int incident_number= uint2korr(buf + common_header_len);
+ if (incident_number >= INCIDENT_COUNT ||
+ incident_number <= INCIDENT_NONE)
+ {
+ // If the incident is not recognized, this binlog event is
+ // invalid. If we set incident_number to INCIDENT_NONE, the
+ // invalidity will be detected by is_valid().
+ m_incident= INCIDENT_NONE;
+ DBUG_VOID_RETURN;
+ }
+ m_incident= static_cast<Incident>(incident_number);
+ char const *ptr= buf + common_header_len + post_header_len;
+ char const *const str_end= buf + event_len;
+ uint8 len= 0; // Assignment to keep compiler happy
+ const char *str= NULL; // Assignment to keep compiler happy
+ if (read_str(&ptr, str_end, &str, &len))
+ {
+ /* Mark this event invalid */
+ m_incident= INCIDENT_NONE;
+ DBUG_VOID_RETURN;
+ }
+ if (!(m_message.str= (char*) my_malloc(key_memory_log_event, len+1, MYF(MY_WME))))
+ {
+ /* Mark this event invalid */
+ m_incident= INCIDENT_NONE;
+ DBUG_VOID_RETURN;
+ }
+ strmake(m_message.str, str, len);
+ m_message.length= len;
+ DBUG_PRINT("info", ("m_incident: %d", m_incident));
+ DBUG_VOID_RETURN;
+}
+
+
+Incident_log_event::~Incident_log_event()
+{
+ if (m_message.str)
+ my_free(m_message.str);
+}
+
+
+const char *
+Incident_log_event::description() const
+{
+ static const char *const description[]= {
+ "NOTHING", // Not used
+ "LOST_EVENTS"
+ };
+
+ DBUG_PRINT("info", ("m_incident: %d", m_incident));
+ return description[m_incident];
+}
+
+
+Ignorable_log_event::Ignorable_log_event(const char *buf,
+ const Format_description_log_event
+ *descr_event,
+ const char *event_name)
+ :Log_event(buf, descr_event), number((int) (uchar) buf[EVENT_TYPE_OFFSET]),
+ description(event_name)
+{
+ DBUG_ENTER("Ignorable_log_event::Ignorable_log_event");
+ DBUG_VOID_RETURN;
+}
+
+Ignorable_log_event::~Ignorable_log_event()
+{
+}
+
+bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file)
+{
+ return (my_b_copy_all_to_file(cache, file) ||
+ reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE));
+}