summaryrefslogtreecommitdiffstats
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:39:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:39:13 +0000
commit86fbb58c3ac0865482819c10a3e81f2eea001c36 (patch)
tree28c9e526ea739c6f9b89e36115e1e2698bddf981 /sql/sql_repl.cc
parentReleasing progress-linux version 1:10.11.6-2~progress7.99u1. (diff)
downloadmariadb-86fbb58c3ac0865482819c10a3e81f2eea001c36.tar.xz
mariadb-86fbb58c3ac0865482819c10a3e81f2eea001c36.zip
Merging upstream version 1:10.11.7.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc140
1 files changed, 112 insertions, 28 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 8bde0f3b..ad71bf6f 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -510,7 +510,7 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD *
}
else
{
- DBUG_ASSERT(entry->type == STRING_RESULT);
+ DBUG_ASSERT(entry->type_handler()->result_type() == STRING_RESULT);
String str;
uint dummy_errors;
str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin,
@@ -2060,7 +2060,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
if (need_sync && repl_semisync_master.flush_net(info->thd,
- packet->c_ptr_safe()))
+ packet->c_ptr()))
{
info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
@@ -3011,8 +3011,13 @@ err:
if (info->thd->killed == KILL_SLAVE_SAME_ID)
{
- info->errmsg= "A slave with the same server_uuid/server_id as this slave "
- "has connected to the master";
+ /*
+ Note that the text is limited to 64 characters in errmsg-utf8 in
+ ER_ABORTING_CONNECTION.
+ */
+ info->errmsg=
+ "A slave with the same server_uuid/server_id is already "
+ "connected";
info->error= ER_SLAVE_SAME_ID;
}
@@ -3385,6 +3390,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
@retval 0 success
@retval 1 error
*/
+
int reset_slave(THD *thd, Master_info* mi)
{
MY_STAT stat_area;
@@ -3482,8 +3488,6 @@ int reset_slave(THD *thd, Master_info* mi)
else if (global_system_variables.log_warnings > 1)
sql_print_information("Deleted Master_info file '%s'.", fname);
- if (rpl_semi_sync_slave_enabled)
- repl_semisync_slave.reset_slave(mi);
err:
mi->unlock_slave_threads();
if (unlikely(error))
@@ -3511,43 +3515,89 @@ err:
struct kill_callback_arg
{
- kill_callback_arg(uint32 id): slave_server_id(id), thd(0) {}
- uint32 slave_server_id;
+ kill_callback_arg(THD *thd_arg, uint32 id):
+ thd(thd_arg), slave_server_id(id), counter(0) {}
THD *thd;
+ uint32 slave_server_id;
+ uint counter;
};
-static my_bool kill_callback(THD *thd, kill_callback_arg *arg)
+
+/*
+ Collect all active dump threads
+*/
+
+static my_bool kill_callback_collect(THD *thd, kill_callback_arg *arg)
{
if (thd->get_command() == COM_BINLOG_DUMP &&
- thd->variables.server_id == arg->slave_server_id)
+ thd->variables.server_id == arg->slave_server_id &&
+ thd != arg->thd)
{
- arg->thd= thd;
+ arg->counter++;
mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
mysql_mutex_lock(&thd->LOCK_thd_data);
- return 1;
+ thd->awake_no_mutex(KILL_SLAVE_SAME_ID); // Mark killed
+ /*
+ Remover the thread from ack_receiver to ensure it is not
+ sending acks to the master anymore.
+ */
+ ack_receiver.remove_slave(thd);
+
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
return 0;
}
-void kill_zombie_dump_threads(uint32 slave_server_id)
+/*
+ Check if there are any active dump threads
+*/
+
+static my_bool kill_callback_check(THD *thd, kill_callback_arg *arg)
+{
+ return (thd->get_command() == COM_BINLOG_DUMP &&
+ thd->variables.server_id == arg->slave_server_id &&
+ thd != arg->thd);
+}
+
+
+/**
+ Try to kill running dump threads on the master
+
+ @result 0 ok
+ @result 1 old slave thread exists and does not want to die
+
+ There should not be more than one dump thread with the same server id
+ this code has however in the past has several issues. To ensure that
+ things works in all cases (now and in the future), this code is collecting
+ all matching server id's and killing all of them.
+*/
+
+bool kill_zombie_dump_threads(THD *thd, uint32 slave_server_id)
{
- kill_callback_arg arg(slave_server_id);
- server_threads.iterate(kill_callback, &arg);
+ kill_callback_arg arg(thd, slave_server_id);
+ server_threads.iterate(kill_callback_collect, &arg);
+
+ if (!arg.counter)
+ return 0;
- if (arg.thd)
+ /*
+ Wait up to SECONDS_TO_WAIT_FOR_DUMP_THREAD_KILL for kill
+ of all dump thread, trying every 1/10 of second.
+ */
+ for (uint i= 10 * SECONDS_TO_WAIT_FOR_DUMP_THREAD_KILL ;
+ --i > 0 && !thd->killed;
+ i++)
{
- /*
- Here we do not call kill_one_thread() as
- it will be slow because it will iterate through the list
- again. We just to do kill the thread ourselves.
- */
- arg.thd->awake_no_mutex(KILL_SLAVE_SAME_ID);
- mysql_mutex_unlock(&arg.thd->LOCK_thd_kill);
- mysql_mutex_unlock(&arg.thd->LOCK_thd_data);
+ if (!server_threads.iterate(kill_callback_check, &arg))
+ return 0; // All dump thread are killed
+ my_sleep(1000000L / 10); // Wait 1/10 of a second
}
+ return 1;
}
+
/**
Get value for a string parameter with error checking
@@ -4303,11 +4353,17 @@ bool mysql_show_binlog_events(THD* thd)
}
}
+ /*
+ Omit error messages from server log in Log_event::read_log_event. That
+ is, we only need to notify the client to correct their 'from' offset;
+ writing about this in the server log would be confusing as it isn't
+ related to server operational status.
+ */
for (event_count = 0;
(ev = Log_event::read_log_event(&log,
description_event,
(opt_master_verify_checksum ||
- verify_checksum_once))); )
+ verify_checksum_once), false)); )
{
if (!unit->lim.check_offset(event_count) &&
ev->net_send(protocol, linfo.log_file_name, pos))
@@ -4595,6 +4651,10 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
/* buffer contains position where we started last read */
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
uint max_event_size= lf_info->thd->variables.max_allowed_packet;
+ int res;
+#ifndef DBUG_OFF
+ bool did_dbug_inject= false;
+#endif
if (lf_info->thd->is_current_stmt_binlog_format_row())
goto ret;
@@ -4602,6 +4662,19 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
goto ret;
+ DBUG_EXECUTE_IF("load_data_binlog_cache_error",
+ {
+ /*
+ Simulate "disk full" error in the middle of writing to
+ the binlog cache.
+ */
+ if (lf_info->last_pos_in_file >= 2*4096)
+ {
+ DBUG_SET("+d,simulate_file_write_error");
+ did_dbug_inject= true;
+ }
+ };);
+
for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
buffer += MY_MIN(block_len, max_event_size),
block_len -= MY_MIN(block_len, max_event_size))
@@ -4613,7 +4686,10 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
MY_MIN(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write(&a))
- DBUG_RETURN(1);
+ {
+ res= 1;
+ goto err;
+ }
}
else
{
@@ -4622,12 +4698,20 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
MY_MIN(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write(&b))
- DBUG_RETURN(1);
+ {
+ res= 1;
+ goto err;
+ }
lf_info->wrote_create_file= 1;
}
}
ret:
- int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
+ res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
+err:
+#ifndef DBUG_OFF
+ if (did_dbug_inject)
+ DBUG_SET("-d,simulate_file_write_error");
+#endif
DBUG_RETURN(res);
}