summaryrefslogtreecommitdiffstats
path: root/sql/wsrep_binlog.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /sql/wsrep_binlog.cc
parentInitial commit. (diff)
downloadmariadb-10.5-upstream/1%10.5.12.tar.xz
mariadb-10.5-upstream/1%10.5.12.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/wsrep_binlog.cc')
-rw-r--r--sql/wsrep_binlog.cc400
1 files changed, 400 insertions, 0 deletions
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
new file mode 100644
index 00000000..3fe0792d
--- /dev/null
+++ b/sql/wsrep_binlog.cc
@@ -0,0 +1,400 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
+
+#include "mariadb.h"
+#include "mysql/service_wsrep.h"
+#include "wsrep_binlog.h"
+#include "wsrep_priv.h"
+#include "log.h"
+#include "slave.h"
+#include "log_event.h"
+#include "wsrep_applier.h"
+
+#include "transaction.h"
+
+extern handlerton *binlog_hton;
+/*
+ Write the contents of a cache to a memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
+{
+ *buf= NULL;
+ *buf_len= 0;
+ my_off_t const saved_pos(my_b_tell(cache));
+ DBUG_ENTER("wsrep_write_cache_buf");
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
+
+ uint length= my_b_bytes_in_cache(cache);
+ if (unlikely(0 == length)) length= my_b_fill(cache);
+
+ size_t total_length= 0;
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /*
+ Bail out if buffer grows too large.
+ A temporary fix to avoid allocating indefinitely large buffer,
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (total_length > wsrep_max_ws_size)
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ goto error;
+ }
+ uchar* tmp= (uchar *)my_realloc(PSI_INSTRUMENT_ME, *buf, total_length,
+ MYF(MY_ALLOW_ZERO_PTR));
+ if (!tmp)
+ {
+ WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
+ *buf_len, length);
+ goto error;
+ }
+ *buf= tmp;
+
+ memcpy(*buf + *buf_len, cache->read_pos, length);
+ *buf_len= total_length;
+
+ if (cache->file < 0)
+ {
+ cache->read_pos= cache->read_end;
+ break;
+ }
+ } while ((length= my_b_fill(cache)));
+
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ goto cleanup;
+ }
+
+ DBUG_RETURN(0);
+
+error:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ }
+cleanup:
+ my_free(*buf);
+ *buf= NULL;
+ *buf_len= 0;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+}
+
+#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
+ * many transactions would fit in there
+ * so there is no need to reach for the heap */
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ This version uses incremental data appending as it reads it from cache.
+ */
+static int wsrep_write_cache_inc(THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ DBUG_ENTER("wsrep_write_cache_inc");
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ DBUG_RETURN(1);;
+ }
+
+ int ret= 0;
+ size_t total_length(0);
+
+ uint length(my_b_bytes_in_cache(cache));
+ if (unlikely(0 == length)) length= my_b_fill(cache);
+
+ if (likely(length > 0))
+ {
+ do
+ {
+ total_length += length;
+ /* bail out if buffer grows too large
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (unlikely(total_length > wsrep_max_ws_size))
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ ret= 1;
+ goto cleanup;
+ }
+ if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length)))
+ goto cleanup;
+ cache->read_pos= cache->read_end;
+ } while ((cache->file >= 0) && (length= my_b_fill(cache)));
+ }
+ if (ret == 0)
+ {
+ assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
+ }
+
+cleanup:
+ *len= total_length;
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+ DBUG_RETURN(ret);
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache(THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ return wsrep_write_cache_inc(thd, cache, len);
+}
+
+void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
+{
+ int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld.log",
+ wsrep_data_home_dir, (longlong) thd->thread_id,
+ (longlong) wsrep_thd_trx_seqno(thd));
+ if (len < 0)
+ {
+ WSREP_ERROR("snprintf error: %d, skipping dump.", len);
+ return;
+ }
+ /*
+ len doesn't count the \0 end-of-string. Use len+1 below
+ to alloc and pass as an argument to snprintf.
+ */
+
+ char *filename= (char *)malloc(len+1);
+ int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld.log",
+ wsrep_data_home_dir, (longlong) thd->thread_id,
+ (long long)wsrep_thd_trx_seqno(thd));
+
+ if (len > len1)
+ {
+ WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
+ free(filename);
+ return;
+ }
+
+ FILE *of= fopen(filename, "wb");
+
+ if (of)
+ {
+ if (fwrite(rbr_buf, buf_len, 1, of) == 0)
+ WSREP_ERROR("Failed to write buffer of length %llu to '%s'",
+ (unsigned long long)buf_len, filename);
+
+ fclose(of);
+ }
+ else
+ {
+ WSREP_ERROR("Failed to open file '%s': %d (%s)",
+ filename, errno, strerror(errno));
+ }
+ free(filename);
+}
+
+/* Dump replication buffer along with header to a file. */
+void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
+ size_t buf_len)
+{
+ DBUG_ENTER("wsrep_dump_rbr_buf_with_header");
+
+ File file;
+ IO_CACHE cache;
+ Log_event_writer writer(&cache, 0);
+ Format_description_log_event *ev= 0;
+
+ longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd);
+ int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld_v2.log",
+ wsrep_data_home_dir, (longlong)thd->thread_id,
+ thd_trx_seqno);
+ /*
+ len doesn't count the \0 end-of-string. Use len+1 below
+ to alloc and pass as an argument to snprintf.
+ */
+ char *filename;
+ if (len < 0 || !(filename= (char*)malloc(len+1)))
+ {
+ WSREP_ERROR("snprintf error: %d, skipping dump.", len);
+ DBUG_VOID_RETURN;
+ }
+
+ int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld_v2.log",
+ wsrep_data_home_dir, (longlong) thd->thread_id,
+ thd_trx_seqno);
+
+ if (len > len1)
+ {
+ WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
+ free(filename);
+ DBUG_VOID_RETURN;
+ }
+
+ if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
+ O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
+ {
+ WSREP_ERROR("Failed to open file '%s' : %d (%s)",
+ filename, errno, strerror(errno));
+ goto cleanup1;
+ }
+
+ if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
+ {
+ goto cleanup2;
+ }
+
+ if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
+ {
+ goto cleanup2;
+ }
+
+ /*
+ Instantiate an FDLE object for non-wsrep threads (to be written
+ to the dump file).
+ */
+ ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
+ (new Format_description_log_event(4));
+
+ if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
+ flush_io_cache(&cache))
+ {
+ WSREP_ERROR("Failed to write to '%s'.", filename);
+ goto cleanup2;
+ }
+
+cleanup2:
+ end_io_cache(&cache);
+
+cleanup1:
+ free(filename);
+ mysql_file_close(file, MYF(MY_WME));
+
+ if (!thd->wsrep_applier) delete ev;
+
+ DBUG_VOID_RETURN;
+}
+
+int wsrep_write_skip_event(THD* thd)
+{
+ DBUG_ENTER("wsrep_write_skip_event");
+ Ignorable_log_event skip_event(thd);
+ int ret= mysql_bin_log.write_event(&skip_event);
+ if (ret)
+ {
+ WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret);
+ }
+ if (!ret && (ret= trans_commit_stmt(thd)))
+ {
+ WSREP_WARN("wsrep_write_skip_event: statt commit failed");
+ }
+ DBUG_RETURN(ret);
+}
+
+int wsrep_write_dummy_event_low(THD *thd, const char *msg)
+{
+ ::abort();
+ return 0;
+}
+
+int wsrep_write_dummy_event(THD *orig_thd, const char *msg)
+{
+ return 0;
+}
+
+bool wsrep_commit_will_write_binlog(THD *thd)
+{
+ return (!wsrep_emulate_bin_log && /* binlog enabled*/
+ (wsrep_thd_is_local(thd) || /* local thd*/
+ (thd->wsrep_applier_service && /* applier and log-slave-updates */
+ opt_log_slave_updates)));
+}
+
+/*
+ The last THD/commit_for_wait registered for group commit.
+*/
+static wait_for_commit *commit_order_tail= NULL;
+
+void wsrep_register_for_group_commit(THD *thd)
+{
+ DBUG_ENTER("wsrep_register_for_group_commit");
+ if (wsrep_emulate_bin_log)
+ {
+ /* Binlog is off, no need to maintain group commit queue */
+ DBUG_VOID_RETURN;
+ }
+
+ DBUG_ASSERT(thd->wsrep_trx().ordered());
+
+ wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc;
+
+ mysql_mutex_lock(&LOCK_wsrep_group_commit);
+ if (commit_order_tail)
+ {
+ wfc->register_wait_for_prior_commit(commit_order_tail);
+ }
+ commit_order_tail= thd->wait_for_commit_ptr;
+ mysql_mutex_unlock(&LOCK_wsrep_group_commit);
+
+ /*
+ Now we have queued for group commit. If the commit will go
+ through TC log_and_order(), the commit ordering is done
+ by TC group commit. Otherwise the wait for prior
+ commits to complete is done in ha_commit_one_phase().
+ */
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_unregister_from_group_commit(THD *thd)
+{
+ DBUG_ASSERT(thd->wsrep_trx().ordered());
+ wait_for_commit *wfc= thd->wait_for_commit_ptr;
+
+ if (wfc)
+ {
+ mysql_mutex_lock(&LOCK_wsrep_group_commit);
+ wfc->unregister_wait_for_prior_commit();
+ thd->wakeup_subsequent_commits(0);
+
+ /* The last one queued for group commit has completed commit, it is
+ safe to set tail to NULL. */
+ if (wfc == commit_order_tail)
+ commit_order_tail= NULL;
+ mysql_mutex_unlock(&LOCK_wsrep_group_commit);
+ thd->wait_for_commit_ptr= NULL;
+ }
+}