diff options
Diffstat (limited to 'storage/innobase/log')
-rw-r--r-- | storage/innobase/log/log0crypt.cc | 429 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 1340 | ||||
-rw-r--r-- | storage/innobase/log/log0recv.cc | 3783 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.cc | 309 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.h | 81 |
5 files changed, 5942 insertions, 0 deletions
diff --git a/storage/innobase/log/log0crypt.cc b/storage/innobase/log/log0crypt.cc new file mode 100644 index 00000000..dbf41c7d --- /dev/null +++ b/storage/innobase/log/log0crypt.cc @@ -0,0 +1,429 @@ +/***************************************************************************** + +Copyright (C) 2013, 2015, Google Inc. All Rights Reserved. +Copyright (C) 2014, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ +/**************************************************//** +@file log0crypt.cc +Innodb log encrypt/decrypt + +Created 11/25/2013 Minli Zhu Google +Modified Jan Lindström jan.lindstrom@mariadb.com +MDEV-11782: Rewritten for MariaDB 10.2 by Marko Mäkelä, MariaDB Corporation. +*******************************************************/ +#include <my_global.h> +#include "log0crypt.h" +#include <mysql/service_my_crypt.h> +#include "assume_aligned.h" + +#include "log0crypt.h" +#include "log0recv.h" // for recv_sys + +/** innodb_encrypt_log: whether to encrypt the redo log */ +my_bool srv_encrypt_log; + +/** Redo log encryption key ID */ +#define LOG_DEFAULT_ENCRYPTION_KEY 1 + +struct crypt_info_t { + ulint checkpoint_no; /*!< checkpoint no; 32 bits */ + uint key_version; /*!< mysqld key version */ + /** random string for encrypting the key */ + alignas(8) byte crypt_msg[MY_AES_BLOCK_SIZE]; + /** the secret key */ + alignas(8) byte crypt_key[MY_AES_BLOCK_SIZE]; + /** a random string for the per-block initialization vector */ + alignas(4) byte crypt_nonce[4]; +}; + +/** The crypt info */ +static crypt_info_t info; + +/** Initialization vector used for temporary files/tablespace */ +static byte tmp_iv[MY_AES_BLOCK_SIZE]; + +/** Crypt info when upgrading from 10.1 */ +static crypt_info_t infos[5 * 2]; +/** First unused slot in infos[] */ +static size_t infos_used; + +/*********************************************************************//** +Get a log block's start lsn. +@return a log block's start lsn */ +static inline +lsn_t +log_block_get_start_lsn( +/*====================*/ + lsn_t lsn, /*!< in: checkpoint lsn */ + ulint log_block_no) /*!< in: log block number */ +{ + lsn_t start_lsn = + (lsn & (lsn_t)0xffffffff00000000ULL) | + (((log_block_no - 1) & (lsn_t)0x3fffffff) << 9); + return start_lsn; +} + +/** Generate crypt key from crypt msg. +@param[in,out] info encryption key +@param[in] upgrade whether to use the key in MariaDB 10.1 format +@return whether the operation was successful */ +static bool init_crypt_key(crypt_info_t* info, bool upgrade = false) +{ + byte mysqld_key[MY_AES_MAX_KEY_LENGTH]; + uint keylen = sizeof mysqld_key; + + compile_time_assert(16 == sizeof info->crypt_key); + compile_time_assert(16 == MY_AES_BLOCK_SIZE); + + if (uint rc = encryption_key_get(LOG_DEFAULT_ENCRYPTION_KEY, + info->key_version, mysqld_key, + &keylen)) { + ib::error() + << "Obtaining redo log encryption key version " + << info->key_version << " failed (" << rc + << "). Maybe the key or the required encryption " + "key management plugin was not found."; + info->key_version = ENCRYPTION_KEY_VERSION_INVALID; + return false; + } + + if (upgrade) { + while (keylen < sizeof mysqld_key) { + mysqld_key[keylen++] = 0; + } + } + + uint dst_len; + int err= my_aes_crypt(MY_AES_ECB, + ENCRYPTION_FLAG_NOPAD | ENCRYPTION_FLAG_ENCRYPT, + info->crypt_msg, MY_AES_BLOCK_SIZE, + info->crypt_key, &dst_len, + mysqld_key, keylen, NULL, 0); + + if (err != MY_AES_OK || dst_len != MY_AES_BLOCK_SIZE) { + ib::error() << "Getting redo log crypto key failed: err = " + << err << ", len = " << dst_len; + info->key_version = ENCRYPTION_KEY_VERSION_INVALID; + return false; + } + + return true; +} + +/** Encrypt or decrypt log blocks. +@param[in,out] buf log blocks to encrypt or decrypt +@param[in] lsn log sequence number of the start of the buffer +@param[in] size size of the buffer, in bytes +@param[in] op whether to decrypt, encrypt, or rotate key and encrypt +@return whether the operation succeeded (encrypt always does) */ +bool log_crypt(byte* buf, lsn_t lsn, ulint size, log_crypt_t op) +{ + ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(ulint(buf) % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_a(info.key_version); + + alignas(8) byte aes_ctr_iv[MY_AES_BLOCK_SIZE]; + +#define LOG_CRYPT_HDR_SIZE 4 + lsn &= ~lsn_t(OS_FILE_LOG_BLOCK_SIZE - 1); + + for (const byte* const end = buf + size; buf != end; + buf += OS_FILE_LOG_BLOCK_SIZE, lsn += OS_FILE_LOG_BLOCK_SIZE) { + alignas(4) byte dst[OS_FILE_LOG_BLOCK_SIZE - LOG_CRYPT_HDR_SIZE + - LOG_BLOCK_CHECKSUM]; + + /* The log block number is not encrypted. */ + memcpy_aligned<4>(dst, buf + LOG_BLOCK_HDR_NO, 4); + memcpy_aligned<4>(aes_ctr_iv, buf + LOG_BLOCK_HDR_NO, 4); + *aes_ctr_iv &= byte(~(LOG_BLOCK_FLUSH_BIT_MASK >> 24)); + static_assert(LOG_BLOCK_HDR_NO + 4 == LOG_CRYPT_HDR_SIZE, + "compatibility"); + memcpy_aligned<4>(aes_ctr_iv + 4, info.crypt_nonce, 4); + mach_write_to_8(my_assume_aligned<8>(aes_ctr_iv + 8), lsn); + ut_ad(log_block_get_start_lsn(lsn, + log_block_get_hdr_no(buf)) + == lsn); + byte* key_ver = &buf[OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_KEY + - LOG_BLOCK_CHECKSUM]; + const size_t dst_size + = log_sys.has_encryption_key_rotation() + ? sizeof dst - LOG_BLOCK_KEY + : sizeof dst; + if (log_sys.has_encryption_key_rotation()) { + const uint key_version = info.key_version; + switch (op) { + case LOG_ENCRYPT_ROTATE_KEY: + info.key_version + = encryption_key_get_latest_version( + LOG_DEFAULT_ENCRYPTION_KEY); + if (key_version != info.key_version + && !init_crypt_key(&info)) { + info.key_version = key_version; + } + /* fall through */ + case LOG_ENCRYPT: + mach_write_to_4(key_ver, info.key_version); + break; + case LOG_DECRYPT: + info.key_version = mach_read_from_4(key_ver); + if (key_version != info.key_version + && !init_crypt_key(&info)) { + return false; + } + } +#ifndef DBUG_OFF + if (key_version != info.key_version) { + DBUG_PRINT("ib_log", ("key_version: %x -> %x", + key_version, + info.key_version)); + } +#endif /* !DBUG_OFF */ + } + + ut_ad(LOG_CRYPT_HDR_SIZE + dst_size + == log_sys.trailer_offset()); + + uint dst_len; + int rc = encryption_crypt( + buf + LOG_CRYPT_HDR_SIZE, static_cast<uint>(dst_size), + reinterpret_cast<byte*>(dst), &dst_len, + const_cast<byte*>(info.crypt_key), + MY_AES_BLOCK_SIZE, + aes_ctr_iv, sizeof aes_ctr_iv, + op == LOG_DECRYPT + ? ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD + : ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD, + LOG_DEFAULT_ENCRYPTION_KEY, + info.key_version); + ut_a(rc == MY_AES_OK); + ut_a(dst_len == dst_size); + memcpy(buf + LOG_CRYPT_HDR_SIZE, dst, dst_size); + } + + return true; +} + +/** Initialize the redo log encryption key and random parameters +when creating a new redo log. +The random parameters will be persisted in the log checkpoint pages. +@see log_crypt_write_checkpoint_buf() +@see log_crypt_read_checkpoint_buf() +@return whether the operation succeeded */ +bool log_crypt_init() +{ + info.key_version= + encryption_key_get_latest_version(LOG_DEFAULT_ENCRYPTION_KEY); + + if (info.key_version == ENCRYPTION_KEY_VERSION_INVALID) + ib::error() << "log_crypt_init(): cannot get key version"; + else if (my_random_bytes(tmp_iv, MY_AES_BLOCK_SIZE) != MY_AES_OK || + my_random_bytes(info.crypt_msg, sizeof info.crypt_msg) != + MY_AES_OK || + my_random_bytes(info.crypt_nonce, sizeof info.crypt_nonce) != + MY_AES_OK) + ib::error() << "log_crypt_init(): my_random_bytes() failed"; + else if (init_crypt_key(&info)) + goto func_exit; + + info.key_version= 0; +func_exit: + return info.key_version != 0; +} + +/** Read the MariaDB 10.1 checkpoint crypto (version, msg and iv) info. +@param[in] buf checkpoint buffer +@return whether the operation was successful */ +ATTRIBUTE_COLD bool log_crypt_101_read_checkpoint(const byte* buf) +{ + buf += 20 + 32 * 9; + + const size_t n = *buf++ == 2 ? std::min(unsigned(*buf++), 5U) : 0; + + for (size_t i = 0; i < n; i++) { + struct crypt_info_t& info = infos[infos_used]; + unsigned checkpoint_no = mach_read_from_4(buf); + for (size_t j = 0; j < infos_used; j++) { + if (infos[j].checkpoint_no == checkpoint_no) { + /* Do not overwrite an existing slot. */ + goto next_slot; + } + } + if (infos_used >= UT_ARR_SIZE(infos)) { + ut_ad("too many checkpoint pages" == 0); + goto next_slot; + } + infos_used++; + info.checkpoint_no = checkpoint_no; + info.key_version = mach_read_from_4(buf + 4); + memcpy(info.crypt_msg, buf + 8, MY_AES_BLOCK_SIZE); + memcpy(info.crypt_nonce, buf + 24, sizeof info.crypt_nonce); + + if (!init_crypt_key(&info, true)) { + return false; + } +next_slot: + buf += 4 + 4 + 2 * MY_AES_BLOCK_SIZE; + } + + return true; +} + +/** Decrypt a MariaDB 10.1 redo log block. +@param[in,out] buf log block +@param[in] start_lsn server start LSN +@return whether the decryption was successful */ +ATTRIBUTE_COLD bool log_crypt_101_read_block(byte* buf, lsn_t start_lsn) +{ + const uint32_t checkpoint_no + = uint32_t(log_block_get_checkpoint_no(buf)); + const crypt_info_t* info = infos; + for (const crypt_info_t* const end = info + infos_used; info < end; + info++) { + if (info->key_version + && info->key_version != ENCRYPTION_KEY_VERSION_INVALID + && info->checkpoint_no == checkpoint_no) { + goto found; + } + } + + if (infos_used == 0) { + return false; + } + /* MariaDB Server 10.1 would use the first key if it fails to + find a key for the current checkpoint. */ + info = infos; + if (info->key_version == ENCRYPTION_KEY_VERSION_INVALID) { + return false; + } +found: + byte dst[OS_FILE_LOG_BLOCK_SIZE]; + uint dst_len; + byte aes_ctr_iv[MY_AES_BLOCK_SIZE]; + + const uint src_len = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE; + + ulint log_block_no = log_block_get_hdr_no(buf); + + /* The log block header is not encrypted. */ + memcpy(dst, buf, LOG_BLOCK_HDR_SIZE); + + memcpy(aes_ctr_iv, info->crypt_nonce, 3); + mach_write_to_8(aes_ctr_iv + 3, + log_block_get_start_lsn(start_lsn, log_block_no)); + memcpy(aes_ctr_iv + 11, buf, 4); + aes_ctr_iv[11] &= byte(~(LOG_BLOCK_FLUSH_BIT_MASK >> 24)); + aes_ctr_iv[15] = 0; + + int rc = encryption_crypt(buf + LOG_BLOCK_HDR_SIZE, src_len, + dst + LOG_BLOCK_HDR_SIZE, &dst_len, + const_cast<byte*>(info->crypt_key), + MY_AES_BLOCK_SIZE, + aes_ctr_iv, MY_AES_BLOCK_SIZE, + ENCRYPTION_FLAG_DECRYPT + | ENCRYPTION_FLAG_NOPAD, + LOG_DEFAULT_ENCRYPTION_KEY, + info->key_version); + + if (rc != MY_AES_OK || dst_len != src_len) { + return false; + } + + memcpy(buf, dst, sizeof dst); + return true; +} + +/** Add the encryption information to a redo log checkpoint buffer. +@param[in,out] buf checkpoint buffer */ +UNIV_INTERN +void +log_crypt_write_checkpoint_buf(byte* buf) +{ + ut_ad(info.key_version); + compile_time_assert(16 == sizeof info.crypt_msg); + compile_time_assert(16 == MY_AES_BLOCK_SIZE); + compile_time_assert(LOG_CHECKPOINT_CRYPT_MESSAGE + - LOG_CHECKPOINT_CRYPT_NONCE + == sizeof info.crypt_nonce); + + memcpy(buf + LOG_CHECKPOINT_CRYPT_MESSAGE, info.crypt_msg, + MY_AES_BLOCK_SIZE); + memcpy(buf + LOG_CHECKPOINT_CRYPT_NONCE, info.crypt_nonce, + sizeof info.crypt_nonce); + mach_write_to_4(buf + LOG_CHECKPOINT_CRYPT_KEY, info.key_version); +} + +/** Read the checkpoint crypto (version, msg and iv) info. +@param[in] buf checkpoint buffer +@return whether the operation was successful */ +bool log_crypt_read_checkpoint_buf(const byte* buf) +{ + info.checkpoint_no = mach_read_from_4(buf + (LOG_CHECKPOINT_NO + 4)); + info.key_version = mach_read_from_4(buf + LOG_CHECKPOINT_CRYPT_KEY); + +#if MY_AES_BLOCK_SIZE != 16 +# error "MY_AES_BLOCK_SIZE != 16; redo log checkpoint format affected" +#endif + compile_time_assert(16 == sizeof info.crypt_msg); + compile_time_assert(16 == MY_AES_BLOCK_SIZE); + compile_time_assert(LOG_CHECKPOINT_CRYPT_MESSAGE + - LOG_CHECKPOINT_CRYPT_NONCE + == sizeof info.crypt_nonce); + + memcpy(info.crypt_msg, buf + LOG_CHECKPOINT_CRYPT_MESSAGE, + MY_AES_BLOCK_SIZE); + memcpy(info.crypt_nonce, buf + LOG_CHECKPOINT_CRYPT_NONCE, + sizeof info.crypt_nonce); + + return init_crypt_key(&info); +} + +/** Encrypt or decrypt a temporary file block. +@param[in] src block to encrypt or decrypt +@param[in] size size of the block +@param[out] dst destination block +@param[in] offs offset to block +@param[in] encrypt true=encrypt; false=decrypt +@return whether the operation succeeded */ +UNIV_INTERN +bool +log_tmp_block_encrypt( + const byte* src, + ulint size, + byte* dst, + uint64_t offs, + bool encrypt) +{ + uint dst_len; + uint64_t iv[MY_AES_BLOCK_SIZE / sizeof(uint64_t)]; + iv[0] = offs; + memcpy(iv + 1, tmp_iv, sizeof iv - sizeof *iv); + + int rc = encryption_crypt( + src, uint(size), dst, &dst_len, + const_cast<byte*>(info.crypt_key), MY_AES_BLOCK_SIZE, + reinterpret_cast<byte*>(iv), uint(sizeof iv), + encrypt + ? ENCRYPTION_FLAG_ENCRYPT|ENCRYPTION_FLAG_NOPAD + : ENCRYPTION_FLAG_DECRYPT|ENCRYPTION_FLAG_NOPAD, + LOG_DEFAULT_ENCRYPTION_KEY, info.key_version); + + if (rc != MY_AES_OK) { + ib::error() << (encrypt ? "Encryption" : "Decryption") + << " failed for temporary file: " << rc; + } + + return rc == MY_AES_OK; +} diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc new file mode 100644 index 00000000..a6fa50dd --- /dev/null +++ b/storage/innobase/log/log0log.cc @@ -0,0 +1,1340 @@ +/***************************************************************************** + +Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2009, Google Inc. +Copyright (c) 2014, 2021, MariaDB Corporation. + +Portions of this file contain modifications contributed and copyrighted by +Google, Inc. Those modifications are gratefully acknowledged and are described +briefly in the InnoDB documentation. The contributions by Google are +incorporated with their permission, and subject to the conditions contained in +the file COPYING.Google. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file log/log0log.cc +Database log + +Created 12/9/1995 Heikki Tuuri +*******************************************************/ + +#include "univ.i" +#include <debug_sync.h> +#include <my_service_manager.h> + +#include "log0log.h" +#include "log0crypt.h" +#include "buf0buf.h" +#include "buf0flu.h" +#include "lock0lock.h" +#include "log0recv.h" +#include "fil0fil.h" +#include "dict0stats_bg.h" +#include "btr0defragment.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "trx0sys.h" +#include "trx0trx.h" +#include "trx0roll.h" +#include "srv0mon.h" +#include "sync0sync.h" +#include "buf0dump.h" +#include "log0sync.h" + +/* +General philosophy of InnoDB redo-logs: + +Every change to a contents of a data page must be done +through mtr_t, and mtr_t::commit() will write log records +to the InnoDB redo log. */ + +/** Redo log system */ +log_t log_sys; + +/* A margin for free space in the log buffer before a log entry is catenated */ +#define LOG_BUF_WRITE_MARGIN (4 * OS_FILE_LOG_BLOCK_SIZE) + +/* Margins for free space in the log buffer after a log entry is catenated */ +#define LOG_BUF_FLUSH_RATIO 2 +#define LOG_BUF_FLUSH_MARGIN (LOG_BUF_WRITE_MARGIN \ + + (4U << srv_page_size_shift)) + +/** Extends the log buffer. +@param[in] len requested minimum size in bytes */ +void log_buffer_extend(ulong len) +{ + const size_t new_buf_size = ut_calc_align(len, srv_page_size); + byte* new_buf = static_cast<byte*> + (ut_malloc_dontdump(new_buf_size, PSI_INSTRUMENT_ME)); + TRASH_ALLOC(new_buf, new_buf_size); + byte* new_flush_buf = static_cast<byte*> + (ut_malloc_dontdump(new_buf_size, PSI_INSTRUMENT_ME)); + TRASH_ALLOC(new_flush_buf, new_buf_size); + + mysql_mutex_lock(&log_sys.mutex); + + if (len <= srv_log_buffer_size) { + /* Already extended enough by the others */ + mysql_mutex_unlock(&log_sys.mutex); + ut_free_dodump(new_buf, new_buf_size); + ut_free_dodump(new_flush_buf, new_buf_size); + return; + } + + ib::warn() << "The redo log transaction size " << len << + " exceeds innodb_log_buffer_size=" + << srv_log_buffer_size << " / 2). Trying to extend it."; + + byte* old_buf = log_sys.buf; + byte* old_flush_buf = log_sys.flush_buf; + const ulong old_buf_size = srv_log_buffer_size; + srv_log_buffer_size = static_cast<ulong>(new_buf_size); + log_sys.buf = new_buf; + log_sys.flush_buf = new_flush_buf; + memcpy_aligned<OS_FILE_LOG_BLOCK_SIZE>(new_buf, old_buf, + log_sys.buf_free); + + log_sys.max_buf_free = new_buf_size / LOG_BUF_FLUSH_RATIO + - LOG_BUF_FLUSH_MARGIN; + + mysql_mutex_unlock(&log_sys.mutex); + + ut_free_dodump(old_buf, old_buf_size); + ut_free_dodump(old_flush_buf, old_buf_size); + + ib::info() << "innodb_log_buffer_size was extended to " + << new_buf_size << "."; +} + +/** Calculate the recommended highest values for lsn - last_checkpoint_lsn +and lsn - buf_pool.get_oldest_modification(). +@param[in] file_size requested innodb_log_file_size +@retval true on success +@retval false if the smallest log group is too small to +accommodate the number of OS threads in the database server */ +bool +log_set_capacity(ulonglong file_size) +{ + /* Margin for the free space in the smallest log, before a new query + step which modifies the database, is started */ + const size_t LOG_CHECKPOINT_FREE_PER_THREAD = 4U + << srv_page_size_shift; + const size_t LOG_CHECKPOINT_EXTRA_FREE = 8U << srv_page_size_shift; + + lsn_t margin; + ulint free; + + lsn_t smallest_capacity = file_size - LOG_FILE_HDR_SIZE; + /* Add extra safety */ + smallest_capacity -= smallest_capacity / 10; + + /* For each OS thread we must reserve so much free space in the + smallest log group that it can accommodate the log entries produced + by single query steps: running out of free log space is a serious + system error which requires rebooting the database. */ + + free = LOG_CHECKPOINT_FREE_PER_THREAD * 10 + + LOG_CHECKPOINT_EXTRA_FREE; + if (free >= smallest_capacity / 2) { + ib::error() << "Cannot continue operation because log file is " + "too small. Increase innodb_log_file_size " + "or decrease innodb_thread_concurrency. " + << INNODB_PARAMETERS_MSG; + return false; + } + + margin = smallest_capacity - free; + margin = margin - margin / 10; /* Add still some extra safety */ + + mysql_mutex_lock(&log_sys.mutex); + + log_sys.log_capacity = smallest_capacity; + + log_sys.max_modified_age_async = margin - margin / 8; + log_sys.max_checkpoint_age = margin; + + mysql_mutex_unlock(&log_sys.mutex); + + return(true); +} + +/** Initialize the redo log subsystem. */ +void log_t::create() +{ + ut_ad(this == &log_sys); + ut_ad(!is_initialised()); + m_initialised= true; + + mysql_mutex_init(log_sys_mutex_key, &mutex, nullptr); + mysql_mutex_init(log_flush_order_mutex_key, &flush_order_mutex, nullptr); + + /* Start the lsn from one log block from zero: this way every + log record has a non-zero start lsn, a fact which we will use */ + + set_lsn(LOG_START_LSN + LOG_BLOCK_HDR_SIZE); + set_flushed_lsn(LOG_START_LSN + LOG_BLOCK_HDR_SIZE); + + ut_ad(srv_log_buffer_size >= 16 * OS_FILE_LOG_BLOCK_SIZE); + ut_ad(srv_log_buffer_size >= 4U << srv_page_size_shift); + + buf= static_cast<byte*>(ut_malloc_dontdump(srv_log_buffer_size, + PSI_INSTRUMENT_ME)); + TRASH_ALLOC(buf, srv_log_buffer_size); + flush_buf= static_cast<byte*>(ut_malloc_dontdump(srv_log_buffer_size, + PSI_INSTRUMENT_ME)); + TRASH_ALLOC(flush_buf, srv_log_buffer_size); + + max_buf_free= srv_log_buffer_size / LOG_BUF_FLUSH_RATIO - + LOG_BUF_FLUSH_MARGIN; + set_check_flush_or_checkpoint(); + + n_log_ios_old= n_log_ios; + last_printout_time= time(NULL); + + buf_next_to_write= 0; + last_checkpoint_lsn= write_lsn= LOG_START_LSN; + n_log_ios= 0; + n_log_ios_old= 0; + log_capacity= 0; + max_modified_age_async= 0; + max_checkpoint_age= 0; + next_checkpoint_no= 0; + next_checkpoint_lsn= 0; + n_pending_checkpoint_writes= 0; + + log_block_init(buf, LOG_START_LSN); + log_block_set_first_rec_group(buf, LOG_BLOCK_HDR_SIZE); + + buf_free= LOG_BLOCK_HDR_SIZE; + checkpoint_buf= static_cast<byte*> + (aligned_malloc(OS_FILE_LOG_BLOCK_SIZE, OS_FILE_LOG_BLOCK_SIZE)); +} + +mapped_file_t::~mapped_file_t() noexcept +{ + if (!m_area.empty()) + unmap(); +} + +dberr_t mapped_file_t::map(const char *path, bool read_only, + bool nvme) noexcept +{ + auto fd= mysql_file_open(innodb_log_file_key, path, + read_only ? O_RDONLY : O_RDWR, MYF(MY_WME)); + if (fd == -1) + return DB_ERROR; + + const auto file_size= os_file_get_size(path).m_total_size; + + const int nvme_flag= nvme ? MAP_SYNC : 0; + void *ptr= my_mmap(0, static_cast<size_t>(file_size), + read_only ? PROT_READ : PROT_READ | PROT_WRITE, + MAP_SHARED_VALIDATE | nvme_flag, fd, 0); + mysql_file_close(fd, MYF(MY_WME)); + + if (ptr == MAP_FAILED) + return DB_ERROR; + + m_area= {static_cast<byte *>(ptr), + static_cast<span<byte>::size_type>(file_size)}; + return DB_SUCCESS; +} + +dberr_t mapped_file_t::unmap() noexcept +{ + ut_ad(!m_area.empty()); + + if (my_munmap(m_area.data(), m_area.size())) + return DB_ERROR; + + m_area= {}; + return DB_SUCCESS; +} + +file_os_io::file_os_io(file_os_io &&rhs) : m_fd(rhs.m_fd) +{ + rhs.m_fd= OS_FILE_CLOSED; +} + +file_os_io &file_os_io::operator=(file_os_io &&rhs) +{ + std::swap(m_fd, rhs.m_fd); + return *this; +} + +file_os_io::~file_os_io() noexcept +{ + if (is_opened()) + close(); +} + +dberr_t file_os_io::open(const char *path, bool read_only) noexcept +{ + ut_ad(!is_opened()); + + bool success; + auto tmp_fd= os_file_create( + innodb_log_file_key, path, OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT, + OS_FILE_NORMAL, OS_LOG_FILE, read_only, &success); + if (!success) + return DB_ERROR; + + m_durable_writes= srv_file_flush_method == SRV_O_DSYNC; + m_fd= tmp_fd; + return success ? DB_SUCCESS : DB_ERROR; +} + +dberr_t file_os_io::rename(const char *old_path, const char *new_path) noexcept +{ + return os_file_rename(innodb_log_file_key, old_path, new_path) ? DB_SUCCESS + : DB_ERROR; +} + +dberr_t file_os_io::close() noexcept +{ + if (!os_file_close(m_fd)) + return DB_ERROR; + + m_fd= OS_FILE_CLOSED; + return DB_SUCCESS; +} + +dberr_t file_os_io::read(os_offset_t offset, span<byte> buf) noexcept +{ + return os_file_read(IORequestRead, m_fd, buf.data(), offset, buf.size()); +} + +dberr_t file_os_io::write(const char *path, os_offset_t offset, + span<const byte> buf) noexcept +{ + return os_file_write(IORequestWrite, path, m_fd, buf.data(), offset, + buf.size()); +} + +dberr_t file_os_io::flush() noexcept +{ + return os_file_flush(m_fd) ? DB_SUCCESS : DB_ERROR; +} + +#ifdef HAVE_PMEM + +#include <libpmem.h> + +static bool is_pmem(const char *path) noexcept +{ + mapped_file_t mf; + return mf.map(path, true, true) == DB_SUCCESS ? true : false; +} + +class file_pmem_io final : public file_io +{ +public: + file_pmem_io() noexcept : file_io(true) {} + + dberr_t open(const char *path, bool read_only) noexcept final + { + return m_file.map(path, read_only, true); + } + dberr_t rename(const char *old_path, const char *new_path) noexcept final + { + return os_file_rename(innodb_log_file_key, old_path, new_path) ? DB_SUCCESS + : DB_ERROR; + } + dberr_t close() noexcept final { return m_file.unmap(); } + dberr_t read(os_offset_t offset, span<byte> buf) noexcept final + { + memcpy(buf.data(), m_file.data() + offset, buf.size()); + return DB_SUCCESS; + } + dberr_t write(const char *, os_offset_t offset, + span<const byte> buf) noexcept final + { + pmem_memcpy_persist(m_file.data() + offset, buf.data(), buf.size()); + return DB_SUCCESS; + } + dberr_t flush() noexcept final + { + ut_ad(0); + return DB_SUCCESS; + } + +private: + mapped_file_t m_file; +}; +#endif + +dberr_t log_file_t::open(bool read_only) noexcept +{ + ut_a(!is_opened()); + +#ifdef HAVE_PMEM + auto ptr= is_pmem(m_path.c_str()) + ? std::unique_ptr<file_io>(new file_pmem_io) + : std::unique_ptr<file_io>(new file_os_io); +#else + auto ptr= std::unique_ptr<file_io>(new file_os_io); +#endif + + if (dberr_t err= ptr->open(m_path.c_str(), read_only)) + return err; + + m_file= std::move(ptr); + return DB_SUCCESS; +} + +bool log_file_t::is_opened() const noexcept +{ + return static_cast<bool>(m_file); +} + +dberr_t log_file_t::rename(std::string new_path) noexcept +{ + if (dberr_t err= m_file->rename(m_path.c_str(), new_path.c_str())) + return err; + + m_path = std::move(new_path); + return DB_SUCCESS; +} + +dberr_t log_file_t::close() noexcept +{ + ut_a(is_opened()); + + if (dberr_t err= m_file->close()) + return err; + + m_file.reset(); + return DB_SUCCESS; +} + +dberr_t log_file_t::read(os_offset_t offset, span<byte> buf) noexcept +{ + ut_ad(is_opened()); + return m_file->read(offset, buf); +} + +bool log_file_t::writes_are_durable() const noexcept +{ + return m_file->writes_are_durable(); +} + +dberr_t log_file_t::write(os_offset_t offset, span<const byte> buf) noexcept +{ + ut_ad(is_opened()); + return m_file->write(m_path.c_str(), offset, buf); +} + +dberr_t log_file_t::flush() noexcept +{ + ut_ad(is_opened()); + return m_file->flush(); +} + +void log_t::file::open_file(std::string path) +{ + fd= log_file_t(std::move(path)); + if (const dberr_t err= fd.open(srv_read_only_mode)) + ib::fatal() << "open(" << fd.get_path() << ") returned " << err; +} + +/** Update the log block checksum. */ +static void log_block_store_checksum(byte* block) +{ + log_block_set_checksum(block, log_block_calc_checksum_crc32(block)); +} + +void log_t::file::write_header_durable(lsn_t lsn) +{ + ut_ad(lsn % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(!recv_no_log_write); + ut_ad(log_sys.log.format == log_t::FORMAT_10_5 || + log_sys.log.format == log_t::FORMAT_ENC_10_5); + + byte *buf= log_sys.checkpoint_buf; + memset_aligned<OS_FILE_LOG_BLOCK_SIZE>(buf, 0, OS_FILE_LOG_BLOCK_SIZE); + + mach_write_to_4(buf + LOG_HEADER_FORMAT, log_sys.log.format); + mach_write_to_4(buf + LOG_HEADER_SUBFORMAT, log_sys.log.subformat); + mach_write_to_8(buf + LOG_HEADER_START_LSN, lsn); + strcpy(reinterpret_cast<char*>(buf) + LOG_HEADER_CREATOR, + LOG_HEADER_CREATOR_CURRENT); + ut_ad(LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR >= + sizeof LOG_HEADER_CREATOR_CURRENT); + log_block_store_checksum(buf); + + DBUG_PRINT("ib_log", ("write " LSN_PF, lsn)); + + log_sys.log.write(0, {buf, OS_FILE_LOG_BLOCK_SIZE}); + if (!log_sys.log.writes_are_durable()) + log_sys.log.flush(); +} + +void log_t::file::read(os_offset_t offset, span<byte> buf) +{ + if (const dberr_t err= fd.read(offset, buf)) + ib::fatal() << "read(" << fd.get_path() << ") returned "<< err; +} + +bool log_t::file::writes_are_durable() const noexcept +{ + return fd.writes_are_durable(); +} + +void log_t::file::write(os_offset_t offset, span<byte> buf) +{ + srv_stats.os_log_pending_writes.inc(); + if (const dberr_t err= fd.write(offset, buf)) + ib::fatal() << "write(" << fd.get_path() << ") returned " << err; + srv_stats.os_log_pending_writes.dec(); + srv_stats.os_log_written.add(buf.size()); + srv_stats.log_writes.inc(); + log_sys.n_log_ios++; +} + +void log_t::file::flush() +{ + log_sys.pending_flushes.fetch_add(1, std::memory_order_acquire); + if (const dberr_t err= fd.flush()) + ib::fatal() << "flush(" << fd.get_path() << ") returned " << err; + log_sys.pending_flushes.fetch_sub(1, std::memory_order_release); + log_sys.flushes.fetch_add(1, std::memory_order_release); +} + +void log_t::file::close_file() +{ + if (fd.is_opened()) + { + if (const dberr_t err= fd.close()) + ib::fatal() << "close(" << fd.get_path() << ") returned " << err; + } + fd.free(); // Free path +} + +/** Initialize the redo log. */ +void log_t::file::create() +{ + ut_ad(this == &log_sys.log); + ut_ad(log_sys.is_initialised()); + + format= srv_encrypt_log ? log_t::FORMAT_ENC_10_5 : log_t::FORMAT_10_5; + subformat= 2; + file_size= srv_log_file_size; + lsn= LOG_START_LSN; + lsn_offset= LOG_FILE_HDR_SIZE; +} + +/******************************************************//** +Writes a buffer to a log file. */ +static +void +log_write_buf( + byte* buf, /*!< in: buffer */ + ulint len, /*!< in: buffer len; must be divisible + by OS_FILE_LOG_BLOCK_SIZE */ +#ifdef UNIV_DEBUG + ulint pad_len, /*!< in: pad len in the buffer len */ +#endif /* UNIV_DEBUG */ + lsn_t start_lsn, /*!< in: start lsn of the buffer; must + be divisible by + OS_FILE_LOG_BLOCK_SIZE */ + ulint new_data_offset)/*!< in: start offset of new data in + buf: this parameter is used to decide + if we have to write a new log file + header */ +{ + ulint write_len; + lsn_t next_offset; + ulint i; + + ut_ad(log_write_lock_own()); + ut_ad(!recv_no_log_write); + ut_a(len % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_a(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); + +loop: + if (len == 0) { + + return; + } + + next_offset = log_sys.log.calc_lsn_offset(start_lsn); + + if ((next_offset % log_sys.log.file_size) + len + > log_sys.log.file_size) { + /* if the above condition holds, then the below expression + is < len which is ulint, so the typecast is ok */ + write_len = ulint(log_sys.log.file_size + - (next_offset % log_sys.log.file_size)); + } else { + write_len = len; + } + + DBUG_PRINT("ib_log", + ("write " LSN_PF " to " LSN_PF + ": len " ULINTPF + " blocks " ULINTPF ".." ULINTPF, + start_lsn, next_offset, + write_len, + log_block_get_hdr_no(buf), + log_block_get_hdr_no( + buf + write_len + - OS_FILE_LOG_BLOCK_SIZE))); + + ut_ad(pad_len >= len + || log_block_get_hdr_no(buf) + == log_block_convert_lsn_to_no(start_lsn)); + + /* Calculate the checksums for each log block and write them to + the trailer fields of the log blocks */ + + for (i = 0; i < write_len / OS_FILE_LOG_BLOCK_SIZE; i++) { +#ifdef UNIV_DEBUG + ulint hdr_no_2 = log_block_get_hdr_no(buf) + i; + DBUG_EXECUTE_IF("innodb_small_log_block_no_limit", + hdr_no_2 = ((hdr_no_2 - 1) & 0xFUL) + 1;); +#endif + ut_ad(pad_len >= len + || i * OS_FILE_LOG_BLOCK_SIZE >= len - pad_len + || log_block_get_hdr_no(buf + i * OS_FILE_LOG_BLOCK_SIZE) == hdr_no_2); + log_block_store_checksum(buf + i * OS_FILE_LOG_BLOCK_SIZE); + } + + ut_a((next_offset >> srv_page_size_shift) <= ULINT_MAX); + + log_sys.log.write(static_cast<size_t>(next_offset), {buf, write_len}); + + if (write_len < len) { + start_lsn += write_len; + len -= write_len; + buf += write_len; + goto loop; + } +} + +/** Flush the recently written changes to the log file. +and invoke mysql_mutex_lock(&log_sys.mutex). */ +static void log_write_flush_to_disk_low(lsn_t lsn) +{ + if (!log_sys.log.writes_are_durable()) + log_sys.log.flush(); + ut_a(lsn >= log_sys.get_flushed_lsn()); + log_sys.set_flushed_lsn(lsn); +} + +/** Swap log buffers, and copy the content of last block +from old buf to the head of the new buf. Thus, buf_free and +buf_next_to_write would be changed accordingly */ +static inline +void +log_buffer_switch() +{ + mysql_mutex_assert_owner(&log_sys.mutex); + ut_ad(log_write_lock_own()); + + size_t area_end = ut_calc_align<size_t>( + log_sys.buf_free, OS_FILE_LOG_BLOCK_SIZE); + + /* Copy the last block to new buf */ + memcpy_aligned<OS_FILE_LOG_BLOCK_SIZE>( + log_sys.flush_buf, + log_sys.buf + area_end - OS_FILE_LOG_BLOCK_SIZE, + OS_FILE_LOG_BLOCK_SIZE); + + std::swap(log_sys.buf, log_sys.flush_buf); + + log_sys.buf_free %= OS_FILE_LOG_BLOCK_SIZE; + log_sys.buf_next_to_write = log_sys.buf_free; +} + +/** Invoke commit_checkpoint_notify_ha() to notify that outstanding +log writes have been completed. */ +void log_flush_notify(lsn_t flush_lsn); + +/** +Writes log buffer to disk +which is the "write" part of log_write_up_to(). + +This function does not flush anything. + +Note : the caller must have log_sys.mutex locked, and this +mutex is released in the function. + +*/ +static void log_write(bool rotate_key) +{ + mysql_mutex_assert_owner(&log_sys.mutex); + ut_ad(!recv_no_log_write); + lsn_t write_lsn; + if (log_sys.buf_free == log_sys.buf_next_to_write) { + /* Nothing to write */ + mysql_mutex_unlock(&log_sys.mutex); + return; + } + + ulint start_offset; + ulint end_offset; + ulint area_start; + ulint area_end; + ulong write_ahead_size = srv_log_write_ahead_size; + ulint pad_size; + + DBUG_PRINT("ib_log", ("write " LSN_PF " to " LSN_PF, + log_sys.write_lsn, + log_sys.get_lsn())); + + + start_offset = log_sys.buf_next_to_write; + end_offset = log_sys.buf_free; + + area_start = ut_2pow_round(start_offset, + ulint(OS_FILE_LOG_BLOCK_SIZE)); + area_end = ut_calc_align(end_offset, ulint(OS_FILE_LOG_BLOCK_SIZE)); + + ut_ad(area_end - area_start > 0); + + log_block_set_flush_bit(log_sys.buf + area_start, TRUE); + log_block_set_checkpoint_no( + log_sys.buf + area_end - OS_FILE_LOG_BLOCK_SIZE, + log_sys.next_checkpoint_no); + + write_lsn = log_sys.get_lsn(); + byte *write_buf = log_sys.buf; + + log_buffer_switch(); + + log_sys.log.set_fields(log_sys.write_lsn); + + mysql_mutex_unlock(&log_sys.mutex); + /* Erase the end of the last log block. */ + memset(write_buf + end_offset, 0, + ~end_offset & (OS_FILE_LOG_BLOCK_SIZE - 1)); + + /* Calculate pad_size if needed. */ + pad_size = 0; + if (write_ahead_size > OS_FILE_LOG_BLOCK_SIZE) { + ulint end_offset_in_unit; + lsn_t end_offset = log_sys.log.calc_lsn_offset( + ut_uint64_align_up(write_lsn, OS_FILE_LOG_BLOCK_SIZE)); + end_offset_in_unit = (ulint) (end_offset % write_ahead_size); + + if (end_offset_in_unit > 0 + && (area_end - area_start) > end_offset_in_unit) { + /* The first block in the unit was initialized + after the last writing. + Needs to be written padded data once. */ + pad_size = std::min<ulint>( + ulint(write_ahead_size) - end_offset_in_unit, + srv_log_buffer_size - area_end); + ::memset(write_buf + area_end, 0, pad_size); + } + } + + if (UNIV_UNLIKELY(srv_shutdown_state > SRV_SHUTDOWN_INITIATED)) { + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "InnoDB log write: " + LSN_PF, log_sys.write_lsn); + } + + if (log_sys.is_encrypted()) { + log_crypt(write_buf + area_start, log_sys.write_lsn, + area_end - area_start, + rotate_key ? LOG_ENCRYPT_ROTATE_KEY : LOG_ENCRYPT); + } + + /* Do the write to the log file */ + log_write_buf( + write_buf + area_start, area_end - area_start + pad_size, +#ifdef UNIV_DEBUG + pad_size, +#endif /* UNIV_DEBUG */ + ut_uint64_align_down(log_sys.write_lsn, + OS_FILE_LOG_BLOCK_SIZE), + start_offset - area_start); + srv_stats.log_padded.add(pad_size); + log_sys.write_lsn = write_lsn; + if (log_sys.log.writes_are_durable()) { + log_sys.set_flushed_lsn(write_lsn); + log_flush_notify(write_lsn); + } + return; +} + +static group_commit_lock write_lock; +static group_commit_lock flush_lock; + +#ifdef UNIV_DEBUG +bool log_write_lock_own() +{ + return write_lock.is_owner(); +} +#endif + +/** Ensure that the log has been written to the log file up to a given +log entry (such as that of a transaction commit). Start a new write, or +wait and check if an already running write is covering the request. +@param[in] lsn log sequence number that should be +included in the redo log file write +@param[in] flush_to_disk whether the written log should also +be flushed to the file system +@param[in] rotate_key whether to rotate the encryption key */ +void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key) +{ + ut_ad(!srv_read_only_mode); + ut_ad(!rotate_key || flush_to_disk); + ut_ad(lsn != LSN_MAX); + + if (recv_no_ibuf_operations) + { + /* Recovery is running and no operations on the log files are + allowed yet (the variable name .._no_ibuf_.. is misleading) */ + return; + } + + if (flush_to_disk && + flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED) + { + return; + } + + if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED) + { + mysql_mutex_lock(&log_sys.mutex); + lsn_t write_lsn= log_sys.get_lsn(); + write_lock.set_pending(write_lsn); + + log_write(rotate_key); + + ut_a(log_sys.write_lsn == write_lsn); + write_lock.release(write_lsn); + } + + if (!flush_to_disk) + { + return; + } + + /* Flush the highest written lsn.*/ + auto flush_lsn = write_lock.value(); + flush_lock.set_pending(flush_lsn); + log_write_flush_to_disk_low(flush_lsn); + flush_lock.release(flush_lsn); + + log_flush_notify(flush_lsn); +} + +/** write to the log file up to the last log entry. +@param[in] sync whether we want the written log +also to be flushed to disk. */ +void log_buffer_flush_to_disk(bool sync) +{ + ut_ad(!srv_read_only_mode); + log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), sync); +} + +/******************************************************************** + +Tries to establish a big enough margin of free space in the log buffer, such +that a new log entry can be catenated without an immediate need for a flush. */ +ATTRIBUTE_COLD static void log_flush_margin() +{ + lsn_t lsn = 0; + + mysql_mutex_lock(&log_sys.mutex); + + if (log_sys.buf_free > log_sys.max_buf_free) { + /* We can write during flush */ + lsn = log_sys.get_lsn(); + } + + mysql_mutex_unlock(&log_sys.mutex); + + if (lsn) { + log_write_up_to(lsn, false); + } +} + +/** Write checkpoint info to the log header and release log_sys.mutex. +@param[in] end_lsn start LSN of the FILE_CHECKPOINT mini-transaction */ +ATTRIBUTE_COLD void log_write_checkpoint_info(lsn_t end_lsn) +{ + ut_ad(!srv_read_only_mode); + ut_ad(end_lsn == 0 || end_lsn >= log_sys.next_checkpoint_lsn); + ut_ad(end_lsn <= log_sys.get_lsn()); + ut_ad(end_lsn + SIZE_OF_FILE_CHECKPOINT <= log_sys.get_lsn() + || srv_shutdown_state > SRV_SHUTDOWN_INITIATED); + + DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF + " written", + log_sys.next_checkpoint_no, + log_sys.next_checkpoint_lsn)); + + byte* buf = log_sys.checkpoint_buf; + memset_aligned<OS_FILE_LOG_BLOCK_SIZE>(buf, 0, OS_FILE_LOG_BLOCK_SIZE); + + mach_write_to_8(buf + LOG_CHECKPOINT_NO, log_sys.next_checkpoint_no); + mach_write_to_8(buf + LOG_CHECKPOINT_LSN, log_sys.next_checkpoint_lsn); + + if (log_sys.is_encrypted()) { + log_crypt_write_checkpoint_buf(buf); + } + + lsn_t lsn_offset + = log_sys.log.calc_lsn_offset(log_sys.next_checkpoint_lsn); + mach_write_to_8(buf + LOG_CHECKPOINT_OFFSET, lsn_offset); + mach_write_to_8(buf + LOG_CHECKPOINT_LOG_BUF_SIZE, + srv_log_buffer_size); + mach_write_to_8(buf + LOG_CHECKPOINT_END_LSN, end_lsn); + + log_block_store_checksum(buf); + + ut_ad(LOG_CHECKPOINT_1 < srv_page_size); + ut_ad(LOG_CHECKPOINT_2 < srv_page_size); + + ++log_sys.n_pending_checkpoint_writes; + + mysql_mutex_unlock(&log_sys.mutex); + + /* Note: We alternate the physical place of the checkpoint info. + See the (next_checkpoint_no & 1) below. */ + + log_sys.log.write((log_sys.next_checkpoint_no & 1) ? LOG_CHECKPOINT_2 + : LOG_CHECKPOINT_1, + {buf, OS_FILE_LOG_BLOCK_SIZE}); + + log_sys.log.flush(); + + mysql_mutex_lock(&log_sys.mutex); + + --log_sys.n_pending_checkpoint_writes; + ut_ad(log_sys.n_pending_checkpoint_writes == 0); + + log_sys.next_checkpoint_no++; + + log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn; + + DBUG_PRINT("ib_log", ("checkpoint ended at " LSN_PF + ", flushed to " LSN_PF, + lsn_t{log_sys.last_checkpoint_lsn}, + log_sys.get_flushed_lsn())); + + MONITOR_INC(MONITOR_NUM_CHECKPOINT); + + DBUG_EXECUTE_IF("crash_after_checkpoint", DBUG_SUICIDE();); + + mysql_mutex_unlock(&log_sys.mutex); +} + +/****************************************************************//** +Tries to establish a big enough margin of free space in the log, such +that a new log entry can be catenated without an immediate need for a +checkpoint. NOTE: this function may only be called if the calling thread +owns no synchronization objects! */ +ATTRIBUTE_COLD static void log_checkpoint_margin() +{ + while (log_sys.check_flush_or_checkpoint()) + { + mysql_mutex_lock(&log_sys.mutex); + ut_ad(!recv_no_log_write); + + if (!log_sys.check_flush_or_checkpoint()) + { +func_exit: + mysql_mutex_unlock(&log_sys.mutex); + return; + } + + const lsn_t lsn= log_sys.get_lsn(); + const lsn_t checkpoint= log_sys.last_checkpoint_lsn; + const lsn_t sync_lsn= checkpoint + log_sys.max_checkpoint_age; + if (lsn <= sync_lsn) + { + log_sys.set_check_flush_or_checkpoint(false); + goto func_exit; + } + + mysql_mutex_unlock(&log_sys.mutex); + + /* We must wait to prevent the tail of the log overwriting the head. */ + buf_flush_wait_flushed(std::min(sync_lsn, checkpoint + (1U << 20))); + os_thread_sleep(10000); /* Sleep 10ms to avoid a thundering herd */ + } +} + +/** +Checks that there is enough free space in the log to start a new query step. +Flushes the log buffer or makes a new checkpoint if necessary. NOTE: this +function may only be called if the calling thread owns no synchronization +objects! */ +ATTRIBUTE_COLD void log_check_margins() +{ + do + { + log_flush_margin(); + log_checkpoint_margin(); + ut_ad(!recv_no_log_write); + } + while (log_sys.check_flush_or_checkpoint()); +} + +extern void buf_resize_shutdown(); + +/** Make a checkpoint at the latest lsn on shutdown. */ +ATTRIBUTE_COLD void logs_empty_and_mark_files_at_shutdown() +{ + lsn_t lsn; + ulint count = 0; + + ib::info() << "Starting shutdown..."; + + /* Wait until the master thread and all other operations are idle: our + algorithm only works if the server is idle at shutdown */ + bool do_srv_shutdown = false; + if (srv_master_timer) { + do_srv_shutdown = srv_fast_shutdown < 2; + srv_master_timer.reset(); + } + + /* Wait for the end of the buffer resize task.*/ + buf_resize_shutdown(); + dict_stats_shutdown(); + btr_defragment_shutdown(); + + srv_shutdown_state = SRV_SHUTDOWN_CLEANUP; + + if (srv_buffer_pool_dump_at_shutdown && + !srv_read_only_mode && srv_fast_shutdown < 2) { + buf_dump_start(); + } + srv_monitor_timer.reset(); + lock_sys.timeout_timer.reset(); + if (do_srv_shutdown) { + srv_shutdown(srv_fast_shutdown == 0); + } + + +loop: + ut_ad(lock_sys.is_initialised() || !srv_was_started); + ut_ad(log_sys.is_initialised() || !srv_was_started); + ut_ad(fil_system.is_initialised() || !srv_was_started); + +#define COUNT_INTERVAL 600U +#define CHECK_INTERVAL 100000U + os_thread_sleep(CHECK_INTERVAL); + + count++; + + /* Check that there are no longer transactions, except for + PREPARED ones. We need this wait even for the 'very fast' + shutdown, because the InnoDB layer may have committed or + prepared transactions and we don't want to lose them. */ + + if (ulint total_trx = srv_was_started && !srv_read_only_mode + && srv_force_recovery < SRV_FORCE_NO_TRX_UNDO + ? trx_sys.any_active_transactions() : 0) { + + if (srv_print_verbose_log && count > COUNT_INTERVAL) { + service_manager_extend_timeout( + COUNT_INTERVAL * CHECK_INTERVAL/1000000 * 2, + "Waiting for %lu active transactions to finish", + (ulong) total_trx); + ib::info() << "Waiting for " << total_trx << " active" + << " transactions to finish"; + + count = 0; + } + + goto loop; + } + + /* We need these threads to stop early in shutdown. */ + const char* thread_name; + + if (srv_fast_shutdown != 2 && trx_rollback_is_active) { + thread_name = "rollback of recovered transactions"; + } else { + thread_name = NULL; + } + + if (thread_name) { + ut_ad(!srv_read_only_mode); +wait_suspend_loop: + service_manager_extend_timeout( + COUNT_INTERVAL * CHECK_INTERVAL/1000000 * 2, + "Waiting for %s to exit", thread_name); + if (srv_print_verbose_log && count > COUNT_INTERVAL) { + ib::info() << "Waiting for " << thread_name + << " to exit"; + count = 0; + } + goto loop; + } + + /* Check that the background threads are suspended */ + + ut_ad(!srv_any_background_activity()); + if (srv_n_fil_crypt_threads_started) { + os_event_set(fil_crypt_threads_event); + thread_name = "fil_crypt_thread"; + goto wait_suspend_loop; + } + + if (buf_page_cleaner_is_active) { + thread_name = "page cleaner thread"; + pthread_cond_signal(&buf_pool.do_flush_list); + goto wait_suspend_loop; + } + + buf_load_dump_end(); + + if (!buf_pool.is_initialised()) { + ut_ad(!srv_was_started); + } else if (ulint pending_io = buf_pool.io_pending()) { + if (srv_print_verbose_log && count > 600) { + ib::info() << "Waiting for " << pending_io << " buffer" + " page I/Os to complete"; + count = 0; + } + + goto loop; + } else { + buf_flush_buffer_pool(); + } + + if (log_sys.is_initialised()) { + mysql_mutex_lock(&log_sys.mutex); + const ulint n_write = log_sys.n_pending_checkpoint_writes; + const ulint n_flush = log_sys.pending_flushes; + mysql_mutex_unlock(&log_sys.mutex); + + if (n_write || n_flush) { + if (srv_print_verbose_log && count > 600) { + ib::info() << "Pending checkpoint_writes: " + << n_write + << ". Pending log flush writes: " + << n_flush; + count = 0; + } + goto loop; + } + } + + if (srv_fast_shutdown == 2 || !srv_was_started) { + if (!srv_read_only_mode && srv_was_started) { + ib::info() << "MySQL has requested a very fast" + " shutdown without flushing the InnoDB buffer" + " pool to data files. At the next mysqld" + " startup InnoDB will do a crash recovery!"; + + /* In this fastest shutdown we do not flush the + buffer pool: + + it is essentially a 'crash' of the InnoDB server. + Make sure that the log is all flushed to disk, so + that we can recover all committed transactions in + a crash recovery. We must not write the lsn stamps + to the data files, since at a startup InnoDB deduces + from the stamps if the previous shutdown was clean. */ + + log_buffer_flush_to_disk(); + } + + srv_shutdown_state = SRV_SHUTDOWN_LAST_PHASE; + return; + } + + if (!srv_read_only_mode) { + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "ensuring dirty buffer pool are written to log"); + log_make_checkpoint(); + + mysql_mutex_lock(&log_sys.mutex); + + lsn = log_sys.get_lsn(); + + const bool lsn_changed = lsn != log_sys.last_checkpoint_lsn + && lsn != log_sys.last_checkpoint_lsn + + SIZE_OF_FILE_CHECKPOINT; + ut_ad(lsn >= log_sys.last_checkpoint_lsn); + + mysql_mutex_unlock(&log_sys.mutex); + + if (lsn_changed) { + goto loop; + } + + log_sys.log.flush(); + } else { + lsn = recv_sys.recovered_lsn; + } + + srv_shutdown_state = SRV_SHUTDOWN_LAST_PHASE; + + /* Make some checks that the server really is quiet */ + ut_ad(!srv_any_background_activity()); + + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "Free innodb buffer pool"); + ut_d(buf_pool.assert_all_freed()); + + ut_a(lsn == log_sys.get_lsn() + || srv_force_recovery == SRV_FORCE_NO_LOG_REDO); + + if (UNIV_UNLIKELY(lsn < recv_sys.recovered_lsn)) { + ib::error() << "Shutdown LSN=" << lsn + << " is less than start LSN=" + << recv_sys.recovered_lsn; + } + + srv_shutdown_lsn = lsn; + + if (!srv_read_only_mode) { + dberr_t err = fil_write_flushed_lsn(lsn); + + if (err != DB_SUCCESS) { + ib::error() << "Writing flushed lsn " << lsn + << " failed; error=" << err; + } + } + + /* Make some checks that the server really is quiet */ + ut_ad(!srv_any_background_activity()); + + ut_a(lsn == log_sys.get_lsn() + || srv_force_recovery == SRV_FORCE_NO_LOG_REDO); +} + +/******************************************************//** +Prints info of the log. */ +void +log_print( +/*======*/ + FILE* file) /*!< in: file where to print */ +{ + double time_elapsed; + time_t current_time; + + mysql_mutex_lock(&log_sys.mutex); + + const lsn_t lsn= log_sys.get_lsn(); + mysql_mutex_lock(&buf_pool.flush_list_mutex); + const lsn_t pages_flushed = buf_pool.get_oldest_modification(lsn); + mysql_mutex_unlock(&buf_pool.flush_list_mutex); + + fprintf(file, + "Log sequence number " LSN_PF "\n" + "Log flushed up to " LSN_PF "\n" + "Pages flushed up to " LSN_PF "\n" + "Last checkpoint at " LSN_PF "\n", + lsn, + log_sys.get_flushed_lsn(), + pages_flushed, + lsn_t{log_sys.last_checkpoint_lsn}); + + current_time = time(NULL); + + time_elapsed = difftime(current_time, + log_sys.last_printout_time); + + if (time_elapsed <= 0) { + time_elapsed = 1; + } + + fprintf(file, + ULINTPF " pending log flushes, " + ULINTPF " pending chkp writes\n" + ULINTPF " log i/o's done, %.2f log i/o's/second\n", + log_sys.pending_flushes.load(), + log_sys.n_pending_checkpoint_writes, + log_sys.n_log_ios, + static_cast<double>( + log_sys.n_log_ios - log_sys.n_log_ios_old) + / time_elapsed); + + log_sys.n_log_ios_old = log_sys.n_log_ios; + log_sys.last_printout_time = current_time; + + mysql_mutex_unlock(&log_sys.mutex); +} + +/**********************************************************************//** +Refreshes the statistics used to print per-second averages. */ +void +log_refresh_stats(void) +/*===================*/ +{ + log_sys.n_log_ios_old = log_sys.n_log_ios; + log_sys.last_printout_time = time(NULL); +} + +/** Shut down the redo log subsystem. */ +void log_t::close() +{ + ut_ad(this == &log_sys); + if (!is_initialised()) return; + m_initialised= false; + log.close(); + + ut_free_dodump(buf, srv_log_buffer_size); + buf= nullptr; + ut_free_dodump(flush_buf, srv_log_buffer_size); + flush_buf= nullptr; + + mysql_mutex_destroy(&mutex); + mysql_mutex_destroy(&flush_order_mutex); + + recv_sys.close(); + + aligned_free(checkpoint_buf); + checkpoint_buf= nullptr; +} + +std::string get_log_file_path(const char *filename) +{ + const size_t size= strlen(srv_log_group_home_dir) + /* path separator */ 1 + + strlen(filename) + /* longest suffix */ 3; + std::string path; + path.reserve(size); + path.assign(srv_log_group_home_dir); + + std::replace(path.begin(), path.end(), OS_PATH_SEPARATOR_ALT, + OS_PATH_SEPARATOR); + + if (path.back() != OS_PATH_SEPARATOR) + path.push_back(OS_PATH_SEPARATOR); + path.append(filename); + + return path; +} + +std::vector<std::string> get_existing_log_files_paths() { + std::vector<std::string> result; + + for (int i= 0; i < 101; i++) { + auto path= get_log_file_path(LOG_FILE_NAME_PREFIX) + .append(std::to_string(i)); + os_file_stat_t stat; + dberr_t err= os_file_get_status(path.c_str(), &stat, false, true); + if (err) + break; + + if (stat.type != OS_FILE_TYPE_FILE) + break; + + result.push_back(std::move(path)); + } + + return result; +} diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc new file mode 100644 index 00000000..eb34fd8e --- /dev/null +++ b/storage/innobase/log/log0recv.cc @@ -0,0 +1,3783 @@ +/***************************************************************************** + +Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2012, Facebook Inc. +Copyright (c) 2013, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file log/log0recv.cc +Recovery + +Created 9/20/1997 Heikki Tuuri +*******************************************************/ + +#include "univ.i" + +#include <map> +#include <string> +#include <my_service_manager.h> + +#include "log0recv.h" + +#ifdef HAVE_MY_AES_H +#include <my_aes.h> +#endif + +#include "log0crypt.h" +#include "mem0mem.h" +#include "buf0buf.h" +#include "buf0dblwr.h" +#include "buf0flu.h" +#include "mtr0mtr.h" +#include "mtr0log.h" +#include "page0page.h" +#include "page0cur.h" +#include "trx0undo.h" +#include "ibuf0ibuf.h" +#include "trx0undo.h" +#include "trx0rec.h" +#include "fil0fil.h" +#include "buf0rea.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "fil0pagecompress.h" + +/** Read-ahead area in applying log records to file pages */ +#define RECV_READ_AHEAD_AREA 32U + +/** The recovery system */ +recv_sys_t recv_sys; +/** TRUE when recv_init_crash_recovery() has been called. */ +bool recv_needed_recovery; +#ifdef UNIV_DEBUG +/** TRUE if writing to the redo log (mtr_commit) is forbidden. +Protected by log_sys.mutex. */ +bool recv_no_log_write = false; +#endif /* UNIV_DEBUG */ + +/** TRUE if buf_page_is_corrupted() should check if the log sequence +number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by +recv_recovery_from_checkpoint_start(). */ +bool recv_lsn_checks_on; + +/** If the following is TRUE, the buffer pool file pages must be invalidated +after recovery and no ibuf operations are allowed; this becomes TRUE if +the log record hash table becomes too full, and log records must be merged +to file pages already before the recovery is finished: in this case no +ibuf operations are allowed, as they could modify the pages read in the +buffer pool before the pages have been recovered to the up-to-date state. + +true means that recovery is running and no operations on the log file +are allowed yet: the variable name is misleading. */ +bool recv_no_ibuf_operations; + +/** The maximum lsn we see for a page during the recovery process. If this +is bigger than the lsn we are able to scan up to, that is an indication that +the recovery failed and the database may be corrupt. */ +static lsn_t recv_max_page_lsn; + +/** Stored physical log record with logical LSN (@see log_t::FORMAT_10_5) */ +struct log_phys_t : public log_rec_t +{ + /** start LSN of the mini-transaction (not necessarily of this record) */ + const lsn_t start_lsn; +private: + /** @return the start of length and data */ + const byte *start() const + { + return my_assume_aligned<sizeof(size_t)> + (reinterpret_cast<const byte*>(&start_lsn + 1)); + } + /** @return the start of length and data */ + byte *start() + { return const_cast<byte*>(const_cast<const log_phys_t*>(this)->start()); } + /** @return the length of the following record */ + uint16_t len() const { uint16_t i; memcpy(&i, start(), 2); return i; } + + /** @return start of the log records */ + byte *begin() { return start() + 2; } + /** @return end of the log records */ + byte *end() { byte *e= begin() + len(); ut_ad(!*e); return e; } +public: + /** @return start of the log records */ + const byte *begin() const { return const_cast<log_phys_t*>(this)->begin(); } + /** @return end of the log records */ + const byte *end() const { return const_cast<log_phys_t*>(this)->end(); } + + /** Determine the allocated size of the object. + @param len length of recs, excluding terminating NUL byte + @return the total allocation size */ + static inline size_t alloc_size(size_t len); + + /** Constructor. + @param start_lsn start LSN of the mini-transaction + @param lsn mtr_t::commit_lsn() of the mini-transaction + @param recs the first log record for the page in the mini-transaction + @param size length of recs, in bytes, excluding terminating NUL byte */ + log_phys_t(lsn_t start_lsn, lsn_t lsn, const byte *recs, size_t size) : + log_rec_t(lsn), start_lsn(start_lsn) + { + ut_ad(start_lsn); + ut_ad(start_lsn < lsn); + const uint16_t len= static_cast<uint16_t>(size); + ut_ad(len == size); + memcpy(start(), &len, 2); + reinterpret_cast<byte*>(memcpy(begin(), recs, size))[size]= 0; + } + + /** Append a record to the log. + @param recs log to append + @param size size of the log, in bytes */ + void append(const byte *recs, size_t size) + { + ut_ad(start_lsn < lsn); + uint16_t l= len(); + reinterpret_cast<byte*>(memcpy(end(), recs, size))[size]= 0; + l= static_cast<uint16_t>(l + size); + memcpy(start(), &l, 2); + } + + /** Apply an UNDO_APPEND record. + @see mtr_t::undo_append() + @param block undo log page + @param data undo log record + @param len length of the undo log record + @return whether the operation failed (inconcistency was noticed) */ + static bool undo_append(const buf_block_t &block, const byte *data, + size_t len) + { + ut_ad(len > 2); + byte *free_p= my_assume_aligned<2> + (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block.frame); + const uint16_t free= mach_read_from_2(free_p); + if (UNIV_UNLIKELY(free < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE || + free + len + 6 >= srv_page_size - FIL_PAGE_DATA_END)) + { + ib::error() << "Not applying UNDO_APPEND due to corruption on " + << block.page.id(); + return true; + } + + byte *p= block.frame + free; + mach_write_to_2(free_p, free + 4 + len); + memcpy(p, free_p, 2); + p+= 2; + memcpy(p, data, len); + p+= len; + mach_write_to_2(p, free); + return false; + } + + /** The status of apply() */ + enum apply_status { + /** The page was not affected */ + APPLIED_NO= 0, + /** The page was modified */ + APPLIED_YES, + /** The page was modified, affecting the encryption parameters */ + APPLIED_TO_ENCRYPTION, + /** The page was modified, affecting the tablespace header */ + APPLIED_TO_FSP_HEADER + }; + + /** Apply log to a page frame. + @param[in,out] block buffer block + @param[in,out] last_offset last byte offset, for same_page records + @return whether any log was applied to the page */ + apply_status apply(const buf_block_t &block, uint16_t &last_offset) const + { + const byte * const recs= begin(); + byte *const frame= block.page.zip.ssize + ? block.page.zip.data : block.frame; + const size_t size= block.physical_size(); + apply_status applied= APPLIED_NO; + + for (const byte *l= recs;;) + { + const byte b= *l++; + if (!b) + return applied; + ut_ad((b & 0x70) != RESERVED); + size_t rlen= b & 0xf; + if (!rlen) + { + const size_t lenlen= mlog_decode_varint_length(*l); + const uint32_t addlen= mlog_decode_varint(l); + ut_ad(addlen != MLOG_DECODE_ERROR); + rlen= addlen + 15 - lenlen; + l+= lenlen; + } + if (!(b & 0x80)) + { + /* Skip the page identifier. It has already been validated. */ + size_t idlen= mlog_decode_varint_length(*l); + ut_ad(idlen <= 5); + ut_ad(idlen < rlen); + ut_ad(mlog_decode_varint(l) == block.page.id().space()); + l+= idlen; + rlen-= idlen; + idlen= mlog_decode_varint_length(*l); + ut_ad(idlen <= 5); + ut_ad(idlen <= rlen); + ut_ad(mlog_decode_varint(l) == block.page.id().page_no()); + l+= idlen; + rlen-= idlen; + last_offset= 0; + } + + switch (b & 0x70) { + case FREE_PAGE: + ut_ad(last_offset == 0); + goto next_not_same_page; + case INIT_PAGE: + if (UNIV_LIKELY(rlen == 0)) + { + memset_aligned<UNIV_ZIP_SIZE_MIN>(frame, 0, size); + mach_write_to_4(frame + FIL_PAGE_OFFSET, block.page.id().page_no()); + memset_aligned<8>(FIL_PAGE_PREV + frame, 0xff, 8); + mach_write_to_4(frame + FIL_PAGE_SPACE_ID, block.page.id().space()); + last_offset= FIL_PAGE_TYPE; + next_after_applying: + if (applied == APPLIED_NO) + applied= APPLIED_YES; + } + else + { + record_corrupted: + if (!srv_force_recovery) + { + recv_sys.found_corrupt_log= true; + return applied; + } + next_not_same_page: + last_offset= 1; /* the next record must not be same_page */ + } + next: + l+= rlen; + continue; + } + + ut_ad(mach_read_from_4(frame + FIL_PAGE_OFFSET) == + block.page.id().page_no()); + ut_ad(mach_read_from_4(frame + FIL_PAGE_SPACE_ID) == + block.page.id().space()); + ut_ad(last_offset <= 1 || last_offset > 8); + ut_ad(last_offset <= size); + + switch (b & 0x70) { + case OPTION: + goto next; + case EXTENDED: + if (UNIV_UNLIKELY(block.page.id().page_no() < 3 || + block.page.zip.ssize)) + goto record_corrupted; + static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity"); + static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility"); + if (UNIV_UNLIKELY(!rlen)) + goto record_corrupted; + switch (const byte subtype= *l) { + uint8_t ll; + size_t prev_rec, hdr_size; + default: + goto record_corrupted; + case INIT_ROW_FORMAT_REDUNDANT: + case INIT_ROW_FORMAT_DYNAMIC: + if (UNIV_UNLIKELY(rlen != 1)) + goto record_corrupted; + page_create_low(&block, *l != INIT_ROW_FORMAT_REDUNDANT); + break; + case UNDO_INIT: + if (UNIV_UNLIKELY(rlen != 1)) + goto record_corrupted; + trx_undo_page_init(block); + break; + case UNDO_APPEND: + if (UNIV_UNLIKELY(rlen <= 3)) + goto record_corrupted; + if (undo_append(block, ++l, --rlen) && !srv_force_recovery) + { +page_corrupted: + ib::error() << "Set innodb_force_recovery=1 to ignore corruption."; + recv_sys.found_corrupt_log= true; + return applied; + } + break; + case INSERT_HEAP_REDUNDANT: + case INSERT_REUSE_REDUNDANT: + case INSERT_HEAP_DYNAMIC: + case INSERT_REUSE_DYNAMIC: + if (UNIV_UNLIKELY(rlen < 2)) + goto record_corrupted; + rlen--; + ll= mlog_decode_varint_length(*++l); + if (UNIV_UNLIKELY(ll > 3 || ll >= rlen)) + goto record_corrupted; + prev_rec= mlog_decode_varint(l); + ut_ad(prev_rec != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + static_assert(INSERT_HEAP_REDUNDANT == 4, "compatibility"); + static_assert(INSERT_REUSE_REDUNDANT == 5, "compatibility"); + static_assert(INSERT_HEAP_DYNAMIC == 6, "compatibility"); + static_assert(INSERT_REUSE_DYNAMIC == 7, "compatibility"); + if (subtype & 2) + { + size_t shift= 0; + if (subtype & 1) + { + if (UNIV_UNLIKELY(ll > 3 || ll >= rlen)) + goto record_corrupted; + shift= mlog_decode_varint(l); + ut_ad(shift != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + } + if (UNIV_UNLIKELY(ll > 3 || ll >= rlen)) + goto record_corrupted; + size_t enc_hdr_l= mlog_decode_varint(l); + ut_ad(enc_hdr_l != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 2 || ll >= rlen)) + goto record_corrupted; + size_t hdr_c= mlog_decode_varint(l); + ut_ad(hdr_c != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 3 || ll >= rlen)) + goto record_corrupted; + size_t data_c= mlog_decode_varint(l); + ut_ad(data_c != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + if (page_apply_insert_dynamic(block, subtype & 1, prev_rec, + shift, enc_hdr_l, hdr_c, data_c, + l, rlen) && !srv_force_recovery) + goto page_corrupted; + } + else + { + if (UNIV_UNLIKELY(ll > 2 || ll >= rlen)) + goto record_corrupted; + size_t header= mlog_decode_varint(l); + ut_ad(header != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 2 || ll >= rlen)) + goto record_corrupted; + size_t hdr_c= mlog_decode_varint(l); + ut_ad(hdr_c != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 2 || ll >= rlen)) + goto record_corrupted; + size_t data_c= mlog_decode_varint(l); + rlen-= ll; + l+= ll; + if (page_apply_insert_redundant(block, subtype & 1, prev_rec, + header, hdr_c, data_c, + l, rlen) && !srv_force_recovery) + goto page_corrupted; + } + break; + case DELETE_ROW_FORMAT_REDUNDANT: + if (UNIV_UNLIKELY(rlen < 2 || rlen > 4)) + goto record_corrupted; + rlen--; + ll= mlog_decode_varint_length(*++l); + if (UNIV_UNLIKELY(ll != rlen)) + goto record_corrupted; + if (page_apply_delete_redundant(block, mlog_decode_varint(l)) && + !srv_force_recovery) + goto page_corrupted; + break; + case DELETE_ROW_FORMAT_DYNAMIC: + if (UNIV_UNLIKELY(rlen < 2)) + goto record_corrupted; + rlen--; + ll= mlog_decode_varint_length(*++l); + if (UNIV_UNLIKELY(ll > 3 || ll >= rlen)) + goto record_corrupted; + prev_rec= mlog_decode_varint(l); + ut_ad(prev_rec != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 2 || ll >= rlen)) + goto record_corrupted; + hdr_size= mlog_decode_varint(l); + ut_ad(hdr_size != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 3 || ll != rlen)) + goto record_corrupted; + if (page_apply_delete_dynamic(block, prev_rec, hdr_size, + mlog_decode_varint(l)) && + !srv_force_recovery) + goto page_corrupted; + break; + } + last_offset= FIL_PAGE_TYPE; + goto next_after_applying; + case WRITE: + case MEMSET: + case MEMMOVE: + if (UNIV_UNLIKELY(last_offset == 1)) + goto record_corrupted; + const size_t olen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3)) + goto record_corrupted; + const uint32_t offset= mlog_decode_varint(l); + ut_ad(offset != MLOG_DECODE_ERROR); + static_assert(FIL_PAGE_OFFSET == 4, "compatibility"); + if (UNIV_UNLIKELY(offset >= size)) + goto record_corrupted; + if (UNIV_UNLIKELY(offset + last_offset < 8 || + offset + last_offset >= size)) + goto record_corrupted; + last_offset= static_cast<uint16_t>(last_offset + offset); + l+= olen; + rlen-= olen; + size_t llen= rlen; + if ((b & 0x70) == WRITE) + { + if (UNIV_UNLIKELY(rlen + last_offset > size)) + goto record_corrupted; + memcpy(frame + last_offset, l, llen); + if (UNIV_LIKELY(block.page.id().page_no())); + else if (llen == 11 + MY_AES_BLOCK_SIZE && + last_offset == FSP_HEADER_OFFSET + MAGIC_SZ + + fsp_header_get_encryption_offset(block.zip_size())) + applied= APPLIED_TO_ENCRYPTION; + else if (last_offset < FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN + 4 && + last_offset + llen >= FSP_HEADER_OFFSET + FSP_SIZE) + applied= APPLIED_TO_FSP_HEADER; + next_after_applying_write: + ut_ad(llen + last_offset <= size); + last_offset= static_cast<uint16_t>(last_offset + llen); + goto next_after_applying; + } + llen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(llen > rlen || llen > 3)) + goto record_corrupted; + const uint32_t len= mlog_decode_varint(l); + ut_ad(len != MLOG_DECODE_ERROR); + if (UNIV_UNLIKELY(len + last_offset > size)) + goto record_corrupted; + l+= llen; + rlen-= llen; + llen= len; + if ((b & 0x70) == MEMSET) + { + ut_ad(rlen <= llen); + if (UNIV_UNLIKELY(rlen != 1)) + { + size_t s; + for (s= 0; s < llen; s+= rlen) + memcpy(frame + last_offset + s, l, rlen); + memcpy(frame + last_offset + s, l, llen - s); + } + else + memset(frame + last_offset, *l, llen); + goto next_after_applying_write; + } + const size_t slen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(slen != rlen || slen > 3)) + goto record_corrupted; + uint32_t s= mlog_decode_varint(l); + ut_ad(slen != MLOG_DECODE_ERROR); + if (s & 1) + s= last_offset - (s >> 1) - 1; + else + s= last_offset + (s >> 1) + 1; + if (UNIV_LIKELY(s >= 8 && s + llen <= size)) + { + memmove(frame + last_offset, frame + s, llen); + goto next_after_applying_write; + } + } + goto record_corrupted; + } + } +}; + + +inline size_t log_phys_t::alloc_size(size_t len) +{ + return len + (1 + 2 + sizeof(log_phys_t)); +} + + +/** Tablespace item during recovery */ +struct file_name_t { + /** Tablespace file name (FILE_MODIFY) */ + std::string name; + /** Tablespace object (NULL if not valid or not found) */ + fil_space_t* space = nullptr; + + /** Tablespace status. */ + enum fil_status { + /** Normal tablespace */ + NORMAL, + /** Deleted tablespace */ + DELETED, + /** Missing tablespace */ + MISSING + }; + + /** Status of the tablespace */ + fil_status status; + + /** FSP_SIZE of tablespace */ + uint32_t size = 0; + + /** Freed pages of tablespace */ + range_set freed_ranges; + + /** Dummy flags before they have been read from the .ibd file */ + static constexpr uint32_t initial_flags = FSP_FLAGS_FCRC32_MASK_MARKER; + /** FSP_SPACE_FLAGS of tablespace */ + uint32_t flags = initial_flags; + + /** Constructor */ + file_name_t(std::string name_, bool deleted) + : name(std::move(name_)), status(deleted ? DELETED: NORMAL) {} + + /** Add the freed pages */ + void add_freed_page(uint32_t page_no) { freed_ranges.add_value(page_no); } + + /** Remove the freed pages */ + void remove_freed_page(uint32_t page_no) + { + if (freed_ranges.empty()) return; + freed_ranges.remove_value(page_no); + } +}; + +/** Map of dirty tablespaces during recovery */ +typedef std::map< + ulint, + file_name_t, + std::less<ulint>, + ut_allocator<std::pair<const ulint, file_name_t> > > recv_spaces_t; + +static recv_spaces_t recv_spaces; + +/** The last parsed FILE_RENAME records */ +static std::map<uint32_t,std::string> renamed_spaces; + +/** Report an operation to create, delete, or rename a file during backup. +@param[in] space_id tablespace identifier +@param[in] create whether the file is being created +@param[in] name file name (not NUL-terminated) +@param[in] len length of name, in bytes +@param[in] new_name new file name (NULL if not rename) +@param[in] new_len length of new_name, in bytes (0 if NULL) */ +void (*log_file_op)(ulint space_id, bool create, + const byte* name, ulint len, + const byte* new_name, ulint new_len); + +/** Information about initializing page contents during redo log processing. +FIXME: Rely on recv_sys.pages! */ +class mlog_init_t +{ +public: + /** A page initialization operation that was parsed from + the redo log */ + struct init { + /** log sequence number of the page initialization */ + lsn_t lsn; + /** Whether btr_page_create() avoided a read of the page. + + At the end of the last recovery batch, mark_ibuf_exist() + will mark pages for which this flag is set. */ + bool created; + }; + +private: + typedef std::map<const page_id_t, init, + std::less<const page_id_t>, + ut_allocator<std::pair<const page_id_t, init> > > + map; + /** Map of page initialization operations. + FIXME: Merge this to recv_sys.pages! */ + map inits; +public: + /** Record that a page will be initialized by the redo log. + @param[in] page_id page identifier + @param[in] lsn log sequence number + @return whether the state was changed */ + bool add(const page_id_t page_id, lsn_t lsn) + { + ut_ad(mutex_own(&recv_sys.mutex)); + const init init = { lsn, false }; + std::pair<map::iterator, bool> p = inits.insert( + map::value_type(page_id, init)); + ut_ad(!p.first->second.created); + if (p.second) return true; + if (p.first->second.lsn >= init.lsn) return false; + p.first->second = init; + return true; + } + + /** Get the last stored lsn of the page id and its respective + init/load operation. + @param[in] page_id page id + @param[in,out] init initialize log or load log + @return the latest page initialization; + not valid after releasing recv_sys.mutex. */ + init& last(page_id_t page_id) + { + ut_ad(mutex_own(&recv_sys.mutex)); + return inits.find(page_id)->second; + } + + /** Determine if a page will be initialized or freed after a time. + @param page_id page identifier + @param lsn log sequence number + @return whether page_id will be freed or initialized after lsn */ + bool will_avoid_read(page_id_t page_id, lsn_t lsn) const + { + ut_ad(mutex_own(&recv_sys.mutex)); + auto i= inits.find(page_id); + return i != inits.end() && i->second.lsn > lsn; + } + + /** At the end of each recovery batch, reset the 'created' flags. */ + void reset() + { + ut_ad(mutex_own(&recv_sys.mutex)); + ut_ad(recv_no_ibuf_operations); + for (map::value_type& i : inits) { + i.second.created = false; + } + } + + /** On the last recovery batch, mark whether there exist + buffered changes for the pages that were initialized + by buf_page_create() and still reside in the buffer pool. + @param[in,out] mtr dummy mini-transaction */ + void mark_ibuf_exist(mtr_t& mtr) + { + ut_ad(mutex_own(&recv_sys.mutex)); + mtr.start(); + + for (const map::value_type& i : inits) { + if (!i.second.created) { + continue; + } + if (buf_block_t* block = buf_page_get_low( + i.first, 0, RW_X_LATCH, nullptr, + BUF_GET_IF_IN_POOL, __FILE__, __LINE__, + &mtr, nullptr, false)) { + if (UNIV_LIKELY_NULL(block->page.zip.data)) { + switch (fil_page_get_type( + block->page.zip.data)) { + case FIL_PAGE_INDEX: + case FIL_PAGE_RTREE: + if (page_zip_decompress( + &block->page.zip, + block->frame, + true)) { + break; + } + ib::error() << "corrupted " + << block->page.id(); + } + } + if (recv_no_ibuf_operations) { + mtr.commit(); + mtr.start(); + continue; + } + mutex_exit(&recv_sys.mutex); + block->page.ibuf_exist = ibuf_page_exists( + block->page.id(), block->zip_size()); + mtr.commit(); + mtr.start(); + mutex_enter(&recv_sys.mutex); + } + } + + mtr.commit(); + } + + /** Clear the data structure */ + void clear() { inits.clear(); } +}; + +static mlog_init_t mlog_init; + +/** Process a record that indicates that a tablespace is +being shrunk in size. +@param page_id first page identifier that is not in the file +@param lsn log sequence number of the shrink operation */ +inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn) +{ + DBUG_ENTER("recv_sys_t::trim"); + DBUG_LOG("ib_log", + "discarding log beyond end of tablespace " + << page_id << " before LSN " << lsn); + ut_ad(mutex_own(&mutex)); + for (recv_sys_t::map::iterator p = pages.lower_bound(page_id); + p != pages.end() && p->first.space() == page_id.space();) { + recv_sys_t::map::iterator r = p++; + if (r->second.trim(lsn)) { + pages.erase(r); + } + } + if (fil_space_t* space = fil_space_get(page_id.space())) { + ut_ad(UT_LIST_GET_LEN(space->chain) == 1); + fil_node_t* file = UT_LIST_GET_FIRST(space->chain); + ut_ad(file->is_open()); + os_file_truncate(file->name, file->handle, + os_offset_t{page_id.page_no()} + << srv_page_size_shift, true); + } + DBUG_VOID_RETURN; +} + +void recv_sys_t::open_log_files_if_needed() +{ + if (!recv_sys.files.empty()) + return; + + for (auto &&path : get_existing_log_files_paths()) + { + recv_sys.files.emplace_back(std::move(path)); + ut_a(recv_sys.files.back().open(true) == DB_SUCCESS); + } +} + +void recv_sys_t::read(os_offset_t total_offset, span<byte> buf) +{ + open_log_files_if_needed(); + + size_t file_idx= static_cast<size_t>(total_offset / log_sys.log.file_size); + os_offset_t offset= total_offset % log_sys.log.file_size; + dberr_t err= recv_sys.files[file_idx].read(offset, buf); + ut_a(err == DB_SUCCESS); +} + +inline size_t recv_sys_t::files_size() +{ + open_log_files_if_needed(); + return files.size(); +} + +/** Process a file name from a FILE_* record. +@param[in,out] name file name +@param[in] len length of the file name +@param[in] space_id the tablespace ID +@param[in] deleted whether this is a FILE_DELETE record */ +static +void +fil_name_process(char* name, ulint len, ulint space_id, bool deleted) +{ + if (srv_operation == SRV_OPERATION_BACKUP) { + return; + } + + ut_ad(srv_operation == SRV_OPERATION_NORMAL + || srv_operation == SRV_OPERATION_RESTORE + || srv_operation == SRV_OPERATION_RESTORE_EXPORT); + + /* We will also insert space=NULL into the map, so that + further checks can ensure that a FILE_MODIFY record was + scanned before applying any page records for the space_id. */ + + os_normalize_path(name); + const file_name_t fname(std::string(name, len), deleted); + std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.emplace( + space_id, fname); + ut_ad(p.first->first == space_id); + + file_name_t& f = p.first->second; + + if (deleted) { + /* Got FILE_DELETE */ + + if (!p.second && f.status != file_name_t::DELETED) { + f.status = file_name_t::DELETED; + if (f.space != NULL) { + fil_space_free(space_id, false); + f.space = NULL; + } + } + + ut_ad(f.space == NULL); + } else if (p.second // the first FILE_MODIFY or FILE_RENAME + || f.name != fname.name) { + fil_space_t* space; + + /* Check if the tablespace file exists and contains + the space_id. If not, ignore the file after displaying + a note. Abort if there are multiple files with the + same space_id. */ + switch (fil_ibd_load(space_id, name, space)) { + case FIL_LOAD_OK: + ut_ad(space != NULL); + + if (!f.space) { + if (f.size + || f.flags != f.initial_flags) { + fil_space_set_recv_size_and_flags( + space->id, f.size, f.flags); + } + + f.space = space; + goto same_space; + } else if (f.space == space) { +same_space: + f.name = fname.name; + f.status = file_name_t::NORMAL; + } else { + ib::error() << "Tablespace " << space_id + << " has been found in two places: '" + << f.name << "' and '" << name << "'." + " You must delete one of them."; + recv_sys.found_corrupt_fs = true; + } + break; + + case FIL_LOAD_ID_CHANGED: + ut_ad(space == NULL); + break; + + case FIL_LOAD_NOT_FOUND: + /* No matching tablespace was found; maybe it + was renamed, and we will find a subsequent + FILE_* record. */ + ut_ad(space == NULL); + + if (srv_force_recovery) { + /* Without innodb_force_recovery, + missing tablespaces will only be + reported in + recv_init_crash_recovery_spaces(). + Enable some more diagnostics when + forcing recovery. */ + + ib::info() + << "At LSN: " << recv_sys.recovered_lsn + << ": unable to open file " << name + << " for tablespace " << space_id; + } + break; + + case FIL_LOAD_INVALID: + ut_ad(space == NULL); + if (srv_force_recovery == 0) { + ib::warn() << "We do not continue the crash" + " recovery, because the table may" + " become corrupt if we cannot apply" + " the log records in the InnoDB log to" + " it. To fix the problem and start" + " mysqld:"; + ib::info() << "1) If there is a permission" + " problem in the file and mysqld" + " cannot open the file, you should" + " modify the permissions."; + ib::info() << "2) If the tablespace is not" + " needed, or you can restore an older" + " version from a backup, then you can" + " remove the .ibd file, and use" + " --innodb_force_recovery=1 to force" + " startup without this file."; + ib::info() << "3) If the file system or the" + " disk is broken, and you cannot" + " remove the .ibd file, you can set" + " --innodb_force_recovery."; + recv_sys.found_corrupt_fs = true; + break; + } + + ib::info() << "innodb_force_recovery was set to " + << srv_force_recovery << ". Continuing crash" + " recovery even though we cannot access the" + " files for tablespace " << space_id << "."; + break; + } + } +} + +/** Clean up after recv_sys_t::create() */ +void recv_sys_t::close() +{ + ut_ad(this == &recv_sys); + + if (is_initialised()) + { + dblwr.pages.clear(); + ut_d(mutex_enter(&mutex)); + clear(); + ut_d(mutex_exit(&mutex)); + + if (buf) + { + ut_free_dodump(buf, RECV_PARSING_BUF_SIZE); + buf= nullptr; + } + + last_stored_lsn= 0; + mutex_free(&mutex); + } + + recv_spaces.clear(); + renamed_spaces.clear(); + mlog_init.clear(); + + close_files(); +} + +/** Initialize the redo log recovery subsystem. */ +void recv_sys_t::create() +{ + ut_ad(this == &recv_sys); + ut_ad(!is_initialised()); + mutex_create(LATCH_ID_RECV_SYS, &mutex); + + apply_log_recs = false; + apply_batch_on = false; + + buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE, + PSI_INSTRUMENT_ME)); + len = 0; + parse_start_lsn = 0; + scanned_lsn = 0; + scanned_checkpoint_no = 0; + recovered_offset = 0; + recovered_lsn = 0; + found_corrupt_log = false; + found_corrupt_fs = false; + mlog_checkpoint_lsn = 0; + + progress_time = time(NULL); + recv_max_page_lsn = 0; + + memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces); + last_stored_lsn = 1; + UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU); +} + +/** Clear a fully processed set of stored redo log records. */ +inline void recv_sys_t::clear() +{ + ut_ad(mutex_own(&mutex)); + apply_log_recs= false; + apply_batch_on= false; + ut_ad(!after_apply || !UT_LIST_GET_LAST(blocks)); + pages.clear(); + + for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; ) + { + buf_block_t *prev_block= UT_LIST_GET_PREV(unzip_LRU, block); + ut_ad(block->page.state() == BUF_BLOCK_MEMORY); + UT_LIST_REMOVE(blocks, block); + MEM_MAKE_ADDRESSABLE(block->frame, srv_page_size); + buf_block_free(block); + block= prev_block; + } +} + +/** Free most recovery data structures. */ +void recv_sys_t::debug_free() +{ + ut_ad(this == &recv_sys); + ut_ad(is_initialised()); + mutex_enter(&mutex); + + recovery_on= false; + pages.clear(); + ut_free_dodump(buf, RECV_PARSING_BUF_SIZE); + + buf= nullptr; + + mutex_exit(&mutex); +} + +inline void *recv_sys_t::alloc(size_t len) +{ + ut_ad(mutex_own(&mutex)); + ut_ad(len); + ut_ad(len <= srv_page_size); + + buf_block_t *block= UT_LIST_GET_FIRST(blocks); + if (UNIV_UNLIKELY(!block)) + { +create_block: + block= buf_block_alloc(); + block->page.access_time= 1U << 16 | + ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT); + static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2"); + UT_LIST_ADD_FIRST(blocks, block); + MEM_MAKE_ADDRESSABLE(block->frame, len); + MEM_NOACCESS(block->frame + len, srv_page_size - len); + return my_assume_aligned<ALIGNMENT>(block->frame); + } + + size_t free_offset= static_cast<uint16_t>(block->page.access_time); + ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT)); + if (UNIV_UNLIKELY(!free_offset)) + { + ut_ad(srv_page_size == 65536); + goto create_block; + } + ut_ad(free_offset <= srv_page_size); + free_offset+= len; + + if (free_offset > srv_page_size) + goto create_block; + + block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 | + ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT); + MEM_MAKE_ADDRESSABLE(block->frame + free_offset - len, len); + return my_assume_aligned<ALIGNMENT>(block->frame + free_offset - len); +} + + +/** Free a redo log snippet. +@param data buffer returned by alloc() */ +inline void recv_sys_t::free(const void *data) +{ + ut_ad(!ut_align_offset(data, ALIGNMENT)); + data= page_align(data); + ut_ad(mutex_own(&mutex)); + + /* MDEV-14481 FIXME: To prevent race condition with buf_pool.resize(), + we must acquire and hold the buffer pool mutex here. */ + ut_ad(!buf_pool.resize_in_progress()); + + auto *chunk= buf_pool.chunks; + for (auto i= buf_pool.n_chunks; i--; chunk++) + { + if (data < chunk->blocks->frame) + continue; + const size_t offs= (reinterpret_cast<const byte*>(data) - + chunk->blocks->frame) >> srv_page_size_shift; + if (offs >= chunk->size) + continue; + buf_block_t *block= &chunk->blocks[offs]; + ut_ad(block->frame == data); + ut_ad(block->page.state() == BUF_BLOCK_MEMORY); + ut_ad(static_cast<uint16_t>(block->page.access_time - 1) < + srv_page_size); + ut_ad(block->page.access_time >= 1U << 16); + if (!((block->page.access_time -= 1U << 16) >> 16)) + { + UT_LIST_REMOVE(blocks, block); + MEM_MAKE_ADDRESSABLE(block->frame, srv_page_size); + buf_block_free(block); + } + return; + } + ut_ad(0); +} + + +/** Read a log segment to log_sys.buf. +@param[in,out] start_lsn in: read area start, +out: the last read valid lsn +@param[in] end_lsn read area end +@return whether no invalid blocks (e.g checksum mismatch) were found */ +bool log_t::file::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn) +{ + ulint len; + bool success = true; + mysql_mutex_assert_owner(&log_sys.mutex); + ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE)); + ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE)); + byte* buf = log_sys.buf; +loop: + lsn_t source_offset = calc_lsn_offset_old(*start_lsn); + + ut_a(end_lsn - *start_lsn <= ULINT_MAX); + len = (ulint) (end_lsn - *start_lsn); + + ut_ad(len != 0); + + const bool at_eof = (source_offset % file_size) + len > file_size; + if (at_eof) { + /* If the above condition is true then len (which is ulint) + is > the expression below, so the typecast is ok */ + len = ulint(file_size - (source_offset % file_size)); + } + + log_sys.n_log_ios++; + + ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX); + + recv_sys.read(source_offset, {buf, len}); + + for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE, + buf += OS_FILE_LOG_BLOCK_SIZE, + (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) { + const ulint block_number = log_block_get_hdr_no(buf); + + if (block_number != log_block_convert_lsn_to_no(*start_lsn)) { + /* Garbage or an incompletely written log block. + We will not report any error, because this can + happen when InnoDB was killed while it was + writing redo log. We simply treat this as an + abrupt end of the redo log. */ +fail: + end_lsn = *start_lsn; + success = false; + break; + } + + ulint crc = log_block_calc_checksum_crc32(buf); + ulint cksum = log_block_get_checksum(buf); + + DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", { + static int block_counter; + if (block_counter++ == 0) { + cksum = crc + 1; + } + }); + + DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; }); + + if (UNIV_UNLIKELY(crc != cksum)) { + ib::error_or_warn(srv_operation!=SRV_OPERATION_BACKUP) + << "Invalid log block checksum. block: " + << block_number + << " checkpoint no: " + << log_block_get_checkpoint_no(buf) + << " expected: " << crc + << " found: " << cksum; + goto fail; + } + + if (is_encrypted() + && !log_crypt(buf, *start_lsn, + OS_FILE_LOG_BLOCK_SIZE, + LOG_DECRYPT)) { + goto fail; + } + + ulint dl = log_block_get_data_len(buf); + if (dl < LOG_BLOCK_HDR_SIZE + || (dl != OS_FILE_LOG_BLOCK_SIZE + && dl > log_sys.trailer_offset())) { + recv_sys.found_corrupt_log = true; + goto fail; + } + } + + if (recv_sys.report(time(NULL))) { + ib::info() << "Read redo log up to LSN=" << *start_lsn; + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "Read redo log up to LSN=" LSN_PF, + *start_lsn); + } + + if (*start_lsn != end_lsn) { + goto loop; + } + + return(success); +} + + + +/********************************************************//** +Copies a log segment from the most up-to-date log group to the other log +groups, so that they all contain the latest log data. Also writes the info +about the latest checkpoint to the groups, and inits the fields in the group +memory structs to up-to-date values. */ +static +void +recv_synchronize_groups() +{ + const lsn_t recovered_lsn = recv_sys.recovered_lsn; + + /* Read the last recovered log block to the recovery system buffer: + the block is always incomplete */ + + lsn_t start_lsn = ut_uint64_align_down(recovered_lsn, + OS_FILE_LOG_BLOCK_SIZE); + log_sys.log.read_log_seg(&start_lsn, + start_lsn + OS_FILE_LOG_BLOCK_SIZE); + log_sys.log.set_fields(recovered_lsn); + + /* Copy the checkpoint info to the log; remember that we have + incremented checkpoint_no by one, and the info will not be written + over the max checkpoint info, thus making the preservation of max + checkpoint info on disk certain */ + + if (!srv_read_only_mode) { + log_write_checkpoint_info(0); + mysql_mutex_lock(&log_sys.mutex); + } +} + +/** Check the consistency of a log header block. +@param[in] log header block +@return true if ok */ +static +bool +recv_check_log_header_checksum( + const byte* buf) +{ + return(log_block_get_checksum(buf) + == log_block_calc_checksum_crc32(buf)); +} + +static bool redo_file_sizes_are_correct() +{ + auto paths= get_existing_log_files_paths(); + auto get_size= [](const std::string &path) { + return os_file_get_size(path.c_str()).m_total_size; + }; + os_offset_t size= get_size(paths[0]); + + auto it= + std::find_if(paths.begin(), paths.end(), [&](const std::string &path) { + return get_size(path) != size; + }); + + if (it == paths.end()) + return true; + + ib::error() << "Log file " << *it << " is of different size " + << get_size(*it) << " bytes than other log files " << size + << " bytes!"; + return false; +} + +/** Calculate the checksum for a log block using the pre-10.2.2 algorithm. */ +inline uint32_t log_block_calc_checksum_format_0(const byte *b) +{ + uint32_t sum= 1; + const byte *const end= &b[512 - 4]; + + for (uint32_t sh= 0; b < end; ) + { + sum&= 0x7FFFFFFFUL; + sum+= uint32_t{*b} << sh++; + sum+= *b++; + if (sh > 24) + sh= 0; + } + + return sum; +} + +/** Determine if a redo log from before MariaDB 10.2.2 is clean. +@return error code +@retval DB_SUCCESS if the redo log is clean +@retval DB_CORRUPTION if the redo log is corrupted +@retval DB_ERROR if the redo log is not empty */ +ATTRIBUTE_COLD static dberr_t recv_log_recover_pre_10_2() +{ + uint64_t max_no= 0; + byte *buf= log_sys.buf; + + ut_ad(log_sys.log.format == 0); + + if (!redo_file_sizes_are_correct()) + return DB_CORRUPTION; + + /** Offset of the first checkpoint checksum */ + constexpr uint CHECKSUM_1= 288; + /** Offset of the second checkpoint checksum */ + constexpr uint CHECKSUM_2= CHECKSUM_1 + 4; + /** the checkpoint LSN field */ + constexpr uint CHECKPOINT_LSN= 8; + /** Most significant bits of the checkpoint offset */ + constexpr uint OFFS_HI= CHECKSUM_2 + 12; + /** Least significant bits of the checkpoint offset */ + constexpr uint OFFS_LO= 16; + + lsn_t lsn= 0; + + for (ulint field= LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2; + field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) + { + log_sys.log.read(field, {buf, OS_FILE_LOG_BLOCK_SIZE}); + + if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1)) != + mach_read_from_4(buf + CHECKSUM_1) || + static_cast<uint32_t>(ut_fold_binary(buf + CHECKPOINT_LSN, + CHECKSUM_2 - CHECKPOINT_LSN)) != + mach_read_from_4(buf + CHECKSUM_2)) + { + DBUG_LOG("ib_log", "invalid pre-10.2.2 checkpoint " << field); + continue; + } + + if (!log_crypt_101_read_checkpoint(buf)) + { + ib::error() << "Decrypting checkpoint failed"; + continue; + } + + const uint64_t checkpoint_no= mach_read_from_8(buf); + + DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found", + checkpoint_no, + mach_read_from_8(buf + CHECKPOINT_LSN))); + + if (checkpoint_no >= max_no) + { + max_no= checkpoint_no; + lsn= mach_read_from_8(buf + CHECKPOINT_LSN); + log_sys.log.set_lsn(lsn); + log_sys.log.set_lsn_offset(lsn_t{mach_read_from_4(buf + OFFS_HI)} << 32 | + mach_read_from_4(buf + OFFS_LO)); + } + } + + if (!lsn) + { + ib::error() << "Upgrade after a crash is not supported." + " This redo log was created before MariaDB 10.2.2," + " and we did not find a valid checkpoint." + " Please follow the instructions at" + " https://mariadb.com/kb/en/library/upgrading/"; + return DB_ERROR; + } + + log_sys.set_lsn(lsn); + log_sys.set_flushed_lsn(lsn); + const lsn_t source_offset= log_sys.log.calc_lsn_offset_old(lsn); + + static constexpr char NO_UPGRADE_RECOVERY_MSG[]= + "Upgrade after a crash is not supported." + " This redo log was created before MariaDB 10.2.2"; + + recv_sys.read(source_offset & ~511, {buf, 512}); + + if (log_block_calc_checksum_format_0(buf) != log_block_get_checksum(buf) && + !log_crypt_101_read_block(buf, lsn)) + { + ib::error() << NO_UPGRADE_RECOVERY_MSG << ", and it appears corrupted."; + return DB_CORRUPTION; + } + + if (mach_read_from_2(buf + 4) == (source_offset & 511)) + { + /* Mark the redo log for upgrading. */ + srv_log_file_size= 0; + recv_sys.parse_start_lsn= recv_sys.recovered_lsn= recv_sys.scanned_lsn= + recv_sys.mlog_checkpoint_lsn = lsn; + log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn= + log_sys.write_lsn= log_sys.current_flush_lsn= lsn; + log_sys.next_checkpoint_no= 0; + return DB_SUCCESS; + } + + if (buf[20 + 32 * 9] == 2) + ib::error() << "Cannot decrypt log for upgrading." + " The encrypted log was created before MariaDB 10.2.2."; + else + ib::error() << NO_UPGRADE_RECOVERY_MSG << "."; + + return DB_ERROR; +} + +/** Calculate the offset of a log sequence number +in an old redo log file (during upgrade check). +@param[in] lsn log sequence number +@return byte offset within the log */ +inline lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const +{ + const lsn_t size= capacity() * recv_sys.files_size(); + lsn_t l= lsn - this->lsn; + if (longlong(l) < 0) + { + l= lsn_t(-longlong(l)) % size; + l= size - l; + } + + l+= lsn_offset - LOG_FILE_HDR_SIZE * (1 + lsn_offset / file_size); + l%= size; + return l + LOG_FILE_HDR_SIZE * (1 + l / (file_size - LOG_FILE_HDR_SIZE)); +} + +/** Determine if a redo log from MariaDB 10.2.2+, 10.3, or 10.4 is clean. +@return error code +@retval DB_SUCCESS if the redo log is clean +@retval DB_CORRUPTION if the redo log is corrupted +@retval DB_ERROR if the redo log is not empty */ +static dberr_t recv_log_recover_10_4() +{ + const lsn_t lsn = log_sys.log.get_lsn(); + const lsn_t source_offset = log_sys.log.calc_lsn_offset_old(lsn); + byte* buf = log_sys.buf; + + if (!redo_file_sizes_are_correct()) { + return DB_CORRUPTION; + } + + recv_sys.read(source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1), + {buf, OS_FILE_LOG_BLOCK_SIZE}); + + ulint crc = log_block_calc_checksum_crc32(buf); + ulint cksum = log_block_get_checksum(buf); + + if (UNIV_UNLIKELY(crc != cksum)) { + ib::error() << "Invalid log block checksum." + << " block: " + << log_block_get_hdr_no(buf) + << " checkpoint no: " + << log_block_get_checkpoint_no(buf) + << " expected: " << crc + << " found: " << cksum; + return DB_CORRUPTION; + } + + if (log_sys.log.is_encrypted() + && !log_crypt(buf, lsn & ~511, 512, LOG_DECRYPT)) { + return DB_ERROR; + } + + /* On a clean shutdown, the redo log will be logically empty + after the checkpoint lsn. */ + + if (log_block_get_data_len(buf) + != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) { + return DB_ERROR; + } + + /* Mark the redo log for upgrading. */ + srv_log_file_size = 0; + recv_sys.parse_start_lsn = recv_sys.recovered_lsn + = recv_sys.scanned_lsn + = recv_sys.mlog_checkpoint_lsn = lsn; + log_sys.set_lsn(lsn); + log_sys.set_flushed_lsn(lsn); + log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn + = log_sys.write_lsn = log_sys.current_flush_lsn = lsn; + log_sys.next_checkpoint_no = 0; + return DB_SUCCESS; +} + +/** Find the latest checkpoint in the log header. +@param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2 +@return error code or DB_SUCCESS */ +dberr_t +recv_find_max_checkpoint(ulint* max_field) +{ + ib_uint64_t max_no; + ib_uint64_t checkpoint_no; + ulint field; + byte* buf; + + max_no = 0; + *max_field = 0; + + buf = log_sys.checkpoint_buf; + + log_sys.log.read(0, {buf, OS_FILE_LOG_BLOCK_SIZE}); + /* Check the header page checksum. There was no + checksum in the first redo log format (version 0). */ + log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT); + log_sys.log.subformat = log_sys.log.format != log_t::FORMAT_3_23 + ? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT) + : 0; + if (log_sys.log.format != log_t::FORMAT_3_23 + && !recv_check_log_header_checksum(buf)) { + ib::error() << "Invalid redo log header checksum."; + return(DB_CORRUPTION); + } + + char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1]; + + memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator); + /* Ensure that the string is NUL-terminated. */ + creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0; + + switch (log_sys.log.format) { + case log_t::FORMAT_3_23: + return recv_log_recover_pre_10_2(); + case log_t::FORMAT_10_2: + case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED: + case log_t::FORMAT_10_3: + case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED: + case log_t::FORMAT_10_4: + case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED: + case log_t::FORMAT_10_5: + case log_t::FORMAT_10_5 | log_t::FORMAT_ENCRYPTED: + break; + default: + ib::error() << "Unsupported redo log format." + " The redo log was created with " << creator << "."; + return(DB_ERROR); + } + + for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2; + field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) { + log_sys.log.read(field, {buf, OS_FILE_LOG_BLOCK_SIZE}); + + const ulint crc32 = log_block_calc_checksum_crc32(buf); + const ulint cksum = log_block_get_checksum(buf); + + if (crc32 != cksum) { + DBUG_PRINT("ib_log", + ("invalid checkpoint," + " at " ULINTPF + ", checksum " ULINTPFx + " expected " ULINTPFx, + field, cksum, crc32)); + continue; + } + + if (log_sys.is_encrypted() + && !log_crypt_read_checkpoint_buf(buf)) { + ib::error() << "Reading checkpoint" + " encryption info failed."; + continue; + } + + checkpoint_no = mach_read_from_8( + buf + LOG_CHECKPOINT_NO); + + DBUG_PRINT("ib_log", + ("checkpoint " UINT64PF " at " LSN_PF " found", + checkpoint_no, mach_read_from_8( + buf + LOG_CHECKPOINT_LSN))); + + if (checkpoint_no >= max_no) { + *max_field = field; + max_no = checkpoint_no; + log_sys.log.set_lsn(mach_read_from_8( + buf + LOG_CHECKPOINT_LSN)); + log_sys.log.set_lsn_offset(mach_read_from_8( + buf + LOG_CHECKPOINT_OFFSET)); + log_sys.next_checkpoint_no = checkpoint_no; + } + } + + if (*max_field == 0) { + /* Before 10.2.2, we could get here during database + initialization if we created an LOG_FILE_NAME file that + was filled with zeroes, and were killed. After + 10.2.2, we would reject such a file already earlier, + when checking the file header. */ + ib::error() << "No valid checkpoint found" + " (corrupted redo log)." + " You can try --innodb-force-recovery=6" + " as a last resort."; + return(DB_ERROR); + } + + switch (log_sys.log.format) { + case log_t::FORMAT_10_5: + case log_t::FORMAT_10_5 | log_t::FORMAT_ENCRYPTED: + break; + default: + if (dberr_t err = recv_log_recover_10_4()) { + ib::error() + << "Upgrade after a crash is not supported." + " The redo log was created with " << creator + << (err == DB_ERROR + ? "." : ", and it appears corrupted."); + return err; + } + } + + return(DB_SUCCESS); +} + +/*******************************************************//** +Calculates the new value for lsn when more data is added to the log. */ +static +lsn_t +recv_calc_lsn_on_data_add( +/*======================*/ + lsn_t lsn, /*!< in: old lsn */ + ib_uint64_t len) /*!< in: this many bytes of data is + added, log block headers not included */ +{ + unsigned frag_len = static_cast<unsigned>(lsn % OS_FILE_LOG_BLOCK_SIZE) + - LOG_BLOCK_HDR_SIZE; + unsigned payload_size = log_sys.payload_size(); + ut_ad(frag_len < payload_size); + lsn_t lsn_len = len; + lsn_len += (lsn_len + frag_len) / payload_size + * (OS_FILE_LOG_BLOCK_SIZE - payload_size); + + return(lsn + lsn_len); +} + +/** Trim old log records for a page. +@param start_lsn oldest log sequence number to preserve +@return whether all the log for the page was trimmed */ +inline bool page_recv_t::trim(lsn_t start_lsn) +{ + while (log.head) + { + if (log.head->lsn >= start_lsn) return false; + last_offset= 1; /* the next record must not be same_page */ + log_rec_t *next= log.head->next; + recv_sys.free(log.head); + log.head= next; + } + log.tail= nullptr; + return true; +} + + +inline void page_recv_t::recs_t::clear() +{ + ut_ad(mutex_own(&recv_sys.mutex)); + for (const log_rec_t *l= head; l; ) + { + const log_rec_t *next= l->next; + recv_sys.free(l); + l= next; + } + head= tail= nullptr; +} + + +/** Ignore any earlier redo log records for this page. */ +inline void page_recv_t::will_not_read() +{ + ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ); + state= RECV_WILL_NOT_READ; + log.clear(); +} + + +/** Register a redo log snippet for a page. +@param page_id page identifier +@param start_lsn start LSN of the mini-transaction +@param lsn @see mtr_t::commit_lsn() +@param recs redo log snippet @see log_t::FORMAT_10_5 +@param len length of l, in bytes */ +inline void recv_sys_t::add(const page_id_t page_id, + lsn_t start_lsn, lsn_t lsn, const byte *l, + size_t len) +{ + ut_ad(mutex_own(&mutex)); + std::pair<map::iterator, bool> p= pages.emplace(map::value_type + (page_id, page_recv_t())); + page_recv_t& recs= p.first->second; + ut_ad(p.second == recs.log.empty()); + + switch (*l & 0x70) { + case FREE_PAGE: case INIT_PAGE: + recs.will_not_read(); + mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */ + /* fall through */ + default: + log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last()); + if (!tail) + break; + if (tail->start_lsn != start_lsn) + break; + ut_ad(tail->lsn == lsn); + buf_block_t *block= UT_LIST_GET_LAST(blocks); + ut_ad(block); + const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1; + ut_ad(used >= ALIGNMENT); + const byte *end= const_cast<const log_phys_t*>(tail)->end(); + if (!((reinterpret_cast<size_t>(end + len) ^ + reinterpret_cast<size_t>(end)) & ~(ALIGNMENT - 1))) + { + /* Use already allocated 'padding' bytes */ +append: + MEM_MAKE_ADDRESSABLE(end + 1, len); + /* Append to the preceding record for the page */ + tail->append(l, len); + return; + } + if (end <= &block->frame[used - ALIGNMENT] || &block->frame[used] >= end) + break; /* Not the last allocated record in the page */ + const size_t new_used= static_cast<size_t>(end - block->frame + len + 1); + ut_ad(new_used > used); + if (new_used > srv_page_size) + break; + block->page.access_time= (block->page.access_time & ~0U << 16) | + ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT); + goto append; + } + recs.log.append(new (alloc(log_phys_t::alloc_size(len))) + log_phys_t(start_lsn, lsn, l, len)); +} + +/** Store/remove the freed pages in fil_name_t of recv_spaces. +@param[in] page_id freed or init page_id +@param[in] freed TRUE if page is freed */ +static void store_freed_or_init_rec(page_id_t page_id, bool freed) +{ + uint32_t space_id= page_id.space(); + uint32_t page_no= page_id.page_no(); + if (is_predefined_tablespace(space_id)) + { + if (!srv_immediate_scrub_data_uncompressed) + return; + fil_space_t *space; + if (space_id == TRX_SYS_SPACE) + space= fil_system.sys_space; + else + space= fil_space_get(space_id); + + space->free_page(page_no, freed); + return; + } + + recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id); + if (i != recv_spaces.end() && i->first == space_id) + { + if (freed) + i->second.add_freed_page(page_no); + else + i->second.remove_freed_page(page_no); + } +} + +/** Parse and register one mini-transaction in log_t::FORMAT_10_5. +@param checkpoint_lsn the log sequence number of the latest checkpoint +@param store whether to store the records +@param apply whether to apply file-level log records +@return whether FILE_CHECKPOINT record was seen the first time, +or corruption was noticed */ +bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply) +{ + mysql_mutex_assert_owner(&log_sys.mutex); + ut_ad(mutex_own(&mutex)); + ut_ad(parse_start_lsn); + ut_ad(log_sys.is_physical()); + + bool last_phase= (*store == STORE_IF_EXISTS); + const byte *const end= buf + len; +loop: + const byte *const log= buf + recovered_offset; + const lsn_t start_lsn= recovered_lsn; + + /* Check that the entire mini-transaction is included within the buffer */ + const byte *l; + uint32_t rlen; + for (l= log; l < end; l+= rlen) + { + if (!*l) + goto eom_found; + if (UNIV_LIKELY((*l & 0x70) != RESERVED)); + else if (srv_force_recovery) + ib::warn() << "Ignoring unknown log record at LSN " << recovered_lsn; + else + { +malformed: + ib::error() << "Malformed log record;" + " set innodb_force_recovery=1 to ignore."; +corrupted: + const size_t trailing_bytes= std::min<size_t>(100, size_t(end - l)); + ib::info() << "Dump from the start of the mini-transaction (LSN=" + << start_lsn << ") to " + << trailing_bytes << " bytes after the record:"; + ut_print_buf(stderr, log, l - log + trailing_bytes); + putc('\n', stderr); + found_corrupt_log= true; + return true; + } + rlen= *l++ & 0xf; + if (l + (rlen ? rlen : 16) >= end) + break; + if (!rlen) + { + rlen= mlog_decode_varint_length(*l); + if (l + rlen >= end) + break; + const uint32_t addlen= mlog_decode_varint(l); + if (UNIV_UNLIKELY(addlen == MLOG_DECODE_ERROR)) + { + ib::error() << "Corrupted record length"; + goto corrupted; + } + rlen= addlen + 15; + } + } + + /* Not the entire mini-transaction was present. */ + return false; + +eom_found: + ut_ad(!*l); + ut_d(const byte *const el= l + 1); + + const lsn_t end_lsn= recv_calc_lsn_on_data_add(start_lsn, l + 1 - log); + if (UNIV_UNLIKELY(end_lsn > scanned_lsn)) + /* The log record filled a log block, and we require that also the + next log block should have been scanned in */ + return false; + + ut_d(std::set<page_id_t> freed); +#if 0 && defined UNIV_DEBUG /* MDEV-21727 FIXME: enable this */ + /* Pages that have been modified in this mini-transaction. + If a mini-transaction writes INIT_PAGE for a page, it should not have + written any log records for the page. Unfortunately, this does not + hold for ROW_FORMAT=COMPRESSED pages, because page_zip_compress() + can be invoked in a pessimistic operation, even after log has + been written for other pages. */ + ut_d(std::set<page_id_t> modified); +#endif + + uint32_t space_id= 0, page_no= 0, last_offset= 0; +#if 1 /* MDEV-14425 FIXME: remove this */ + bool got_page_op= false; +#endif + for (l= log; l < end; l+= rlen) + { + const byte *const recs= l; + const byte b= *l++; + + if (!b) + break; + ut_ad(UNIV_LIKELY(b & 0x70) != RESERVED || srv_force_recovery); + rlen= b & 0xf; + ut_ad(l + rlen < end); + ut_ad(rlen || l + 16 < end); + if (!rlen) + { + const uint32_t lenlen= mlog_decode_varint_length(*l); + ut_ad(l + lenlen < end); + const uint32_t addlen= mlog_decode_varint(l); + ut_ad(addlen != MLOG_DECODE_ERROR); + rlen= addlen + 15 - lenlen; + l+= lenlen; + } + ut_ad(l + rlen < end); + uint32_t idlen; + if ((b & 0x80) && got_page_op) + { + /* This record is for the same page as the previous one. */ + if (UNIV_UNLIKELY((b & 0x70) <= INIT_PAGE)) + { +record_corrupted: + /* FREE_PAGE,INIT_PAGE cannot be with same_page flag */ + if (!srv_force_recovery) + goto malformed; + ib::warn() << "Ignoring malformed log record at LSN " << recovered_lsn; + last_offset= 1; /* the next record must not be same_page */ + continue; + } + goto same_page; + } + last_offset= 0; + idlen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen)) + { +page_id_corrupted: + if (!srv_force_recovery) + { + ib::error() << "Corrupted page identifier at " << recovered_lsn + << "; set innodb_force_recovery=1 to ignore the record."; + goto corrupted; + } + ib::warn() << "Ignoring corrupted page identifier at LSN " + << recovered_lsn; + continue; + } + space_id= mlog_decode_varint(l); + if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR)) + goto page_id_corrupted; + l+= idlen; + rlen-= idlen; + idlen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen)) + goto page_id_corrupted; + page_no= mlog_decode_varint(l); + if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR)) + goto page_id_corrupted; + l+= idlen; + rlen-= idlen; + got_page_op = !(b & 0x80); + if (got_page_op && apply && !is_predefined_tablespace(space_id)) + { + recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id); + if (i != recv_spaces.end() && i->first == space_id); + else if (recovered_lsn < mlog_checkpoint_lsn) + /* We have not seen all records between the checkpoint and + FILE_CHECKPOINT. There should be a FILE_DELETE for this + tablespace later. */ + recv_spaces.emplace_hint(i, space_id, file_name_t("", false)); + else + { + const page_id_t id(space_id, page_no); + if (!srv_force_recovery) + { + ib::error() << "Missing FILE_DELETE or FILE_MODIFY for " << id + << " at " << recovered_lsn + << "; set innodb_force_recovery=1 to ignore the record."; + goto corrupted; + } + ib::warn() << "Ignoring record for " << id << " at " << recovered_lsn; + continue; + } + } +same_page: + DBUG_PRINT("ib_log", + ("scan " LSN_PF ": rec %x len %zu page %u:%u", + recovered_lsn, b, static_cast<size_t>(l + rlen - recs), + space_id, page_no)); + + if (got_page_op) + { + const page_id_t id(space_id, page_no); + ut_d(if ((b & 0x70) == INIT_PAGE) freed.erase(id)); + ut_ad(freed.find(id) == freed.end()); + switch (b & 0x70) { + case FREE_PAGE: + ut_ad(freed.emplace(id).second); + last_offset= 1; /* the next record must not be same_page */ + goto free_or_init_page; + case INIT_PAGE: + last_offset= FIL_PAGE_TYPE; + free_or_init_page: + store_freed_or_init_rec(id, (b & 0x70) == FREE_PAGE); + if (UNIV_UNLIKELY(rlen != 0)) + goto record_corrupted; + break; + case EXTENDED: + if (UNIV_UNLIKELY(!rlen)) + goto record_corrupted; + if (rlen == 1 && *l == TRIM_PAGES) + { +#if 0 /* For now, we can only truncate an undo log tablespace */ + if (UNIV_UNLIKELY(!space_id || !page_no)) + goto record_corrupted; +#else + if (!srv_is_undo_tablespace(space_id) || + page_no != SRV_UNDO_TABLESPACE_SIZE_IN_PAGES) + goto record_corrupted; + static_assert(UT_ARR_SIZE(truncated_undo_spaces) == + TRX_SYS_MAX_UNDO_SPACES, "compatibility"); + truncated_undo_spaces[space_id - srv_undo_space_id_start]= + { recovered_lsn, page_no }; +#endif + last_offset= 1; /* the next record must not be same_page */ + continue; + } + last_offset= FIL_PAGE_TYPE; + break; + case RESERVED: + case OPTION: + continue; + case WRITE: + case MEMMOVE: + case MEMSET: + if (UNIV_UNLIKELY(rlen == 0 || last_offset == 1)) + goto record_corrupted; + const uint32_t olen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3)) + goto record_corrupted; + const uint32_t offset= mlog_decode_varint(l); + ut_ad(offset != MLOG_DECODE_ERROR); + static_assert(FIL_PAGE_OFFSET == 4, "compatibility"); + if (UNIV_UNLIKELY(offset >= srv_page_size)) + goto record_corrupted; + last_offset+= offset; + if (UNIV_UNLIKELY(last_offset < 8 || last_offset >= srv_page_size)) + goto record_corrupted; + l+= olen; + rlen-= olen; + if ((b & 0x70) == WRITE) + { + if (UNIV_UNLIKELY(rlen + last_offset > srv_page_size)) + goto record_corrupted; + if (UNIV_UNLIKELY(!page_no) && apply) + { + const bool has_size= last_offset <= FSP_HEADER_OFFSET + FSP_SIZE && + last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SIZE + 4; + const bool has_flags= last_offset <= + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS && + last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + 4; + if (has_size || has_flags) + { + recv_spaces_t::iterator it= recv_spaces.find(space_id); + const uint32_t size= has_size + ? mach_read_from_4(FSP_HEADER_OFFSET + FSP_SIZE + l - + last_offset) + : 0; + const uint32_t flags= has_flags + ? mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + l - + last_offset) + : file_name_t::initial_flags; + if (it == recv_spaces.end()) + ut_ad(!mlog_checkpoint_lsn || space_id == TRX_SYS_SPACE || + srv_is_undo_tablespace(space_id)); + else if (!it->second.space) + { + if (has_size) + it->second.size= size; + if (has_flags) + it->second.flags= flags; + } + fil_space_set_recv_size_and_flags(space_id, size, flags); + } + } + last_offset+= rlen; + break; + } + uint32_t llen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(llen > rlen || llen > 3)) + goto record_corrupted; + const uint32_t len= mlog_decode_varint(l); + ut_ad(len != MLOG_DECODE_ERROR); + if (UNIV_UNLIKELY(last_offset + len > srv_page_size)) + goto record_corrupted; + l+= llen; + rlen-= llen; + llen= len; + if ((b & 0x70) == MEMSET) + { + if (UNIV_UNLIKELY(rlen > llen)) + goto record_corrupted; + last_offset+= llen; + break; + } + const uint32_t slen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(slen != rlen || slen > 3)) + goto record_corrupted; + uint32_t s= mlog_decode_varint(l); + ut_ad(slen != MLOG_DECODE_ERROR); + if (s & 1) + s= last_offset - (s >> 1) - 1; + else + s= last_offset + (s >> 1) + 1; + if (UNIV_UNLIKELY(s < 8 || s + llen > srv_page_size)) + goto record_corrupted; + last_offset+= llen; + break; + } +#if 0 && defined UNIV_DEBUG + switch (b & 0x70) { + case RESERVED: + case OPTION: + ut_ad(0); /* we did "continue" earlier */ + break; + case FREE_PAGE: + break; + default: + ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE); + } +#endif + const bool is_init= (b & 0x70) <= INIT_PAGE; + switch (*store) { + case STORE_IF_EXISTS: + if (fil_space_t *space= fil_space_t::get(space_id)) + { + const auto size= space->get_size(); + space->release(); + if (!size) + continue; + } + else + continue; + /* fall through */ + case STORE_YES: + if (!mlog_init.will_avoid_read(id, start_lsn)) + add(id, start_lsn, end_lsn, recs, + static_cast<size_t>(l + rlen - recs)); + continue; + case STORE_NO: + if (!is_init) + continue; + mlog_init.add(id, start_lsn); + map::iterator i= pages.find(id); + if (i == pages.end()) + continue; + i->second.log.clear(); + pages.erase(i); + } + } +#if 1 /* MDEV-14425 FIXME: this must be in the checkpoint file only! */ + else if (rlen) + { + switch (b & 0xf0) { +# if 1 /* MDEV-14425 FIXME: Remove this! */ + case FILE_CHECKPOINT: + if (space_id == 0 && page_no == 0 && rlen == 8) + { + const lsn_t lsn= mach_read_from_8(l); + + if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) + fprintf(stderr, "FILE_CHECKPOINT(" LSN_PF ") %s at " LSN_PF "\n", + lsn, lsn != checkpoint_lsn + ? "ignored" + : mlog_checkpoint_lsn ? "reread" : "read", + recovered_lsn); + + DBUG_PRINT("ib_log", ("FILE_CHECKPOINT(" LSN_PF ") %s at " LSN_PF, + lsn, lsn != checkpoint_lsn + ? "ignored" + : mlog_checkpoint_lsn ? "reread" : "read", + recovered_lsn)); + + if (lsn == checkpoint_lsn) + { + /* There can be multiple FILE_CHECKPOINT for the same LSN. */ + if (mlog_checkpoint_lsn) + continue; + mlog_checkpoint_lsn= recovered_lsn; + l+= 8; + recovered_offset= l - buf; + return true; + } + continue; + } +# endif + /* fall through */ + default: + if (!srv_force_recovery) + goto malformed; + ib::warn() << "Ignoring malformed log record at LSN " << recovered_lsn; + continue; + case FILE_DELETE: + case FILE_MODIFY: + case FILE_RENAME: + if (UNIV_UNLIKELY(page_no != 0)) + { + file_rec_error: + if (!srv_force_recovery) + { + ib::error() << "Corrupted file-level record;" + " set innodb_force_recovery=1 to ignore."; + goto corrupted; + } + + ib::warn() << "Ignoring corrupted file-level record at LSN " + << recovered_lsn; + continue; + } + /* fall through */ + case FILE_CREATE: + if (UNIV_UNLIKELY(!space_id || page_no)) + goto file_rec_error; + /* There is no terminating NUL character. Names must end in .ibd. + For FILE_RENAME, there is a NUL between the two file names. */ + const char * const fn= reinterpret_cast<const char*>(l); + const char *fn2= static_cast<const char*>(memchr(fn, 0, rlen)); + + if (UNIV_UNLIKELY((fn2 == nullptr) == ((b & 0xf0) == FILE_RENAME))) + goto file_rec_error; + + const char * const fnend= fn2 ? fn2 : fn + rlen; + const char * const fn2end= fn2 ? fn + rlen : nullptr; + + if (fn2) + { + fn2++; + if (memchr(fn2, 0, fn2end - fn2)) + goto file_rec_error; + if (fn2end - fn2 < 4 || memcmp(fn2end - 4, DOT_IBD, 4)) + goto file_rec_error; + } + + if (is_predefined_tablespace(space_id)) + goto file_rec_error; + if (fnend - fn < 4 || memcmp(fnend - 4, DOT_IBD, 4)) + goto file_rec_error; + + const char saved_end= fn[rlen]; + const_cast<char&>(fn[rlen])= '\0'; + fil_name_process(const_cast<char*>(fn), fnend - fn, space_id, + (b & 0xf0) == FILE_DELETE); + if (fn2) + fil_name_process(const_cast<char*>(fn2), fn2end - fn2, space_id, + false); + if ((b & 0xf0) < FILE_MODIFY && log_file_op) + log_file_op(space_id, (b & 0xf0) == FILE_CREATE, + l, static_cast<ulint>(fnend - fn), + reinterpret_cast<const byte*>(fn2), + fn2 ? static_cast<ulint>(fn2end - fn2) : 0); + const_cast<char&>(fn[rlen])= saved_end; + + if (fn2 && apply) + { + const size_t len= fn2end - fn2; + auto r= renamed_spaces.emplace(space_id, std::string{fn2, len}); + if (!r.second) + r.first->second= std::string{fn2, len}; + } + if (UNIV_UNLIKELY(found_corrupt_fs)) + return true; + } + } +#endif + else + goto malformed; + } + + ut_ad(l == el); + recovered_offset= l - buf; + recovered_lsn= end_lsn; + if (is_memory_exhausted(store) && last_phase) + return false; + goto loop; +} + +/** Apply the hashed log records to the page, if the page lsn is less than the +lsn of a log record. +@param[in,out] block buffer pool page +@param[in,out] mtr mini-transaction +@param[in,out] p recovery address +@param[in,out] space tablespace, or NULL if not looked up yet +@param[in,out] init page initialization operation, or NULL */ +static void recv_recover_page(buf_block_t* block, mtr_t& mtr, + const recv_sys_t::map::iterator& p, + fil_space_t* space = NULL, + mlog_init_t::init* init = NULL) +{ + ut_ad(mutex_own(&recv_sys.mutex)); + ut_ad(recv_sys.apply_log_recs); + ut_ad(recv_needed_recovery); + ut_ad(!init || init->created); + ut_ad(!init || init->lsn); + ut_ad(block->page.id() == p->first); + ut_ad(!p->second.is_being_processed()); + ut_ad(!space || space->id == block->page.id().space()); + ut_ad(log_sys.is_physical()); + + if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) { + ib::info() << "Applying log to page " << block->page.id(); + } + + DBUG_PRINT("ib_log", ("Applying log to page %u:%u", + block->page.id().space(), + block->page.id().page_no())); + + p->second.state = page_recv_t::RECV_BEING_PROCESSED; + + mutex_exit(&recv_sys.mutex); + + byte *frame = UNIV_LIKELY_NULL(block->page.zip.data) + ? block->page.zip.data + : block->frame; + const lsn_t page_lsn = init + ? 0 + : mach_read_from_8(frame + FIL_PAGE_LSN); + bool free_page = false; + lsn_t start_lsn = 0, end_lsn = 0; + ut_d(lsn_t recv_start_lsn = 0); + const lsn_t init_lsn = init ? init->lsn : 0; + + bool skipped_after_init = false; + + for (const log_rec_t* recv : p->second.log) { + const log_phys_t* l = static_cast<const log_phys_t*>(recv); + ut_ad(l->lsn); + ut_ad(end_lsn <= l->lsn); + ut_ad(l->lsn <= log_sys.log.scanned_lsn); + + ut_ad(l->start_lsn); + ut_ad(recv_start_lsn <= l->start_lsn); + ut_d(recv_start_lsn = l->start_lsn); + + if (l->start_lsn < page_lsn) { + /* This record has already been applied. */ + DBUG_PRINT("ib_log", ("apply skip %u:%u LSN " LSN_PF + " < " LSN_PF, + block->page.id().space(), + block->page.id().page_no(), + l->start_lsn, page_lsn)); + skipped_after_init = true; + end_lsn = l->lsn; + continue; + } + + if (l->start_lsn < init_lsn) { + DBUG_PRINT("ib_log", ("init skip %u:%u LSN " LSN_PF + " < " LSN_PF, + block->page.id().space(), + block->page.id().page_no(), + l->start_lsn, init_lsn)); + skipped_after_init = false; + end_lsn = l->lsn; + continue; + } + + /* There is no need to check LSN for just initialized pages. */ + if (skipped_after_init) { + skipped_after_init = false; + ut_ad(end_lsn == page_lsn); + if (end_lsn != page_lsn) + ib::warn() + << "The last skipped log record LSN " + << end_lsn + << " is not equal to page LSN " + << page_lsn; + } + + end_lsn = l->lsn; + + if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) { + ib::info() << "apply " << l->start_lsn + << ": " << block->page.id(); + } + + DBUG_PRINT("ib_log", ("apply " LSN_PF ": %u:%u", + l->start_lsn, + block->page.id().space(), + block->page.id().page_no())); + + log_phys_t::apply_status a= l->apply(*block, + p->second.last_offset); + + switch (a) { + case log_phys_t::APPLIED_NO: + ut_ad(!mtr.has_modifications()); + free_page = true; + start_lsn = 0; + continue; + case log_phys_t::APPLIED_YES: + goto set_start_lsn; + case log_phys_t::APPLIED_TO_FSP_HEADER: + case log_phys_t::APPLIED_TO_ENCRYPTION: + break; + } + + if (fil_space_t* s = space + ? space + : fil_space_t::get(block->page.id().space())) { + switch (a) { + case log_phys_t::APPLIED_TO_FSP_HEADER: + s->flags = mach_read_from_4( + FSP_HEADER_OFFSET + + FSP_SPACE_FLAGS + frame); + s->size_in_header = mach_read_from_4( + FSP_HEADER_OFFSET + FSP_SIZE + + frame); + s->free_limit = mach_read_from_4( + FSP_HEADER_OFFSET + + FSP_FREE_LIMIT + frame); + s->free_len = mach_read_from_4( + FSP_HEADER_OFFSET + FSP_FREE + + FLST_LEN + frame); + break; + default: + byte* b= frame + + fsp_header_get_encryption_offset( + block->zip_size()) + + FSP_HEADER_OFFSET; + if (memcmp(b, CRYPT_MAGIC, MAGIC_SZ)) { + break; + } + b += MAGIC_SZ; + if (*b != CRYPT_SCHEME_UNENCRYPTED + && *b != CRYPT_SCHEME_1) { + break; + } + if (b[1] != MY_AES_BLOCK_SIZE) { + break; + } + if (b[2 + MY_AES_BLOCK_SIZE + 4 + 4] + > FIL_ENCRYPTION_OFF) { + break; + } + fil_crypt_parse(s, b); + } + + if (!space) { + s->release(); + } + } + +set_start_lsn: + if (recv_sys.found_corrupt_log && !srv_force_recovery) { + break; + } + + if (!start_lsn) { + start_lsn = l->start_lsn; + } + } + + if (start_lsn) { + ut_ad(end_lsn >= start_lsn); + mach_write_to_8(FIL_PAGE_LSN + frame, end_lsn); + if (UNIV_LIKELY(frame == block->frame)) { + mach_write_to_8(srv_page_size + - FIL_PAGE_END_LSN_OLD_CHKSUM + + frame, end_lsn); + } else { + buf_zip_decompress(block, false); + } + + buf_block_modify_clock_inc(block); + mysql_mutex_lock(&log_sys.flush_order_mutex); + buf_flush_note_modification(block, start_lsn, end_lsn); + mysql_mutex_unlock(&log_sys.flush_order_mutex); + } else if (free_page && init) { + /* There have been no operations that modify the page. + Any buffered changes must not be merged. A subsequent + buf_page_create() from a user thread should discard + any buffered changes. */ + init->created = false; + ut_ad(!mtr.has_modifications()); + block->page.status = buf_page_t::FREED; + } + + /* Make sure that committing mtr does not change the modification + lsn values of page */ + + mtr.discard_modifications(); + mtr.commit(); + + time_t now = time(NULL); + + mutex_enter(&recv_sys.mutex); + + if (recv_max_page_lsn < page_lsn) { + recv_max_page_lsn = page_lsn; + } + + ut_ad(p->second.is_being_processed()); + ut_ad(!recv_sys.pages.empty()); + + if (recv_sys.report(now)) { + const ulint n = recv_sys.pages.size(); + ib::info() << "To recover: " << n << " pages from log"; + service_manager_extend_timeout( + INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n); + } +} + +/** Remove records for a corrupted page. +This function should only be called when innodb_force_recovery is set. +@param page_id corrupted page identifier */ +ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id) +{ + mutex_enter(&mutex); + map::iterator p= pages.find(page_id); + if (p != pages.end()) + { + p->second.log.clear(); + pages.erase(p); + } + mutex_exit(&mutex); +} + +/** Apply any buffered redo log to a page that was just read from a data file. +@param[in,out] space tablespace +@param[in,out] bpage buffer pool page */ +void recv_recover_page(fil_space_t* space, buf_page_t* bpage) +{ + mtr_t mtr; + mtr.start(); + mtr.set_log_mode(MTR_LOG_NO_REDO); + + ut_ad(bpage->state() == BUF_BLOCK_FILE_PAGE); + buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage); + + /* Move the ownership of the x-latch on the page to + this OS thread, so that we can acquire a second + x-latch on it. This is needed for the operations to + the page to pass the debug checks. */ + rw_lock_x_lock_move_ownership(&block->lock); + buf_block_buf_fix_inc(block, __FILE__, __LINE__); + rw_lock_x_lock(&block->lock); + mtr.memo_push(block, MTR_MEMO_PAGE_X_FIX); + + mutex_enter(&recv_sys.mutex); + if (recv_sys.apply_log_recs) { + recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id()); + if (p != recv_sys.pages.end() + && !p->second.is_being_processed()) { + recv_recover_page(block, mtr, p, space); + p->second.log.clear(); + recv_sys.pages.erase(p); + goto func_exit; + } + } + + mtr.commit(); +func_exit: + mutex_exit(&recv_sys.mutex); + ut_ad(mtr.has_committed()); +} + +/** Reads in pages which have hashed log records, from an area around a given +page number. +@param[in] page_id page id */ +static void recv_read_in_area(page_id_t page_id) +{ + uint32_t page_nos[RECV_READ_AHEAD_AREA]; + compile_time_assert(ut_is_2pow(RECV_READ_AHEAD_AREA)); + page_id.set_page_no(ut_2pow_round(page_id.page_no(), + RECV_READ_AHEAD_AREA)); + const ulint up_limit = page_id.page_no() + RECV_READ_AHEAD_AREA; + uint32_t* p = page_nos; + + for (recv_sys_t::map::iterator i= recv_sys.pages.lower_bound(page_id); + i != recv_sys.pages.end() + && i->first.space() == page_id.space() + && i->first.page_no() < up_limit; i++) { + if (i->second.state == page_recv_t::RECV_NOT_PROCESSED + && !buf_pool.page_hash_contains(i->first)) { + i->second.state = page_recv_t::RECV_BEING_READ; + *p++ = i->first.page_no(); + } + } + + if (p != page_nos) { + mutex_exit(&recv_sys.mutex); + buf_read_recv_pages(page_id.space(), page_nos, + ulint(p - page_nos)); + mutex_enter(&recv_sys.mutex); + } +} + +/** Attempt to initialize a page based on redo log records. +@param page_id page identifier +@param p iterator pointing to page_id +@param mtr mini-transaction +@param b pre-allocated buffer pool block +@return whether the page was successfully initialized */ +inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, + map::iterator &p, mtr_t &mtr, + buf_block_t *b) +{ + ut_ad(mutex_own(&mutex)); + ut_ad(p->first == page_id); + page_recv_t &recs= p->second; + ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ); + buf_block_t* block= nullptr; + mlog_init_t::init &i= mlog_init.last(page_id); + const lsn_t end_lsn = recs.log.last()->lsn; + if (end_lsn < i.lsn) + DBUG_LOG("ib_log", "skip log for page " << page_id + << " LSN " << end_lsn << " < " << i.lsn); + else if (fil_space_t *space= fil_space_t::get(page_id.space())) + { + mtr.start(); + mtr.set_log_mode(MTR_LOG_NO_REDO); + block= buf_page_create(space, page_id.page_no(), space->zip_size(), &mtr, + b); + if (UNIV_UNLIKELY(block != b)) + { + /* The page happened to exist in the buffer pool, or it was just + being read in. Before buf_page_get_with_no_latch() returned to + buf_page_create(), all changes must have been applied to the + page already. */ + ut_ad(recv_sys.pages.find(page_id) == recv_sys.pages.end()); + mtr.commit(); + block= nullptr; + } + else + { + ut_ad(&recs == &recv_sys.pages.find(page_id)->second); + i.created= true; + buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK); + recv_recover_page(block, mtr, p, space, &i); + ut_ad(mtr.has_committed()); + recs.log.clear(); + map::iterator r= p++; + recv_sys.pages.erase(r); + } + space->release(); + } + + return block; +} + +/** Attempt to initialize a page based on redo log records. +@param page_id page identifier +@return whether the page was successfully initialized */ +buf_block_t *recv_sys_t::recover_low(const page_id_t page_id) +{ + buf_block_t *free_block= buf_LRU_get_free_block(false); + buf_block_t *block= nullptr; + + mutex_enter(&mutex); + map::iterator p= pages.find(page_id); + + if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) + { + mtr_t mtr; + block= recover_low(page_id, p, mtr, free_block); + ut_ad(!block || block == free_block); + } + + mutex_exit(&mutex); + if (UNIV_UNLIKELY(!block)) + buf_pool.free_block(free_block); + return block; +} + +/** Apply buffered log to persistent data pages. +@param last_batch whether it is possible to write more redo log */ +void recv_sys_t::apply(bool last_batch) +{ + ut_ad(srv_operation == SRV_OPERATION_NORMAL || + srv_operation == SRV_OPERATION_RESTORE || + srv_operation == SRV_OPERATION_RESTORE_EXPORT); + + mutex_enter(&mutex); + + while (apply_batch_on) + { + bool abort= found_corrupt_log; + mutex_exit(&mutex); + + if (abort) + return; + + os_thread_sleep(500000); + mutex_enter(&mutex); + } + +#ifdef SAFE_MUTEX + DBUG_ASSERT(!last_batch == mysql_mutex_is_owner(&log_sys.mutex)); +#endif /* SAFE_MUTEX */ + + recv_no_ibuf_operations = !last_batch || + srv_operation == SRV_OPERATION_RESTORE || + srv_operation == SRV_OPERATION_RESTORE_EXPORT; + + mtr_t mtr; + + if (!pages.empty()) + { + const char *msg= last_batch + ? "Starting final batch to recover " + : "Starting a batch to recover "; + const ulint n= pages.size(); + ib::info() << msg << n << " pages from redo log."; + sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log", msg, n); + + apply_log_recs= true; + apply_batch_on= true; + + for (auto id= srv_undo_tablespaces_open; id--;) + { + const trunc& t= truncated_undo_spaces[id]; + if (t.lsn) + trim(page_id_t(id + srv_undo_space_id_start, t.pages), t.lsn); + } + + fil_system.extend_to_recv_size(); + + buf_block_t *free_block= buf_LRU_get_free_block(false); + + for (map::iterator p= pages.begin(); p != pages.end(); ) + { + const page_id_t page_id= p->first; + page_recv_t &recs= p->second; + ut_ad(!recs.log.empty()); + + switch (recs.state) { + case page_recv_t::RECV_BEING_READ: + case page_recv_t::RECV_BEING_PROCESSED: + p++; + continue; + case page_recv_t::RECV_WILL_NOT_READ: + if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block))) + { + mutex_exit(&mutex); + free_block= buf_LRU_get_free_block(false); + mutex_enter(&mutex); +next_page: + p= pages.lower_bound(page_id); + } + continue; + case page_recv_t::RECV_NOT_PROCESSED: + mtr.start(); + mtr.set_log_mode(MTR_LOG_NO_REDO); + if (buf_block_t *block= buf_page_get_low(page_id, 0, RW_X_LATCH, + nullptr, BUF_GET_IF_IN_POOL, + __FILE__, __LINE__, + &mtr, nullptr, false)) + { + buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK); + recv_recover_page(block, mtr, p); + ut_ad(mtr.has_committed()); + } + else + { + mtr.commit(); + recv_read_in_area(page_id); + break; + } + map::iterator r= p++; + r->second.log.clear(); + pages.erase(r); + continue; + } + + goto next_page; + } + + buf_pool.free_block(free_block); + + /* Wait until all the pages have been processed */ + while (!pages.empty() || buf_pool.n_pend_reads) + { + const bool abort= found_corrupt_log || found_corrupt_fs; + + if (found_corrupt_fs && !srv_force_recovery) + ib::info() << "Set innodb_force_recovery=1 to ignore corrupted pages."; + + mutex_exit(&mutex); + + if (abort) + return; + os_thread_sleep(500000); + mutex_enter(&mutex); + } + } + + if (last_batch) + /* We skipped this in buf_page_create(). */ + mlog_init.mark_ibuf_exist(mtr); + else + { + mlog_init.reset(); + mysql_mutex_unlock(&log_sys.mutex); + } + + mysql_mutex_assert_not_owner(&log_sys.mutex); + mutex_exit(&mutex); + + /* Instead of flushing, last_batch could sort the buf_pool.flush_list + in ascending order of buf_page_t::oldest_modification. */ + buf_flush_sync(); + + if (!last_batch) + { + buf_pool_invalidate(); + mysql_mutex_lock(&log_sys.mutex); + } +#if 1 /* Mariabackup FIXME: Remove or adjust rename_table_in_prepare() */ + else if (srv_operation != SRV_OPERATION_NORMAL); +#endif + else + { + /* In the last batch, we will apply any rename operations. */ + for (auto r : renamed_spaces) + { + const uint32_t id= r.first; + fil_space_t *space= fil_space_t::get(id); + if (!space) + continue; + ut_ad(UT_LIST_GET_LEN(space->chain) == 1); + const char *old= space->chain.start->name; + if (r.second != old) + { + bool exists; + os_file_type_t ftype; + const char *new_name= r.second.c_str(); + if (!os_file_status(new_name, &exists, &ftype) || exists) + { + ib::error() << "Cannot replay rename of tablespace " << id + << " from '" << old << "' to '" << r.second << + (exists ? "' because the target file exists" : "'"); + found_corrupt_fs= true; + } + else + { + size_t base= r.second.rfind(OS_PATH_SEPARATOR); + ut_ad(base != std::string::npos); + size_t start= r.second.rfind(OS_PATH_SEPARATOR, base - 1); + if (start == std::string::npos) + start= 0; + else + ++start; + /* Keep only databasename/tablename without .ibd suffix */ + std::string space_name(r.second, start, r.second.size() - start - 4); + ut_ad(space_name[base - start] == OS_PATH_SEPARATOR); +#if OS_PATH_SEPARATOR != '/' + space_name[base - start]= '/'; +#endif + mysql_mutex_lock(&log_sys.mutex); + if (dberr_t err= space->rename(space_name.c_str(), r.second.c_str(), + false)) + { + ib::error() << "Cannot replay rename of tablespace " << id + << " to '" << r.second << "': " << err; + found_corrupt_fs= true; + } + mysql_mutex_unlock(&log_sys.mutex); + } + } + space->release(); + } + renamed_spaces.clear(); + } + + mutex_enter(&mutex); + + ut_d(after_apply= true); + clear(); + mutex_exit(&mutex); +} + +/** Check whether the number of read redo log blocks exceeds the maximum. +Store last_stored_lsn if the recovery is not in the last phase. +@param[in,out] store whether to store page operations +@return whether the memory is exhausted */ +inline bool recv_sys_t::is_memory_exhausted(store_t *store) +{ + if (*store == STORE_NO || + UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages()) + return false; + if (*store == STORE_YES) + last_stored_lsn= recovered_lsn; + *store= STORE_NO; + DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF + " last stored offset " ULINTPF "\n", + recovered_lsn, recovered_offset)); + return true; +} + +/** Adds data from a new log block to the parsing buffer of recv_sys if +recv_sys.parse_start_lsn is non-zero. +@param[in] log_block log block to add +@param[in] scanned_lsn lsn of how far we were able to find + data in this log block +@return true if more data added */ +bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn) +{ + ulint more_len; + ulint data_len; + ulint start_offset; + ulint end_offset; + + ut_ad(scanned_lsn >= recv_sys.scanned_lsn); + + if (!recv_sys.parse_start_lsn) { + /* Cannot start parsing yet because no start point for + it found */ + return(false); + } + + data_len = log_block_get_data_len(log_block); + + if (recv_sys.parse_start_lsn >= scanned_lsn) { + + return(false); + + } else if (recv_sys.scanned_lsn >= scanned_lsn) { + + return(false); + + } else if (recv_sys.parse_start_lsn > recv_sys.scanned_lsn) { + more_len = (ulint) (scanned_lsn - recv_sys.parse_start_lsn); + } else { + more_len = (ulint) (scanned_lsn - recv_sys.scanned_lsn); + } + + if (more_len == 0) { + return(false); + } + + ut_ad(data_len >= more_len); + + start_offset = data_len - more_len; + + if (start_offset < LOG_BLOCK_HDR_SIZE) { + start_offset = LOG_BLOCK_HDR_SIZE; + } + + end_offset = std::min<ulint>(data_len, log_sys.trailer_offset()); + + ut_ad(start_offset <= end_offset); + + if (start_offset < end_offset) { + memcpy(recv_sys.buf + recv_sys.len, + log_block + start_offset, end_offset - start_offset); + + recv_sys.len += end_offset - start_offset; + + ut_a(recv_sys.len <= RECV_PARSING_BUF_SIZE); + } + + return(true); +} + +/** Moves the parsing buffer data left to the buffer start. */ +void recv_sys_justify_left_parsing_buf() +{ + memmove(recv_sys.buf, recv_sys.buf + recv_sys.recovered_offset, + recv_sys.len - recv_sys.recovered_offset); + + recv_sys.len -= recv_sys.recovered_offset; + + recv_sys.recovered_offset = 0; +} + +/** Scan redo log from a buffer and stores new log data to the parsing buffer. +Parse and hash the log records if new data found. +Apply log records automatically when the hash table becomes full. +@param[in,out] store whether the records should be + stored into recv_sys.pages; this is + reset if just debug checking is + needed, or when the num_max_blocks in + recv_sys runs out +@param[in] log_block log segment +@param[in] checkpoint_lsn latest checkpoint LSN +@param[in] start_lsn buffer start LSN +@param[in] end_lsn buffer end LSN +@param[in,out] contiguous_lsn it is known that all groups contain + contiguous log data upto this lsn +@param[out] group_scanned_lsn scanning succeeded upto this lsn +@return true if not able to scan any more in this log group */ +static bool recv_scan_log_recs( + store_t* store, + const byte* log_block, + lsn_t checkpoint_lsn, + lsn_t start_lsn, + lsn_t end_lsn, + lsn_t* contiguous_lsn, + lsn_t* group_scanned_lsn) +{ + lsn_t scanned_lsn = start_lsn; + bool finished = false; + ulint data_len; + bool more_data = false; + bool apply = recv_sys.mlog_checkpoint_lsn != 0; + ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE; + const bool last_phase = (*store == STORE_IF_EXISTS); + ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE); + ut_ad(log_sys.is_physical()); + + const byte* const log_end = log_block + + ulint(end_lsn - start_lsn); + constexpr ulint sizeof_checkpoint= SIZE_OF_FILE_CHECKPOINT; + + do { + ut_ad(!finished); + + if (log_block_get_flush_bit(log_block)) { + /* This block was a start of a log flush operation: + we know that the previous flush operation must have + been completed for all log groups before this block + can have been flushed to any of the groups. Therefore, + we know that log data is contiguous up to scanned_lsn + in all non-corrupt log groups. */ + + if (scanned_lsn > *contiguous_lsn) { + *contiguous_lsn = scanned_lsn; + } + } + + data_len = log_block_get_data_len(log_block); + + if (scanned_lsn + data_len > recv_sys.scanned_lsn + && log_block_get_checkpoint_no(log_block) + < recv_sys.scanned_checkpoint_no + && (recv_sys.scanned_checkpoint_no + - log_block_get_checkpoint_no(log_block) + > 0x80000000UL)) { + + /* Garbage from a log buffer flush which was made + before the most recent database recovery */ + finished = true; + break; + } + + if (!recv_sys.parse_start_lsn + && (log_block_get_first_rec_group(log_block) > 0)) { + + /* We found a point from which to start the parsing + of log records */ + + recv_sys.parse_start_lsn = scanned_lsn + + log_block_get_first_rec_group(log_block); + recv_sys.scanned_lsn = recv_sys.parse_start_lsn; + recv_sys.recovered_lsn = recv_sys.parse_start_lsn; + } + + scanned_lsn += data_len; + + if (data_len == LOG_BLOCK_HDR_SIZE + sizeof_checkpoint + && scanned_lsn == checkpoint_lsn + sizeof_checkpoint + && log_block[LOG_BLOCK_HDR_SIZE] + == (FILE_CHECKPOINT | (SIZE_OF_FILE_CHECKPOINT - 2)) + && checkpoint_lsn == mach_read_from_8( + (LOG_BLOCK_HDR_SIZE + 1 + 2) + + log_block)) { + /* The redo log is logically empty. */ + ut_ad(recv_sys.mlog_checkpoint_lsn == 0 + || recv_sys.mlog_checkpoint_lsn + == checkpoint_lsn); + recv_sys.mlog_checkpoint_lsn = checkpoint_lsn; + DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF, + scanned_lsn)); + finished = true; + break; + } + + if (scanned_lsn > recv_sys.scanned_lsn) { + ut_ad(!srv_log_file_created); + if (!recv_needed_recovery) { + recv_needed_recovery = true; + + if (srv_read_only_mode) { + ib::warn() << "innodb_read_only" + " prevents crash recovery"; + return(true); + } + + ib::info() << "Starting crash recovery from" + " checkpoint LSN=" << checkpoint_lsn + << "," << recv_sys.scanned_lsn; + } + + /* We were able to find more log data: add it to the + parsing buffer if parse_start_lsn is already + non-zero */ + + DBUG_EXECUTE_IF( + "reduce_recv_parsing_buf", + recv_parsing_buf_size = RECV_SCAN_SIZE * 2; + ); + + if (recv_sys.len + 4 * OS_FILE_LOG_BLOCK_SIZE + >= recv_parsing_buf_size) { + ib::error() << "Log parsing buffer overflow." + " Recovery may have failed!"; + + recv_sys.found_corrupt_log = true; + + if (!srv_force_recovery) { + ib::error() + << "Set innodb_force_recovery" + " to ignore this error."; + return(true); + } + } else if (!recv_sys.found_corrupt_log) { + more_data = recv_sys_add_to_parsing_buf( + log_block, scanned_lsn); + } + + recv_sys.scanned_lsn = scanned_lsn; + recv_sys.scanned_checkpoint_no + = log_block_get_checkpoint_no(log_block); + } + + /* During last phase of scanning, there can be redo logs + left in recv_sys.buf to parse & store it in recv_sys.heap */ + if (last_phase + && recv_sys.recovered_lsn < recv_sys.scanned_lsn) { + more_data = true; + } + + if (data_len < OS_FILE_LOG_BLOCK_SIZE) { + /* Log data for this group ends here */ + finished = true; + break; + } else { + log_block += OS_FILE_LOG_BLOCK_SIZE; + } + } while (log_block < log_end); + + *group_scanned_lsn = scanned_lsn; + + mutex_enter(&recv_sys.mutex); + + if (more_data && !recv_sys.found_corrupt_log) { + /* Try to parse more log records */ + if (recv_sys.parse(checkpoint_lsn, store, apply)) { + ut_ad(recv_sys.found_corrupt_log + || recv_sys.found_corrupt_fs + || recv_sys.mlog_checkpoint_lsn + == recv_sys.recovered_lsn); + finished = true; + goto func_exit; + } + + recv_sys.is_memory_exhausted(store); + + if (recv_sys.recovered_offset > recv_parsing_buf_size / 4 + || (recv_sys.recovered_offset + && recv_sys.len + >= recv_parsing_buf_size - RECV_SCAN_SIZE)) { + /* Move parsing buffer data to the buffer start */ + recv_sys_justify_left_parsing_buf(); + } + + /* Need to re-parse the redo log which're stored + in recv_sys.buf */ + if (last_phase && *store == STORE_NO) { + finished = false; + } + } + +func_exit: + mutex_exit(&recv_sys.mutex); + return(finished); +} + +/** Scans log from a buffer and stores new log data to the parsing buffer. +Parses and hashes the log records if new data found. +@param[in] checkpoint_lsn latest checkpoint log sequence number +@param[in,out] contiguous_lsn log sequence number +until which all redo log has been scanned +@param[in] last_phase whether changes +can be applied to the tablespaces +@return whether rescan is needed (not everything was stored) */ +static +bool +recv_group_scan_log_recs( + lsn_t checkpoint_lsn, + lsn_t* contiguous_lsn, + bool last_phase) +{ + DBUG_ENTER("recv_group_scan_log_recs"); + DBUG_ASSERT(!last_phase || recv_sys.mlog_checkpoint_lsn > 0); + + mutex_enter(&recv_sys.mutex); + recv_sys.len = 0; + recv_sys.recovered_offset = 0; + recv_sys.clear(); + recv_sys.parse_start_lsn = *contiguous_lsn; + recv_sys.scanned_lsn = *contiguous_lsn; + recv_sys.recovered_lsn = *contiguous_lsn; + recv_sys.scanned_checkpoint_no = 0; + ut_ad(recv_max_page_lsn == 0); + mutex_exit(&recv_sys.mutex); + + lsn_t start_lsn; + lsn_t end_lsn; + store_t store = recv_sys.mlog_checkpoint_lsn == 0 + ? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES); + + log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn = + ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE); + ut_d(recv_sys.after_apply = last_phase); + + do { + if (last_phase && store == STORE_NO) { + store = STORE_IF_EXISTS; + recv_sys.apply(false); + /* Rescan the redo logs from last stored lsn */ + end_lsn = recv_sys.recovered_lsn; + } + + start_lsn = ut_uint64_align_down(end_lsn, + OS_FILE_LOG_BLOCK_SIZE); + end_lsn = start_lsn; + log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE); + } while (end_lsn != start_lsn + && !recv_scan_log_recs(&store, log_sys.buf, checkpoint_lsn, + start_lsn, end_lsn, contiguous_lsn, + &log_sys.log.scanned_lsn)); + + if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) { + DBUG_RETURN(false); + } + + DBUG_PRINT("ib_log", ("%s " LSN_PF " completed", + last_phase ? "rescan" : "scan", + log_sys.log.scanned_lsn)); + + DBUG_RETURN(store == STORE_NO); +} + +/** Report a missing tablespace for which page-redo log exists. +@param[in] err previous error code +@param[in] i tablespace descriptor +@return new error code */ +static +dberr_t +recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i) +{ + if (srv_operation == SRV_OPERATION_RESTORE + || srv_operation == SRV_OPERATION_RESTORE_EXPORT) { + if (i->second.name.find(TEMP_TABLE_PATH_PREFIX) + != std::string::npos) { + ib::warn() << "Tablespace " << i->first << " was not" + " found at " << i->second.name << " when" + " restoring a (partial?) backup. All redo log" + " for this file will be ignored!"; + } + return(err); + } + + if (srv_force_recovery == 0) { + ib::error() << "Tablespace " << i->first << " was not" + " found at " << i->second.name << "."; + + if (err == DB_SUCCESS) { + ib::error() << "Set innodb_force_recovery=1 to" + " ignore this and to permanently lose" + " all changes to the tablespace."; + err = DB_TABLESPACE_NOT_FOUND; + } + } else { + ib::warn() << "Tablespace " << i->first << " was not" + " found at " << i->second.name << ", and" + " innodb_force_recovery was set. All redo log" + " for this tablespace will be ignored!"; + } + + return(err); +} + +/** Report the missing tablespace and discard the redo logs for the deleted +tablespace. +@param[in] rescan rescan of redo logs is needed + if hash table ran out of memory +@param[out] missing_tablespace missing tablespace exists or not +@return error code or DB_SUCCESS. */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +recv_validate_tablespace(bool rescan, bool& missing_tablespace) +{ + dberr_t err = DB_SUCCESS; + + mutex_enter(&recv_sys.mutex); + + for (recv_sys_t::map::iterator p = recv_sys.pages.begin(); + p != recv_sys.pages.end();) { + ut_ad(!p->second.log.empty()); + const ulint space = p->first.space(); + if (is_predefined_tablespace(space)) { +next: + p++; + continue; + } + + recv_spaces_t::iterator i = recv_spaces.find(space); + ut_ad(i != recv_spaces.end()); + + switch (i->second.status) { + case file_name_t::NORMAL: + goto next; + case file_name_t::MISSING: + err = recv_init_missing_space(err, i); + i->second.status = file_name_t::DELETED; + /* fall through */ + case file_name_t::DELETED: + recv_sys_t::map::iterator r = p++; + r->second.log.clear(); + recv_sys.pages.erase(r); + continue; + } + ut_ad(0); + } + + if (err != DB_SUCCESS) { +func_exit: + mutex_exit(&recv_sys.mutex); + return(err); + } + + /* When rescan is not needed, recv_sys.pages will contain the + entire redo log. If rescan is needed or innodb_force_recovery + is set, we can ignore missing tablespaces. */ + for (const recv_spaces_t::value_type& rs : recv_spaces) { + if (UNIV_LIKELY(rs.second.status != file_name_t::MISSING)) { + continue; + } + + missing_tablespace = true; + + if (srv_force_recovery > 0) { + ib::warn() << "Tablespace " << rs.first + <<" was not found at " << rs.second.name + <<", and innodb_force_recovery was set." + <<" All redo log for this tablespace" + <<" will be ignored!"; + continue; + } + + if (!rescan) { + ib::info() << "Tablespace " << rs.first + << " was not found at '" + << rs.second.name << "', but there" + <<" were no modifications either."; + } + } + + if (!rescan || srv_force_recovery > 0) { + missing_tablespace = false; + } + + err = DB_SUCCESS; + goto func_exit; +} + +/** Check if all tablespaces were found for crash recovery. +@param[in] rescan rescan of redo logs is needed +@param[out] missing_tablespace missing table exists +@return error code or DB_SUCCESS */ +static MY_ATTRIBUTE((warn_unused_result)) +dberr_t +recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace) +{ + bool flag_deleted = false; + + ut_ad(!srv_read_only_mode); + ut_ad(recv_needed_recovery); + + for (recv_spaces_t::value_type& rs : recv_spaces) { + ut_ad(!is_predefined_tablespace(rs.first)); + ut_ad(rs.second.status != file_name_t::DELETED + || !rs.second.space); + + if (rs.second.status == file_name_t::DELETED) { + /* The tablespace was deleted, + so we can ignore any redo log for it. */ + flag_deleted = true; + } else if (rs.second.space != NULL) { + /* The tablespace was found, and there + are some redo log records for it. */ + fil_names_dirty(rs.second.space); + + /* Add the freed page ranges in the respective + tablespace */ + if (!rs.second.freed_ranges.empty() + && (srv_immediate_scrub_data_uncompressed + || rs.second.space->is_compressed())) { + + rs.second.space->add_free_ranges( + std::move(rs.second.freed_ranges)); + } + } else if (rs.second.name == "") { + ib::error() << "Missing FILE_CREATE, FILE_DELETE" + " or FILE_MODIFY before FILE_CHECKPOINT" + " for tablespace " << rs.first; + recv_sys.found_corrupt_log = true; + return(DB_CORRUPTION); + } else { + rs.second.status = file_name_t::MISSING; + flag_deleted = true; + } + + ut_ad(rs.second.status == file_name_t::DELETED + || rs.second.name != ""); + } + + if (flag_deleted) { + return recv_validate_tablespace(rescan, missing_tablespace); + } + + return DB_SUCCESS; +} + +/** Start recovering from a redo log checkpoint. +@param[in] flush_lsn FIL_PAGE_FILE_FLUSH_LSN +of first system tablespace page +@return error code or DB_SUCCESS */ +dberr_t +recv_recovery_from_checkpoint_start(lsn_t flush_lsn) +{ + ulint max_cp_field; + lsn_t checkpoint_lsn; + bool rescan = false; + ib_uint64_t checkpoint_no; + lsn_t contiguous_lsn; + byte* buf; + dberr_t err = DB_SUCCESS; + + ut_ad(srv_operation == SRV_OPERATION_NORMAL + || srv_operation == SRV_OPERATION_RESTORE + || srv_operation == SRV_OPERATION_RESTORE_EXPORT); + ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex)); + ut_ad(UT_LIST_GET_LEN(buf_pool.LRU) == 0); + ut_ad(UT_LIST_GET_LEN(buf_pool.unzip_LRU) == 0); + ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex)); + + if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) { + + ib::info() << "innodb_force_recovery=6 skips redo log apply"; + + return(DB_SUCCESS); + } + + recv_sys.recovery_on = true; + + mysql_mutex_lock(&log_sys.mutex); + + err = recv_find_max_checkpoint(&max_cp_field); + + if (err != DB_SUCCESS) { + + recv_sys.recovered_lsn = log_sys.get_lsn(); + mysql_mutex_unlock(&log_sys.mutex); + return(err); + } + + buf = log_sys.checkpoint_buf; + log_sys.log.read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE}); + + checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN); + checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO); + + /* Start reading the log from the checkpoint lsn. The variable + contiguous_lsn contains an lsn up to which the log is known to + be contiguously written. */ + recv_sys.mlog_checkpoint_lsn = 0; + + ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size); + + const lsn_t end_lsn = mach_read_from_8( + buf + LOG_CHECKPOINT_END_LSN); + + ut_ad(recv_sys.pages.empty()); + contiguous_lsn = checkpoint_lsn; + switch (log_sys.log.format) { + case 0: + mysql_mutex_unlock(&log_sys.mutex); + return DB_SUCCESS; + default: + if (end_lsn == 0) { + break; + } + if (end_lsn >= checkpoint_lsn) { + contiguous_lsn = end_lsn; + break; + } + recv_sys.found_corrupt_log = true; + mysql_mutex_unlock(&log_sys.mutex); + return(DB_ERROR); + } + + size_t sizeof_checkpoint; + + if (!log_sys.is_physical()) { + sizeof_checkpoint = 9/* size of MLOG_CHECKPOINT */; + goto completed; + } + + /* Look for FILE_CHECKPOINT. */ + recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false); + /* The first scan should not have stored or applied any records. */ + ut_ad(recv_sys.pages.empty()); + ut_ad(!recv_sys.found_corrupt_fs); + + if (srv_read_only_mode && recv_needed_recovery) { + mysql_mutex_unlock(&log_sys.mutex); + return(DB_READ_ONLY); + } + + if (recv_sys.found_corrupt_log && !srv_force_recovery) { + mysql_mutex_unlock(&log_sys.mutex); + ib::warn() << "Log scan aborted at LSN " << contiguous_lsn; + return(DB_ERROR); + } + + if (recv_sys.mlog_checkpoint_lsn == 0) { + lsn_t scan_lsn = log_sys.log.scanned_lsn; + if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) { + mysql_mutex_unlock(&log_sys.mutex); + ib::error err; + err << "Missing FILE_CHECKPOINT"; + if (end_lsn) { + err << " at " << end_lsn; + } + err << " between the checkpoint " << checkpoint_lsn + << " and the end " << scan_lsn << "."; + return(DB_ERROR); + } + + log_sys.log.scanned_lsn = checkpoint_lsn; + } else { + contiguous_lsn = checkpoint_lsn; + rescan = recv_group_scan_log_recs( + checkpoint_lsn, &contiguous_lsn, false); + + if ((recv_sys.found_corrupt_log && !srv_force_recovery) + || recv_sys.found_corrupt_fs) { + mysql_mutex_unlock(&log_sys.mutex); + return(DB_ERROR); + } + } + + /* NOTE: we always do a 'recovery' at startup, but only if + there is something wrong we will print a message to the + user about recovery: */ + sizeof_checkpoint= SIZE_OF_FILE_CHECKPOINT; + +completed: + if (flush_lsn == checkpoint_lsn + sizeof_checkpoint + && recv_sys.mlog_checkpoint_lsn == checkpoint_lsn) { + /* The redo log is logically empty. */ + } else if (checkpoint_lsn != flush_lsn) { + ut_ad(!srv_log_file_created); + + if (checkpoint_lsn + sizeof_checkpoint < flush_lsn) { + ib::warn() + << "Are you sure you are using the right " + << LOG_FILE_NAME + << " to start up the database? Log sequence " + "number in the " + << LOG_FILE_NAME << " is " << checkpoint_lsn + << ", less than the log sequence number in " + "the first system tablespace file header, " + << flush_lsn << "."; + } + + if (!recv_needed_recovery) { + + ib::info() + << "The log sequence number " << flush_lsn + << " in the system tablespace does not match" + " the log sequence number " + << checkpoint_lsn << " in the " + << LOG_FILE_NAME << "!"; + + if (srv_read_only_mode) { + ib::error() << "innodb_read_only" + " prevents crash recovery"; + mysql_mutex_unlock(&log_sys.mutex); + return(DB_READ_ONLY); + } + + recv_needed_recovery = true; + } + } + + log_sys.set_lsn(recv_sys.recovered_lsn); + if (UNIV_LIKELY(log_sys.get_flushed_lsn() < recv_sys.recovered_lsn)) { + /* This may already have been set by create_log_file() + if no logs existed when the server started up. */ + log_sys.set_flushed_lsn(recv_sys.recovered_lsn); + } + + if (recv_needed_recovery) { + bool missing_tablespace = false; + + err = recv_init_crash_recovery_spaces( + rescan, missing_tablespace); + + if (err != DB_SUCCESS) { + mysql_mutex_unlock(&log_sys.mutex); + return(err); + } + + /* If there is any missing tablespace and rescan is needed + then there is a possiblity that hash table will not contain + all space ids redo logs. Rescan the remaining unstored + redo logs for the validation of missing tablespace. */ + ut_ad(rescan || !missing_tablespace); + + while (missing_tablespace) { + DBUG_PRINT("ib_log", ("Rescan of redo log to validate " + "the missing tablespace. Scan " + "from last stored LSN " LSN_PF, + recv_sys.last_stored_lsn)); + + lsn_t recent_stored_lsn = recv_sys.last_stored_lsn; + rescan = recv_group_scan_log_recs( + checkpoint_lsn, &recent_stored_lsn, false); + + ut_ad(!recv_sys.found_corrupt_fs); + + missing_tablespace = false; + + err = recv_sys.found_corrupt_log + ? DB_ERROR + : recv_validate_tablespace( + rescan, missing_tablespace); + + if (err != DB_SUCCESS) { + mysql_mutex_unlock(&log_sys.mutex); + return err; + } + + rescan = true; + } + + recv_sys.parse_start_lsn = checkpoint_lsn; + + if (srv_operation == SRV_OPERATION_NORMAL) { + buf_dblwr.recover(); + } + + ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN); + + if (rescan) { + contiguous_lsn = checkpoint_lsn; + + recv_group_scan_log_recs( + checkpoint_lsn, &contiguous_lsn, true); + + if ((recv_sys.found_corrupt_log + && !srv_force_recovery) + || recv_sys.found_corrupt_fs) { + mysql_mutex_unlock(&log_sys.mutex); + return(DB_ERROR); + } + } + } else { + ut_ad(!rescan || recv_sys.pages.empty()); + } + + if (log_sys.is_physical() + && (log_sys.log.scanned_lsn < checkpoint_lsn + || log_sys.log.scanned_lsn < recv_max_page_lsn)) { + + ib::error() << "We scanned the log up to " + << log_sys.log.scanned_lsn + << ". A checkpoint was at " << checkpoint_lsn << " and" + " the maximum LSN on a database page was " + << recv_max_page_lsn << ". It is possible that the" + " database is now corrupt!"; + } + + if (recv_sys.recovered_lsn < checkpoint_lsn) { + mysql_mutex_unlock(&log_sys.mutex); + + ib::error() << "Recovered only to lsn:" + << recv_sys.recovered_lsn + << " checkpoint_lsn: " << checkpoint_lsn; + + return(DB_ERROR); + } + + log_sys.next_checkpoint_lsn = checkpoint_lsn; + log_sys.next_checkpoint_no = checkpoint_no + 1; + + recv_synchronize_groups(); + + ut_ad(recv_needed_recovery + || checkpoint_lsn == recv_sys.recovered_lsn); + + log_sys.write_lsn = log_sys.get_lsn(); + log_sys.buf_free = log_sys.write_lsn % OS_FILE_LOG_BLOCK_SIZE; + log_sys.buf_next_to_write = log_sys.buf_free; + + log_sys.last_checkpoint_lsn = checkpoint_lsn; + + if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL) { + /* Write a FILE_CHECKPOINT marker as the first thing, + before generating any other redo log. This ensures + that subsequent crash recovery will be possible even + if the server were killed soon after this. */ + fil_names_clear(log_sys.last_checkpoint_lsn, true); + } + + log_sys.next_checkpoint_no = ++checkpoint_no; + + mutex_enter(&recv_sys.mutex); + + recv_sys.apply_log_recs = true; + recv_no_ibuf_operations = false; + ut_d(recv_no_log_write = srv_operation == SRV_OPERATION_RESTORE + || srv_operation == SRV_OPERATION_RESTORE_EXPORT); + + mutex_exit(&recv_sys.mutex); + + mysql_mutex_unlock(&log_sys.mutex); + + recv_lsn_checks_on = true; + + /* The database is now ready to start almost normal processing of user + transactions: transaction rollbacks and the application of the log + records in the hash table can be run in background. */ + + return(DB_SUCCESS); +} + +bool recv_dblwr_t::validate_page(const page_id_t page_id, + const byte *page, + const fil_space_t *space, + byte *tmp_buf) +{ + if (page_id.page_no() == 0) + { + ulint flags= fsp_header_get_flags(page); + if (!fil_space_t::is_valid_flags(flags, page_id.space())) + { + ulint cflags= fsp_flags_convert_from_101(flags); + if (cflags == ULINT_UNDEFINED) + { + ib::warn() << "Ignoring a doublewrite copy of page " << page_id + << "due to invalid flags " << ib::hex(flags); + return false; + } + + flags= cflags; + } + + /* Page 0 is never page_compressed or encrypted. */ + return !buf_page_is_corrupted(true, page, flags); + } + + ut_ad(tmp_buf); + byte *tmp_frame= tmp_buf; + byte *tmp_page= tmp_buf + srv_page_size; + const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE); + const bool expect_encrypted= space->crypt_data && + space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED; + + if (space->full_crc32()) + return !buf_page_is_corrupted(true, page, space->flags); + + if (expect_encrypted && + mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) + { + if (!fil_space_verify_crypt_checksum(page, space->zip_size())) + return false; + if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) + return true; + if (space->zip_size()) + return false; + memcpy(tmp_page, page, space->physical_size()); + if (!fil_space_decrypt(space, tmp_frame, tmp_page)) + return false; + } + + switch (page_type) { + case FIL_PAGE_PAGE_COMPRESSED: + memcpy(tmp_page, page, space->physical_size()); + /* fall through */ + case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED: + if (space->zip_size()) + return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */ + ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags); + if (!decomp) + return false; /* decompression failed */ + if (decomp == srv_page_size) + return false; /* the page was not compressed (invalid page type) */ + return !buf_page_is_corrupted(true, tmp_page, space->flags); + } + + return !buf_page_is_corrupted(true, page, space->flags); +} + +byte *recv_dblwr_t::find_page(const page_id_t page_id, + const fil_space_t *space, byte *tmp_buf) +{ + byte *result= NULL; + lsn_t max_lsn= 0; + + for (byte *page : pages) + { + if (page_get_page_no(page) != page_id.page_no() || + page_get_space_id(page) != page_id.space()) + continue; + const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN); + if (lsn <= max_lsn || + !validate_page(page_id, page, space, tmp_buf)) + { + /* Mark processed for subsequent iterations in buf_dblwr_t::recover() */ + memset(page + FIL_PAGE_LSN, 0, 8); + continue; + } + max_lsn= lsn; + result= page; + } + + return result; +} diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc new file mode 100644 index 00000000..2a6e1b8b --- /dev/null +++ b/storage/innobase/log/log0sync.cc @@ -0,0 +1,309 @@ +/***************************************************************************** +Copyright (c) 2020 MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/* +The group commit synchronization used in log_write_up_to() +works as follows + +For simplicity, lets consider only write operation,synchronozation of +flush operation works the same. + +Rules of the game + +A thread enters log_write_up_to() with lsn of the current transaction +1. If last written lsn is greater than wait lsn (another thread already + wrote the log buffer),then there is no need to do anything. +2. If no other thread is currently writing, write the log buffer, + and update last written lsn. +3. Otherwise, wait, and go to step 1. + +Synchronization can be done in different ways, e.g + +a) Simple mutex locking the entire check and write operation +Disadvantage that threads that could continue after updating +last written lsn, still wait. + +b) Spinlock, with periodic checks for last written lsn. +Fixes a) but burns CPU unnecessary. + +c) Mutex / condition variable combo. + +Condtion variable notifies (broadcast) all waiters, whenever +last written lsn is changed. + +Has a disadvantage of many suprious wakeups, stress on OS scheduler, +and mutex contention. + +d) Something else. +Make use of the waiter's lsn parameter, and only wakeup "right" waiting +threads. + +We chose d). Even if implementation is more complicated than alternatves +due to the need to maintain list of waiters, it provides the best performance. + +See group_commit_lock implementation for details. + +Note that if write operation is very fast, a) or b) can be fine as alternative. +*/ +#ifdef _WIN32 +#include <windows.h> +#endif + +#ifdef __linux__ +#include <linux/futex.h> +#include <sys/syscall.h> +#endif + +#include <atomic> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <my_cpu.h> + +#include <log0types.h> +#include "log0sync.h" +#include <mysql/service_thd_wait.h> +/** + Helper class , used in group commit lock. + + Binary semaphore, or (same thing), an auto-reset event + Has state (signalled or not), and provides 2 operations. + wait() and wake() + + The implementation uses efficient locking primitives on Linux and Windows. + Or, mutex/condition combo elsewhere. +*/ + +class binary_semaphore +{ +public: + /**Wait until semaphore becomes signalled, and atomically reset the state + to non-signalled*/ + void wait(); + /** signals the semaphore */ + void wake(); + +private: +#if defined(__linux__) || defined (_WIN32) + std::atomic<int> m_signalled; + static constexpr std::memory_order mem_order= std::memory_order_acq_rel; +public: + binary_semaphore() :m_signalled(0) {} +#else + std::mutex m_mtx{}; + std::condition_variable m_cv{}; + bool m_signalled = false; +#endif +}; + +#if defined (__linux__) || defined (_WIN32) +void binary_semaphore::wait() +{ + for (;;) + { + if (m_signalled.exchange(0, mem_order) == 1) + { + break; + } +#ifdef _WIN32 + int zero = 0; + WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE); +#else + syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0); +#endif + } +} + +void binary_semaphore::wake() +{ + if (m_signalled.exchange(1, mem_order) == 0) + { +#ifdef _WIN32 + WakeByAddressSingle(&m_signalled); +#else + syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0); +#endif + } +} +#else +void binary_semaphore::wait() +{ + std::unique_lock<std::mutex> lk(m_mtx); + while (!m_signalled) + m_cv.wait(lk); + m_signalled = false; +} +void binary_semaphore::wake() +{ + std::unique_lock<std::mutex> lk(m_mtx); + m_signalled = true; + m_cv.notify_one(); +} +#endif + +/* A thread helper structure, used in group commit lock below*/ +struct group_commit_waiter_t +{ + lsn_t m_value; + binary_semaphore m_sema; + group_commit_waiter_t* m_next; + group_commit_waiter_t() :m_value(), m_sema(), m_next() {} +}; + +group_commit_lock::group_commit_lock() : + m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list() +{ +} + +group_commit_lock::value_type group_commit_lock::value() const +{ + return m_value.load(std::memory_order::memory_order_relaxed); +} + +group_commit_lock::value_type group_commit_lock::pending() const +{ + return m_pending_value.load(std::memory_order::memory_order_relaxed); +} + +void group_commit_lock::set_pending(group_commit_lock::value_type num) +{ + ut_a(num >= value()); + m_pending_value.store(num, std::memory_order::memory_order_relaxed); +} + +const unsigned int MAX_SPINS = 1; /** max spins in acquire */ +thread_local group_commit_waiter_t thread_local_waiter; + +group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) +{ + unsigned int spins = MAX_SPINS; + + for(;;) + { + if (num <= value()) + { + /* No need to wait.*/ + return lock_return_code::EXPIRED; + } + + if(spins-- == 0) + break; + if (num > pending()) + { + /* Longer wait expected (longer than currently running operation), + don't spin.*/ + break; + } + ut_delay(1); + } + + thread_local_waiter.m_value = num; + std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); + while (num > value()) + { + lk.lock(); + + /* Re-read current value after acquiring the lock*/ + if (num <= value()) + { + return lock_return_code::EXPIRED; + } + + if (!m_lock) + { + /* Take the lock, become group commit leader.*/ + m_lock = true; +#ifndef DBUG_OFF + m_owner_id = std::this_thread::get_id(); +#endif + return lock_return_code::ACQUIRED; + } + + /* Add yourself to waiters list.*/ + thread_local_waiter.m_next = m_waiters_list; + m_waiters_list = &thread_local_waiter; + lk.unlock(); + + /* Sleep until woken in release().*/ + thd_wait_begin(0,THD_WAIT_GROUP_COMMIT); + thread_local_waiter.m_sema.wait(); + thd_wait_end(0); + + } + return lock_return_code::EXPIRED; +} + +void group_commit_lock::release(value_type num) +{ + std::unique_lock<std::mutex> lk(m_mtx); + m_lock = false; + + /* Update current value. */ + ut_a(num >= value()); + m_value.store(num, std::memory_order_relaxed); + + /* + Wake waiters for value <= current value. + Wake one more waiter, who will become the group commit lead. + */ + group_commit_waiter_t* cur, * prev, * next; + group_commit_waiter_t* wakeup_list = nullptr; + int extra_wake = 0; + + for (prev= nullptr, cur= m_waiters_list; cur; cur= next) + { + next= cur->m_next; + if (cur->m_value <= num || extra_wake++ == 0) + { + /* Move current waiter to wakeup_list*/ + + if (!prev) + { + /* Remove from the start of the list.*/ + m_waiters_list = next; + } + else + { + /* Remove from the middle of the list.*/ + prev->m_next= cur->m_next; + } + + /* Append entry to the wakeup list.*/ + cur->m_next = wakeup_list; + wakeup_list = cur; + } + else + { + prev= cur; + } + } + lk.unlock(); + + for (cur= wakeup_list; cur; cur= next) + { + next= cur->m_next; + cur->m_sema.wake(); + } +} + +#ifndef DBUG_OFF +bool group_commit_lock::is_owner() +{ + return m_lock && std::this_thread::get_id() == m_owner_id; +} +#endif + diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h new file mode 100644 index 00000000..40afbf74 --- /dev/null +++ b/storage/innobase/log/log0sync.h @@ -0,0 +1,81 @@ +/***************************************************************************** +Copyright (c) 2020 MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +#include <atomic> +#include <thread> +#include <log0types.h> + +struct group_commit_waiter_t; + +/** +Special synchronization primitive, which is helpful for +performing group commit. + +It has a state consisting of + - locked (bool) + - current value (number). This value is always increasing. + - pending value (number). current value can soon become this number + This is only used for optimization, does not have to be exact + +Operations supported on this semaphore + +1.acquire(num): +- waits until current value exceeds num, or until lock is granted. + +- returns EXPIRED if current_value >= num, + or ACQUIRED, if current_value < num and lock is granted. + +2.release(num) +- releases lock +- sets new current value to max(num,current_value) +- releases some threads waiting in acquire() + +3. value() +- read current value + +4. pending_value() +- read pending value + +5. set_pending_value() +*/ +class group_commit_lock +{ + using value_type = lsn_t; +#ifndef DBUG_OFF + std::thread::id m_owner_id{}; +#endif + std::mutex m_mtx; + std::atomic<value_type> m_value; + std::atomic<value_type> m_pending_value; + bool m_lock; + group_commit_waiter_t* m_waiters_list; +public: + group_commit_lock(); + enum lock_return_code + { + ACQUIRED, + EXPIRED + }; + lock_return_code acquire(value_type num); + void release(value_type num); + value_type value() const; + value_type pending() const; + void set_pending(value_type num); +#ifndef DBUG_OFF + bool is_owner(); +#endif +}; |