summaryrefslogtreecommitdiffstats
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-01 18:15:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-01 18:15:00 +0000
commita2a2e32c02643a0cec111511220227703fda1cd5 (patch)
tree69cc2b631234c2a8e026b9cd4d72676c61c594df /sql/sql_repl.cc
parentReleasing progress-linux version 1:10.11.8-1~progress7.99u1. (diff)
downloadmariadb-a2a2e32c02643a0cec111511220227703fda1cd5.tar.xz
mariadb-a2a2e32c02643a0cec111511220227703fda1cd5.zip
Merging upstream version 1:11.4.2.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc418
1 files changed, 336 insertions, 82 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 114c36a5..8ca252c3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -31,6 +31,7 @@
#include "semisync_master.h"
#include "semisync_slave.h"
#include "mysys_err.h"
+#include "gtid_index.h"
enum enum_gtid_until_state {
@@ -52,7 +53,7 @@ extern TYPELIB binlog_checksum_typelib;
static int
fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
- enum enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos)
+ enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos)
{
char header[LOG_EVENT_HEADER_LEN];
ulong event_len;
@@ -133,7 +134,7 @@ struct binlog_send_info {
enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group;
ushort flags;
- enum enum_binlog_checksum_alg current_checksum_alg;
+ enum_binlog_checksum_alg current_checksum_alg;
bool slave_gtid_strict_mode;
bool send_fake_gtid_list;
bool slave_gtid_ignore_duplicates;
@@ -162,6 +163,7 @@ struct binlog_send_info {
bool clear_initial_log_pos;
bool should_stop;
size_t dirlen;
+ bool is_until_before_gtids;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
@@ -180,7 +182,7 @@ struct binlog_send_info {
hb_info_counter(0),
#endif
clear_initial_log_pos(false),
- should_stop(false)
+ should_stop(false), is_until_before_gtids(false)
{
error_text[0] = 0;
bzero(&error_gtid, sizeof(error_gtid));
@@ -211,7 +213,7 @@ static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
*/
static int fake_rotate_event(binlog_send_info *info, ulonglong position,
- const char** errmsg, enum enum_binlog_checksum_alg checksum_alg_arg)
+ const char** errmsg, enum_binlog_checksum_alg checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
ulong ev_offset;
@@ -495,12 +497,12 @@ static bool is_slave_checksum_aware(THD * thd)
@param[in] thd THD to access a user variable
@return value of @@binlog_checksum alg according to
- @c enum enum_binlog_checksum_alg
+ @c enum_binlog_checksum_alg
*/
-static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd)
+static enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd)
{
- enum enum_binlog_checksum_alg ret;
+ enum_binlog_checksum_alg ret;
DBUG_ENTER("get_binlog_checksum_value_at_connect");
user_var_entry *entry= get_binlog_checksum_uservar(thd);
@@ -585,22 +587,42 @@ void adjust_linfo_offsets(my_off_t purge_offset)
}
-static my_bool log_in_use_callback(THD *thd, const char *log_name)
+struct st_log_in_use
+{
+ const char *log_name;
+ uint connected_slaves;
+};
+
+static my_bool log_in_use_callback(THD *thd, st_log_in_use *arg)
{
my_bool result= 0;
- mysql_mutex_lock(&thd->LOCK_thd_data);
- if (auto linfo= thd->current_linfo)
- result= !strcmp(log_name, linfo->log_file_name);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ const char *log_name= arg->log_name;
+ if (thd->current_linfo)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (LOG_INFO *linfo= thd->current_linfo)
+ {
+ result= !strcmp(log_name, linfo->log_file_name);
+ if (linfo->log_file_name[0]) // If slave is active
+ arg->connected_slaves++;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
return result;
}
-bool log_in_use(const char* log_name)
+bool log_in_use(const char* log_name, uint min_connected)
{
- return server_threads.iterate(log_in_use_callback, log_name);
+ st_log_in_use arg;
+ arg.log_name= log_name;
+ arg.connected_slaves= 0;
+
+ return ((server_threads.iterate(log_in_use_callback, &arg) ||
+ arg.connected_slaves < min_connected));
}
+
bool purge_error_message(THD* thd, int res)
{
uint errcode;
@@ -808,6 +830,18 @@ get_slave_until_gtid(THD *thd, String *out_str)
return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
}
+static bool
+get_slave_gtid_until_before_gtids(THD *thd)
+{
+ bool null_value;
+
+ const LEX_CSTRING name= { STRING_WITH_LEN("slave_gtid_until_before_gtids") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_int(&null_value) && !null_value;
+}
+
/*
Function prepares and sends repliation heartbeat event.
@@ -826,7 +860,7 @@ get_slave_until_gtid(THD *thd, String *out_str)
static int send_heartbeat_event(binlog_send_info *info,
NET* net, String* packet,
const struct event_coordinates *coord,
- enum enum_binlog_checksum_alg checksum_alg_arg)
+ enum_binlog_checksum_alg checksum_alg_arg)
{
DBUG_ENTER("send_heartbeat_event");
@@ -1273,6 +1307,100 @@ end:
return err;
}
+
+/*
+ Helper function for gtid_find_binlog_pos() below.
+ Check a binlog file against a slave position. Use a GTID index if present.
+ Returns:
+ 0 This is the binlog file that contains the position. If *out_start_seek
+ is non-zero, it is the offset found in the GTID index at which to start
+ scanning the binlog file for events to send to the slave.
+ 1 This binlog file is too new to contain the given slave position.
+ -1 Error, *out_errormsg contains error string.
+
+ The *out_glev event must be deleted by the caller if set non-null.
+ */
+static int
+gtid_check_binlog_file(slave_connection_state *state,
+ Gtid_index_reader_hot *reader,
+ const binlog_file_entry *list,
+ bool *found_in_index, uint32 *out_start_seek,
+ uint32 *found_count,
+ char *out_name, Gtid_list_log_event **out_glev,
+ const char **out_errormsg)
+{
+ Gtid_list_log_event *glev= nullptr;
+ char buf[FN_REFLEN];
+ File file;
+ IO_CACHE cache;
+ int res= -1;
+
+ *found_in_index= false;
+ *out_glev= nullptr;
+ *out_errormsg= nullptr;
+ /*
+ Try to lookup the GTID position in the gtid index.
+ If that doesn't work, read the Gtid_list_log_event at the start of the
+ binlog file to get the binlog state.
+ */
+ if (normalize_binlog_name(buf, list->name.str, false))
+ {
+ *out_errormsg= "Failed to determine binlog file name while looking for "
+ "GTID position in binlog";
+ goto end;
+ }
+
+ if (likely(reader && !reader->open_index_file(buf)))
+ {
+ int lookup= reader->search_gtid_pos(state, out_start_seek, found_count);
+ reader->close_index_file();
+ if (lookup >= 0)
+ {
+ statistic_increment(binlog_gtid_index_hit, &LOCK_status);
+ if (lookup == 0)
+ res= 1;
+ else
+ {
+ strmake(out_name, buf, FN_REFLEN);
+ *found_in_index= true;
+ res= 0;
+ }
+ goto end;
+ }
+ /*
+ Error in the index lookup; fall back to reading the GTID_LIST event from
+ the binlog file and scan it from the beginning.
+ */
+ }
+ statistic_increment(binlog_gtid_index_miss, &LOCK_status);
+
+ bzero((char*) &cache, sizeof(cache));
+ if (unlikely((file= open_binlog(&cache, buf, out_errormsg)) == (File)-1))
+ goto end;
+ *out_errormsg= get_gtid_list_event(&cache, &glev);
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+ if (unlikely(*out_errormsg))
+ goto end;
+
+ if (!glev || contains_all_slave_gtid(state, glev))
+ {
+ strmake(out_name, buf, FN_REFLEN);
+ *out_glev= glev;
+ *out_errormsg= nullptr;
+ res= 0;
+ }
+ else
+ {
+ delete glev;
+ res= 1;
+ }
+
+end:
+ return res;
+}
+
+
/*
Find the name of the binlog file to start reading for a slave that connects
using GTID state.
@@ -1301,14 +1429,17 @@ end:
the requested GTID that was already purged.
*/
static const char *
-gtid_find_binlog_file(slave_connection_state *state, char *out_name,
- slave_connection_state *until_gtid_state)
+gtid_find_binlog_pos(slave_connection_state *state, char *out_name,
+ slave_connection_state *until_gtid_state,
+ rpl_binlog_state *until_binlog_state,
+ bool *found_in_index, uint32 *out_start_seek)
{
MEM_ROOT memroot;
binlog_file_entry *list;
Gtid_list_log_event *glev= NULL;
const char *errormsg= NULL;
- char buf[FN_REFLEN];
+ Gtid_index_reader_hot *reader= NULL;
+ *found_in_index= false;
init_alloc_root(PSI_INSTRUMENT_ME, &memroot,
10*(FN_REFLEN+sizeof(binlog_file_entry)), 0,
@@ -1319,48 +1450,41 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
goto end;
}
+ if (opt_binlog_gtid_index)
+ reader= new Gtid_index_reader_hot();
+
while (list)
{
- File file;
- IO_CACHE cache;
-
- if (!list->next)
- {
- /*
- It should be safe to read the currently used binlog, as we will only
- read the header part that is already written.
-
- But if that does not work on windows, then we will need to cache the
- event somewhere in memory I suppose - that could work too.
- */
- }
- /*
- Read the Gtid_list_log_event at the start of the binlog file to
- get the binlog state.
- */
- if (normalize_binlog_name(buf, list->name.str, false))
- {
- errormsg= "Failed to determine binlog file name while looking for "
- "GTID position in binlog";
- goto end;
- }
- bzero((char*) &cache, sizeof(cache));
- if (unlikely((file= open_binlog(&cache, buf, &errormsg)) == (File)-1))
- goto end;
- errormsg= get_gtid_list_event(&cache, &glev);
- end_io_cache(&cache);
- mysql_file_close(file, MYF(MY_WME));
- if (unlikely(errormsg))
+ uint32 found_count;
+ int res= gtid_check_binlog_file(state, reader, list, found_in_index,
+ out_start_seek, &found_count,
+ out_name, &glev, &errormsg);
+ if (res < 0)
goto end;
-
- if (!glev || contains_all_slave_gtid(state, glev))
+ if (res == 0)
{
- strmake(out_name, buf, FN_REFLEN);
-
- if (glev)
+ if (*found_in_index || glev)
{
uint32 i;
+ uint32 count;
+ rpl_gtid *gtids;
+ if (*found_in_index)
+ {
+ count= found_count;
+ gtids= reader->search_gtid_list();
+ /*
+ Load the initial GTID state corresponding to the position found in
+ the GTID index, as we will not have a GTID_LIST event to load it
+ from.
+ */
+ until_binlog_state->load(gtids, count);
+ }
+ else
+ {
+ count= glev->count;
+ gtids= glev->list;
+ }
/*
As a special case, we allow to start from binlog file N if the
requested GTID is the last event (in the corresponding domain) in
@@ -1372,9 +1496,9 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
from the UNTIL hash, to mark that such domains have already reached
their UNTIL condition.
*/
- for (i= 0; i < glev->count; ++i)
+ for (i= 0; i < count; ++i)
{
- const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ const rpl_gtid *gtid= state->find(gtids[i].domain_id);
if (!gtid)
{
/*
@@ -1387,8 +1511,8 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
further GTIDs in the Gtid_list.
*/
DBUG_ASSERT(0);
- } else if (gtid->server_id == glev->list[i].server_id &&
- gtid->seq_no == glev->list[i].seq_no)
+ } else if (gtid->server_id == gtids[i].server_id &&
+ gtid->seq_no == gtids[i].seq_no)
{
/*
The slave requested to start from the very beginning of this
@@ -1399,9 +1523,9 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
}
if (until_gtid_state &&
- (gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
- gtid->server_id == glev->list[i].server_id &&
- gtid->seq_no <= glev->list[i].seq_no)
+ (gtid= until_gtid_state->find(gtids[i].domain_id)) &&
+ gtid->server_id == gtids[i].server_id &&
+ gtid->seq_no <= gtids[i].seq_no)
{
/*
We've already reached the stop position in UNTIL for this domain,
@@ -1414,8 +1538,6 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
goto end;
}
- delete glev;
- glev= NULL;
list= list->next;
}
@@ -1428,11 +1550,56 @@ end:
if (glev)
delete glev;
+ if (reader)
+ delete reader;
+
free_root(&memroot, MYF(0));
return errormsg;
}
+static bool
+gtid_index_lookup_pos(const char *name, uint32 offset, uint32 *out_start_seek,
+ slave_connection_state *out_gtid_state)
+{
+ Gtid_index_reader_hot *reader= nullptr;
+ bool opened= false;
+ bool found= false;
+ uint32 found_offset, found_gtid_count;
+ rpl_gtid *found_gtids;
+ int res;
+
+ if (!(reader= new Gtid_index_reader_hot()) ||
+ reader->open_index_file(name))
+ {
+ statistic_increment(binlog_gtid_index_miss, &LOCK_status);
+ goto err;
+ }
+ opened= true;
+ res= reader->search_offset(offset, &found_offset, &found_gtid_count);
+ if (res <= 0)
+ {
+ statistic_increment(binlog_gtid_index_miss, &LOCK_status);
+ goto err;
+ }
+ statistic_increment(binlog_gtid_index_hit, &LOCK_status);
+
+ /* We found the position, initialize the state from the index. */
+ found_gtids= reader->search_gtid_list();
+ if (out_gtid_state->load(found_gtids, found_gtid_count))
+ goto err;
+ *out_start_seek= found_offset;
+ found= true;
+
+err:
+ if (opened)
+ reader->close_index_file();
+ if (reader)
+ delete reader;
+ return found;
+}
+
+
/*
Given an old-style binlog position with file name and file offset, find the
corresponding gtid position. If the offset is not at an event boundary, give
@@ -1452,12 +1619,26 @@ gtid_state_from_pos(const char *name, uint32 offset,
bool found_gtid_list_event= false;
bool found_format_description_event= false;
bool valid_pos= false;
- enum enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int err;
String packet;
Format_description_log_event *fdev= NULL;
+ bool found_in_index;
+ uint32 UNINIT_VAR(start_seek);
+ bool seek_done= false;
- if (unlikely(gtid_state->load((const rpl_gtid *)NULL, 0)))
+ /*
+ Try to lookup the position in the binlog gtid index. If found (as it will
+ usually be unless the index is corrupted somehow), we can seek directly to
+ a point at or just before the desired location, saving an expensive scan
+ of the binlog file from the start.
+ */
+ found_in_index= opt_binlog_gtid_index ?
+ gtid_index_lookup_pos(name, offset, &start_seek, gtid_state) :
+ false;
+ if (found_in_index)
+ found_gtid_list_event= true;
+ else if (unlikely(gtid_state->load((const rpl_gtid *)NULL, 0)))
{
errormsg= "Internal error (out of memory?) initializing slave state "
"while scanning binlog to find start position";
@@ -1467,7 +1648,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
if (unlikely((file= open_binlog(&cache, name, &errormsg)) == (File)-1))
return errormsg;
- if (!(fdev= new Format_description_log_event(3)))
+ if (!(fdev= new Format_description_log_event(4)))
{
errormsg= "Out of memory initializing format_description event "
"while scanning binlog to find start position";
@@ -1546,6 +1727,25 @@ gtid_state_from_pos(const char *name, uint32 offset,
errormsg= "Could not start decryption of binlog.";
goto end;
}
+ if (found_in_index && !seek_done)
+ {
+ /*
+ Just to avoid a redundant event read before hitting the next branch.
+ ToDo: share this code with the below somehow.
+ */
+ my_b_seek(&cache, start_seek);
+ seek_done= true;
+ }
+ }
+ else if (found_in_index && !seek_done)
+ {
+ /*
+ After reading the format_description event and possibly
+ start_encryption, we can seek forward to avoid most or all of the scan
+ (depending on the sparseness of the index).
+ */
+ my_b_seek(&cache, start_seek);
+ seek_done= true;
}
else if (unlikely(typ != FORMAT_DESCRIPTION_EVENT &&
!found_format_description_event))
@@ -1557,7 +1757,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
typ == BINLOG_CHECKPOINT_EVENT)
continue; /* Continue looking */
- else if (typ == GTID_LIST_EVENT)
+ else if (typ == GTID_LIST_EVENT && !found_in_index)
{
rpl_gtid *gtid_list;
bool status;
@@ -1722,7 +1922,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
String* const packet= info->packet;
size_t len= packet->length();
int mariadb_slave_capability= info->mariadb_slave_capability;
- enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
+ enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state;
bool need_sync= false;
@@ -1785,7 +1985,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
});
- if (info->until_binlog_state.update_nolock(&event_gtid, false))
+ if (info->until_binlog_state.update_nolock(&event_gtid))
{
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory";
@@ -1867,8 +2067,9 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
This domain already reached the START SLAVE UNTIL stop condition,
so skip this event group.
*/
- info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
- GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE
+ ? GTID_SKIP_STANDALONE
+ : GTID_SKIP_TRANSACTION);
}
else if (event_gtid.server_id == gtid->server_id &&
event_gtid.seq_no >= gtid->seq_no)
@@ -1885,14 +2086,19 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_UNTIL_STOP_AFTER_STANDALONE :
GTID_UNTIL_STOP_AFTER_TRANSACTION);
- if (event_gtid.seq_no > until_seq_no)
+ if (event_gtid.seq_no > until_seq_no ||
+ info->is_until_before_gtids)
{
/*
+ Stop processing events now and skip the current event group
+ because either:
+
The GTID in START SLAVE UNTIL condition is missing in our binlog.
This should normally not happen (user error), but since we can be
sure that we are now beyond the position that the UNTIL condition
- should be in, we can just stop now. And we also need to skip this
- event group (as it is beyond the UNTIL condition).
+ should be in, we can just stop now.
+
+ Or the until condition is specified as SQL_BEFORE_GTIDS
*/
info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
@@ -2139,7 +2345,10 @@ static int init_binlog_sender(binlog_send_info *info,
info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
if (get_slave_until_gtid(thd, &slave_until_gtid_str))
+ {
info->until_gtid_state= &info->until_gtid_state_obj;
+ info->is_until_before_gtids= get_slave_gtid_until_before_gtids(thd);
+ }
}
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
@@ -2176,6 +2385,8 @@ static int init_binlog_sender(binlog_send_info *info,
char search_file_name[FN_REFLEN];
const char *name=search_file_name;
+ bool found_in_index= false;
+ uint32 start_seek= 0;
if (info->using_gtid_state)
{
if (info->gtid_state.load(connect_gtid_state.ptr(),
@@ -2201,16 +2412,26 @@ static int init_binlog_sender(binlog_send_info *info,
info->error= error;
return 1;
}
- if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
- search_file_name,
- info->until_gtid_state)))
+ if ((info->errmsg= gtid_find_binlog_pos(&info->gtid_state,
+ search_file_name,
+ info->until_gtid_state,
+ &info->until_binlog_state,
+ &found_in_index, &start_seek)))
{
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return 1;
}
- /* start from beginning of binlog file */
- *pos = 4;
+ if (found_in_index)
+ {
+ /* Start from a position looked up in the binlog gtid index. */
+ *pos = start_seek;
+ }
+ else
+ {
+ /* start from beginning of binlog file */
+ *pos = 4;
+ }
}
else
{
@@ -2273,7 +2494,7 @@ static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log,
if (info->fdev != NULL)
delete info->fdev;
- if (!(info->fdev= new Format_description_log_event(3)))
+ if (!(info->fdev= new Format_description_log_event(4)))
{
info->errmsg= "Out of memory initializing format_description event";
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
@@ -2796,6 +3017,7 @@ static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo,
* return 0 - OK
* 1 - NOK
*/
+
static int send_one_binlog_file(binlog_send_info *info,
IO_CACHE* log,
LOG_INFO* linfo,
@@ -2810,6 +3032,8 @@ static int send_one_binlog_file(binlog_send_info *info,
linfo->pos= start_pos;
}
+ /* Counter used by can_purge_log() */
+ sending_new_binlog_file++;
while (!should_stop(info))
{
/**
@@ -2837,6 +3061,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags)
{
LOG_INFO linfo;
+ ulong ev_offset;
IO_CACHE log;
File file = -1;
@@ -2962,6 +3187,34 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
if (info->until_gtid_state && info->until_gtid_state->count() == 0)
info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+ if (info->using_gtid_state && pos > BIN_LOG_HEADER_SIZE &&
+ ( info->gtid_state.is_pos_reached() ||
+ info->gtid_until_group == GTID_UNTIL_STOP_AFTER_STANDALONE ) )
+ {
+ /*
+ We are starting a GTID connect from a point not at the start of the
+ binlog file (from a GTID index lookup). Send a fake GTID_LIST event
+ in place of the real GTID_LIST that would normally be sent from the
+ start of the binlog file.
+
+ If we already reached the gtid UNTIL position, then set the
+ FLAG_UNTIL_REACHED in the GTID_LIST event and stop immediately.
+ */
+ uint32 flag= 0;
+ if (info->gtid_until_group == GTID_UNTIL_STOP_AFTER_STANDALONE)
+ {
+ flag= Gtid_list_log_event::FLAG_UNTIL_REACHED;
+ info->should_stop= true;
+ }
+ Gtid_list_log_event glev(&info->until_binlog_state, flag);
+ if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
+ fake_gtid_list_event(info, &glev, &info->errmsg, (int32)pos))
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+ }
+
THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
if (send_one_binlog_file(info, &log, &linfo, pos))
break;
@@ -3227,6 +3480,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
goto err;
}
mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
+ mi->rli.is_until_before_gtids= thd->lex->mi.is_until_before_gtids;
}
else
mi->rli.clear_until_condition();
@@ -4169,7 +4423,7 @@ bool mysql_show_binlog_events(THD* thd)
Master_info *mi= 0;
LOG_INFO linfo;
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
- enum enum_binlog_checksum_alg checksum_alg;
+ enum_binlog_checksum_alg checksum_alg;
my_off_t binlog_size;
MY_STAT s;
@@ -4201,7 +4455,7 @@ bool mysql_show_binlog_events(THD* thd)
}
Format_description_log_event *description_event= new
- Format_description_log_event(3); /* MySQL 4.0 by default */
+ Format_description_log_event(4);
if (binary_log->is_open())
{
@@ -4305,7 +4559,7 @@ bool mysql_show_binlog_events(THD* thd)
if (lex_mi->pos > BIN_LOG_HEADER_SIZE)
{
- checksum_alg= description_event->checksum_alg;
+ checksum_alg= description_event->used_checksum_alg;
/* Validate user given position using checksum */
if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)