summaryrefslogtreecommitdiffstats
path: root/storage/innobase/buf/buf0dblwr.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/buf/buf0dblwr.cc')
-rw-r--r--storage/innobase/buf/buf0dblwr.cc779
1 files changed, 779 insertions, 0 deletions
diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc
new file mode 100644
index 00000000..e9aea355
--- /dev/null
+++ b/storage/innobase/buf/buf0dblwr.cc
@@ -0,0 +1,779 @@
+/*****************************************************************************
+
+Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, 2022, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file buf/buf0dblwr.cc
+Doublwrite buffer module
+
+Created 2011/12/19
+*******************************************************/
+
+#include "buf0dblwr.h"
+#include "buf0flu.h"
+#include "buf0checksum.h"
+#include "srv0start.h"
+#include "srv0srv.h"
+#include "page0zip.h"
+#include "trx0sys.h"
+#include "fil0crypt.h"
+#include "fil0pagecompress.h"
+
+using st_::span;
+
+/** The doublewrite buffer */
+buf_dblwr_t buf_dblwr;
+
+/** @return the TRX_SYS page */
+inline buf_block_t *buf_dblwr_trx_sys_get(mtr_t *mtr)
+{
+ return buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
+ 0, RW_X_LATCH, mtr);
+}
+
+void buf_dblwr_t::init()
+{
+ if (!active_slot)
+ {
+ active_slot= &slots[0];
+ mysql_mutex_init(buf_dblwr_mutex_key, &mutex, nullptr);
+ pthread_cond_init(&cond, nullptr);
+ }
+}
+
+/** Initialise the persistent storage of the doublewrite buffer.
+@param header doublewrite page header in the TRX_SYS page */
+inline void buf_dblwr_t::init(const byte *header)
+{
+ ut_ad(!active_slot->first_free);
+ ut_ad(!active_slot->reserved);
+ ut_ad(!batch_running);
+
+ block1= page_id_t(0, mach_read_from_4(header + TRX_SYS_DOUBLEWRITE_BLOCK1));
+ block2= page_id_t(0, mach_read_from_4(header + TRX_SYS_DOUBLEWRITE_BLOCK2));
+
+ const uint32_t buf_size= 2 * block_size();
+ for (int i= 0; i < 2; i++)
+ {
+ slots[i].write_buf= static_cast<byte*>
+ (aligned_malloc(buf_size << srv_page_size_shift, srv_page_size));
+ slots[i].buf_block_arr= static_cast<element*>
+ (ut_zalloc_nokey(buf_size * sizeof(element)));
+ }
+ active_slot= &slots[0];
+}
+
+/** Create or restore the doublewrite buffer in the TRX_SYS page.
+@return whether the operation succeeded */
+bool buf_dblwr_t::create()
+{
+ if (is_created())
+ return true;
+
+ mtr_t mtr;
+ const ulint size= block_size();
+
+start_again:
+ mtr.start();
+
+ dberr_t err;
+ buf_block_t *trx_sys_block= buf_dblwr_trx_sys_get(&mtr);
+ if (!trx_sys_block)
+ {
+ mtr.commit();
+ return false;
+ }
+
+ if (mach_read_from_4(TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_MAGIC +
+ trx_sys_block->page.frame) ==
+ TRX_SYS_DOUBLEWRITE_MAGIC_N)
+ {
+ /* The doublewrite buffer has already been created: just read in
+ some numbers */
+ init(TRX_SYS_DOUBLEWRITE + trx_sys_block->page.frame);
+ mtr.commit();
+ return true;
+ }
+
+ if (UT_LIST_GET_FIRST(fil_system.sys_space->chain)->size < 3 * size)
+ {
+ ib::error() << "Cannot create doublewrite buffer: "
+ "the first file in innodb_data_file_path must be at least "
+ << (3 * (size >> (20U - srv_page_size_shift))) << "M.";
+fail:
+ mtr.commit();
+ return false;
+ }
+ else
+ {
+ buf_block_t *b= fseg_create(fil_system.sys_space,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_FSEG,
+ &mtr, &err, false, trx_sys_block);
+ if (!b)
+ {
+ ib::error() << "Cannot create doublewrite buffer: " << err;
+ goto fail;
+ }
+
+ ib::info() << "Doublewrite buffer not found: creating new";
+
+ /* FIXME: After this point, the doublewrite buffer creation
+ is not atomic. The doublewrite buffer should not exist in
+ the InnoDB system tablespace file in the first place.
+ It could be located in separate optional file(s) in a
+ user-specified location. */
+ }
+
+ byte *fseg_header= TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_FSEG +
+ trx_sys_block->page.frame;
+ for (uint32_t prev_page_no= 0, i= 0, extent_size= FSP_EXTENT_SIZE;
+ i < 2 * size + extent_size / 2; i++)
+ {
+ buf_block_t *new_block=
+ fseg_alloc_free_page_general(fseg_header, prev_page_no + 1, FSP_UP,
+ false, &mtr, &mtr, &err);
+ if (!new_block)
+ {
+ ib::error() << "Cannot create doublewrite buffer: "
+ " you must increase your tablespace size."
+ " Cannot continue operation.";
+ /* This may essentially corrupt the doublewrite
+ buffer. However, usually the doublewrite buffer
+ is created at database initialization, and it
+ should not matter (just remove all newly created
+ InnoDB files and restart). */
+ mtr.commit();
+ return false;
+ }
+
+ /* We read the allocated pages to the buffer pool; when they are
+ written to disk in a flush, the space id and page number fields
+ are also written to the pages. When we at database startup read
+ pages from the doublewrite buffer, we know that if the space id
+ and page number in them are the same as the page position in the
+ tablespace, then the page has not been written to in
+ doublewrite. */
+
+ ut_ad(new_block->page.lock.not_recursive());
+ const page_id_t id= new_block->page.id();
+ /* We only do this in the debug build, to ensure that the check in
+ buf_flush_init_for_writing() will see a valid page type. The
+ flushes of new_block are actually unnecessary here. */
+ ut_d(mtr.write<2>(*new_block, FIL_PAGE_TYPE + new_block->page.frame,
+ FIL_PAGE_TYPE_SYS));
+
+ if (i == size / 2)
+ {
+ ut_a(id.page_no() == size);
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_BLOCK1 +
+ trx_sys_block->page.frame, id.page_no());
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_REPEAT +
+ TRX_SYS_DOUBLEWRITE_BLOCK1 + trx_sys_block->page.frame,
+ id.page_no());
+ }
+ else if (i == size / 2 + size)
+ {
+ ut_a(id.page_no() == 2 * size);
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_BLOCK2 +
+ trx_sys_block->page.frame, id.page_no());
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_REPEAT +
+ TRX_SYS_DOUBLEWRITE_BLOCK2 + trx_sys_block->page.frame,
+ id.page_no());
+ }
+ else if (i > size / 2)
+ ut_a(id.page_no() == prev_page_no + 1);
+
+ if (((i + 1) & 15) == 0) {
+ /* rw_locks can only be recursively x-locked 2048 times. (on 32
+ bit platforms, (lint) 0 - (X_LOCK_DECR * 2049) is no longer a
+ negative number, and thus lock_word becomes like a shared lock).
+ For 4k page size this loop will lock the fseg header too many
+ times. Since this code is not done while any other threads are
+ active, restart the MTR occasionally. */
+ mtr.commit();
+ mtr.start();
+ trx_sys_block= buf_dblwr_trx_sys_get(&mtr);
+ fseg_header= TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_FSEG +
+ trx_sys_block->page.frame;
+ }
+
+ prev_page_no= id.page_no();
+ }
+
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_MAGIC +
+ trx_sys_block->page.frame, TRX_SYS_DOUBLEWRITE_MAGIC_N);
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_MAGIC +
+ TRX_SYS_DOUBLEWRITE_REPEAT + trx_sys_block->page.frame,
+ TRX_SYS_DOUBLEWRITE_MAGIC_N);
+
+ mtr.write<4>(*trx_sys_block,
+ TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED +
+ trx_sys_block->page.frame,
+ TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N);
+ mtr.commit();
+
+ buf_flush_wait_flushed(mtr.commit_lsn());
+
+ /* Remove doublewrite pages from LRU */
+ buf_pool_invalidate();
+ goto start_again;
+}
+
+/** Initialize the doublewrite buffer memory structure on recovery.
+If we are upgrading from a version before MySQL 4.1, then this
+function performs the necessary update operations to support
+innodb_file_per_table. If we are in a crash recovery, this function
+loads the pages from double write buffer into memory.
+@param file File handle
+@param path Path name of file
+@return DB_SUCCESS or error code */
+dberr_t buf_dblwr_t::init_or_load_pages(pfs_os_file_t file, const char *path)
+{
+ ut_ad(this == &buf_dblwr);
+ const uint32_t size= block_size();
+
+ /* We do the file i/o past the buffer pool */
+ byte *read_buf= static_cast<byte*>(aligned_malloc(srv_page_size,
+ srv_page_size));
+ /* Read the TRX_SYS header to check if we are using the doublewrite buffer */
+ dberr_t err= os_file_read(IORequestRead, file, read_buf,
+ TRX_SYS_PAGE_NO << srv_page_size_shift,
+ srv_page_size, nullptr);
+
+ if (err != DB_SUCCESS)
+ {
+ ib::error() << "Failed to read the system tablespace header page";
+func_exit:
+ aligned_free(read_buf);
+ return err;
+ }
+
+ /* TRX_SYS_PAGE_NO is not encrypted see fil_crypt_rotate_page() */
+ if (mach_read_from_4(TRX_SYS_DOUBLEWRITE_MAGIC + TRX_SYS_DOUBLEWRITE +
+ read_buf) != TRX_SYS_DOUBLEWRITE_MAGIC_N)
+ {
+ /* There is no doublewrite buffer initialized in the TRX_SYS page.
+ This should normally not be possible; the doublewrite buffer should
+ be initialized when creating the database. */
+ err= DB_SUCCESS;
+ goto func_exit;
+ }
+
+ init(TRX_SYS_DOUBLEWRITE + read_buf);
+
+ const bool upgrade_to_innodb_file_per_table=
+ mach_read_from_4(TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED +
+ TRX_SYS_DOUBLEWRITE + read_buf) !=
+ TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N;
+
+ auto write_buf= active_slot->write_buf;
+ /* Read the pages from the doublewrite buffer to memory */
+ err= os_file_read(IORequestRead, file, write_buf,
+ block1.page_no() << srv_page_size_shift,
+ size << srv_page_size_shift, nullptr);
+
+ if (err != DB_SUCCESS)
+ {
+ ib::error() << "Failed to read the first double write buffer extent";
+ goto func_exit;
+ }
+
+ err= os_file_read(IORequestRead, file,
+ write_buf + (size << srv_page_size_shift),
+ block2.page_no() << srv_page_size_shift,
+ size << srv_page_size_shift, nullptr);
+ if (err != DB_SUCCESS)
+ {
+ ib::error() << "Failed to read the second double write buffer extent";
+ goto func_exit;
+ }
+
+ byte *page= write_buf;
+
+ if (UNIV_UNLIKELY(upgrade_to_innodb_file_per_table))
+ {
+ ib::info() << "Resetting space id's in the doublewrite buffer";
+
+ for (ulint i= 0; i < size * 2; i++, page += srv_page_size)
+ {
+ memset(page + FIL_PAGE_SPACE_ID, 0, 4);
+ /* For pre-MySQL-4.1 innodb_checksum_algorithm=innodb, we do not need to
+ calculate new checksums for the pages because the field
+ .._SPACE_ID does not affect them. Write the page back to where
+ we read it from. */
+ const ulint source_page_no= i < size
+ ? block1.page_no() + i
+ : block2.page_no() + i - size;
+ err= os_file_write(IORequestWrite, path, file, page,
+ source_page_no << srv_page_size_shift, srv_page_size);
+ if (err != DB_SUCCESS)
+ {
+ ib::error() << "Failed to upgrade the double write buffer";
+ goto func_exit;
+ }
+ }
+ os_file_flush(file);
+ }
+ else
+ for (ulint i= 0; i < size * 2; i++, page += srv_page_size)
+ if (mach_read_from_8(my_assume_aligned<8>(page + FIL_PAGE_LSN)))
+ /* Each valid page header must contain a nonzero FIL_PAGE_LSN field. */
+ recv_sys.dblwr.add(page);
+
+ err= DB_SUCCESS;
+ goto func_exit;
+}
+
+/** Process and remove the double write buffer pages for all tablespaces. */
+void buf_dblwr_t::recover()
+{
+ ut_ad(log_sys.last_checkpoint_lsn);
+ if (!is_created())
+ return;
+
+ uint32_t page_no_dblwr= 0;
+ byte *read_buf= static_cast<byte*>(aligned_malloc(3 * srv_page_size,
+ srv_page_size));
+ byte *const buf= read_buf + srv_page_size;
+
+ for (recv_dblwr_t::list::iterator i= recv_sys.dblwr.pages.begin();
+ i != recv_sys.dblwr.pages.end(); ++i, ++page_no_dblwr)
+ {
+ byte *page= *i;
+ const uint32_t page_no= page_get_page_no(page);
+ if (!page_no) /* recovered via recv_dblwr_t::restore_first_page() */
+ continue;
+
+ const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
+ if (log_sys.last_checkpoint_lsn > lsn)
+ /* Pages written before the checkpoint are not useful for recovery. */
+ continue;
+ const uint32_t space_id= page_get_space_id(page);
+ const page_id_t page_id(space_id, page_no);
+
+ if (recv_sys.scanned_lsn < lsn)
+ {
+ ib::info() << "Ignoring a doublewrite copy of page " << page_id
+ << " with future log sequence number " << lsn;
+ continue;
+ }
+
+ fil_space_t *space= fil_space_t::get(space_id);
+
+ if (!space)
+ /* The tablespace that this page once belonged to does not exist */
+ continue;
+
+ if (UNIV_UNLIKELY(page_no >= space->get_size()))
+ {
+ /* Do not report the warning for undo tablespaces, because they
+ can be truncated in place. */
+ if (!srv_is_undo_tablespace(space_id))
+ ib::warn() << "A copy of page " << page_no
+ << " in the doublewrite buffer slot " << page_no_dblwr
+ << " is beyond the end of " << space->chain.start->name
+ << " (" << space->size << " pages)";
+next_page:
+ space->release();
+ continue;
+ }
+
+ const ulint physical_size= space->physical_size();
+ ut_ad(!buf_is_zeroes(span<const byte>(page, physical_size)));
+
+ /* We want to ensure that for partial reads the unread portion of
+ the page is NUL. */
+ memset(read_buf, 0x0, physical_size);
+
+ /* Read in the actual page from the file */
+ fil_io_t fio= space->io(IORequest(IORequest::DBLWR_RECOVER),
+ os_offset_t{page_no} * physical_size,
+ physical_size, read_buf);
+
+ if (UNIV_UNLIKELY(fio.err != DB_SUCCESS))
+ {
+ ib::warn() << "Double write buffer recovery: " << page_id
+ << " ('" << space->chain.start->name
+ << "') read failed with error: " << fio.err;
+ continue;
+ }
+
+ if (buf_is_zeroes(span<const byte>(read_buf, physical_size)))
+ {
+ /* We will check if the copy in the doublewrite buffer is
+ valid. If not, we will ignore this page (there should be redo
+ log records to initialize it). */
+ }
+ else if (recv_sys.dblwr.validate_page(page_id, read_buf, space, buf))
+ goto next_page;
+ else
+ /* We intentionally skip this message for all-zero pages. */
+ ib::info() << "Trying to recover page " << page_id
+ << " from the doublewrite buffer.";
+
+ page= recv_sys.dblwr.find_page(page_id, space, buf);
+
+ if (!page)
+ goto next_page;
+
+ /* Write the good page from the doublewrite buffer to the intended
+ position. */
+ space->reacquire();
+ fio= space->io(IORequestWrite,
+ os_offset_t{page_id.page_no()} * physical_size,
+ physical_size, page);
+
+ if (fio.err == DB_SUCCESS)
+ ib::info() << "Recovered page " << page_id << " to '" << fio.node->name
+ << "' from the doublewrite buffer.";
+ goto next_page;
+ }
+
+ recv_sys.dblwr.pages.clear();
+ fil_flush_file_spaces();
+ aligned_free(read_buf);
+}
+
+/** Free the doublewrite buffer. */
+void buf_dblwr_t::close()
+{
+ if (!active_slot)
+ return;
+
+ ut_ad(!active_slot->reserved);
+ ut_ad(!active_slot->first_free);
+ ut_ad(!batch_running);
+
+ pthread_cond_destroy(&cond);
+ for (int i= 0; i < 2; i++)
+ {
+ aligned_free(slots[i].write_buf);
+ ut_free(slots[i].buf_block_arr);
+ }
+ mysql_mutex_destroy(&mutex);
+
+ memset((void*) this, 0, sizeof *this);
+}
+
+/** Update the doublewrite buffer on write completion. */
+void buf_dblwr_t::write_completed()
+{
+ ut_ad(this == &buf_dblwr);
+ ut_ad(!srv_read_only_mode);
+
+ mysql_mutex_lock(&mutex);
+
+ ut_ad(is_created());
+ ut_ad(srv_use_doublewrite_buf);
+ ut_ad(batch_running);
+ slot *flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
+ ut_ad(flush_slot->reserved);
+ ut_ad(flush_slot->reserved <= flush_slot->first_free);
+
+ if (!--flush_slot->reserved)
+ {
+ mysql_mutex_unlock(&mutex);
+ /* This will finish the batch. Sync data files to the disk. */
+ fil_flush_file_spaces();
+ mysql_mutex_lock(&mutex);
+
+ /* We can now reuse the doublewrite memory buffer: */
+ flush_slot->first_free= 0;
+ batch_running= false;
+ pthread_cond_broadcast(&cond);
+ }
+
+ mysql_mutex_unlock(&mutex);
+}
+
+#ifdef UNIV_DEBUG
+/** Check the LSN values on the page.
+@param[in] page page to check
+@param[in] s tablespace */
+static void buf_dblwr_check_page_lsn(const page_t* page, const fil_space_t& s)
+{
+ /* Ignore page_compressed or encrypted pages */
+ if (s.is_compressed() || buf_page_get_key_version(page, s.flags))
+ return;
+ const byte* lsn_start= FIL_PAGE_LSN + 4 + page;
+ const byte* lsn_end= page + srv_page_size -
+ (s.full_crc32()
+ ? FIL_PAGE_FCRC32_END_LSN
+ : FIL_PAGE_END_LSN_OLD_CHKSUM - 4);
+ static_assert(FIL_PAGE_FCRC32_END_LSN % 4 == 0, "alignment");
+ static_assert(FIL_PAGE_LSN % 4 == 0, "alignment");
+ ut_ad(!memcmp_aligned<4>(lsn_start, lsn_end, 4));
+}
+
+static void buf_dblwr_check_page_lsn(const buf_page_t &b, const byte *page)
+{
+ if (fil_space_t *space= fil_space_t::get_for_write(b.id().space()))
+ {
+ buf_dblwr_check_page_lsn(page, *space);
+ space->release();
+ }
+}
+
+/** Check the LSN values on the page with which this block is associated. */
+static void buf_dblwr_check_block(const buf_page_t *bpage)
+{
+ ut_ad(bpage->in_file());
+ const page_t *page= bpage->frame;
+ ut_ad(page);
+
+ switch (fil_page_get_type(page)) {
+ case FIL_PAGE_INDEX:
+ case FIL_PAGE_TYPE_INSTANT:
+ case FIL_PAGE_RTREE:
+ if (page_is_comp(page))
+ {
+ if (page_simple_validate_new(page))
+ return;
+ }
+ else if (page_simple_validate_old(page))
+ return;
+ /* While it is possible that this is not an index page but just
+ happens to have wrongly set FIL_PAGE_TYPE, such pages should never
+ be modified to without also adjusting the page type during page
+ allocation or buf_flush_init_for_writing() or
+ fil_block_reset_type(). */
+ buf_page_print(page);
+
+ ib::fatal() << "Apparent corruption of an index page " << bpage->id()
+ << " to be written to data file. We intentionally crash"
+ " the server to prevent corrupt data from ending up in"
+ " data files.";
+ }
+}
+#endif /* UNIV_DEBUG */
+
+bool buf_dblwr_t::flush_buffered_writes(const ulint size)
+{
+ mysql_mutex_assert_owner(&mutex);
+ ut_ad(size == block_size());
+
+ for (;;)
+ {
+ if (!active_slot->first_free)
+ return false;
+ if (!batch_running)
+ break;
+ my_cond_wait(&cond, &mutex.m_mutex);
+ }
+
+ ut_ad(active_slot->reserved == active_slot->first_free);
+ ut_ad(!flushing_buffered_writes);
+
+ /* Disallow anyone else to start another batch of flushing. */
+ slot *flush_slot= active_slot;
+ /* Switch the active slot */
+ active_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
+ ut_a(active_slot->first_free == 0);
+ batch_running= true;
+ const ulint old_first_free= flush_slot->first_free;
+ auto write_buf= flush_slot->write_buf;
+ const bool multi_batch= block1 + static_cast<uint32_t>(size) != block2 &&
+ old_first_free > size;
+ flushing_buffered_writes= 1 + multi_batch;
+ /* Now safe to release the mutex. */
+ mysql_mutex_unlock(&mutex);
+#ifdef UNIV_DEBUG
+ for (ulint len2= 0, i= 0; i < old_first_free; len2 += srv_page_size, i++)
+ {
+ buf_page_t *bpage= flush_slot->buf_block_arr[i].request.bpage;
+
+ if (bpage->zip.data)
+ /* No simple validate for ROW_FORMAT=COMPRESSED pages exists. */
+ continue;
+
+ /* Check that the actual page in the buffer pool is not corrupt
+ and the LSN values are sane. */
+ buf_dblwr_check_block(bpage);
+ ut_d(buf_dblwr_check_page_lsn(*bpage, write_buf + len2));
+ }
+#endif /* UNIV_DEBUG */
+ const IORequest request{nullptr, nullptr, fil_system.sys_space->chain.start,
+ IORequest::DBLWR_BATCH};
+ ut_a(fil_system.sys_space->acquire());
+ if (multi_batch)
+ {
+ fil_system.sys_space->reacquire();
+ os_aio(request, write_buf,
+ os_offset_t{block1.page_no()} << srv_page_size_shift,
+ size << srv_page_size_shift);
+ os_aio(request, write_buf + (size << srv_page_size_shift),
+ os_offset_t{block2.page_no()} << srv_page_size_shift,
+ (old_first_free - size) << srv_page_size_shift);
+ }
+ else
+ os_aio(request, write_buf,
+ os_offset_t{block1.page_no()} << srv_page_size_shift,
+ old_first_free << srv_page_size_shift);
+ return true;
+}
+
+static void *get_frame(const IORequest &request)
+{
+ if (request.slot)
+ return request.slot->out_buf;
+ const buf_page_t *bpage= request.bpage;
+ return bpage->zip.data ? bpage->zip.data : bpage->frame;
+}
+
+void buf_dblwr_t::flush_buffered_writes_completed(const IORequest &request)
+{
+ ut_ad(this == &buf_dblwr);
+ ut_ad(srv_use_doublewrite_buf);
+ ut_ad(is_created());
+ ut_ad(!srv_read_only_mode);
+ ut_ad(!request.bpage);
+ ut_ad(request.node == fil_system.sys_space->chain.start);
+ ut_ad(request.type == IORequest::DBLWR_BATCH);
+ mysql_mutex_lock(&mutex);
+ ut_ad(batch_running);
+ ut_ad(flushing_buffered_writes);
+ ut_ad(flushing_buffered_writes <= 2);
+ writes_completed++;
+ if (UNIV_UNLIKELY(--flushing_buffered_writes))
+ {
+ mysql_mutex_unlock(&mutex);
+ return;
+ }
+
+ slot *const flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
+ ut_ad(flush_slot->reserved == flush_slot->first_free);
+ /* increment the doublewrite flushed pages counter */
+ pages_written+= flush_slot->first_free;
+ mysql_mutex_unlock(&mutex);
+
+ /* Now flush the doublewrite buffer data to disk */
+ fil_system.sys_space->flush<false>();
+
+ /* The writes have been flushed to disk now and in recovery we will
+ find them in the doublewrite buffer blocks. Next, write the data pages. */
+ for (ulint i= 0, first_free= flush_slot->first_free; i < first_free; i++)
+ {
+ auto e= flush_slot->buf_block_arr[i];
+ buf_page_t* bpage= e.request.bpage;
+ ut_ad(bpage->in_file());
+
+ void *frame= get_frame(e.request);
+ ut_ad(frame);
+
+ auto e_size= e.size;
+
+ if (UNIV_LIKELY_NULL(bpage->zip.data))
+ {
+ e_size= bpage->zip_size();
+ ut_ad(e_size);
+ }
+ else
+ {
+ ut_ad(!bpage->zip_size());
+ ut_d(buf_dblwr_check_page_lsn(*bpage, static_cast<const byte*>(frame)));
+ }
+
+ const lsn_t lsn= mach_read_from_8(my_assume_aligned<8>
+ (FIL_PAGE_LSN +
+ static_cast<const byte*>(frame)));
+ ut_ad(lsn);
+ ut_ad(lsn >= bpage->oldest_modification());
+ log_write_up_to(lsn, true);
+ e.request.node->space->io(e.request, bpage->physical_offset(), e_size,
+ frame, bpage);
+ }
+}
+
+/** Flush possible buffered writes to persistent storage.
+It is very important to call this function after a batch of writes has been
+posted, and also when we may have to wait for a page latch!
+Otherwise a deadlock of threads can occur. */
+void buf_dblwr_t::flush_buffered_writes()
+{
+ if (!is_created() || !srv_use_doublewrite_buf)
+ {
+ fil_flush_file_spaces();
+ return;
+ }
+
+ ut_ad(!srv_read_only_mode);
+ const ulint size= block_size();
+
+ mysql_mutex_lock(&mutex);
+ if (!flush_buffered_writes(size))
+ mysql_mutex_unlock(&mutex);
+}
+
+/** Schedule a page write. If the doublewrite memory buffer is full,
+flush_buffered_writes() will be invoked to make space.
+@param request asynchronous write request
+@param size payload size in bytes */
+void buf_dblwr_t::add_to_batch(const IORequest &request, size_t size)
+{
+ ut_ad(request.is_async());
+ ut_ad(request.is_write());
+ ut_ad(request.bpage);
+ ut_ad(request.bpage->in_file());
+ ut_ad(request.node);
+ ut_ad(request.node->space->purpose == FIL_TYPE_TABLESPACE);
+ ut_ad(request.node->space->id == request.bpage->id().space());
+ ut_ad(request.node->space->referenced());
+ ut_ad(!srv_read_only_mode);
+
+ const ulint buf_size= 2 * block_size();
+
+ mysql_mutex_lock(&mutex);
+
+ for (;;)
+ {
+ ut_ad(active_slot->first_free <= buf_size);
+ if (active_slot->first_free != buf_size)
+ break;
+
+ if (flush_buffered_writes(buf_size / 2))
+ mysql_mutex_lock(&mutex);
+ }
+
+ byte *p= active_slot->write_buf + srv_page_size * active_slot->first_free;
+
+ /* "frame" is at least 1024-byte aligned for ROW_FORMAT=COMPRESSED pages,
+ and at least srv_page_size (4096-byte) for everything else. */
+ memcpy_aligned<UNIV_ZIP_SIZE_MIN>(p, get_frame(request), size);
+ /* fil_page_compress() for page_compressed guarantees 256-byte alignment */
+ memset_aligned<256>(p + size, 0, srv_page_size - size);
+ /* FIXME: Inform the compiler that "size" and "srv_page_size - size"
+ are integer multiples of 256, so the above can translate into simple
+ SIMD instructions. Currently, we make no such assumptions about the
+ non-pointer parameters that are passed to the _aligned templates. */
+ ut_ad(!request.bpage->zip_size() || request.bpage->zip_size() == size);
+ ut_ad(active_slot->reserved == active_slot->first_free);
+ ut_ad(active_slot->reserved < buf_size);
+ new (active_slot->buf_block_arr + active_slot->first_free++)
+ element{request, size};
+ active_slot->reserved= active_slot->first_free;
+
+ if (active_slot->first_free != buf_size ||
+ !flush_buffered_writes(buf_size / 2))
+ mysql_mutex_unlock(&mutex);
+}