summaryrefslogtreecommitdiffstats
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc131
1 files changed, 77 insertions, 54 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 68649a95..5c2e0cb9 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -1128,10 +1128,8 @@ void wsrep_recover()
}
}
-
-void wsrep_stop_replication(THD *thd)
+static void wsrep_stop_replication_common(THD *thd)
{
- WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0);
if (Wsrep_server_state::instance().state() !=
Wsrep_server_state::s_disconnected)
{
@@ -1144,10 +1142,10 @@ void wsrep_stop_replication(THD *thd)
}
}
- /* my connection, should not terminate with wsrep_close_client_connection(),
- make transaction to rollback
- */
- if (thd && !thd->wsrep_applier) trans_rollback(thd);
+ /* my connection, should not terminate with
+ wsrep_close_client_connections(), make transaction to rollback */
+ if (thd && !thd->wsrep_applier)
+ trans_rollback(thd);
wsrep_close_client_connections(TRUE, thd);
/* wait until appliers have stopped */
@@ -1156,26 +1154,16 @@ void wsrep_stop_replication(THD *thd)
node_uuid= WSREP_UUID_UNDEFINED;
}
+void wsrep_stop_replication(THD *thd)
+{
+ WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0);
+ wsrep_stop_replication_common(thd);
+}
+
void wsrep_shutdown_replication()
{
WSREP_INFO("Shutdown replication");
- if (Wsrep_server_state::instance().state() != wsrep::server_state::s_disconnected)
- {
- WSREP_DEBUG("Disconnect provider");
- Wsrep_server_state::instance().disconnect();
- if (Wsrep_server_state::instance().wait_until_state(
- Wsrep_server_state::s_disconnected))
- {
- WSREP_WARN("Wsrep interrupted while waiting for disconnected state");
- }
- }
-
- wsrep_close_client_connections(TRUE);
-
- /* wait until appliers have stopped */
- wsrep_wait_appliers_close(NULL);
- node_uuid= WSREP_UUID_UNDEFINED;
-
+ wsrep_stop_replication_common(nullptr);
/* Undocking the thread specific data. */
set_current_thd(nullptr);
}
@@ -3243,7 +3231,9 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
}
}
else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
- granted_thd->mdl_context.has_explicit_locks())
+ /* System transactions with explicit locks are BACKUP. */
+ (granted_thd->system_thread != NON_SYSTEM_THREAD &&
+ granted_thd->mdl_context.has_explicit_locks()))
{
WSREP_DEBUG("BF thread waiting for FLUSH for %s",
wsrep_thd_query(request_thd));
@@ -3348,14 +3338,20 @@ static my_bool have_client_connections(THD *thd, void*)
{
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) thd->thread_id));
- if (is_client_connection(thd) &&
- (thd->killed == KILL_CONNECTION ||
- thd->killed == KILL_CONNECTION_HARD))
+ if (is_client_connection(thd))
{
- (void)abort_replicated(thd);
- return 1;
+ if (thd->killed == KILL_CONNECTION ||
+ thd->killed == KILL_CONNECTION_HARD)
+ {
+ (void)abort_replicated(thd);
+ return true;
+ }
+ if (thd->get_stmt_da()->is_eof())
+ {
+ return true;
+ }
}
- return 0;
+ return false;
}
static void wsrep_close_thread(THD *thd)
@@ -3393,14 +3389,24 @@ static my_bool kill_all_threads(THD *thd, THD *caller_thd)
/* We skip slave threads & scheduler on this first loop through. */
if (is_client_connection(thd) && thd != caller_thd)
{
+ if (thd->get_stmt_da()->is_eof())
+ {
+ return 0;
+ }
+
if (is_replaying_connection(thd))
+ {
thd->set_killed(KILL_CONNECTION_HARD);
- else if (!abort_replicated(thd))
+ return 0;
+ }
+
+ if (!abort_replicated(thd))
{
/* replicated transactions must be skipped */
WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id);
/* instead of wsrep_close_thread() we do now soft kill by THD::awake */
thd->awake(KILL_CONNECTION_HARD);
+ return 0;
}
}
return 0;
@@ -3412,6 +3418,7 @@ static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
if (is_client_connection(thd) &&
!abort_replicated(thd) &&
!is_replaying_connection(thd) &&
+ !thd->get_stmt_da()->is_eof() &&
thd_is_connection_alive(thd) &&
thd != caller_thd)
{
@@ -3906,7 +3913,7 @@ void wsrep_ready_set(bool ready_value)
mysql_mutex_lock(&LOCK_wsrep_ready);
wsrep_ready= ready_value;
// Signal if we have reached ready state
- if (wsrep_ready)
+ if (ready_value)
mysql_cond_signal(&COND_wsrep_ready);
mysql_mutex_unlock(&LOCK_wsrep_ready);
}
@@ -3921,21 +3928,11 @@ void wsrep_ready_set(bool ready_value)
step is performed to leave the wsrep transaction in the state as it
never existed.
- This should not be an inline functions as it requires a lot of stack space
- because of WSREP_DBUG() usage. It's also not a function that is
- frequently called.
*/
void wsrep_commit_empty(THD* thd, bool all)
{
DBUG_ENTER("wsrep_commit_empty");
- WSREP_DEBUG("wsrep_commit_empty for %llu client_state %s client_mode"
- " %s trans_state %s sql %s",
- thd_get_thread_id(thd),
- wsrep::to_c_string(thd->wsrep_cs().state()),
- wsrep::to_c_string(thd->wsrep_cs().mode()),
- wsrep::to_c_string(thd->wsrep_cs().transaction().state()),
- wsrep_thd_query(thd));
if (wsrep_is_real(thd, all) &&
wsrep_thd_is_local(thd) &&
@@ -3943,14 +3940,40 @@ void wsrep_commit_empty(THD* thd, bool all)
!thd->internal_transaction() &&
thd->wsrep_trx().state() != wsrep::transaction::s_committed)
{
- /* Here transaction is either empty (i.e. no changes) or
- it was CREATE TABLE with no row binlog format or
- we have already aborted transaction e.g. because max writeset size
- has been reached. */
- DBUG_ASSERT(!wsrep_has_changes(thd) ||
- (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
- !thd->is_current_stmt_binlog_format_row()) ||
- thd->wsrep_cs().transaction().state() == wsrep::transaction::s_aborted);
+#ifndef DBUG_OFF
+ const bool empty= !wsrep_has_changes(thd);
+ const bool create= thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
+ !thd->is_current_stmt_binlog_format_row();
+ const bool aborted= thd->wsrep_cs().transaction().state() == wsrep::transaction::s_aborted;
+ const bool ddl_replay= ((sql_command_flags[thd->lex->sql_command] &
+ (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) &&
+ thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay);
+ /* Here transaction is either
+ (1) empty (i.e. no changes) or
+ (2) it was CREATE TABLE with no row binlog format or
+ (3) we have already aborted transaction e.g. because max writeset size
+ has been reached or
+ (4) it was DDL and got BF aborted and must replay.
+ */
+ if(!(empty || create || aborted || ddl_replay))
+ {
+ WSREP_DEBUG("wsrep_commit_empty: thread: %llu client_state: %s client_mode:"
+ " %s trans_state: %s error: %s empty: %d create: %d aborted:"
+ " %d ddl_replay: %d sql: %s",
+ thd_get_thread_id(thd),
+ wsrep::to_c_string(thd->wsrep_cs().state()),
+ wsrep::to_c_string(thd->wsrep_cs().mode()),
+ wsrep::to_c_string(thd->wsrep_cs().transaction().state()),
+ wsrep::to_c_string(thd->wsrep_cs().current_error()),
+ empty, create, aborted, ddl_replay,
+ wsrep_thd_query(thd));
+
+ DBUG_ASSERT(empty || // 1
+ create || // 2
+ aborted || // 3
+ ddl_replay); // 4
+ }
+#endif /* DBUG_OFF */
bool have_error= wsrep_current_error(thd);
int ret= wsrep_before_rollback(thd, all) ||
wsrep_after_rollback(thd, all) ||
@@ -3964,10 +3987,10 @@ void wsrep_commit_empty(THD* thd, bool all)
DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error);
thd->wsrep_cs().reset_error();
}
+
if (ret)
- {
- WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd));
- }
+ WSREP_DEBUG("wsrep_commit_empty failed: %s",
+ wsrep::to_c_string(thd->wsrep_cs().current_error()));
}
DBUG_VOID_RETURN;
}