From 06eaf7232e9a920468c0f8d74dcf2fe8b555501c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Apr 2024 14:24:36 +0200 Subject: Adding upstream version 1:10.11.6. Signed-off-by: Daniel Baumann --- sql/wsrep_sst.cc | 2161 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2161 insertions(+) create mode 100644 sql/wsrep_sst.cc (limited to 'sql/wsrep_sst.cc') diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc new file mode 100644 index 00000000..db138f25 --- /dev/null +++ b/sql/wsrep_sst.cc @@ -0,0 +1,2161 @@ +/* Copyright 2008-2022 Codership Oy + Copyright (c) 2008, 2022, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#include "mariadb.h" +#include "wsrep_sst.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "wsrep_priv.h" +#include "wsrep_utils.h" +#include "wsrep_xid.h" +#include "wsrep_thd.h" +#include "wsrep_server_state.h" + +#include +#include +#include "debug_sync.h" + +#include + +static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 + + sizeof(WSREP_SST_OPT_CONF) + + sizeof(WSREP_SST_OPT_CONF_SUFFIX) + + sizeof(WSREP_SST_OPT_CONF_EXTRA)]= {0}; + +const char* wsrep_sst_method = WSREP_SST_DEFAULT; +const char* wsrep_sst_receive_address= WSREP_SST_ADDRESS_AUTO; +const char* wsrep_sst_donor = ""; +const char* wsrep_sst_auth = NULL; + +// container for real auth string +static const char* sst_auth_real = NULL; +my_bool wsrep_sst_donor_rejects_queries= FALSE; + +#define WSREP_EXTEND_TIMEOUT_INTERVAL 60 +#define WSREP_TIMEDWAIT_SECONDS 30 + +bool sst_joiner_completed = false; +bool sst_donor_completed = false; + +struct sst_thread_arg +{ + const char* cmd; + char** env; + char* ret_str; + int err; + mysql_mutex_t lock; + mysql_cond_t cond; + + sst_thread_arg (const char* c, char** e) + : cmd(c), env(e), ret_str(0), err(-1) + { + mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL); + } + + ~sst_thread_arg() + { + mysql_cond_destroy (&cond); + mysql_mutex_unlock (&lock); + mysql_mutex_destroy (&lock); + } +}; + +static void wsrep_donor_monitor_end(void) +{ + mysql_mutex_lock(&LOCK_wsrep_donor_monitor); + sst_donor_completed= true; + mysql_cond_signal(&COND_wsrep_donor_monitor); + mysql_mutex_unlock(&LOCK_wsrep_donor_monitor); +} + +static void wsrep_joiner_monitor_end(void) +{ + mysql_mutex_lock(&LOCK_wsrep_joiner_monitor); + sst_joiner_completed= true; + mysql_cond_signal(&COND_wsrep_joiner_monitor); + mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor); +} + +static void* wsrep_sst_donor_monitor_thread(void *arg __attribute__((unused))) +{ + int ret= 0; + unsigned long time_waited= 0; + + mysql_mutex_lock(&LOCK_wsrep_donor_monitor); + + WSREP_INFO("Donor monitor thread started to monitor"); + + wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can + // operate with wsrep_ready == OFF + + while (!sst_donor_completed) + { + timespec ts; + set_timespec(ts, WSREP_TIMEDWAIT_SECONDS); + time_t start_time= time(NULL); + ret= mysql_cond_timedwait(&COND_wsrep_donor_monitor, &LOCK_wsrep_donor_monitor, &ts); + time_t end_time= time(NULL); + time_waited+= difftime(end_time, start_time); + + if (ret == ETIMEDOUT && !sst_donor_completed) + { + WSREP_DEBUG("Donor waited %lu sec, extending systemd startup timeout as SST" + "is not completed", + time_waited); + service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL, + "WSREP state transfer ongoing..."); + } + } + + WSREP_INFO("Donor monitor thread ended with total time %lu sec", time_waited); + mysql_mutex_unlock(&LOCK_wsrep_donor_monitor); + + return NULL; +} + +static void* wsrep_sst_joiner_monitor_thread(void *arg __attribute__((unused))) +{ + int ret= 0; + unsigned long time_waited= 0; + + mysql_mutex_lock(&LOCK_wsrep_joiner_monitor); + + WSREP_INFO("Joiner monitor thread started to monitor"); + + wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can + // operate with wsrep_ready == OFF + + while (!sst_joiner_completed) + { + timespec ts; + set_timespec(ts, WSREP_TIMEDWAIT_SECONDS); + time_t start_time= time(NULL); + ret= mysql_cond_timedwait(&COND_wsrep_joiner_monitor, &LOCK_wsrep_joiner_monitor, &ts); + time_t end_time= time(NULL); + time_waited+= difftime(end_time, start_time); + + if (ret == ETIMEDOUT && !sst_joiner_completed) + { + WSREP_DEBUG("Joiner waited %lu sec, extending systemd startup timeout as SST" + "is not completed", + time_waited); + service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL, + "WSREP state transfer ongoing..."); + } + } + + WSREP_INFO("Joiner monitor thread ended with total time %lu sec", time_waited); + mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor); + + return NULL; +} + +/* return true if character can be a part of a filename */ +static bool filename_char(int const c) +{ + return isalnum(c) || (c == '-') || (c == '_') || (c == '.'); +} + +/* return true if character can be a part of an address string */ +static bool address_char(int const c) +{ + return filename_char(c) || + (c == ':') || (c == '[') || (c == ']') || (c == '/'); +} + +static bool check_request_str(const char* const str, + bool (*check) (int c), + bool log_warn = true) +{ + for (size_t i(0); str[i] != '\0'; ++i) + { + if (!check(str[i])) + { + if (log_warn) WSREP_WARN("Illegal character in state transfer request: %i (%c).", + str[i], str[i]); + return true; + } + } + + return false; +} + +bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) +{ + if ((! var->save_result.string_value.str) || + (var->save_result.string_value.length == 0 )) + { + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + var->save_result.string_value.str ? + var->save_result.string_value.str : "NULL"); + return 1; + } + + /* check also that method name is alphanumeric string */ + if (check_request_str(var->save_result.string_value.str, + filename_char, false)) + { + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + var->save_result.string_value.str ? + var->save_result.string_value.str : "NULL"); + return 1; + } + + return 0; +} + +static const char* data_home_dir; + +void wsrep_set_data_home_dir(const char *data_dir) +{ + data_home_dir= (data_dir && *data_dir) ? data_dir : NULL; +} + +static void make_wsrep_defaults_file() +{ + if (!wsrep_defaults_file[0]) + { + char *ptr= wsrep_defaults_file; + char *end= ptr + sizeof(wsrep_defaults_file); + if (my_defaults_file) + ptr= strxnmov(ptr, end - ptr, + WSREP_SST_OPT_CONF, " '", my_defaults_file, "' ", NULL); + + if (my_defaults_extra_file) + ptr= strxnmov(ptr, end - ptr, + WSREP_SST_OPT_CONF_EXTRA, " '", my_defaults_extra_file, "' ", NULL); + + if (my_defaults_group_suffix) + ptr= strxnmov(ptr, end - ptr, + WSREP_SST_OPT_CONF_SUFFIX, " '", my_defaults_group_suffix, "' ", NULL); + } +} + + +bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var) +{ + if ((! var->save_result.string_value.str) || + (var->save_result.string_value.length > (FN_REFLEN - 1))) // safety + { + goto err; + } + + return 0; + +err: + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + var->save_result.string_value.str ? + var->save_result.string_value.str : "NULL"); + return 1; +} + +bool wsrep_sst_receive_address_update (sys_var *self, THD* thd, + enum_var_type type) +{ + return 0; +} + +bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var) +{ + return 0; +} + +static bool sst_auth_real_set (const char* value) +{ + const char* v= NULL; + + if (value) + { + v= my_strdup(PSI_INSTRUMENT_ME, value, MYF(0)); + } + else // its NULL + { + wsrep_sst_auth_free(); + return 0; + } + + if (v) + { + // set sst_auth_real + if (sst_auth_real) { my_free((void *) sst_auth_real); } + sst_auth_real= v; + + // mask wsrep_sst_auth + if (strlen(sst_auth_real)) + { + if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); } + wsrep_sst_auth= my_strdup(PSI_INSTRUMENT_ME, WSREP_SST_AUTH_MASK, MYF(0)); + } + else + { + if (wsrep_sst_auth) { my_free((void*) wsrep_sst_auth); } + wsrep_sst_auth= NULL; + } + + return 0; + } + return 1; +} + +void wsrep_sst_auth_free() +{ + if (wsrep_sst_auth) { my_free((void *) wsrep_sst_auth); } + if (sst_auth_real) { my_free((void *) sst_auth_real); } + wsrep_sst_auth= NULL; + sst_auth_real= NULL; +} + +bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type) +{ + return sst_auth_real_set (wsrep_sst_auth); +} + +void wsrep_sst_auth_init () +{ + sst_auth_real_set(wsrep_sst_auth); +} + +bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var) +{ + if ((! var->save_result.string_value.str) || + (var->save_result.string_value.length > (FN_REFLEN -1))) // safety + { + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + var->save_result.string_value.str ? + var->save_result.string_value.str : "NULL"); + return 1; + } + + return 0; +} + +bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type) +{ + return 0; +} + + +bool wsrep_before_SE() +{ + return (wsrep_provider != NULL + && strcmp (wsrep_provider, WSREP_NONE) + && strcmp (wsrep_sst_method, WSREP_SST_SKIP) + && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP)); +} + +// Signal end of SST +static bool wsrep_sst_complete (THD* thd, + int const rcode, + wsrep::gtid const sst_gtid) +{ + Wsrep_client_service client_service(thd, thd->wsrep_cs()); + Wsrep_server_state& server_state= Wsrep_server_state::instance(); + enum wsrep::server_state::state state= server_state.state(); + bool failed= false; + char start_pos_buf[FN_REFLEN]; + ssize_t len= wsrep::print_to_c_str(sst_gtid, start_pos_buf, FN_REFLEN-1); + start_pos_buf[len]='\0'; + + // Do not call sst_received if we are not in joiner or + // initialized state on server. This is because it + // assumes we are on those states. Give error if we are + // in incorrect state. + if ((state == Wsrep_server_state::s_joiner || + state == Wsrep_server_state::s_initialized)) + { + if (Wsrep_server_state::instance().sst_received(client_service, rcode)) + { + failed= true; + } + else + { + WSREP_INFO("SST succeeded for position %s", start_pos_buf); + } + } + else + { + WSREP_ERROR("SST failed for position %s initialized %d server_state %s", + start_pos_buf, + server_state.is_initialized(), + wsrep::to_c_string(state)); + failed= true; + } + + wsrep_joiner_monitor_end(); + return failed; +} + + /* + If wsrep provider is loaded, inform that the new state snapshot + has been received. Also update the local checkpoint. + + @param thd [IN] + @param uuid [IN] Initial state UUID + @param seqno [IN] Initial state sequence number + @param state [IN] Always NULL, also ignored by wsrep provider (?) + @param state_len [IN] Always 0, also ignored by wsrep provider (?) + @return true when successful, false if error +*/ +bool wsrep_sst_received (THD* thd, + const wsrep_uuid_t& uuid, + wsrep_seqno_t const seqno, + const void* const state, + size_t const state_len) +{ + bool error= false; + /* + To keep track of whether the local uuid:seqno should be updated. Also, note + that local state (uuid:seqno) is updated/checkpointed only after we get an + OK from wsrep provider. By doing so, the values remain consistent across + the server & wsrep provider. + */ + /* + TODO: Handle backwards compatibility. WSREP API v25 does not have + wsrep schema. + */ + /* + Logical SST methods (mysqldump etc) don't update InnoDB sys header. + Reset the SE checkpoint before recovering view in order to avoid + sanity check failure. + */ + wsrep::gtid const sst_gtid(wsrep::id(uuid.data, sizeof(uuid.data)), + wsrep::seqno(seqno)); + + if (!wsrep_before_SE()) { + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); + wsrep_set_SE_checkpoint(sst_gtid, wsrep_gtid_server.gtid()); + } + wsrep_verify_SE_checkpoint(uuid, seqno); + + /* + Both wsrep_init_SR() and wsrep_recover_view() may use + wsrep thread pool. Restore original thd context before returning. + */ + if (thd) { + wsrep_store_threadvars(thd); + } + else { + set_current_thd(nullptr); + } + + /* During sst WSREP(thd) is not yet set for joiner. */ + if (WSREP_ON) + { + int const rcode(seqno < 0 ? seqno : 0); + error= wsrep_sst_complete(thd,rcode, sst_gtid); + } + + return error; +} + +static int sst_scan_uuid_seqno (const char* str, + wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) +{ + int offt= wsrep_uuid_scan (str, strlen(str), uuid); + errno= 0; /* Reset the errno */ + if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt]) + { + *seqno= strtoll (str + offt + 1, NULL, 10); + if (*seqno != LLONG_MAX || errno != ERANGE) + { + return 0; + } + } + + WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str); + return -EINVAL; +} + +// get rid of trailing \n +static char* my_fgets (char* buf, size_t buf_len, FILE* stream) +{ + char* ret= fgets (buf, buf_len, stream); + + if (ret) + { + size_t len= strlen(ret); + if (len > 0 && ret[len - 1] == '\n') ret[len - 1]= '\0'; + } + + return ret; +} + +/* + Generate "name 'value'" string. +*/ +static char* generate_name_value(const char* name, const char* value) +{ + size_t name_len= strlen(name); + size_t value_len= strlen(value); + char* buf= + (char*) my_malloc(PSI_INSTRUMENT_ME, (name_len + value_len + 5), MYF(0)); + if (buf) + { + char* ref= buf; + *ref++ = ' '; + memcpy(ref, name, name_len * sizeof(char)); + ref += name_len; + *ref++ = ' '; + *ref++ = '\''; + memcpy(ref, value, value_len * sizeof(char)); + ref += value_len; + *ref++ = '\''; + *ref = 0; + } + return buf; +} +/* + Generate binlog option string for sst_donate_other(), sst_prepare_other(). + + Returns zero on success, negative error code otherwise. + + String containing binlog name is stored in param ret if binlog is enabled + and GTID mode is on, otherwise empty string. Returned string should be + freed with my_free(). + */ +static int generate_binlog_opt_val(char** ret) +{ + DBUG_ASSERT(ret); + *ret= NULL; + if (opt_bin_log) + { + assert(opt_bin_logname); + *ret= strcmp(opt_bin_logname, "0") ? + generate_name_value(WSREP_SST_OPT_BINLOG, + opt_bin_logname) : + my_strdup(PSI_INSTRUMENT_ME, "", MYF(0)); + } + else + { + *ret= my_strdup(PSI_INSTRUMENT_ME, "", MYF(0)); + } + if (!*ret) return -ENOMEM; + return 0; +} + +static int generate_binlog_index_opt_val(char** ret) +{ + DBUG_ASSERT(ret); + *ret= NULL; + if (opt_binlog_index_name) + { + *ret= strcmp(opt_binlog_index_name, "0") ? + generate_name_value(WSREP_SST_OPT_BINLOG_INDEX, + opt_binlog_index_name) : + my_strdup(PSI_INSTRUMENT_ME, "", MYF(0)); + } + else + { + *ret= my_strdup(PSI_INSTRUMENT_ME, "", MYF(0)); + } + if (!*ret) return -ENOMEM; + return 0; +} + +// report progress event +static void sst_report_progress(int const from, + long long const total_prev, + long long const total, + long long const complete) +{ + static char buf[128] = { '\0', }; + static size_t const buf_len= sizeof(buf) - 1; + snprintf(buf, buf_len, + "{ \"from\": %d, \"to\": %d, \"total\": %lld, \"done\": %lld, " + "\"indefinite\": -1 }", + from, WSREP_MEMBER_JOINED, total_prev + total, total_prev +complete); + WSREP_DEBUG("REPORTING SST PROGRESS: '%s'", buf); +} + +// process "complete" event from SST script feedback +static void sst_handle_complete(const char* const input, + long long const total_prev, + long long* total, + long long* complete, + int const from) +{ + long long x; + int n= sscanf(input, " %lld", &x); + if (n > 0 && x > *complete) + { + *complete= x; + if (*complete > *total) *total= *complete; + sst_report_progress(from, total_prev, *total, *complete); + } +} + +// process "total" event from SST script feedback +static void sst_handle_total(const char* const input, + long long* total_prev, + long long* total, + long long* complete, + int const from) +{ + long long x; + int n= sscanf(input, " %lld", &x); + if (n > 0) + { + // new stage starts, update total_prev + *total_prev+= *total; + *total= x; + *complete= 0; + sst_report_progress(from, *total_prev, *total, *complete); + } +} + +static void* sst_joiner_thread (void* a) +{ + sst_thread_arg* arg= (sst_thread_arg*) a; + int err= 1; + + { + THD* thd; + static const char magic[]= "ready"; + static const size_t magic_len= sizeof(magic) - 1; + const size_t out_len= 512; + char out[out_len]; + + WSREP_INFO("Running: '%s'", arg->cmd); + + wsp::process proc (arg->cmd, "r", arg->env); + + if (proc.pipe() && !proc.error()) + { + const char* tmp= my_fgets (out, out_len, proc.pipe()); + + if (!tmp || strlen(tmp) < (magic_len + 2) || + strncasecmp (tmp, magic, magic_len)) + { + WSREP_ERROR("Failed to read '%s ' from: %s\n\tRead: '%s'", + magic, arg->cmd, tmp); + proc.wait(); + if (proc.error()) err= proc.error(); + } + else + { + err= 0; + } + } + else + { + err= proc.error(); + WSREP_ERROR("Failed to execute: %s : %d (%s)", + arg->cmd, err, strerror(err)); + } + + /* + signal sst_prepare thread with ret code, + it will go on sending SST request + */ + mysql_mutex_lock (&arg->lock); + if (!err) + { + arg->ret_str= strdup (out + magic_len + 1); + if (!arg->ret_str) err= ENOMEM; + } + arg->err= -err; + mysql_cond_signal (&arg->cond); + mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. + + if (err) return NULL; /* lp:808417 - return immediately, don't signal + * initializer thread to ensure single thread of + * shutdown. */ + + wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; + wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; + + // current stage progress + long long total= 0; + long long complete= 0; + // previous stages cumulative progress + long long total_prev= 0; + + // in case of successful receiver start, wait for SST completion/end + const char* tmp= NULL; + err= EINVAL; + + wait_signal: + tmp= my_fgets (out, out_len, proc.pipe()); + + if (tmp) + { + static const char magic_total[]= "total"; + static const size_t total_len=strlen(magic_total); + static const char magic_complete[]= "complete"; + static const size_t complete_len=strlen(magic_complete); + static const int from= WSREP_MEMBER_JOINER; + + if (!strncasecmp (tmp, magic_complete, complete_len)) + { + sst_handle_complete(tmp + complete_len, total_prev, &total, &complete, + from); + goto wait_signal; + } + else if (!strncasecmp (tmp, magic_total, total_len)) + { + sst_handle_total(tmp + total_len, &total_prev, &total, &complete, from); + goto wait_signal; + } + } + else + { + WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from " + "joiner script."); + proc.wait(); + if (proc.error()) err= proc.error(); + } + + // this should be the final script output with GTID + if (tmp) + { + proc.wait(); + // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any). + const char *pos= strchr(out, ' '); + + if (!pos) { + + if (wsrep_gtid_mode) + { + // There is no wsrep_gtid_domain_id (some older version SST script?). + WSREP_WARN("Did not find domain ID from SST script output '%s'. " + "Domain ID must be set manually to keep binlog consistent", + out); + } + err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); + + } else { + // Scan state ID first followed by wsrep_gtid_domain_id. + unsigned long int domain_id; + + // Null-terminate the state-id. + out[pos - out]= 0; + err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); + + if (err) + { + goto err; + } + else if (wsrep_gtid_mode) + { + errno= 0; /* Reset the errno */ + domain_id= strtoul(pos + 1, NULL, 10); + err= errno; + + /* Check if we received a valid gtid_domain_id. */ + if (err == EINVAL || err == ERANGE) + { + WSREP_ERROR("Failed to get donor wsrep_gtid_domain_id."); + err= EINVAL; + goto err; + } else { + wsrep_gtid_server.domain_id= (uint32) domain_id; + wsrep_gtid_domain_id= (uint32)domain_id; + } + } + } + } + +err: + + wsrep::gtid ret_gtid; + + if (err) + { + ret_gtid= wsrep::gtid::undefined(); + } + else + { + ret_gtid= wsrep::gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)), + wsrep::seqno(ret_seqno)); + } + + /* + Tell initializer thread that SST is complete + For that initialize a THD + */ + if (my_thread_init()) + { + WSREP_ERROR("my_thread_init() failed, can't signal end of SST. " + "Aborting."); + unireg_abort(1); + } + + thd= new THD(next_thread_id()); + + if (!thd) + { + WSREP_ERROR("Failed to allocate THD to restore view from local state, " + "can't signal end of SST. Aborting."); + unireg_abort(1); + } + + thd->thread_stack= (char*) &thd; + thd->security_ctx->skip_grants(); + thd->system_thread= SYSTEM_THREAD_GENERIC; + thd->real_id= pthread_self(); + + wsrep_assign_from_threadvars(thd); + wsrep_store_threadvars(thd); + + /* */ + thd->variables.wsrep_on = 0; + /* No binlogging */ + thd->variables.sql_log_bin = 0; + thd->variables.option_bits &= ~OPTION_BIN_LOG; + /* No general log */ + thd->variables.option_bits |= OPTION_LOG_OFF; + /* Read committed isolation to avoid gap locking */ + thd->variables.tx_isolation= ISO_READ_COMMITTED; + + wsrep_sst_complete (thd, -err, ret_gtid); + + delete thd; + my_thread_end(); + } + + return NULL; +} + +#define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH" +#define WSREP_SST_REMOTE_AUTH_ENV "WSREP_SST_OPT_REMOTE_AUTH" +#define DATA_HOME_DIR_ENV "INNODB_DATA_HOME_DIR" + +static int sst_append_env_var(wsp::env& env, + const char* const var, + const char* const val) +{ + int const env_str_size= strlen(var) + 1 /* = */ + + (val ? strlen(val) : 0) + 1 /* \0 */; + + wsp::string env_str(env_str_size); // for automatic cleanup on return + if (!env_str()) return -ENOMEM; + + int ret= snprintf(env_str(), env_str_size, "%s=%s", var, val ? val : ""); + + if (ret < 0 || ret >= env_str_size) + { + WSREP_ERROR("sst_append_env_var(): snprintf(%s=%s) failed: %d", + var, val, ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + env.append(env_str()); + return -env.error(); +} + +#ifdef _WIN32 +/* + Space, single quote, ampersand, backquote, I/O redirection + characters, caret, all brackets, plus, exclamation and comma + characters require text to be enclosed in double quotes: +*/ +#define IS_SPECIAL(c) \ + (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \ + c == '>' || c == '<' || c == ';' || c == '^' || \ + c == '[' || c == ']' || c == '{' || c == '}' || \ + c == '(' || c == ')' || c == '+' || c == '!' || \ + c == ',') +/* + Inside values, equals character are interpreted as special + character and requires quotation: +*/ +#define IS_SPECIAL_V(c) (IS_SPECIAL(c) || c == '=') +/* + Double quotation mark and percent characters require escaping: +*/ +#define IS_REQ_ESCAPING(c) (c == '""' || c == '%') +#else +/* + Space, single quote, ampersand, backquote, and I/O redirection + characters require text to be enclosed in double quotes. The + semicolon is used to separate shell commands, so it must be + enclosed in double quotes as well: +*/ +#define IS_SPECIAL(c) \ + (isspace(c) || c == '\'' || c == '&' || c == '`' || c == '|' || \ + c == '>' || c == '<' || c == ';') +/* + Inside values, characters are interpreted as in parameter names: +*/ +#define IS_SPECIAL_V(c) IS_SPECIAL(c) +/* + Double quotation mark and backslash characters require + backslash prefixing, the dollar symbol is used to substitute + a variable value, therefore it also requires escaping: +*/ +#define IS_REQ_ESCAPING(c) (c == '"' || c == '\\' || c == '$') +#endif + +static size_t estimate_cmd_len (bool* extra_args) +{ + /* + The length of the area reserved for the control parameters + of the SST script (excluding the copying of the original + mysqld arguments): + */ + size_t cmd_len= 4096; + bool extra= false; + /* + If mysqld was started with arguments, add them all: + */ + if (orig_argc > 1) + { + for (int i = 1; i < orig_argc; i++) + { + const char* arg= orig_argv[i]; + size_t n= strlen(arg); + if (n == 0) continue; + cmd_len += n; + bool quotation= false; + char c; + while ((c = *arg++) != 0) + { + if (IS_SPECIAL(c)) + { + quotation= true; + } + else if (IS_REQ_ESCAPING(c)) + { + cmd_len++; +#ifdef _WIN32 + quotation= true; +#endif + } + /* + If the equals symbol is encountered, then we need to separately + process the right side: + */ + else if (c == '=') + { + /* Perhaps we need to quote the left part of the argument: */ + if (quotation) + { + cmd_len += 2; + /* + Reset the quotation flag, since now the status for + the right side of the expression will be saved here: + */ + quotation= false; + } + while ((c = *arg++) != 0) + { + if (IS_SPECIAL_V(c)) + { + quotation= true; + } + else if (IS_REQ_ESCAPING(c)) + { + cmd_len++; +#ifdef _WIN32 + quotation= true; +#endif + } + } + break; + } + } + /* Perhaps we need to quote the entire argument or its right part: */ + if (quotation) + { + cmd_len += 2; + } + } + extra = true; + cmd_len += strlen(WSREP_SST_OPT_MYSQLD); + /* + Add the separating spaces between arguments, + and one additional space before "--mysqld-args": + */ + cmd_len += orig_argc; + } + *extra_args= extra; + return cmd_len; +} + +static void copy_orig_argv (char* cmd_str) +{ + /* + If mysqld was started with arguments, copy them all: + */ + if (orig_argc > 1) + { + size_t n = strlen(WSREP_SST_OPT_MYSQLD); + *cmd_str++ = ' '; + memcpy(cmd_str, WSREP_SST_OPT_MYSQLD, n * sizeof(char)); + cmd_str += n; + for (int i = 1; i < orig_argc; i++) + { + char* arg= orig_argv[i]; + n = strlen(arg); + if (n == 0) continue; + *cmd_str++ = ' '; + bool quotation= false; + bool plain= true; + char *arg_scan= arg; + char c; + while ((c = *arg_scan++) != 0) + { + if (IS_SPECIAL(c)) + { + quotation= true; + } + else if (IS_REQ_ESCAPING(c)) + { + plain= false; +#ifdef _WIN32 + quotation= true; +#endif + } + /* + If the equals symbol is encountered, then we need to separately + process the right side: + */ + else if (c == '=') + { + /* Calculate length of the Left part of the argument: */ + size_t m = (size_t) (arg_scan - arg) - 1; + if (m) + { + /* Perhaps we need to quote the left part of the argument: */ + if (quotation) + { + *cmd_str++ = '"'; + } + /* + If there were special characters inside, then we can use + the fast memcpy function: + */ + if (plain) + { + memcpy(cmd_str, arg, m * sizeof(char)); + cmd_str += m; + /* Left part of the argument has already been processed: */ + n -= m; + arg += m; + } + /* Otherwise we need to prefix individual characters: */ + else + { + n -= m; + while (m) + { + c = *arg++; + if (IS_REQ_ESCAPING(c)) + { +#ifdef _WIN32 + *cmd_str++ = c; +#else + *cmd_str++ = '\\'; +#endif + } + *cmd_str++ = c; + m--; + } + /* + Reset the plain string flag, since now the status for + the right side of the expression will be saved here: + */ + plain= true; + } + /* Perhaps we need to quote the left part of the argument: */ + if (quotation) + { + *cmd_str++ = '"'; + /* + Reset the quotation flag, since now the status for + the right side of the expression will be saved here: + */ + quotation= false; + } + } + /* Copy equals symbol: */ + *cmd_str++ = '='; + arg++; + n--; + /* Let's deal with the left side of the expression: */ + while ((c = *arg_scan++) != 0) + { + if (IS_SPECIAL_V(c)) + { + quotation= true; + } + else if (IS_REQ_ESCAPING(c)) + { + plain= false; +#ifdef _WIN32 + quotation= true; +#endif + } + } + break; + } + } + if (n) + { + /* Perhaps we need to quote the entire argument or its right part: */ + if (quotation) + { + *cmd_str++ = '"'; + } + /* + If there were no special characters inside, then we can use + the fast memcpy function: + */ + if (plain) + { + memcpy(cmd_str, arg, n * sizeof(char)); + cmd_str += n; + } + /* Otherwise we need to prefix individual characters: */ + else + { + while ((c = *arg++) != 0) + { + if (IS_REQ_ESCAPING(c)) + { +#ifdef _WIN32 + *cmd_str++ = c; +#else + *cmd_str++ = '\\'; +#endif + } + *cmd_str++ = c; + } + } + /* Perhaps we need to quote the entire argument or its right part: */ + if (quotation) + { + *cmd_str++ = '"'; + } + } + } + /* + Add a terminating null character (not counted in the length, + since we've overwritten the original null character which + was previously added by snprintf: + */ + *cmd_str = 0; + } +} + +static ssize_t sst_prepare_other (const char* method, + const char* sst_auth, + const char* addr_in, + const char** addr_out) +{ + bool extra_args; + size_t const cmd_len= estimate_cmd_len(&extra_args); + wsp::string cmd_str(cmd_len); + + if (!cmd_str()) + { + WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %zd bytes", + cmd_len); + return -ENOMEM; + } + + char* binlog_opt_val= NULL; + char* binlog_index_opt_val= NULL; + + int ret; + if ((ret= generate_binlog_opt_val(&binlog_opt_val))) + { + WSREP_ERROR("sst_prepare_other(): generate_binlog_opt_val() failed: %d", + ret); + return ret; + } + + if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val))) + { + WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d", + ret); + if (binlog_opt_val) my_free(binlog_opt_val); + return ret; + } + + make_wsrep_defaults_file(); + + ret= snprintf (cmd_str(), cmd_len, + "wsrep_sst_%s " + WSREP_SST_OPT_ROLE " 'joiner' " + WSREP_SST_OPT_ADDR " '%s' " + WSREP_SST_OPT_DATA " '%s' " + "%s" + WSREP_SST_OPT_PARENT " %d " + WSREP_SST_OPT_PROGRESS " %d" + "%s" + "%s", + method, addr_in, mysql_real_data_home, + wsrep_defaults_file, + (int)getpid(), + wsrep_debug ? 1 : 0, + binlog_opt_val, binlog_index_opt_val); + + my_free(binlog_opt_val); + my_free(binlog_index_opt_val); + + if (ret < 0 || size_t(ret) >= cmd_len) + { + WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + if (extra_args) + copy_orig_argv(cmd_str() + ret); + + wsp::env env(NULL); + if (env.error()) + { + WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error()); + return -env.error(); + } + + if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth))) + { + WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret); + return ret; + } + + if (data_home_dir) + { + if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir))) + { + WSREP_ERROR("sst_prepare_other(): appending data " + "directory failed: %d", ret); + return ret; + } + } + + pthread_t tmp, monitor; + sst_thread_arg arg(cmd_str(), env()); + + mysql_mutex_lock (&arg.lock); + + ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL); + + if (ret) + { + WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)", + ret, strerror(ret)); + return -ret; + } + + sst_joiner_completed= false; + + ret= mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg); + + if (ret) + { + WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)", + ret, strerror(ret)); + + pthread_detach(monitor); + return -ret; + } + + mysql_cond_wait (&arg.cond, &arg.lock); + + *addr_out= arg.ret_str; + + if (!arg.err) + ret= strlen(*addr_out); + else + { + assert (arg.err < 0); + ret= arg.err; + } + + pthread_detach (tmp); + pthread_detach (monitor); + + return ret; +} + +extern uint mysqld_port; + +/*! Just tells donor where to send mysqldump */ +static ssize_t sst_prepare_mysqldump (const char* addr_in, + const char** addr_out) +{ + ssize_t ret= strlen (addr_in); + + if (!strrchr(addr_in, ':')) + { + ssize_t s= ret + 7; + char* tmp= (char*) malloc (s); + + if (tmp) + { + ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port); + + if (ret > 0 && ret < s) + { + *addr_out= tmp; + return ret; + } + if (ret > 0) /* buffer too short */ ret= -EMSGSIZE; + free (tmp); + } + else { + ret= -ENOMEM; + } + + WSREP_ERROR ("Could not prepare state transfer request: " + "adding default port failed: %zd.", ret); + } + else { + *addr_out= addr_in; + } + + pthread_t monitor; + ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL); + + if (ret) + { + WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)", + ret, strerror(ret)); + return -ret; + } + + sst_joiner_completed= false; + pthread_detach (monitor); + + return ret; +} + +std::string wsrep_sst_prepare() +{ + const ssize_t ip_max= 256; + char ip_buf[ip_max]; + const char* addr_in= NULL; + const char* addr_out= NULL; + const char* method; + + if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP)) + { + return WSREP_STATE_TRANSFER_TRIVIAL; + } + + /* + Figure out SST receive address. Common for all SST methods. + */ + wsp::Address* addr_in_parser= NULL; + + // Attempt 1: wsrep_sst_receive_address + if (wsrep_sst_receive_address && + strcmp(wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) + { + addr_in_parser = new wsp::Address(wsrep_sst_receive_address); + + if (!addr_in_parser->is_valid()) + { + WSREP_ERROR("Could not parse wsrep_sst_receive_address : %s", + wsrep_sst_receive_address); + unireg_abort(1); + } + } + //Attempt 2: wsrep_node_address + else if (wsrep_node_address && *wsrep_node_address) + { + addr_in_parser = new wsp::Address(wsrep_node_address); + + if (addr_in_parser->is_valid()) + { + // we must not inherit the port number from this address: + addr_in_parser->set_port(0); + } + else + { + WSREP_ERROR("Could not parse wsrep_node_address : %s", + wsrep_node_address); + throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable"); + } + } + // Attempt 3: Try to get the IP from the list of available interfaces. + else + { + ssize_t ret= wsrep_guess_ip(ip_buf, ip_max); + + if (ret && ret < ip_max) + { + addr_in_parser = new wsp::Address(ip_buf); + } + else + { + WSREP_ERROR("Failed to guess address to accept state transfer. " + "wsrep_sst_receive_address must be set manually."); + throw wsrep::runtime_error("Could not prepare state transfer request"); + } + } + + assert(addr_in_parser); + + size_t len= addr_in_parser->get_address_len(); + bool is_ipv6= addr_in_parser->is_ipv6(); + const char* address= addr_in_parser->get_address(); + + if (len > (is_ipv6 ? ip_max - 2 : ip_max)) + { + WSREP_ERROR("Address to accept state transfer is too long: '%s'", + address); + unireg_abort(1); + } + + if (is_ipv6) + { + /* wsrep_sst_*.sh scripts requite ipv6 addreses to be in square breackets */ + ip_buf[0] = '['; + /* the length (len) already includes the null byte: */ + memcpy(ip_buf + 1, address, len - 1); + ip_buf[len] = ']'; + ip_buf[len + 1] = 0; + len += 2; + } + else + { + memcpy(ip_buf, address, len); + } + + int port= addr_in_parser->get_port(); + if (port) + { + size_t space= ip_max - len; + ip_buf[len - 1] = ':'; + int ret= snprintf(ip_buf + len, ip_max - len, "%d", port); + if (ret <= 0 || (size_t) ret > space) + { + WSREP_ERROR("Address to accept state transfer is too long: '%s:%d'", + address, port); + unireg_abort(1); + } + } + + delete addr_in_parser; + addr_in = ip_buf; + + ssize_t addr_len= -ENOSYS; + method = wsrep_sst_method; + if (!strcmp(method, WSREP_SST_MYSQLDUMP)) + { + addr_len= sst_prepare_mysqldump (addr_in, &addr_out); + if (addr_len < 0) + { + throw wsrep::runtime_error("Could not prepare mysqldimp address"); + } + } + else + { + /*! A heuristic workaround until we learn how to stop and start engines */ + if (Wsrep_server_state::instance().is_initialized() && + Wsrep_server_state::instance().state() == Wsrep_server_state::s_joiner) + { + if (!strcmp(method, WSREP_SST_XTRABACKUP) || + !strcmp(method, WSREP_SST_XTRABACKUPV2)) + { + WSREP_WARN("The %s SST method is deprecated, so it is automatically " + "replaced by %s", method, WSREP_SST_MARIABACKUP); + method = WSREP_SST_MARIABACKUP; + } + // we already did SST at initializaiton, now engines are running + // sql_print_information() is here because the message is too long + // for WSREP_INFO. + sql_print_information ("WSREP: " + "You have configured '%s' state snapshot transfer method " + "which cannot be performed on a running server. " + "Wsrep provider won't be able to fall back to it " + "if other means of state transfer are unavailable. " + "In that case you will need to restart the server.", + method); + return ""; + } + + addr_len = sst_prepare_other (method, sst_auth_real, + addr_in, &addr_out); + if (addr_len < 0) + { + WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", + method); + throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable"); + } + } + + std::string ret; + ret += method; + ret.push_back('\0'); + ret += addr_out; + + const char* method_ptr(ret.data()); + const char* addr_ptr(ret.data() + strlen(method_ptr) + 1); + WSREP_DEBUG("Prepared SST request: %s|%s", method_ptr, addr_ptr); + + if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out); + + return ret; +} + +// helper method for donors +static int sst_run_shell (const char* cmd_str, char** env, int max_tries) +{ + int ret= 0; + + for (int tries=1; tries <= max_tries; tries++) + { + wsp::process proc (cmd_str, "r", env); + + if (NULL != proc.pipe()) + { + proc.wait(); + } + + if ((ret= proc.error())) + { + WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)", + tries, max_tries, proc.cmd(), ret, strerror(ret)); + sleep (1); + } + else + { + WSREP_DEBUG("SST script successfully completed."); + break; + } + } + + return -ret; +} + +static void sst_reject_queries(my_bool close_conn) +{ + WSREP_INFO("Rejecting client queries for the duration of SST."); + if (TRUE == close_conn) wsrep_close_client_connections(FALSE); +} + +static int sst_donate_mysqldump (const char* addr, + const wsrep::gtid& gtid, + bool bypass, + char** env) // carries auth info +{ + char host[256]; + wsp::Address address(addr); + if (!address.is_valid()) + { + WSREP_ERROR("Could not parse SST address : %s", addr); + return 0; + } + memcpy(host, address.get_address(), address.get_address_len()); + int port= address.get_port(); + bool extra_args; + size_t const cmd_len= estimate_cmd_len(&extra_args); + wsp::string cmd_str(cmd_len); + + if (!cmd_str()) + { + WSREP_ERROR("sst_donate_mysqldump(): " + "could not allocate cmd buffer of %zd bytes", cmd_len); + return -ENOMEM; + } + + /* + we enable new client connections so that mysqldump donation can connect in, + but we reject local connections from modifyingcdata during SST, to keep + data intact + */ + if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); + + make_wsrep_defaults_file(); + + std::ostringstream uuid_oss; + uuid_oss << gtid.id(); + int ret= snprintf (cmd_str(), cmd_len, + "wsrep_sst_mysqldump " + WSREP_SST_OPT_ADDR " '%s' " + WSREP_SST_OPT_PORT " '%u' " + WSREP_SST_OPT_LPORT " '%u' " + WSREP_SST_OPT_SOCKET " '%s' " + "%s" + WSREP_SST_OPT_GTID " '%s:%lld,%d-%d-%llu' " + WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'" + "%s", + addr, port, mysqld_port, mysqld_unix_port, + wsrep_defaults_file, + uuid_oss.str().c_str(), gtid.seqno().get(), + wsrep_gtid_server.domain_id, wsrep_gtid_server.server_id, + wsrep_gtid_server.seqno(), + wsrep_gtid_server.domain_id, + bypass ? " " WSREP_SST_OPT_BYPASS : ""); + + if (ret < 0 || size_t(ret) >= cmd_len) + { + WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + if (extra_args) + copy_orig_argv(cmd_str() + ret); + + WSREP_DEBUG("Running: '%s'", cmd_str()); + + ret= sst_run_shell (cmd_str(), env, 3); + + wsrep::gtid sst_sent_gtid(ret == 0 ? + gtid : + wsrep::gtid(gtid.id(), + wsrep::seqno::undefined())); + Wsrep_server_state::instance().sst_sent(sst_sent_gtid, ret); + + wsrep_donor_monitor_end(); + + return ret; +} + +wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; + +/* + Create a file under data directory. +*/ +static int sst_create_file(const char *name, const char *content) +{ + int err= 0; + char *real_name; + char *tmp_name; + ssize_t len; + FILE *file; + + len= strlen(mysql_real_data_home) + strlen(name) + 2; + real_name= (char *) alloca(len); + + snprintf(real_name, (size_t) len, "%s/%s", mysql_real_data_home, name); + + tmp_name= (char *) alloca(len + 4); + snprintf(tmp_name, (size_t) len + 4, "%s.tmp", real_name); + + file= fopen(tmp_name, "w+"); + + if (0 == file) + { + err= errno; + WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err, strerror(err)); + } + else + { + // Write the specified content into the file. + if (content != NULL) + { + fprintf(file, "%s\n", content); + fsync(fileno(file)); + } + + fclose(file); + + if (rename(tmp_name, real_name) == -1) + { + err= errno; + WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name, + real_name, err, strerror(err)); + } + } + + return err; +} + +static int run_sql_command(THD *thd, const char *query) +{ + thd->set_query((char *)query, strlen(query)); + + Parser_state ps; + if (ps.init(thd, thd->query(), thd->query_length())) + { + WSREP_ERROR("SST query: %s failed", query); + return -1; + } + + mysql_parse(thd, thd->query(), thd->query_length(), &ps); + if (thd->is_error()) + { + int const err= thd->get_stmt_da()->sql_errno(); + WSREP_WARN ("Error executing '%s': %d (%s)", + query, err, thd->get_stmt_da()->message()); + thd->clear_error(); + return -1; + } + return 0; +} + +static int sst_flush_tables(THD* thd) +{ + WSREP_INFO("Flushing tables for SST..."); + + int err= 0; + int not_used; + /* + Files created to notify the SST script about the outcome of table flush + operation. + */ + const char *flush_success= "tables_flushed"; + const char *flush_error= "sst_error"; + + CHARSET_INFO *current_charset= thd->variables.character_set_client; + + if (!is_supported_parser_charset(current_charset)) + { + /* Do not use non-supported parser character sets */ + WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->cs_name.str); + thd->variables.character_set_client= &my_charset_latin1; + WSREP_WARN("For SST temporally setting character set to : %s", + my_charset_latin1.cs_name.str); + } + + if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK")) + { + err= -1; + } + else + { + /* + Make sure logs are flushed after global read lock acquired. In case + reload fails, we must also release the acquired FTWRL. + */ + if (reload_acl_and_cache(thd, REFRESH_ENGINE_LOG | REFRESH_BINARY_LOG, + (TABLE_LIST*) 0, ¬_used)) + { + thd->global_read_lock.unlock_global_read_lock(thd); + err= -1; + } + } + + thd->variables.character_set_client= current_charset; + + if (err) + { + WSREP_ERROR("Failed to flush and lock tables"); + + /* + The SST must be aborted as the flush tables failed. Notify this to SST + script by creating the error file. + */ + int tmp; + if ((tmp= sst_create_file(flush_error, NULL))) { + err= tmp; + } + } + else + { + ha_disable_internal_writes(true); + + WSREP_INFO("Tables flushed."); + + // Create a file with cluster state ID and wsrep_gtid_domain_id. + char content[100]; + snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid, + (long long)wsrep_locked_seqno, wsrep_gtid_server.domain_id); + err= sst_create_file(flush_success, content); + + if (err) + WSREP_INFO("Creating file for flush_success failed %d",err); + + const char base_name[]= "tables_flushed"; + ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; + char *real_name= (char*) my_malloc(key_memory_WSREP, full_len, 0); + sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); + char *tmp_name= (char*) my_malloc(key_memory_WSREP, full_len + 4, 0); + sprintf(tmp_name, "%s.tmp", real_name); + + FILE* file= fopen(tmp_name, "w+"); + if (0 == file) + { + err= errno; + WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err)); + } + else + { + Wsrep_server_state& server_state= Wsrep_server_state::instance(); + std::ostringstream uuid_oss; + + uuid_oss << server_state.current_view().state_id().id(); + + fprintf(file, "%s:%lld %u\n", + uuid_oss.str().c_str(), server_state.pause_seqno().get(), + wsrep_gtid_server.domain_id); + fsync(fileno(file)); + fclose(file); + if (rename(tmp_name, real_name) == -1) + { + err= errno; + WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", + tmp_name, real_name, err,strerror(err)); + } + } + my_free(real_name); + my_free(tmp_name); + if (err) + ha_disable_internal_writes(false); + } + + return err; +} + +static void* sst_donor_thread (void* a) +{ + sst_thread_arg* arg= (sst_thread_arg*)a; + + WSREP_INFO("Running: '%s'", arg->cmd); + + int err= 1; + bool locked= false; + + const char* out= NULL; + const size_t out_len= 128; + char out_buf[out_len]; + + wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED; + // seqno of complete SST + wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; + + // We turn off wsrep_on for this THD so that it can + // operate with wsrep_ready == OFF + // We also set this SST thread THD as system thread + wsp::thd thd(FALSE, true); + wsp::process proc(arg->cmd, "r", arg->env); + + err= -proc.error(); + +/* Inform server about SST script startup and release TO isolation */ + mysql_mutex_lock (&arg->lock); + arg->err= -err; + mysql_cond_signal (&arg->cond); + mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. + + if (proc.pipe() && !err) + { + long long total= 0; + long long complete= 0; + // total form previous stages + long long total_prev= 0; + +wait_signal: + out= my_fgets (out_buf, out_len, proc.pipe()); + + if (out) + { + static const char magic_flush[]= "flush tables"; + static const char magic_cont[]= "continue"; + static const char magic_done[]= "done"; + static const size_t done_len=strlen(magic_done); + static const char magic_total[]= "total"; + static const size_t total_len=strlen(magic_total); + static const char magic_complete[]= "complete"; + static const size_t complete_len=strlen(magic_complete); + static const int from= WSREP_MEMBER_DONOR; + + if (!strncasecmp (out, magic_complete, complete_len)) + { + sst_handle_complete(out + complete_len, total_prev, &total, &complete, + from); + goto wait_signal; + } + else if (!strncasecmp (out, magic_total, total_len)) + { + sst_handle_total(out + total_len, &total_prev, &total, &complete, from); + goto wait_signal; + } + else if (!strcasecmp (out, magic_flush)) + { + err= sst_flush_tables (thd.ptr); + + if (!err) + { + locked= true; + /* + Lets also keep statements that modify binary logs (like RESET LOGS, + RESET MASTER) from proceeding until the files have been transferred + to the joiner node. + */ + if (mysql_bin_log.is_open()) + mysql_mutex_lock(mysql_bin_log.get_log_lock()); + + WSREP_INFO("Donor state reached"); + +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("sync.wsrep_donor_state", + { + const char act[]= + "now " + "SIGNAL sync.wsrep_donor_state_reached " + "WAIT_FOR signal.wsrep_donor_state"; + assert(!debug_sync_set_action(thd.ptr, + STRING_WITH_LEN(act))); + };); +#endif + + goto wait_signal; + } + } + else if (!strcasecmp (out, magic_cont)) + { + if (locked) + { + locked= false; + ha_disable_internal_writes(false); + if (mysql_bin_log.is_open()) + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); + thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr); + } + err= 0; + goto wait_signal; + } + else if (!strncasecmp (out, magic_done, done_len)) + { + err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1, + &ret_uuid, &ret_seqno); + } + else + { + WSREP_WARN("Received unknown signal: '%s'", out); + err = -EINVAL; + proc.wait(); + } + } + else + { + WSREP_ERROR("Failed to read from: %s", proc.cmd()); + proc.wait(); + } + if (!err && proc.error()) err= -proc.error(); + } + else + { + WSREP_ERROR("Failed to execute: %s : %d (%s)", + proc.cmd(), err, strerror(err)); + } + + if (locked) // don't forget to unlock server before return + { + ha_disable_internal_writes(false); + if (mysql_bin_log.is_open()) + { + mysql_mutex_assert_owner(mysql_bin_log.get_log_lock()); + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); + } + thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr); + } + + wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)), + wsrep::seqno(err ? wsrep::seqno::undefined() : + wsrep::seqno(ret_seqno))); + + Wsrep_server_state::instance().sst_sent(gtid, err); + + proc.wait(); + + wsrep_donor_monitor_end(); + return nullptr; +} + +static int sst_donate_other (const char* method, + const char* addr, + const wsrep::gtid& gtid, + bool bypass, + char** env) // carries auth info +{ + bool extra_args; + size_t const cmd_len= estimate_cmd_len(&extra_args); + wsp::string cmd_str(cmd_len); + + if (!cmd_str()) + { + WSREP_ERROR("sst_donate_other(): " + "could not allocate cmd buffer of %zd bytes", cmd_len); + return -ENOMEM; + } + + char* binlog_opt_val= NULL; + char* binlog_index_opt_val= NULL; + + int ret; + if ((ret= generate_binlog_opt_val(&binlog_opt_val))) + { + WSREP_ERROR("sst_donate_other(): generate_binlog_opt_val() failed: %d",ret); + return ret; + } + + if ((ret= generate_binlog_index_opt_val(&binlog_index_opt_val))) + { + WSREP_ERROR("sst_prepare_other(): generate_binlog_index_opt_val() failed %d", + ret); + if (binlog_opt_val) my_free(binlog_opt_val); + return ret; + } + + make_wsrep_defaults_file(); + + std::ostringstream uuid_oss; + uuid_oss << gtid.id(); + ret= snprintf (cmd_str(), cmd_len, + "wsrep_sst_%s " + WSREP_SST_OPT_ROLE " 'donor' " + WSREP_SST_OPT_ADDR " '%s' " + WSREP_SST_OPT_LPORT " %u " + WSREP_SST_OPT_SOCKET " '%s' " + WSREP_SST_OPT_PROGRESS " %d " + WSREP_SST_OPT_DATA " '%s' " + "%s" + WSREP_SST_OPT_GTID " '%s:%lld' " + WSREP_SST_OPT_GTID_DOMAIN_ID " %d" + "%s" + "%s" + "%s", + method, addr, mysqld_port, mysqld_unix_port, + wsrep_debug ? 1 : 0, + mysql_real_data_home, + wsrep_defaults_file, + uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_server.domain_id, + binlog_opt_val, binlog_index_opt_val, + bypass ? " " WSREP_SST_OPT_BYPASS : ""); + + my_free(binlog_opt_val); + my_free(binlog_index_opt_val); + + if (ret < 0 || size_t(ret) >= cmd_len) + { + WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + if (extra_args) + copy_orig_argv(cmd_str() + ret); + + if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE); + + pthread_t tmp; + sst_thread_arg arg(cmd_str(), env); + + mysql_mutex_lock (&arg.lock); + + ret= mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg); + + if (ret) + { + WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)", + ret, strerror(ret)); + return ret; + } + + mysql_cond_wait (&arg.cond, &arg.lock); + + WSREP_INFO("sst_donor_thread signaled with %d", arg.err); + return arg.err; +} + +int wsrep_sst_donate(const std::string& msg, + const wsrep::gtid& current_gtid, + const bool bypass) +{ + const char* method= msg.data(); + size_t method_len= strlen (method); + + if (check_request_str(method, filename_char, true)) + { + WSREP_ERROR("Bad SST method name. SST canceled."); + return WSREP_CB_FAILURE; + } + + const char* data= method + method_len + 1; + + /* check for auth@addr separator */ + const char* addr= strrchr(data, '@'); + wsp::string remote_auth; + if (addr) + { + remote_auth.set(strndup(data, addr - data)); + addr++; + } + else + { + // no auth part + addr= data; + } + + if (check_request_str(addr, address_char, true)) + { + WSREP_ERROR("Bad SST address string. SST canceled."); + return WSREP_CB_FAILURE; + } + + wsp::env env(NULL); + if (env.error()) + { + WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error()); + return WSREP_CB_FAILURE; + } + + int ret; + if ((ret= sst_append_env_var(env, WSREP_SST_AUTH_ENV, sst_auth_real))) + { + WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret); + return WSREP_CB_FAILURE; + } + + if (remote_auth()) + { + if ((ret= sst_append_env_var(env, WSREP_SST_REMOTE_AUTH_ENV,remote_auth()))) + { + WSREP_ERROR("wsrep_sst_donate_cb(): appending remote auth env failed: " + "%d", ret); + return WSREP_CB_FAILURE; + } + } + + if (data_home_dir) + { + if ((ret= sst_append_env_var(env, DATA_HOME_DIR_ENV, data_home_dir))) + { + WSREP_ERROR("wsrep_sst_donate_cb(): appending data " + "directory failed: %d", ret); + return WSREP_CB_FAILURE; + } + } + + sst_donor_completed= false; + pthread_t monitor; + + ret= mysql_thread_create (key_wsrep_sst_donor_monitor, &monitor, NULL, wsrep_sst_donor_monitor_thread, NULL); + + if (ret) + { + WSREP_ERROR("sst_donate: mysql_thread_create() failed: %d (%s)", + ret, strerror(ret)); + return WSREP_CB_FAILURE; + } + + if (!strcmp (WSREP_SST_MYSQLDUMP, method)) + { + ret= sst_donate_mysqldump(addr, current_gtid, bypass, env()); + } + else + { + ret= sst_donate_other(method, addr, current_gtid, bypass, env()); + } + + return (ret >= 0 ? 0 : 1); +} -- cgit v1.2.3