diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
commit | 3f619478f796eddbba6e39502fe941b285dd97b1 (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /sql/sql_binlog.cc | |
parent | Initial commit. (diff) | |
download | mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/sql_binlog.cc')
-rw-r--r-- | sql/sql_binlog.cc | 471 |
1 files changed, 471 insertions, 0 deletions
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc new file mode 100644 index 00000000..e71c7015 --- /dev/null +++ b/sql/sql_binlog.cc @@ -0,0 +1,471 @@ +/* + Copyright (c) 2005, 2013, Oracle and/or its affiliates. + Copyright (c) 2022, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ + +#include "mariadb.h" +#include "sql_priv.h" +#include "sql_binlog.h" +#include "sql_parse.h" +#include "sql_acl.h" +#include "rpl_rli.h" +#include "rpl_mi.h" +#include "slave.h" +#include "log_event.h" + + +/** + Check if the event type is allowed in a BINLOG statement. + + @retval 0 if the event type is ok. + @retval 1 if the event type is not ok. +*/ +static int check_event_type(int type, Relay_log_info *rli) +{ + Format_description_log_event *fd_event= + rli->relay_log.description_event_for_exec; + + /* + Convert event type id of certain old versions (see comment in + Format_description_log_event::Format_description_log_event(char*,...)). + */ + if (fd_event && fd_event->event_type_permutation) + { +#ifdef DBUG_TRACE + int new_type= fd_event->event_type_permutation[type]; + DBUG_PRINT("info", + ("converting event type %d to %d (%s)", + type, new_type, + Log_event::get_type_str((Log_event_type)new_type))); +#endif + type= fd_event->event_type_permutation[type]; + } + + switch (type) + { + case START_EVENT_V3: + case FORMAT_DESCRIPTION_EVENT: + /* + We need a preliminary FD event in order to parse the FD event, + if we don't already have one. + */ + if (!fd_event) + if (!(rli->relay_log.description_event_for_exec= + new Format_description_log_event(4))) + { + my_error(ER_OUTOFMEMORY, MYF(0), 1); + return 1; + } + + /* It is always allowed to execute FD events. */ + return 0; + + case QUERY_EVENT: + case TABLE_MAP_EVENT: + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + case PRE_GA_WRITE_ROWS_EVENT: + case PRE_GA_UPDATE_ROWS_EVENT: + case PRE_GA_DELETE_ROWS_EVENT: + /* + Row events are only allowed if a Format_description_event has + already been seen. + */ + if (fd_event) + return 0; + else + { + my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT, + MYF(0), Log_event::get_type_str((Log_event_type)type)); + return 1; + } + break; + + default: + /* + It is not meaningful to execute other events than row-events and + FD events. It would even be dangerous to execute Stop_log_event + and Rotate_log_event since they call Relay_log_info::flush(), which + is not allowed to call by other threads than the slave SQL + thread when the slave SQL thread is running. + */ + my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT, + MYF(0), Log_event::get_type_str((Log_event_type)type)); + return 1; + } +} + +/** + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + @param thd THD handle + @return + 0 at success, + -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + user_var_entry *entry[2]; + LEX_CSTRING name[2]= { thd->lex->comment, thd->lex->ident }; + + /* compute the total size */ + thd->lex->comment.str= NULL; + thd->lex->comment.length= 0; + for (uint k= 0; k < 2; k++) + { + entry[k]= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, + name[k].length); + if (!entry[k] || entry[k]->type != STRING_RESULT) + { + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); + return -1; + } + thd->lex->comment.length += entry[k]->length; + } + + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(PSI_INSTRUMENT_ME, thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); + return -1; + } + + /* fragments are merged into allocated buf while the user var:s get reset */ + size_t gathered_length= 0; + for (uint k=0; k < 2; k++) + { + memcpy(const_cast<char*>(thd->lex->comment.str) + gathered_length, entry[k]->value, + entry[k]->length); + gathered_length += entry[k]->length; + } + for (uint k=0; k < 2; k++) + update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0); + + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + + return 0; +} + +/** + Wraps Log_event::apply_event to save and restore + session context in case of Query_log_event. + + @param ev replication event + @param rgi execution context for the event + + @return + 0 on success, + non-zero otherwise. +*/ +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int save_restore_context_apply_event(Log_event *ev, rpl_group_info *rgi) +{ + if (ev->get_type_code() != QUERY_EVENT) + return ev->apply_event(rgi); + + THD *thd= rgi->thd; + Relay_log_info *rli= thd->rli_fake; + DBUG_ASSERT(!rli->mi); + LEX_CSTRING connection_name= { STRING_WITH_LEN("BINLOG_BASE64_EVENT") }; + + if (!(rli->mi= new Master_info(&connection_name, false))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return -1; + } + + sql_digest_state *m_digest= thd->m_digest; + PSI_statement_locker *m_statement_psi= thd->m_statement_psi;; + LEX_CSTRING save_db= thd->db; + my_thread_id m_thread_id= thd->variables.pseudo_thread_id; + + thd->system_thread_info.rpl_sql_info= NULL; + thd->reset_db(&null_clex_str); + + thd->m_digest= NULL; + thd->m_statement_psi= NULL; + + int err= ev->apply_event(rgi); + + thd->m_digest= m_digest; + thd->m_statement_psi= m_statement_psi; + thd->variables.pseudo_thread_id= m_thread_id; + thd->reset_db(&save_db); + delete rli->mi; + rli->mi= NULL; + + return err; +} +#endif + +/** + Execute a BINLOG statement. + + To execute the BINLOG command properly the server needs to know + which format the BINLOG command's event is in. Therefore, the first + BINLOG statement seen must be a base64 encoding of the + Format_description_log_event, as outputted by mysqlbinlog. This + Format_description_log_event is cached in + rli->description_event_for_exec. + + @param thd Pointer to THD object for the client thread executing the + statement. +*/ + +void mysql_client_binlog_statement(THD* thd) +{ + DBUG_ENTER("mysql_client_binlog_statement"); + DBUG_PRINT("info",("binlog base64: '%*s'", + (int) (thd->lex->comment.length < 2048 ? + thd->lex->comment.length : 2048), + thd->lex->comment.str)); + + if (check_global_access(thd, PRIV_STMT_BINLOG)) + DBUG_VOID_RETURN; + + /* + option_bits will be changed when applying the event. But we don't expect + it be changed permanently after BINLOG statement, so backup it first. + It will be restored at the end of this function. + */ + ulonglong thd_options= thd->variables.option_bits; + + /* + Allocation + */ + + int err; + Relay_log_info *rli; + rpl_group_info *rgi; + uchar *buf= NULL; + size_t coded_len= 0, decoded_len= 0; + + rli= thd->rli_fake; + if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE, "BINLOG_BASE64_EVENT"))) + rli->sql_driver_thd= thd; + if (!(rgi= thd->rgi_fake)) + rgi= thd->rgi_fake= new rpl_group_info(rli); + rgi->thd= thd; + const char *error= 0; + Log_event *ev = 0; + my_bool is_fragmented= FALSE; + /* + Out of memory check + */ + if (!(rli)) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); /* needed 1 bytes */ + goto end; + } + + DBUG_ASSERT(rli->belongs_to_client()); + + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) + if (binlog_defragment(thd)) + goto end; + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + decoded_len= my_base64_needed_decoded_length((int)coded_len); + if (!(buf= (uchar *) my_malloc(key_memory_binlog_statement_buffer, + decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); + goto end; + } + + for (char const *strptr= thd->lex->comment.str ; + strptr < thd->lex->comment.str + thd->lex->comment.length ; ) + { + char const *endptr= 0; + int bytes_decoded= my_base64_decode(strptr, coded_len, buf, &endptr, + MY_BASE64_DECODE_ALLOW_MULTIPLE_CHUNKS); + +#ifndef HAVE_valgrind + /* + This debug printout should not be used for valgrind builds + since it will read from unassigned memory. + */ + DBUG_PRINT("info", + ("bytes_decoded: %d strptr: %p endptr: %p ('%c':%d)", + bytes_decoded, strptr, endptr, *endptr, + *endptr)); +#endif + + if (bytes_decoded < 0) + { + my_error(ER_BASE64_DECODE_ERROR, MYF(0)); + goto end; + } + else if (bytes_decoded == 0) + break; // If no bytes where read, the string contained only whitespace + + DBUG_ASSERT(bytes_decoded > 0); + DBUG_ASSERT(endptr > strptr); + coded_len-= endptr - strptr; + strptr= endptr; + + /* + Now we have one or more events stored in the buffer. The size of + the buffer is computed based on how much base64-encoded data + there were, so there should be ample space for the data (maybe + even too much, since a statement can consist of a considerable + number of events). + + TODO: Switch to use a stream-based base64 encoder/decoder in + order to be able to read exactly what is necessary. + */ + + DBUG_PRINT("info",("binlog base64 decoded_len: %lu bytes_decoded: %d", + (ulong) decoded_len, bytes_decoded)); + + /* + Now we start to read events of the buffer, until there are no + more. + */ + for (uchar *bufptr= buf ; bytes_decoded > 0 ; ) + { + /* + Checking that the first event in the buffer is not truncated. + */ + ulong event_len; + if (bytes_decoded < EVENT_LEN_OFFSET + 4 || + (event_len= uint4korr(bufptr + EVENT_LEN_OFFSET)) > + (uint) bytes_decoded) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d", + event_len, bytes_decoded)); + + if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli)) + goto end; + + ev= Log_event::read_log_event(bufptr, event_len, &error, + rli->relay_log.description_event_for_exec, + 0); + + DBUG_PRINT("info",("binlog base64 err=%s", error)); + if (!ev) + { + /* + This could actually be an out-of-memory, but it is more likely + caused by a bad statement + */ + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + bytes_decoded -= event_len; + bufptr += event_len; + + DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); + ev->thd= thd; + /* + We go directly to the application phase, since we don't need + to check if the event shall be skipped or not. + + Neither do we have to update the log positions, since that is + not used at all: the rli_fake instance is used only for error + reporting. + */ +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + ulonglong save_skip_replication= + thd->variables.option_bits & OPTION_SKIP_REPLICATION; + thd->variables.option_bits= + (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | + (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? + OPTION_SKIP_REPLICATION : 0); + + { + /* + For conventional statements thd->lex points to thd->main_lex, that is + thd->lex == &thd->main_lex. On the other hand, for prepared statement + thd->lex points to the LEX object explicitly allocated for execution + of the prepared statement and in this case thd->lex != &thd->main_lex. + On handling the BINLOG statement, invocation of ev->apply_event(rgi) + initiates the following sequence of calls + Rows_log_event::do_apply_event -> THD::reset_for_next_command + Since the method THD::reset_for_next_command() contains assert + DBUG_ASSERT(lex == &main_lex) + this sequence of calls results in crash when a binlog event is + applied in PS mode. So, reset the current lex temporary to point to + thd->main_lex before running ev->apply_event() and restore its + original value on return. + */ + LEX *backup_lex; + + thd->backup_and_reset_current_lex(&backup_lex); + err= save_restore_context_apply_event(ev, rgi); + thd->restore_current_lex(backup_lex); + } + thd->variables.option_bits= + (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | + save_skip_replication; +#else + err= 0; +#endif + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; it + will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + delete ev; + ev= 0; + if (err) + { + /* + TODO: Maybe a better error message since the BINLOG statement + now contains several events. + */ + if (!thd->is_error()) + my_error(ER_UNKNOWN_ERROR, MYF(0)); + goto end; + } + } + } + + + DBUG_PRINT("info",("binlog base64 execution finished successfully")); + my_ok(thd); + +end: + if (unlikely(is_fragmented)) + my_free(const_cast<char*>(thd->lex->comment.str)); + thd->variables.option_bits= thd_options; + rgi->slave_close_thread_tables(thd); + my_free(buf); + delete rgi; + rgi= thd->rgi_fake= NULL; + DBUG_VOID_RETURN; +} |