summaryrefslogtreecommitdiffstats
path: root/storage/innobase/fil
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--storage/innobase/fil/fil0crypt.cc2642
-rw-r--r--storage/innobase/fil/fil0fil.cc3757
-rw-r--r--storage/innobase/fil/fil0pagecompress.cc613
3 files changed, 7012 insertions, 0 deletions
diff --git a/storage/innobase/fil/fil0crypt.cc b/storage/innobase/fil/fil0crypt.cc
new file mode 100644
index 00000000..240a2682
--- /dev/null
+++ b/storage/innobase/fil/fil0crypt.cc
@@ -0,0 +1,2642 @@
+/*****************************************************************************
+Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
+Copyright (c) 2014, 2021, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+/**************************************************//**
+@file fil0crypt.cc
+Innodb file space encrypt/decrypt
+
+Created Jonas Oreland Google
+Modified Jan Lindström jan.lindstrom@mariadb.com
+*******************************************************/
+
+#include "fil0crypt.h"
+#include "mtr0types.h"
+#include "mach0data.h"
+#include "page0zip.h"
+#include "buf0checksum.h"
+#ifdef UNIV_INNOCHECKSUM
+# include "buf0buf.h"
+#else
+#include "buf0dblwr.h"
+#include "srv0srv.h"
+#include "srv0start.h"
+#include "mtr0mtr.h"
+#include "mtr0log.h"
+#include "ut0ut.h"
+#include "fsp0fsp.h"
+#include "fil0pagecompress.h"
+#include <my_crypt.h>
+
+static bool fil_crypt_threads_inited = false;
+
+/** Is encryption enabled/disabled */
+UNIV_INTERN ulong srv_encrypt_tables = 0;
+
+/** No of key rotation threads requested */
+UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
+
+/** No of key rotation threads started */
+UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
+
+/** At this age or older a space/page will be rotated */
+UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
+
+/** Whether the encryption plugin does key rotation */
+static bool srv_encrypt_rotate;
+
+/** Event to signal FROM the key rotation threads. */
+static os_event_t fil_crypt_event;
+
+/** Event to signal TO the key rotation threads. */
+UNIV_INTERN os_event_t fil_crypt_threads_event;
+
+/** Event for waking up threads throttle. */
+static os_event_t fil_crypt_throttle_sleep_event;
+
+/** Mutex for key rotation threads. */
+UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
+
+/** Variable ensuring only 1 thread at time does initial conversion */
+static bool fil_crypt_start_converting = false;
+
+/** Variables for throttling */
+UNIV_INTERN uint srv_n_fil_crypt_iops = 100; // 10ms per iop
+static uint srv_alloc_time = 3; // allocate iops for 3s at a time
+static uint n_fil_crypt_iops_allocated = 0;
+
+#define DEBUG_KEYROTATION_THROTTLING 0
+
+/** Statistics variables */
+static fil_crypt_stat_t crypt_stat;
+static ib_mutex_t crypt_stat_mutex;
+
+/***********************************************************************
+Check if a key needs rotation given a key_state
+@param[in] crypt_data Encryption information
+@param[in] key_version Current key version
+@param[in] latest_key_version Latest key version
+@param[in] rotate_key_age when to rotate
+@return true if key needs rotation, false if not */
+static bool
+fil_crypt_needs_rotation(
+ const fil_space_crypt_t* crypt_data,
+ uint key_version,
+ uint latest_key_version,
+ uint rotate_key_age)
+ MY_ATTRIBUTE((warn_unused_result));
+
+/*********************************************************************
+Init space crypt */
+UNIV_INTERN
+void
+fil_space_crypt_init()
+{
+ fil_crypt_throttle_sleep_event = os_event_create(0);
+
+ mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
+ memset(&crypt_stat, 0, sizeof(crypt_stat));
+}
+
+/*********************************************************************
+Cleanup space crypt */
+UNIV_INTERN
+void
+fil_space_crypt_cleanup()
+{
+ os_event_destroy(fil_crypt_throttle_sleep_event);
+ mutex_free(&crypt_stat_mutex);
+}
+
+/**
+Get latest key version from encryption plugin.
+@return key version or ENCRYPTION_KEY_VERSION_INVALID */
+uint
+fil_space_crypt_t::key_get_latest_version(void)
+{
+ uint key_version = key_found;
+
+ if (is_key_found()) {
+ key_version = encryption_key_get_latest_version(key_id);
+ /* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
+ It doesn't matter because srv_encrypt_rotate
+ can be set to true only once */
+ if (!srv_encrypt_rotate
+ && key_version > srv_fil_crypt_rotate_key_age) {
+ srv_encrypt_rotate = true;
+ }
+
+ srv_stats.n_key_requests.inc();
+ key_found = key_version;
+ }
+
+ return key_version;
+}
+
+/******************************************************************
+Get the latest(key-version), waking the encrypt thread, if needed
+@param[in,out] crypt_data Crypt data */
+static inline
+uint
+fil_crypt_get_latest_key_version(
+ fil_space_crypt_t* crypt_data)
+{
+ ut_ad(crypt_data != NULL);
+
+ uint key_version = crypt_data->key_get_latest_version();
+
+ if (crypt_data->is_key_found()) {
+
+ if (fil_crypt_needs_rotation(
+ crypt_data,
+ crypt_data->min_key_version,
+ key_version,
+ srv_fil_crypt_rotate_key_age)) {
+ /* Below event seen as NULL-pointer at startup
+ when new database was created and we create a
+ checkpoint. Only seen when debugging. */
+ if (fil_crypt_threads_inited) {
+ os_event_set(fil_crypt_threads_event);
+ }
+ }
+ }
+
+ return key_version;
+}
+
+/******************************************************************
+Mutex helper for crypt_data->scheme */
+void
+crypt_data_scheme_locker(
+/*=====================*/
+ st_encryption_scheme* scheme,
+ int exit)
+{
+ fil_space_crypt_t* crypt_data =
+ static_cast<fil_space_crypt_t*>(scheme);
+
+ if (exit) {
+ mutex_exit(&crypt_data->mutex);
+ } else {
+ mutex_enter(&crypt_data->mutex);
+ }
+}
+
+/******************************************************************
+Create a fil_space_crypt_t object
+@param[in] type CRYPT_SCHEME_UNENCRYPTE or
+ CRYPT_SCHEME_1
+@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
+ FIL_ENCRYPTION_ON or
+ FIL_ENCRYPTION_OFF
+@param[in] min_key_version key_version or 0
+@param[in] key_id Used key id
+@return crypt object */
+static
+fil_space_crypt_t*
+fil_space_create_crypt_data(
+ uint type,
+ fil_encryption_t encrypt_mode,
+ uint min_key_version,
+ uint key_id)
+{
+ fil_space_crypt_t* crypt_data = NULL;
+ if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
+ crypt_data = new(buf)
+ fil_space_crypt_t(
+ type,
+ min_key_version,
+ key_id,
+ encrypt_mode);
+ }
+
+ return crypt_data;
+}
+
+/******************************************************************
+Create a fil_space_crypt_t object
+@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
+ FIL_ENCRYPTION_ON or
+ FIL_ENCRYPTION_OFF
+
+@param[in] key_id Encryption key id
+@return crypt object */
+UNIV_INTERN
+fil_space_crypt_t*
+fil_space_create_crypt_data(
+ fil_encryption_t encrypt_mode,
+ uint key_id)
+{
+ return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
+}
+
+/******************************************************************
+Merge fil_space_crypt_t object
+@param[in,out] dst Destination cryp data
+@param[in] src Source crypt data */
+UNIV_INTERN
+void
+fil_space_merge_crypt_data(
+ fil_space_crypt_t* dst,
+ const fil_space_crypt_t* src)
+{
+ mutex_enter(&dst->mutex);
+
+ /* validate that they are mergeable */
+ ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
+ src->type == CRYPT_SCHEME_1);
+
+ ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
+ dst->type == CRYPT_SCHEME_1);
+
+ dst->encryption = src->encryption;
+ dst->type = src->type;
+ dst->min_key_version = src->min_key_version;
+ dst->keyserver_requests += src->keyserver_requests;
+
+ mutex_exit(&dst->mutex);
+}
+
+/** Initialize encryption parameters from a tablespace header page.
+@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
+@param[in] page first page of the tablespace
+@return crypt data from page 0
+@retval NULL if not present or not valid */
+fil_space_crypt_t* fil_space_read_crypt_data(ulint zip_size, const byte* page)
+{
+ const ulint offset = FSP_HEADER_OFFSET
+ + fsp_header_get_encryption_offset(zip_size);
+
+ if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
+ /* Crypt data is not stored. */
+ return NULL;
+ }
+
+ uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
+ uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
+ fil_space_crypt_t* crypt_data;
+
+ if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
+ type == CRYPT_SCHEME_1)
+ || iv_length != sizeof crypt_data->iv) {
+ ib::error() << "Found non sensible crypt scheme: "
+ << type << "," << iv_length
+ << " for space: "
+ << page_get_space_id(page);
+ return NULL;
+ }
+
+ uint min_key_version = mach_read_from_4
+ (page + offset + MAGIC_SZ + 2 + iv_length);
+
+ uint key_id = mach_read_from_4
+ (page + offset + MAGIC_SZ + 2 + iv_length + 4);
+
+ fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
+ page + offset + MAGIC_SZ + 2 + iv_length + 8);
+
+ crypt_data = fil_space_create_crypt_data(encryption, key_id);
+ /* We need to overwrite these as above function will initialize
+ members */
+ crypt_data->type = type;
+ crypt_data->min_key_version = min_key_version;
+ memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
+
+ return crypt_data;
+}
+
+/******************************************************************
+Free a crypt data object
+@param[in,out] crypt_data crypt data to be freed */
+UNIV_INTERN
+void
+fil_space_destroy_crypt_data(
+ fil_space_crypt_t **crypt_data)
+{
+ if (crypt_data != NULL && (*crypt_data) != NULL) {
+ fil_space_crypt_t* c;
+ if (UNIV_LIKELY(fil_crypt_threads_inited)) {
+ mutex_enter(&fil_crypt_threads_mutex);
+ c = *crypt_data;
+ *crypt_data = NULL;
+ mutex_exit(&fil_crypt_threads_mutex);
+ } else {
+ ut_ad(srv_read_only_mode || !srv_was_started);
+ c = *crypt_data;
+ *crypt_data = NULL;
+ }
+ if (c) {
+ c->~fil_space_crypt_t();
+ ut_free(c);
+ }
+ }
+}
+
+/** Amend encryption information from redo log.
+@param[in] space tablespace
+@param[in] data encryption metadata */
+void fil_crypt_parse(fil_space_t* space, const byte* data)
+{
+ ut_ad(data[1] == MY_AES_BLOCK_SIZE);
+ if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
+ fil_space_crypt_t* crypt_data = new(buf)
+ fil_space_crypt_t(
+ data[0],
+ mach_read_from_4(&data[2 + MY_AES_BLOCK_SIZE]),
+ mach_read_from_4(&data[6 + MY_AES_BLOCK_SIZE]),
+ static_cast<fil_encryption_t>
+ (data[10 + MY_AES_BLOCK_SIZE]));
+ memcpy(crypt_data->iv, data + 2, MY_AES_BLOCK_SIZE);
+ mutex_enter(&fil_system.mutex);
+ if (space->crypt_data) {
+ fil_space_merge_crypt_data(space->crypt_data,
+ crypt_data);
+ fil_space_destroy_crypt_data(&crypt_data);
+ crypt_data = space->crypt_data;
+ } else {
+ space->crypt_data = crypt_data;
+ }
+ mutex_exit(&fil_system.mutex);
+ }
+}
+
+/** Fill crypt data information to the give page.
+It should be called during ibd file creation.
+@param[in] flags tablespace flags
+@param[in,out] page first page of the tablespace */
+void
+fil_space_crypt_t::fill_page0(
+ ulint flags,
+ byte* page)
+{
+ const uint len = sizeof(iv);
+ const ulint offset = FSP_HEADER_OFFSET
+ + fsp_header_get_encryption_offset(
+ fil_space_t::zip_size(flags));
+
+ memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
+ mach_write_to_1(page + offset + MAGIC_SZ, type);
+ mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
+ memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
+
+ mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
+ min_key_version);
+ mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
+ key_id);
+ mach_write_to_1(page + offset + MAGIC_SZ + 2 + len + 8,
+ encryption);
+}
+
+/** Write encryption metadata to the first page.
+@param[in,out] block first page of the tablespace
+@param[in,out] mtr mini-transaction */
+void fil_space_crypt_t::write_page0(buf_block_t* block, mtr_t* mtr)
+{
+ const ulint offset = FSP_HEADER_OFFSET
+ + fsp_header_get_encryption_offset(block->zip_size());
+ byte* b = block->frame + offset;
+
+ mtr->memcpy<mtr_t::MAYBE_NOP>(*block, b, CRYPT_MAGIC, MAGIC_SZ);
+
+ b += MAGIC_SZ;
+ byte* const start = b;
+ *b++ = static_cast<byte>(type);
+ compile_time_assert(sizeof iv == MY_AES_BLOCK_SIZE);
+ compile_time_assert(sizeof iv == CRYPT_SCHEME_1_IV_LEN);
+ *b++ = sizeof iv;
+ memcpy(b, iv, sizeof iv);
+ b += sizeof iv;
+ mach_write_to_4(b, min_key_version);
+ b += 4;
+ mach_write_to_4(b, key_id);
+ b += 4;
+ *b++ = byte(encryption);
+ ut_ad(b - start == 11 + MY_AES_BLOCK_SIZE);
+ /* We must log also any unchanged bytes, because recovery will
+ invoke fil_crypt_parse() based on this log record. */
+ mtr->memcpy(*block, offset + MAGIC_SZ, b - start);
+}
+
+/** Encrypt a buffer for non full checksum.
+@param[in,out] crypt_data Crypt data
+@param[in] space space_id
+@param[in] offset Page offset
+@param[in] lsn Log sequence number
+@param[in] src_frame Page to encrypt
+@param[in] zip_size ROW_FORMAT=COMPRESSED
+ page size, or 0
+@param[in,out] dst_frame Output buffer
+@return encrypted buffer or NULL */
+static byte* fil_encrypt_buf_for_non_full_checksum(
+ fil_space_crypt_t* crypt_data,
+ ulint space,
+ ulint offset,
+ lsn_t lsn,
+ const byte* src_frame,
+ ulint zip_size,
+ byte* dst_frame)
+{
+ uint size = uint(zip_size ? zip_size : srv_page_size);
+ uint key_version = fil_crypt_get_latest_key_version(crypt_data);
+ ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
+ ut_ad(!ut_align_offset(src_frame, 8));
+ ut_ad(!ut_align_offset(dst_frame, 8));
+
+ const bool page_compressed = fil_page_get_type(src_frame)
+ == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED;
+ uint header_len = FIL_PAGE_DATA;
+
+ if (page_compressed) {
+ header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
+ }
+
+ /* FIL page header is not encrypted */
+ memcpy(dst_frame, src_frame, header_len);
+ mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
+ key_version);
+
+ /* Calculate the start offset in a page */
+ uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
+ uint srclen = size - unencrypted_bytes;
+ const byte* src = src_frame + header_len;
+ byte* dst = dst_frame + header_len;
+ uint32 dstlen = 0;
+ ib_uint32_t checksum = 0;
+
+ if (page_compressed) {
+ srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
+ }
+
+ int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
+ crypt_data, key_version,
+ (uint32)space, (uint32)offset, lsn);
+ ut_a(rc == MY_AES_OK);
+ ut_a(dstlen == srclen);
+
+ /* For compressed tables we do not store the FIL header because
+ the whole page is not stored to the disk. In compressed tables only
+ the FIL header + compressed (and now encrypted) payload alligned
+ to sector boundary is written. */
+ if (!page_compressed) {
+ /* FIL page trailer is also not encrypted */
+ static_assert(FIL_PAGE_DATA_END == 8, "alignment");
+ memcpy_aligned<8>(dst_frame + size - FIL_PAGE_DATA_END,
+ src_frame + size - FIL_PAGE_DATA_END, 8);
+ } else {
+ /* Clean up rest of buffer */
+ memset(dst_frame+header_len+srclen, 0,
+ size - (header_len + srclen));
+ }
+
+ checksum = fil_crypt_calculate_checksum(zip_size, dst_frame);
+
+ /* store the post-encryption checksum after the key-version */
+ mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4,
+ checksum);
+
+ ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
+
+ srv_stats.pages_encrypted.inc();
+
+ return dst_frame;
+}
+
+/** Encrypt a buffer for full checksum format.
+@param[in,out] crypt_data Crypt data
+@param[in] space space_id
+@param[in] offset Page offset
+@param[in] lsn Log sequence number
+@param[in] src_frame Page to encrypt
+@param[in,out] dst_frame Output buffer
+@return encrypted buffer or NULL */
+static byte* fil_encrypt_buf_for_full_crc32(
+ fil_space_crypt_t* crypt_data,
+ ulint space,
+ ulint offset,
+ lsn_t lsn,
+ const byte* src_frame,
+ byte* dst_frame)
+{
+ uint key_version = fil_crypt_get_latest_key_version(crypt_data);
+ ut_d(bool corrupted = false);
+ const uint size = buf_page_full_crc32_size(src_frame, NULL,
+#ifdef UNIV_DEBUG
+ &corrupted
+#else
+ NULL
+#endif
+ );
+ ut_ad(!corrupted);
+ uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ + FIL_PAGE_FCRC32_CHECKSUM);
+ const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
+ byte* dst = dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
+ uint dstlen = 0;
+
+ ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
+
+ /* Till FIL_PAGE_LSN, page is not encrypted */
+ memcpy(dst_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
+
+ /* Write key version to the page. */
+ mach_write_to_4(dst_frame + FIL_PAGE_FCRC32_KEY_VERSION, key_version);
+
+ int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
+ crypt_data, key_version,
+ uint(space), uint(offset), lsn);
+ ut_a(rc == MY_AES_OK);
+ ut_a(dstlen == srclen);
+
+ const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM;
+ mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload));
+ /* Clean the rest of the buffer. FIXME: Punch holes when writing! */
+ memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4));
+
+ srv_stats.pages_encrypted.inc();
+
+ return dst_frame;
+}
+
+/** Encrypt a buffer.
+@param[in,out] crypt_data Crypt data
+@param[in] space space_id
+@param[in] offset Page offset
+@param[in] src_frame Page to encrypt
+@param[in] zip_size ROW_FORMAT=COMPRESSED
+ page size, or 0
+@param[in,out] dst_frame Output buffer
+@param[in] use_full_checksum full crc32 algo is used
+@return encrypted buffer or NULL */
+byte* fil_encrypt_buf(
+ fil_space_crypt_t* crypt_data,
+ ulint space,
+ ulint offset,
+ const byte* src_frame,
+ ulint zip_size,
+ byte* dst_frame,
+ bool use_full_checksum)
+{
+ const lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
+
+ if (use_full_checksum) {
+ ut_ad(!zip_size);
+ return fil_encrypt_buf_for_full_crc32(
+ crypt_data, space, offset,
+ lsn, src_frame, dst_frame);
+ }
+
+ return fil_encrypt_buf_for_non_full_checksum(
+ crypt_data, space, offset, lsn,
+ src_frame, zip_size, dst_frame);
+}
+
+/** Check whether these page types are allowed to encrypt.
+@param[in] space tablespace object
+@param[in] src_frame source page
+@return true if it is valid page type */
+static bool fil_space_encrypt_valid_page_type(
+ const fil_space_t* space,
+ const byte* src_frame)
+{
+ switch (fil_page_get_type(src_frame)) {
+ case FIL_PAGE_RTREE:
+ return space->full_crc32();
+ case FIL_PAGE_TYPE_FSP_HDR:
+ case FIL_PAGE_TYPE_XDES:
+ return false;
+ }
+
+ return true;
+}
+
+/******************************************************************
+Encrypt a page
+
+@param[in] space Tablespace
+@param[in] offset Page offset
+@param[in] src_frame Page to encrypt
+@param[in,out] dst_frame Output buffer
+@return encrypted buffer or NULL */
+byte* fil_space_encrypt(
+ const fil_space_t* space,
+ ulint offset,
+ byte* src_frame,
+ byte* dst_frame)
+{
+ if (!fil_space_encrypt_valid_page_type(space, src_frame)) {
+ return src_frame;
+ }
+
+ if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
+ return (src_frame);
+ }
+
+ ut_ad(space->referenced());
+
+ return fil_encrypt_buf(space->crypt_data, space->id, offset,
+ src_frame, space->zip_size(),
+ dst_frame, space->full_crc32());
+}
+
+/** Decrypt a page for full checksum format.
+@param[in] space space id
+@param[in] crypt_data crypt_data
+@param[in] tmp_frame Temporary buffer
+@param[in,out] src_frame Page to decrypt
+@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
+@return true if page decrypted, false if not.*/
+static bool fil_space_decrypt_full_crc32(
+ ulint space,
+ fil_space_crypt_t* crypt_data,
+ byte* tmp_frame,
+ byte* src_frame,
+ dberr_t* err)
+{
+ uint key_version = mach_read_from_4(
+ src_frame + FIL_PAGE_FCRC32_KEY_VERSION);
+ lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
+ uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
+ *err = DB_SUCCESS;
+
+ if (key_version == ENCRYPTION_KEY_NOT_ENCRYPTED) {
+ return false;
+ }
+
+ ut_ad(crypt_data);
+ ut_ad(crypt_data->is_encrypted());
+
+ memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
+
+ /* Calculate the offset where decryption starts */
+ const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
+ byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
+ uint dstlen = 0;
+ bool corrupted = false;
+ uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted);
+ if (UNIV_UNLIKELY(corrupted)) {
+fail:
+ *err = DB_DECRYPTION_FAILED;
+ return false;
+ }
+
+ uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ + FIL_PAGE_FCRC32_CHECKSUM);
+
+ int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
+ crypt_data, key_version,
+ (uint) space, offset, lsn);
+
+ if (rc != MY_AES_OK || dstlen != srclen) {
+ if (rc == -1) {
+ goto fail;
+ }
+
+ ib::fatal() << "Unable to decrypt data-block "
+ << " src: " << src << "srclen: "
+ << srclen << " buf: " << dst << "buflen: "
+ << dstlen << " return-code: " << rc
+ << " Can't continue!";
+ }
+
+ /* Copy only checksum part in the trailer */
+ memcpy(tmp_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
+ src_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
+ FIL_PAGE_FCRC32_CHECKSUM);
+
+ srv_stats.pages_decrypted.inc();
+
+ return true; /* page was decrypted */
+}
+
+/** Decrypt a page for non full checksum format.
+@param[in] crypt_data crypt_data
+@param[in] tmp_frame Temporary buffer
+@param[in] physical_size page size
+@param[in,out] src_frame Page to decrypt
+@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
+@return true if page decrypted, false if not.*/
+static bool fil_space_decrypt_for_non_full_checksum(
+ fil_space_crypt_t* crypt_data,
+ byte* tmp_frame,
+ ulint physical_size,
+ byte* src_frame,
+ dberr_t* err)
+{
+ uint key_version = mach_read_from_4(
+ src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
+ bool page_compressed = (fil_page_get_type(src_frame)
+ == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
+ uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
+ uint space = mach_read_from_4(
+ src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
+ ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
+
+ *err = DB_SUCCESS;
+
+ if (key_version == ENCRYPTION_KEY_NOT_ENCRYPTED) {
+ return false;
+ }
+
+ ut_a(crypt_data != NULL && crypt_data->is_encrypted());
+
+ /* read space & lsn */
+ uint header_len = FIL_PAGE_DATA;
+
+ if (page_compressed) {
+ header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
+ }
+
+ /* Copy FIL page header, it is not encrypted */
+ memcpy(tmp_frame, src_frame, header_len);
+
+ /* Calculate the offset where decryption starts */
+ const byte* src = src_frame + header_len;
+ byte* dst = tmp_frame + header_len;
+ uint32 dstlen = 0;
+ uint srclen = uint(physical_size) - header_len - FIL_PAGE_DATA_END;
+
+ if (page_compressed) {
+ srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
+ }
+
+ int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
+ crypt_data, key_version,
+ space, offset, lsn);
+
+ if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
+
+ if (rc == -1) {
+ *err = DB_DECRYPTION_FAILED;
+ return false;
+ }
+
+ ib::fatal() << "Unable to decrypt data-block "
+ << " src: " << static_cast<const void*>(src)
+ << "srclen: "
+ << srclen << " buf: "
+ << static_cast<const void*>(dst) << "buflen: "
+ << dstlen << " return-code: " << rc
+ << " Can't continue!";
+ }
+
+ /* For compressed tables we do not store the FIL header because
+ the whole page is not stored to the disk. In compressed tables only
+ the FIL header + compressed (and now encrypted) payload alligned
+ to sector boundary is written. */
+ if (!page_compressed) {
+ /* Copy FIL trailer */
+ memcpy(tmp_frame + physical_size - FIL_PAGE_DATA_END,
+ src_frame + physical_size - FIL_PAGE_DATA_END,
+ FIL_PAGE_DATA_END);
+ }
+
+ srv_stats.pages_decrypted.inc();
+
+ return true; /* page was decrypted */
+}
+
+/** Decrypt a page.
+@param[in] space_id tablespace id
+@param[in] crypt_data crypt_data
+@param[in] tmp_frame Temporary buffer
+@param[in] physical_size page size
+@param[in] fsp_flags Tablespace flags
+@param[in,out] src_frame Page to decrypt
+@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
+@return true if page decrypted, false if not.*/
+UNIV_INTERN
+bool
+fil_space_decrypt(
+ ulint space_id,
+ fil_space_crypt_t* crypt_data,
+ byte* tmp_frame,
+ ulint physical_size,
+ ulint fsp_flags,
+ byte* src_frame,
+ dberr_t* err)
+{
+ if (fil_space_t::full_crc32(fsp_flags)) {
+ return fil_space_decrypt_full_crc32(
+ space_id, crypt_data, tmp_frame, src_frame, err);
+ }
+
+ return fil_space_decrypt_for_non_full_checksum(crypt_data, tmp_frame,
+ physical_size, src_frame,
+ err);
+}
+
+/**
+Decrypt a page.
+@param[in] space Tablespace
+@param[in] tmp_frame Temporary buffer used for decrypting
+@param[in,out] src_frame Page to decrypt
+@return decrypted page, or original not encrypted page if decryption is
+not needed.*/
+UNIV_INTERN
+byte*
+fil_space_decrypt(
+ const fil_space_t* space,
+ byte* tmp_frame,
+ byte* src_frame)
+{
+ dberr_t err = DB_SUCCESS;
+ byte* res = NULL;
+ const ulint physical_size = space->physical_size();
+
+ ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
+ ut_ad(space->referenced());
+
+ bool encrypted = fil_space_decrypt(space->id, space->crypt_data,
+ tmp_frame, physical_size,
+ space->flags,
+ src_frame, &err);
+
+ if (err == DB_SUCCESS) {
+ if (encrypted) {
+ /* Copy the decrypted page back to page buffer, not
+ really any other options. */
+ memcpy(src_frame, tmp_frame, physical_size);
+ }
+
+ res = src_frame;
+ }
+
+ return res;
+}
+
+/**
+Calculate post encryption checksum
+@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
+@param[in] dst_frame Block where checksum is calculated
+@return page checksum
+not needed. */
+uint32_t
+fil_crypt_calculate_checksum(ulint zip_size, const byte* dst_frame)
+{
+ /* For encrypted tables we use only crc32 and strict_crc32 */
+ return zip_size
+ ? page_zip_calc_checksum(dst_frame, zip_size,
+ SRV_CHECKSUM_ALGORITHM_CRC32)
+ : buf_calc_page_crc32(dst_frame);
+}
+
+/***********************************************************************/
+
+/** A copy of global key state */
+struct key_state_t {
+ key_state_t() : key_id(0), key_version(0),
+ rotate_key_age(srv_fil_crypt_rotate_key_age) {}
+ bool operator==(const key_state_t& other) const {
+ return key_version == other.key_version &&
+ rotate_key_age == other.rotate_key_age;
+ }
+ uint key_id;
+ uint key_version;
+ uint rotate_key_age;
+};
+
+/***********************************************************************
+Copy global key state
+@param[in,out] new_state key state
+@param[in] crypt_data crypt data */
+static void
+fil_crypt_get_key_state(
+ key_state_t* new_state,
+ fil_space_crypt_t* crypt_data)
+{
+ if (srv_encrypt_tables) {
+ new_state->key_version = crypt_data->key_get_latest_version();
+ new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
+
+ ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
+ } else {
+ new_state->key_version = 0;
+ new_state->rotate_key_age = 0;
+ }
+}
+
+/***********************************************************************
+Check if a key needs rotation given a key_state
+@param[in] crypt_data Encryption information
+@param[in] key_version Current key version
+@param[in] latest_key_version Latest key version
+@param[in] rotate_key_age when to rotate
+@return true if key needs rotation, false if not */
+static bool
+fil_crypt_needs_rotation(
+ const fil_space_crypt_t* crypt_data,
+ uint key_version,
+ uint latest_key_version,
+ uint rotate_key_age)
+{
+ if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
+ return false;
+ }
+
+ if (key_version == 0 && latest_key_version != 0) {
+ /* this is rotation unencrypted => encrypted
+ * ignore rotate_key_age */
+ return true;
+ }
+
+ if (latest_key_version == 0 && key_version != 0) {
+ if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
+ /* this is rotation encrypted => unencrypted */
+ return true;
+ }
+ return false;
+ }
+
+ if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
+ && crypt_data->type == CRYPT_SCHEME_1
+ && !srv_encrypt_tables) {
+ /* This is rotation encrypted => unencrypted */
+ return true;
+ }
+
+ if (rotate_key_age == 0) {
+ return false;
+ }
+
+ /* this is rotation encrypted => encrypted,
+ * only reencrypt if key is sufficiently old */
+ if (key_version + rotate_key_age < latest_key_version) {
+ return true;
+ }
+
+ return false;
+}
+
+/** Read page 0 and possible crypt data from there.
+@param[in,out] space Tablespace */
+static inline
+void
+fil_crypt_read_crypt_data(fil_space_t* space)
+{
+ if (space->crypt_data || space->size || !space->get_size()) {
+ /* The encryption metadata has already been read, or
+ the tablespace is not encrypted and the file has been
+ opened already, or the file cannot be accessed,
+ likely due to a concurrent DROP
+ (possibly as part of TRUNCATE or ALTER TABLE).
+ FIXME: The file can become unaccessible any time
+ after this check! We should really remove this
+ function and instead make crypt_data an integral
+ part of fil_space_t. */
+ return;
+ }
+
+ const ulint zip_size = space->zip_size();
+ mtr_t mtr;
+ mtr.start();
+ if (buf_block_t* block = buf_page_get_gen(page_id_t(space->id, 0),
+ zip_size, RW_S_LATCH,
+ nullptr,
+ BUF_GET_POSSIBLY_FREED,
+ __FILE__, __LINE__, &mtr)) {
+ if (block->page.status == buf_page_t::FREED) {
+ goto func_exit;
+ }
+ mutex_enter(&fil_system.mutex);
+ if (!space->crypt_data && !space->is_stopping()) {
+ space->crypt_data = fil_space_read_crypt_data(
+ zip_size, block->frame);
+ }
+ mutex_exit(&fil_system.mutex);
+ }
+func_exit:
+ mtr.commit();
+}
+
+/** Start encrypting a space
+@param[in,out] space Tablespace
+@return true if a recheck of tablespace is needed by encryption thread. */
+static bool fil_crypt_start_encrypting_space(fil_space_t* space)
+{
+ mutex_enter(&fil_crypt_threads_mutex);
+
+ fil_space_crypt_t *crypt_data = space->crypt_data;
+
+ /* If space is not encrypted and encryption is not enabled, then
+ do not continue encrypting the space. */
+ if (!crypt_data && !srv_encrypt_tables) {
+ mutex_exit(&fil_crypt_threads_mutex);
+ return false;
+ }
+
+ const bool recheck = fil_crypt_start_converting;
+
+ if (recheck || crypt_data || space->is_stopping()) {
+ mutex_exit(&fil_crypt_threads_mutex);
+ return recheck;
+ }
+
+ /* NOTE: we need to write and flush page 0 before publishing
+ * the crypt data. This so that after restart there is no
+ * risk of finding encrypted pages without having
+ * crypt data in page 0 */
+
+ /* 1 - create crypt data */
+ crypt_data = fil_space_create_crypt_data(
+ FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
+
+ if (crypt_data == NULL) {
+ mutex_exit(&fil_crypt_threads_mutex);
+ return false;
+ }
+
+ fil_crypt_start_converting = true;
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ mtr_t mtr;
+ mtr.start();
+
+ /* 2 - get page 0 */
+ dberr_t err = DB_SUCCESS;
+ if (buf_block_t* block = buf_page_get_gen(
+ page_id_t(space->id, 0), space->zip_size(),
+ RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED,
+ __FILE__, __LINE__, &mtr, &err)) {
+ if (block->page.status == buf_page_t::FREED) {
+ goto abort;
+ }
+
+ crypt_data->type = CRYPT_SCHEME_1;
+ crypt_data->min_key_version = 0; // all pages are unencrypted
+ crypt_data->rotate_state.start_time = time(0);
+ crypt_data->rotate_state.starting = true;
+ crypt_data->rotate_state.active_threads = 1;
+
+ mutex_enter(&fil_system.mutex);
+ const bool stopping = space->is_stopping();
+ if (!stopping) {
+ space->crypt_data = crypt_data;
+ }
+ mutex_exit(&fil_system.mutex);
+
+ if (stopping) {
+ goto abort;
+ }
+
+ /* 3 - write crypt data to page 0 */
+ mtr.set_named_space(space);
+ crypt_data->write_page0(block, &mtr);
+
+ mtr.commit();
+
+ /* 4 - sync tablespace before publishing crypt data */
+ while (buf_flush_list_space(space));
+
+ /* 5 - publish crypt data */
+ mutex_enter(&fil_crypt_threads_mutex);
+ mutex_enter(&crypt_data->mutex);
+ crypt_data->type = CRYPT_SCHEME_1;
+ ut_a(crypt_data->rotate_state.active_threads == 1);
+ crypt_data->rotate_state.active_threads = 0;
+ crypt_data->rotate_state.starting = false;
+
+ fil_crypt_start_converting = false;
+ mutex_exit(&crypt_data->mutex);
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ return false;
+ }
+
+abort:
+ mtr.commit();
+ mutex_enter(&fil_crypt_threads_mutex);
+ fil_crypt_start_converting = false;
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ crypt_data->~fil_space_crypt_t();
+ ut_free(crypt_data);
+ return false;
+}
+
+/** State of a rotation thread */
+struct rotate_thread_t {
+ explicit rotate_thread_t(uint no) {
+ memset(this, 0, sizeof(* this));
+ thread_no = no;
+ first = true;
+ estimated_max_iops = 20;
+ }
+
+ uint thread_no;
+ bool first; /*!< is position before first space */
+ fil_space_t* space; /*!< current space or NULL */
+ uint32_t offset; /*!< current page number */
+ ulint batch; /*!< #pages to rotate */
+ uint min_key_version_found;/*!< min key version found but not rotated */
+ lsn_t end_lsn; /*!< max lsn when rotating this space */
+
+ uint estimated_max_iops; /*!< estimation of max iops */
+ uint allocated_iops; /*!< allocated iops */
+ ulint cnt_waited; /*!< #times waited during this slot */
+ uintmax_t sum_waited_us; /*!< wait time during this slot */
+
+ fil_crypt_stat_t crypt_stat; // statistics
+
+ /** @return whether this thread should terminate */
+ bool should_shutdown() const {
+ switch (srv_shutdown_state) {
+ case SRV_SHUTDOWN_NONE:
+ return thread_no >= srv_n_fil_crypt_threads;
+ case SRV_SHUTDOWN_EXIT_THREADS:
+ /* srv_init_abort() must have been invoked */
+ case SRV_SHUTDOWN_CLEANUP:
+ case SRV_SHUTDOWN_INITIATED:
+ return true;
+ case SRV_SHUTDOWN_LAST_PHASE:
+ break;
+ }
+ ut_ad(0);
+ return true;
+ }
+};
+
+/** Avoid the removal of the tablespace from
+default_encrypt_list only when
+1) Another active encryption thread working on tablespace
+2) Eligible for tablespace key rotation
+3) Tablespace is in flushing phase
+@return true if tablespace should be removed from
+default encrypt */
+static bool fil_crypt_must_remove(const fil_space_t &space)
+{
+ ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
+ fil_space_crypt_t *crypt_data = space.crypt_data;
+ ut_ad(mutex_own(&fil_system.mutex));
+ const ulong encrypt_tables= srv_encrypt_tables;
+ if (!crypt_data)
+ return !encrypt_tables;
+ if (!crypt_data->is_key_found())
+ return true;
+
+ mutex_enter(&crypt_data->mutex);
+ const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
+ (!crypt_data->rotate_state.flushing &&
+ !encrypt_tables == !!crypt_data->min_key_version &&
+ !crypt_data->rotate_state.active_threads);
+ mutex_exit(&crypt_data->mutex);
+ return remove;
+}
+
+/***********************************************************************
+Check if space needs rotation given a key_state
+@param[in,out] state Key rotation state
+@param[in,out] key_state Key state
+@param[in,out] recheck needs recheck ?
+@return true if space needs key rotation */
+static
+bool
+fil_crypt_space_needs_rotation(
+ rotate_thread_t* state,
+ key_state_t* key_state,
+ bool* recheck)
+{
+ fil_space_t* space = state->space;
+
+ /* Make sure that tablespace is normal tablespace */
+ if (space->purpose != FIL_TYPE_TABLESPACE) {
+ return false;
+ }
+
+ ut_ad(space->referenced());
+
+ fil_space_crypt_t *crypt_data = space->crypt_data;
+
+ if (crypt_data == NULL) {
+ /**
+ * space has no crypt data
+ * start encrypting it...
+ */
+ *recheck = fil_crypt_start_encrypting_space(space);
+ crypt_data = space->crypt_data;
+
+ if (crypt_data == NULL) {
+ return false;
+ }
+
+ crypt_data->key_get_latest_version();
+ }
+
+ /* If used key_id is not found from encryption plugin we can't
+ continue to rotate the tablespace */
+ if (!crypt_data->is_key_found()) {
+ return false;
+ }
+
+ bool need_key_rotation = false;
+ mutex_enter(&crypt_data->mutex);
+
+ do {
+ /* prevent threads from starting to rotate space */
+ if (crypt_data->rotate_state.starting) {
+ /* recheck this space later */
+ *recheck = true;
+ break;
+ }
+
+ /* prevent threads from starting to rotate space */
+ if (space->is_stopping()) {
+ break;
+ }
+
+ if (crypt_data->rotate_state.flushing) {
+ break;
+ }
+
+ /* No need to rotate space if encryption is disabled */
+ if (crypt_data->not_encrypted()) {
+ break;
+ }
+
+ if (crypt_data->key_id != key_state->key_id) {
+ key_state->key_id= crypt_data->key_id;
+ fil_crypt_get_key_state(key_state, crypt_data);
+ }
+
+ need_key_rotation = fil_crypt_needs_rotation(
+ crypt_data,
+ crypt_data->min_key_version,
+ key_state->key_version,
+ key_state->rotate_key_age);
+ } while (0);
+
+ mutex_exit(&crypt_data->mutex);
+ return need_key_rotation;
+}
+
+/***********************************************************************
+Update global statistics with thread statistics
+@param[in,out] state key rotation statistics */
+static void
+fil_crypt_update_total_stat(
+ rotate_thread_t *state)
+{
+ mutex_enter(&crypt_stat_mutex);
+ crypt_stat.pages_read_from_cache +=
+ state->crypt_stat.pages_read_from_cache;
+ crypt_stat.pages_read_from_disk +=
+ state->crypt_stat.pages_read_from_disk;
+ crypt_stat.pages_modified += state->crypt_stat.pages_modified;
+ crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
+ // remote old estimate
+ crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
+ // add new estimate
+ crypt_stat.estimated_iops += state->estimated_max_iops;
+ mutex_exit(&crypt_stat_mutex);
+
+ // make new estimate "current" estimate
+ memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
+ // record our old (current) estimate
+ state->crypt_stat.estimated_iops = state->estimated_max_iops;
+}
+
+/***********************************************************************
+Allocate iops to thread from global setting,
+used before starting to rotate a space.
+@param[in,out] state Rotation state
+@return true if allocation succeeded, false if failed */
+static
+bool
+fil_crypt_alloc_iops(
+ rotate_thread_t *state)
+{
+ ut_ad(state->allocated_iops == 0);
+
+ /* We have not yet selected the space to rotate, thus
+ state might not contain space and we can't check
+ its status yet. */
+
+ uint max_iops = state->estimated_max_iops;
+ mutex_enter(&fil_crypt_threads_mutex);
+
+ if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
+ /* this can happen when user decreases srv_fil_crypt_iops */
+ mutex_exit(&fil_crypt_threads_mutex);
+ return false;
+ }
+
+ uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
+
+ if (alloc > max_iops) {
+ alloc = max_iops;
+ }
+
+ n_fil_crypt_iops_allocated += alloc;
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ state->allocated_iops = alloc;
+
+ return alloc > 0;
+}
+
+/***********************************************************************
+Reallocate iops to thread,
+used when inside a space
+@param[in,out] state Rotation state */
+static
+void
+fil_crypt_realloc_iops(
+ rotate_thread_t *state)
+{
+ ut_a(state->allocated_iops > 0);
+
+ if (10 * state->cnt_waited > state->batch) {
+ /* if we waited more than 10% re-estimate max_iops */
+ ulint avg_wait_time_us =
+ ulint(state->sum_waited_us / state->cnt_waited);
+
+ if (avg_wait_time_us == 0) {
+ avg_wait_time_us = 1; // prevent division by zero
+ }
+
+ DBUG_PRINT("ib_crypt",
+ ("thr_no: %u - update estimated_max_iops from %u to "
+ ULINTPF ".",
+ state->thread_no,
+ state->estimated_max_iops,
+ 1000000 / avg_wait_time_us));
+
+ state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
+ state->cnt_waited = 0;
+ state->sum_waited_us = 0;
+ } else {
+ DBUG_PRINT("ib_crypt",
+ ("thr_no: %u only waited " ULINTPF
+ "%% skip re-estimate.",
+ state->thread_no,
+ (100 * state->cnt_waited)
+ / (state->batch ? state->batch : 1)));
+ }
+
+ if (state->estimated_max_iops <= state->allocated_iops) {
+ /* return extra iops */
+ uint extra = state->allocated_iops - state->estimated_max_iops;
+
+ if (extra > 0) {
+ mutex_enter(&fil_crypt_threads_mutex);
+ if (n_fil_crypt_iops_allocated < extra) {
+ /* unknown bug!
+ * crash in debug
+ * keep n_fil_crypt_iops_allocated unchanged
+ * in release */
+ ut_ad(0);
+ extra = 0;
+ }
+ n_fil_crypt_iops_allocated -= extra;
+ state->allocated_iops -= extra;
+
+ if (state->allocated_iops == 0) {
+ /* no matter how slow io system seems to be
+ * never decrease allocated_iops to 0... */
+ state->allocated_iops ++;
+ n_fil_crypt_iops_allocated ++;
+ }
+
+ os_event_set(fil_crypt_threads_event);
+ mutex_exit(&fil_crypt_threads_mutex);
+ }
+ } else {
+ /* see if there are more to get */
+ mutex_enter(&fil_crypt_threads_mutex);
+ if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
+ /* there are extra iops free */
+ uint extra = srv_n_fil_crypt_iops -
+ n_fil_crypt_iops_allocated;
+ if (state->allocated_iops + extra >
+ state->estimated_max_iops) {
+ /* but don't alloc more than our max */
+ extra = state->estimated_max_iops -
+ state->allocated_iops;
+ }
+ n_fil_crypt_iops_allocated += extra;
+ state->allocated_iops += extra;
+
+ DBUG_PRINT("ib_crypt",
+ ("thr_no: %u increased iops from %u to %u.",
+ state->thread_no,
+ state->allocated_iops - extra,
+ state->allocated_iops));
+
+ }
+ mutex_exit(&fil_crypt_threads_mutex);
+ }
+
+ fil_crypt_update_total_stat(state);
+}
+
+/** Release excess allocated iops
+@param state rotation state
+@param wake whether to wake up other threads */
+static void fil_crypt_return_iops(rotate_thread_t *state, bool wake= true)
+{
+ if (state->allocated_iops > 0) {
+ uint iops = state->allocated_iops;
+ mutex_enter(&fil_crypt_threads_mutex);
+ if (n_fil_crypt_iops_allocated < iops) {
+ /* unknown bug!
+ * crash in debug
+ * keep n_fil_crypt_iops_allocated unchanged
+ * in release */
+ ut_ad(0);
+ iops = 0;
+ }
+
+ n_fil_crypt_iops_allocated -= iops;
+ state->allocated_iops = 0;
+ if (wake) {
+ os_event_set(fil_crypt_threads_event);
+ }
+ mutex_exit(&fil_crypt_threads_mutex);
+ }
+
+ fil_crypt_update_total_stat(state);
+}
+
+/** Acquire a tablespace reference.
+@return whether a tablespace reference was successfully acquired */
+inline bool fil_space_t::acquire_if_not_stopped()
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ const uint32_t n= acquire_low();
+ if (UNIV_LIKELY(!(n & (STOPPING | CLOSING))))
+ return true;
+ if (UNIV_UNLIKELY(n & STOPPING))
+ return false;
+ return UNIV_LIKELY(!(n & CLOSING)) || prepare(true);
+}
+
+bool fil_crypt_must_default_encrypt()
+{
+ return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
+}
+
+/** Return the next tablespace from default_encrypt_tables list.
+@param space previous tablespace (nullptr to start from the start)
+@param recheck whether the removal condition needs to be rechecked after
+the encryption parameters were changed
+@param encrypt expected state of innodb_encrypt_tables
+@return the next tablespace to process (n_pending_ops incremented)
+@retval fil_system.temp_space if there is no work to do
+@retval nullptr upon reaching the end of the iteration */
+inline fil_space_t *fil_system_t::default_encrypt_next(fil_space_t *space,
+ bool recheck,
+ bool encrypt)
+{
+ ut_ad(mutex_own(&mutex));
+
+ sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
+ space && space->is_in_default_encrypt
+ ? space
+ : default_encrypt_tables.begin();
+ const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
+ default_encrypt_tables.end();
+
+ if (space)
+ {
+ const bool released= !space->release();
+
+ if (space->is_in_default_encrypt)
+ {
+ while (++it != end &&
+ (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
+
+ /* If one of the encryption threads already started
+ the encryption of the table then don't remove the
+ unencrypted spaces from default encrypt list.
+
+ If there is a change in innodb_encrypt_tables variables
+ value then don't remove the last processed tablespace
+ from the default encrypt list. */
+ if (released && !recheck && fil_crypt_must_remove(*space))
+ {
+ ut_a(!default_encrypt_tables.empty());
+ default_encrypt_tables.remove(*space);
+ space->is_in_default_encrypt= false;
+ }
+ }
+ }
+ else while (it != end &&
+ (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
+ {
+ /* Find the next suitable default encrypt table if
+ beginning of default_encrypt_tables list has been scheduled
+ to be deleted */
+ it++;
+ }
+
+ if (it == end)
+ return temp_space;
+
+ do
+ {
+ space= &*it;
+ if (space->acquire_if_not_stopped())
+ return space;
+ if (++it == end)
+ return nullptr;
+ }
+ while (!UT_LIST_GET_LEN(it->chain) || it->is_stopping());
+
+ return nullptr;
+}
+
+/** Determine the next tablespace for encryption key rotation.
+@param space current tablespace (nullptr to start from the beginning)
+@param recheck whether the removal condition needs to be rechecked after
+encryption parameters were changed
+@param encrypt expected state of innodb_encrypt_tables
+@return the next tablespace
+@retval fil_system.temp_space if there is no work to do
+@retval nullptr upon reaching the end of the iteration */
+inline fil_space_t *fil_space_t::next(fil_space_t *space, bool recheck,
+ bool encrypt)
+{
+ mutex_enter(&fil_system.mutex);
+
+ if (fil_crypt_must_default_encrypt())
+ space= fil_system.default_encrypt_next(space, recheck, encrypt);
+ else
+ {
+ if (!space)
+ space= UT_LIST_GET_FIRST(fil_system.space_list);
+ else
+ {
+ /* Move on to the next fil_space_t */
+ space->release();
+ space= UT_LIST_GET_NEXT(space_list, space);
+ }
+
+ for (; space; space= UT_LIST_GET_NEXT(space_list, space))
+ {
+ if (space->purpose != FIL_TYPE_TABLESPACE)
+ continue;
+ const uint32_t n= space->acquire_low();
+ if (UNIV_LIKELY(!(n & (STOPPING | CLOSING))))
+ break;
+ if (!(n & STOPPING) && space->prepare(true))
+ break;
+ }
+ }
+
+ mutex_exit(&fil_system.mutex);
+ return space;
+}
+
+/** Search for a space needing rotation
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state
+@param[in,out] recheck recheck of the tablespace is needed or
+ still encryption thread does write page 0 */
+static bool fil_crypt_find_space_to_rotate(
+ key_state_t* key_state,
+ rotate_thread_t* state,
+ bool* recheck)
+{
+ /* we need iops to start rotating */
+ while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
+ if (state->space && state->space->is_stopping()) {
+ state->space->release();
+ state->space = NULL;
+ }
+
+ os_event_reset(fil_crypt_threads_event);
+ os_event_wait_time(fil_crypt_threads_event, 100000);
+ }
+
+ if (state->should_shutdown()) {
+ if (state->space) {
+ state->space->release();
+ state->space = NULL;
+ }
+ return false;
+ }
+
+ if (state->first) {
+ state->first = false;
+ if (state->space) {
+ state->space->release();
+ }
+ state->space = NULL;
+ }
+
+ bool wake;
+ for (;;) {
+ state->space = fil_space_t::next(state->space, *recheck,
+ key_state->key_version != 0);
+ wake = state->should_shutdown();
+
+ if (state->space == fil_system.temp_space) {
+ goto done;
+ } else if (wake) {
+ break;
+ } else {
+ wake = true;
+ }
+
+ if (!state->space) {
+ break;
+ }
+
+ /* If there is no crypt data and we have not yet read
+ page 0 for this tablespace, we need to read it before
+ we can continue. */
+ if (!state->space->crypt_data) {
+ fil_crypt_read_crypt_data(state->space);
+ }
+
+ if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
+ ut_ad(key_state->key_id);
+ /* init state->min_key_version_found before
+ * starting on a space */
+ state->min_key_version_found = key_state->key_version;
+ return true;
+ }
+ }
+
+ if (state->space) {
+ state->space->release();
+done:
+ state->space = NULL;
+ }
+
+ /* no work to do; release our allocation of I/O capacity */
+ fil_crypt_return_iops(state, wake);
+
+ return false;
+
+}
+
+/***********************************************************************
+Start rotating a space
+@param[in] key_state Key state
+@param[in,out] state Rotation state */
+static
+void
+fil_crypt_start_rotate_space(
+ const key_state_t* key_state,
+ rotate_thread_t* state)
+{
+ fil_space_crypt_t *crypt_data = state->space->crypt_data;
+
+ ut_ad(crypt_data);
+ mutex_enter(&crypt_data->mutex);
+ ut_ad(key_state->key_id == crypt_data->key_id);
+
+ if (crypt_data->rotate_state.active_threads == 0) {
+ /* only first thread needs to init */
+ crypt_data->rotate_state.next_offset = 1; // skip page 0
+ /* no need to rotate beyond current max
+ * if space extends, it will be encrypted with newer version */
+ /* FIXME: max_offset could be removed and instead
+ space->size consulted.*/
+ crypt_data->rotate_state.max_offset = state->space->size;
+ crypt_data->rotate_state.end_lsn = 0;
+ crypt_data->rotate_state.min_key_version_found =
+ key_state->key_version;
+
+ crypt_data->rotate_state.start_time = time(0);
+
+ if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
+ crypt_data->is_encrypted() &&
+ key_state->key_version != 0) {
+ /* this is rotation unencrypted => encrypted */
+ crypt_data->type = CRYPT_SCHEME_1;
+ }
+ }
+
+ /* count active threads in space */
+ crypt_data->rotate_state.active_threads++;
+
+ /* Initialize thread local state */
+ state->end_lsn = crypt_data->rotate_state.end_lsn;
+ state->min_key_version_found =
+ crypt_data->rotate_state.min_key_version_found;
+
+ mutex_exit(&crypt_data->mutex);
+}
+
+/***********************************************************************
+Search for batch of pages needing rotation
+@param[in] key_state Key state
+@param[in,out] state Rotation state
+@return true if page needing key rotation found, false if not found */
+static
+bool
+fil_crypt_find_page_to_rotate(
+ const key_state_t* key_state,
+ rotate_thread_t* state)
+{
+ ulint batch = srv_alloc_time * state->allocated_iops;
+ fil_space_t* space = state->space;
+
+ ut_ad(!space || space->referenced());
+
+ /* If space is marked to be dropped stop rotation. */
+ if (!space || space->is_stopping()) {
+ return false;
+ }
+
+ fil_space_crypt_t *crypt_data = space->crypt_data;
+
+ mutex_enter(&crypt_data->mutex);
+ ut_ad(key_state->key_id == crypt_data->key_id);
+
+ bool found = crypt_data->rotate_state.max_offset >=
+ crypt_data->rotate_state.next_offset;
+
+ if (found) {
+ state->offset = crypt_data->rotate_state.next_offset;
+ ulint remaining = crypt_data->rotate_state.max_offset -
+ crypt_data->rotate_state.next_offset;
+
+ if (batch <= remaining) {
+ state->batch = batch;
+ } else {
+ state->batch = remaining;
+ }
+ }
+
+ crypt_data->rotate_state.next_offset += uint32_t(batch);
+ mutex_exit(&crypt_data->mutex);
+ return found;
+}
+
+#define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
+ fil_crypt_get_page_throttle_func(state, offset, mtr, \
+ sleeptime_ms, __FILE__, __LINE__)
+
+/***********************************************************************
+Get a page and compute sleep time
+@param[in,out] state Rotation state
+@param[in] offset Page offset
+@param[in,out] mtr Minitransaction
+@param[out] sleeptime_ms Sleep time
+@param[in] file File where called
+@param[in] line Line where called
+@return page or NULL*/
+static
+buf_block_t*
+fil_crypt_get_page_throttle_func(
+ rotate_thread_t* state,
+ uint32_t offset,
+ mtr_t* mtr,
+ ulint* sleeptime_ms,
+ const char* file,
+ unsigned line)
+{
+ fil_space_t* space = state->space;
+ const ulint zip_size = space->zip_size();
+ const page_id_t page_id(space->id, offset);
+ ut_ad(space->referenced());
+
+ /* Before reading from tablespace we need to make sure that
+ the tablespace is not about to be dropped. */
+ if (space->is_stopping()) {
+ return NULL;
+ }
+
+ dberr_t err = DB_SUCCESS;
+ buf_block_t* block = buf_page_get_gen(page_id, zip_size, RW_X_LATCH,
+ NULL,
+ BUF_PEEK_IF_IN_POOL, file, line,
+ mtr, &err);
+ if (block != NULL) {
+ /* page was in buffer pool */
+ state->crypt_stat.pages_read_from_cache++;
+ return block;
+ }
+
+ if (space->is_stopping()) {
+ return NULL;
+ }
+
+ if (fseg_page_is_free(space, state->offset)) {
+ /* page is already freed */
+ return NULL;
+ }
+
+ state->crypt_stat.pages_read_from_disk++;
+
+ const ulonglong start = my_interval_timer();
+ block = buf_page_get_gen(page_id, zip_size,
+ RW_X_LATCH,
+ NULL, BUF_GET_POSSIBLY_FREED,
+ file, line, mtr, &err);
+ const ulonglong end = my_interval_timer();
+
+ state->cnt_waited++;
+
+ if (end > start) {
+ state->sum_waited_us += (end - start) / 1000;
+ }
+
+ /* average page load */
+ ulint add_sleeptime_ms = 0;
+ ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
+ ulint alloc_wait_us = 1000000 / state->allocated_iops;
+
+ if (avg_wait_time_us < alloc_wait_us) {
+ /* we reading faster than we allocated */
+ add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
+ } else {
+ /* if page load time is longer than we want, skip sleeping */
+ }
+
+ *sleeptime_ms += add_sleeptime_ms;
+
+ return block;
+}
+
+/***********************************************************************
+Rotate one page
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state */
+static
+void
+fil_crypt_rotate_page(
+ const key_state_t* key_state,
+ rotate_thread_t* state)
+{
+ fil_space_t*space = state->space;
+ ulint space_id = space->id;
+ uint32_t offset = state->offset;
+ ulint sleeptime_ms = 0;
+ fil_space_crypt_t *crypt_data = space->crypt_data;
+
+ ut_ad(space->referenced());
+ ut_ad(offset > 0);
+
+ /* In fil_crypt_thread where key rotation is done we have
+ acquired space and checked that this space is not yet
+ marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
+ Check here also to give DROP TABLE or similar a change. */
+ if (space->is_stopping()) {
+ return;
+ }
+
+ if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
+ /* don't encrypt this as it contains address to dblwr buffer */
+ return;
+ }
+
+ mtr_t mtr;
+ mtr.start();
+ if (buf_block_t* block = fil_crypt_get_page_throttle(state,
+ offset, &mtr,
+ &sleeptime_ms)) {
+ bool modified = false;
+ byte* frame = buf_block_get_frame(block);
+ const lsn_t block_lsn = mach_read_from_8(FIL_PAGE_LSN + frame);
+ uint kv = buf_page_get_key_version(frame, space->flags);
+
+ if (block->page.status == buf_page_t::FREED) {
+ /* Do not modify freed pages to avoid an assertion
+ failure on recovery.*/
+ } else if (block->page.oldest_modification() > 1) {
+ /* Do not unnecessarily touch pages that are
+ already dirty. */
+ } else if (space->is_stopping()) {
+ /* The tablespace is closing (in DROP TABLE or
+ TRUNCATE TABLE or similar): avoid further access */
+ } else if (!kv && !*reinterpret_cast<uint16_t*>
+ (&frame[FIL_PAGE_TYPE])) {
+ /* It looks like this page is not
+ allocated. Because key rotation is accessing
+ pages in a pattern that is unlike the normal
+ B-tree and undo log access pattern, we cannot
+ invoke fseg_page_is_free() here, because that
+ could result in a deadlock. If we invoked
+ fseg_page_is_free() and released the
+ tablespace latch before acquiring block->lock,
+ then the fseg_page_is_free() information
+ could be stale already. */
+
+ /* If the data file was originally created
+ before MariaDB 10.0 or MySQL 5.6, some
+ allocated data pages could carry 0 in
+ FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
+ pages will be updated in
+ buf_flush_init_for_writing() when the page
+ is modified the next time.
+
+ Also, when the doublewrite buffer pages are
+ allocated on bootstrap in a non-debug build,
+ some dummy pages will be allocated, with 0 in
+ the FIL_PAGE_TYPE. Those pages should be
+ skipped from key rotation forever. */
+ } else if (fil_crypt_needs_rotation(
+ crypt_data,
+ kv,
+ key_state->key_version,
+ key_state->rotate_key_age)) {
+
+ mtr.set_named_space(space);
+ modified = true;
+
+ /* force rotation by dummy updating page */
+ mtr.write<1,mtr_t::FORCED>(*block,
+ &frame[FIL_PAGE_SPACE_ID],
+ frame[FIL_PAGE_SPACE_ID]);
+
+ /* statistics */
+ state->crypt_stat.pages_modified++;
+ } else {
+ if (crypt_data->is_encrypted()) {
+ if (kv < state->min_key_version_found) {
+ state->min_key_version_found = kv;
+ }
+ }
+ }
+
+ mtr.commit();
+ lsn_t end_lsn = mtr.commit_lsn();
+
+
+ if (modified) {
+ /* if we modified page, we take lsn from mtr */
+ ut_a(end_lsn > state->end_lsn);
+ ut_a(end_lsn > block_lsn);
+ state->end_lsn = end_lsn;
+ } else {
+ /* if we did not modify page, check for max lsn */
+ if (block_lsn > state->end_lsn) {
+ state->end_lsn = block_lsn;
+ }
+ }
+ } else {
+ /* If block read failed mtr memo and log should be empty. */
+ ut_ad(!mtr.has_modifications());
+ ut_ad(!mtr.is_dirty());
+ ut_ad(mtr.get_memo()->size() == 0);
+ ut_ad(mtr.get_log()->size() == 0);
+ mtr.commit();
+ }
+
+ if (sleeptime_ms) {
+ os_event_reset(fil_crypt_throttle_sleep_event);
+ os_event_wait_time(fil_crypt_throttle_sleep_event,
+ 1000 * sleeptime_ms);
+ }
+}
+
+/***********************************************************************
+Rotate a batch of pages
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state */
+static
+void
+fil_crypt_rotate_pages(
+ const key_state_t* key_state,
+ rotate_thread_t* state)
+{
+ ulint space_id = state->space->id;
+ uint32_t end = std::min(state->offset + uint32_t(state->batch),
+ state->space->free_limit);
+
+ ut_ad(state->space->referenced());
+
+ for (; state->offset < end; state->offset++) {
+
+ /* we can't rotate pages in dblwr buffer as
+ * it's not possible to read those due to lots of asserts
+ * in buffer pool.
+ *
+ * However since these are only (short-lived) copies of
+ * real pages, they will be updated anyway when the
+ * real page is updated
+ */
+ if (buf_dblwr.is_inside(page_id_t(space_id, state->offset))) {
+ continue;
+ }
+
+ /* If space is marked as stopping, stop rotating
+ pages. */
+ if (state->space->is_stopping()) {
+ break;
+ }
+
+ fil_crypt_rotate_page(key_state, state);
+ }
+}
+
+/***********************************************************************
+Flush rotated pages and then update page 0
+
+@param[in,out] state rotation state */
+static
+void
+fil_crypt_flush_space(
+ rotate_thread_t* state)
+{
+ fil_space_t* space = state->space;
+ fil_space_crypt_t *crypt_data = space->crypt_data;
+
+ ut_ad(space->referenced());
+
+ /* flush tablespace pages so that there are no pages left with old key */
+ lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
+
+ if (end_lsn > 0 && !space->is_stopping()) {
+ ulint sum_pages = 0;
+ const ulonglong start = my_interval_timer();
+ while (buf_flush_list_space(space, &sum_pages));
+ if (sum_pages) {
+ const ulonglong end = my_interval_timer();
+
+ state->cnt_waited += sum_pages;
+ state->sum_waited_us += (end - start) / 1000;
+
+ /* statistics */
+ state->crypt_stat.pages_flushed += sum_pages;
+ }
+ }
+
+ if (crypt_data->min_key_version == 0) {
+ crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
+ }
+
+ if (space->is_stopping()) {
+ return;
+ }
+
+ /* update page 0 */
+ mtr_t mtr;
+ mtr.start();
+
+ if (buf_block_t* block = buf_page_get_gen(
+ page_id_t(space->id, 0), space->zip_size(),
+ RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED,
+ __FILE__, __LINE__, &mtr)) {
+ if (block->page.status != buf_page_t::FREED) {
+ mtr.set_named_space(space);
+ crypt_data->write_page0(block, &mtr);
+ }
+ }
+
+ mtr.commit();
+}
+
+/***********************************************************************
+Complete rotating a space
+@param[in,out] state Rotation state */
+static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
+{
+ fil_space_crypt_t *crypt_data = state->space->crypt_data;
+
+ ut_ad(crypt_data);
+ ut_ad(state->space->referenced());
+
+ /* Space might already be dropped */
+ if (!state->space->is_stopping()) {
+ mutex_enter(&crypt_data->mutex);
+
+ /**
+ * Update crypt data state with state from thread
+ */
+ if (state->min_key_version_found <
+ crypt_data->rotate_state.min_key_version_found) {
+ crypt_data->rotate_state.min_key_version_found =
+ state->min_key_version_found;
+ }
+
+ if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
+ crypt_data->rotate_state.end_lsn = state->end_lsn;
+ }
+
+ ut_a(crypt_data->rotate_state.active_threads > 0);
+ crypt_data->rotate_state.active_threads--;
+ bool last = crypt_data->rotate_state.active_threads == 0;
+
+ /**
+ * check if space is fully done
+ * this as when threads shutdown, it could be that we "complete"
+ * iterating before we have scanned the full space.
+ */
+ bool done = crypt_data->rotate_state.next_offset >=
+ crypt_data->rotate_state.max_offset;
+
+ /**
+ * we should flush space if we're last thread AND
+ * the iteration is done
+ */
+ bool should_flush = last && done;
+
+ if (should_flush) {
+ /* we're the last active thread */
+ crypt_data->rotate_state.flushing = true;
+ crypt_data->min_key_version =
+ crypt_data->rotate_state.min_key_version_found;
+ mutex_exit(&crypt_data->mutex);
+ fil_crypt_flush_space(state);
+
+ mutex_enter(&crypt_data->mutex);
+ crypt_data->rotate_state.flushing = false;
+ mutex_exit(&crypt_data->mutex);
+ } else {
+ mutex_exit(&crypt_data->mutex);
+ }
+ } else {
+ mutex_enter(&crypt_data->mutex);
+ ut_a(crypt_data->rotate_state.active_threads > 0);
+ crypt_data->rotate_state.active_threads--;
+ mutex_exit(&crypt_data->mutex);
+ }
+}
+
+/*********************************************************************//**
+A thread which monitors global key state and rotates tablespaces accordingly
+@return a dummy parameter */
+extern "C" UNIV_INTERN
+os_thread_ret_t
+DECLARE_THREAD(fil_crypt_thread)(void*)
+{
+ mutex_enter(&fil_crypt_threads_mutex);
+ uint thread_no = srv_n_fil_crypt_threads_started;
+ srv_n_fil_crypt_threads_started++;
+ os_event_set(fil_crypt_event); /* signal that we started */
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ /* state of this thread */
+ rotate_thread_t thr(thread_no);
+
+ /* if we find a space that is starting, skip over it and recheck it later */
+ bool recheck = false;
+
+ while (!thr.should_shutdown()) {
+
+ key_state_t new_state;
+
+ while (!thr.should_shutdown()) {
+
+ /* wait for key state changes
+ * i.e either new key version of change or
+ * new rotate_key_age */
+ os_event_reset(fil_crypt_threads_event);
+
+ if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
+ break;
+ }
+
+ if (recheck) {
+ /* check recheck here, after sleep, so
+ * that we don't busy loop while when one thread is starting
+ * a space*/
+ break;
+ }
+ }
+
+ recheck = false;
+ thr.first = true; // restart from first tablespace
+
+ /* iterate all spaces searching for those needing rotation */
+ while (!thr.should_shutdown() &&
+ fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
+
+ /* we found a space to rotate */
+ fil_crypt_start_rotate_space(&new_state, &thr);
+
+ /* iterate all pages (cooperativly with other threads) */
+ while (!thr.should_shutdown() &&
+ fil_crypt_find_page_to_rotate(&new_state, &thr)) {
+
+ if (!thr.space->is_stopping()) {
+ /* rotate a (set) of pages */
+ fil_crypt_rotate_pages(&new_state, &thr);
+ }
+
+ /* If space is marked as stopping, release
+ space and stop rotation. */
+ if (thr.space->is_stopping()) {
+ fil_crypt_complete_rotate_space(&thr);
+ thr.space->release();
+ thr.space = NULL;
+ break;
+ }
+
+ /* realloc iops */
+ fil_crypt_realloc_iops(&thr);
+ }
+
+ /* complete rotation */
+ if (thr.space) {
+ fil_crypt_complete_rotate_space(&thr);
+ }
+
+ /* force key state refresh */
+ new_state.key_id = 0;
+
+ /* return iops */
+ fil_crypt_return_iops(&thr);
+ }
+ }
+
+ /* return iops if shutting down */
+ fil_crypt_return_iops(&thr);
+
+ /* release current space if shutting down */
+ if (thr.space) {
+ thr.space->release();
+ thr.space = NULL;
+ }
+
+ mutex_enter(&fil_crypt_threads_mutex);
+ srv_n_fil_crypt_threads_started--;
+ os_event_set(fil_crypt_event); /* signal that we stopped */
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ /* We count the number of threads in os_thread_exit(). A created
+ thread should always use that to exit and not use return() to exit. */
+
+ os_thread_exit();
+
+ OS_THREAD_DUMMY_RETURN;
+}
+
+/*********************************************************************
+Adjust thread count for key rotation
+@param[in] enw_cnt Number of threads to be used */
+UNIV_INTERN
+void
+fil_crypt_set_thread_cnt(
+ const uint new_cnt)
+{
+ if (!fil_crypt_threads_inited) {
+ if (srv_shutdown_state != SRV_SHUTDOWN_NONE)
+ return;
+ fil_crypt_threads_init();
+ }
+
+ mutex_enter(&fil_crypt_threads_mutex);
+
+ if (new_cnt > srv_n_fil_crypt_threads) {
+ uint add = new_cnt - srv_n_fil_crypt_threads;
+ srv_n_fil_crypt_threads = new_cnt;
+ for (uint i = 0; i < add; i++) {
+ ib::info() << "Creating #"
+ << i+1 << " encryption thread id "
+ << os_thread_create(fil_crypt_thread)
+ << " total threads " << new_cnt << ".";
+ }
+ } else if (new_cnt < srv_n_fil_crypt_threads) {
+ srv_n_fil_crypt_threads = new_cnt;
+ os_event_set(fil_crypt_threads_event);
+ }
+
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
+ os_event_reset(fil_crypt_event);
+ os_event_wait_time(fil_crypt_event, 100000);
+ }
+
+ /* Send a message to encryption threads that there could be
+ something to do. */
+ if (srv_n_fil_crypt_threads) {
+ os_event_set(fil_crypt_threads_event);
+ }
+}
+
+/** Initialize the tablespace default_encrypt_tables
+if innodb_encryption_rotate_key_age=0. */
+static void fil_crypt_default_encrypt_tables_fill()
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+
+ for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
+ space != NULL;
+ space = UT_LIST_GET_NEXT(space_list, space)) {
+ if (space->purpose != FIL_TYPE_TABLESPACE
+ || space->is_in_default_encrypt
+ || UT_LIST_GET_LEN(space->chain) == 0
+ || !space->acquire_if_not_stopped()) {
+ continue;
+ }
+
+ /* Ensure that crypt_data has been initialized. */
+ ut_ad(space->size);
+
+ /* Skip ENCRYPTION!=DEFAULT tablespaces. */
+ if (space->crypt_data
+ && !space->crypt_data->is_default_encryption()) {
+ goto next;
+ }
+
+ if (srv_encrypt_tables) {
+ /* Skip encrypted tablespaces if
+ innodb_encrypt_tables!=OFF */
+ if (space->crypt_data
+ && space->crypt_data->min_key_version) {
+ goto next;
+ }
+ } else {
+ /* Skip unencrypted tablespaces if
+ innodb_encrypt_tables=OFF */
+ if (!space->crypt_data
+ || !space->crypt_data->min_key_version) {
+ goto next;
+ }
+ }
+
+ fil_system.default_encrypt_tables.push_back(*space);
+ space->is_in_default_encrypt = true;
+next:
+ space->release();
+ }
+}
+
+/*********************************************************************
+Adjust max key age
+@param[in] val New max key age */
+UNIV_INTERN
+void
+fil_crypt_set_rotate_key_age(
+ uint val)
+{
+ mutex_enter(&fil_system.mutex);
+ srv_fil_crypt_rotate_key_age = val;
+ if (val == 0) {
+ fil_crypt_default_encrypt_tables_fill();
+ }
+ mutex_exit(&fil_system.mutex);
+ os_event_set(fil_crypt_threads_event);
+}
+
+/*********************************************************************
+Adjust rotation iops
+@param[in] val New max roation iops */
+UNIV_INTERN
+void
+fil_crypt_set_rotation_iops(
+ uint val)
+{
+ srv_n_fil_crypt_iops = val;
+ os_event_set(fil_crypt_threads_event);
+}
+
+/*********************************************************************
+Adjust encrypt tables
+@param[in] val New setting for innodb-encrypt-tables */
+void fil_crypt_set_encrypt_tables(ulong val)
+{
+ mutex_enter(&fil_system.mutex);
+
+ srv_encrypt_tables = val;
+
+ if (fil_crypt_must_default_encrypt()) {
+ fil_crypt_default_encrypt_tables_fill();
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ os_event_set(fil_crypt_threads_event);
+}
+
+/*********************************************************************
+Init threads for key rotation */
+UNIV_INTERN
+void
+fil_crypt_threads_init()
+{
+ if (!fil_crypt_threads_inited) {
+ fil_crypt_event = os_event_create(0);
+ fil_crypt_threads_event = os_event_create(0);
+ mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
+ &fil_crypt_threads_mutex);
+
+ uint cnt = srv_n_fil_crypt_threads;
+ srv_n_fil_crypt_threads = 0;
+ fil_crypt_threads_inited = true;
+ fil_crypt_set_thread_cnt(cnt);
+ }
+}
+
+/*********************************************************************
+Clean up key rotation threads resources */
+UNIV_INTERN
+void
+fil_crypt_threads_cleanup()
+{
+ if (!fil_crypt_threads_inited) {
+ return;
+ }
+ ut_a(!srv_n_fil_crypt_threads_started);
+ os_event_destroy(fil_crypt_event);
+ os_event_destroy(fil_crypt_threads_event);
+ mutex_free(&fil_crypt_threads_mutex);
+ fil_crypt_threads_inited = false;
+}
+
+/*********************************************************************
+Wait for crypt threads to stop accessing space
+@param[in] space Tablespace */
+UNIV_INTERN
+void
+fil_space_crypt_close_tablespace(
+ const fil_space_t* space)
+{
+ fil_space_crypt_t* crypt_data = space->crypt_data;
+
+ if (!crypt_data || srv_n_fil_crypt_threads == 0
+ || !fil_crypt_threads_inited) {
+ return;
+ }
+
+ mutex_enter(&fil_crypt_threads_mutex);
+
+ time_t start = time(0);
+ time_t last = start;
+
+ mutex_enter(&crypt_data->mutex);
+ mutex_exit(&fil_crypt_threads_mutex);
+
+ ulint cnt = crypt_data->rotate_state.active_threads;
+ bool flushing = crypt_data->rotate_state.flushing;
+
+ while (cnt > 0 || flushing) {
+ mutex_exit(&crypt_data->mutex);
+ /* release dict mutex so that scrub threads can release their
+ * table references */
+ dict_mutex_exit_for_mysql();
+
+ /* wakeup throttle (all) sleepers */
+ os_event_set(fil_crypt_throttle_sleep_event);
+ os_event_set(fil_crypt_threads_event);
+
+ os_thread_sleep(20000);
+ dict_mutex_enter_for_mysql();
+ mutex_enter(&crypt_data->mutex);
+ cnt = crypt_data->rotate_state.active_threads;
+ flushing = crypt_data->rotate_state.flushing;
+
+ time_t now = time(0);
+
+ if (now >= last + 30) {
+ ib::warn() << "Waited "
+ << now - start
+ << " seconds to drop space: "
+ << space->name << " ("
+ << space->id << ") active threads "
+ << cnt << "flushing="
+ << flushing << ".";
+ last = now;
+ }
+ }
+
+ mutex_exit(&crypt_data->mutex);
+}
+
+/*********************************************************************
+Get crypt status for a space (used by information_schema)
+@param[in] space Tablespace
+@param[out] status Crypt status */
+UNIV_INTERN
+void
+fil_space_crypt_get_status(
+ const fil_space_t* space,
+ struct fil_space_crypt_status_t* status)
+{
+ memset(status, 0, sizeof(*status));
+
+ ut_ad(space->referenced());
+
+ /* If there is no crypt data and we have not yet read
+ page 0 for this tablespace, we need to read it before
+ we can continue. */
+ if (!space->crypt_data) {
+ fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
+ }
+
+ status->space = ULINT_UNDEFINED;
+
+ if (fil_space_crypt_t* crypt_data = space->crypt_data) {
+ status->space = space->id;
+ mutex_enter(&crypt_data->mutex);
+ status->scheme = crypt_data->type;
+ status->keyserver_requests = crypt_data->keyserver_requests;
+ status->min_key_version = crypt_data->min_key_version;
+ status->key_id = crypt_data->key_id;
+
+ if (crypt_data->rotate_state.active_threads > 0 ||
+ crypt_data->rotate_state.flushing) {
+ status->rotating = true;
+ status->flushing =
+ crypt_data->rotate_state.flushing;
+ status->rotate_next_page_number =
+ crypt_data->rotate_state.next_offset;
+ status->rotate_max_page_number =
+ crypt_data->rotate_state.max_offset;
+ }
+
+ mutex_exit(&crypt_data->mutex);
+
+ if (srv_encrypt_tables || crypt_data->min_key_version) {
+ status->current_key_version =
+ fil_crypt_get_latest_key_version(crypt_data);
+ }
+ }
+}
+
+/*********************************************************************
+Return crypt statistics
+@param[out] stat Crypt statistics */
+UNIV_INTERN
+void
+fil_crypt_total_stat(
+ fil_crypt_stat_t *stat)
+{
+ mutex_enter(&crypt_stat_mutex);
+ *stat = crypt_stat;
+ mutex_exit(&crypt_stat_mutex);
+}
+
+#endif /* UNIV_INNOCHECKSUM */
+
+/**
+Verify that post encryption checksum match calculated checksum.
+This function should be called only if tablespace contains crypt_data
+metadata (this is strong indication that tablespace is encrypted).
+Function also verifies that traditional checksum does not match
+calculated checksum as if it does page could be valid unencrypted,
+encrypted, or corrupted.
+
+@param[in,out] page page frame (checksum is temporarily modified)
+@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
+@return true if page is encrypted AND OK, false otherwise */
+bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
+{
+ ut_ad(mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION));
+
+ /* Compressed and encrypted pages do not have checksum. Assume not
+ corrupted. Page verification happens after decompression in
+ buf_page_read_complete() using buf_page_is_corrupted(). */
+ if (fil_page_get_type(page) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
+ return true;
+ }
+
+ /* Read stored post encryption checksum. */
+ const ib_uint32_t checksum = mach_read_from_4(
+ page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
+
+ /* If stored checksum matches one of the calculated checksums
+ page is not corrupted. */
+
+ switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
+ case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32:
+ case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
+ if (zip_size) {
+ return checksum == page_zip_calc_checksum(
+ page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
+ }
+
+ return checksum == buf_calc_page_crc32(page);
+ case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
+ /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
+ due to MDEV-12114, fil_crypt_calculate_checksum()
+ is only using CRC32 for the encrypted pages.
+ Due to this, we must treat "strict_none" as "none". */
+ case SRV_CHECKSUM_ALGORITHM_NONE:
+ return true;
+ case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
+ /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
+ due to MDEV-12114, fil_crypt_calculate_checksum()
+ is only using CRC32 for the encrypted pages.
+ Due to this, we must treat "strict_innodb" as "innodb". */
+ case SRV_CHECKSUM_ALGORITHM_INNODB:
+ case SRV_CHECKSUM_ALGORITHM_CRC32:
+ case SRV_CHECKSUM_ALGORITHM_FULL_CRC32:
+ if (checksum == BUF_NO_CHECKSUM_MAGIC) {
+ return true;
+ }
+ if (zip_size) {
+ return checksum == page_zip_calc_checksum(
+ page, zip_size,
+ SRV_CHECKSUM_ALGORITHM_CRC32)
+ || checksum == page_zip_calc_checksum(
+ page, zip_size,
+ SRV_CHECKSUM_ALGORITHM_INNODB);
+ }
+
+ return checksum == buf_calc_page_crc32(page)
+ || checksum == buf_calc_page_new_checksum(page);
+ }
+
+ ut_ad("unhandled innodb_checksum_algorithm" == 0);
+ return false;
+}
diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc
new file mode 100644
index 00000000..a2591dd9
--- /dev/null
+++ b/storage/innobase/fil/fil0fil.cc
@@ -0,0 +1,3757 @@
+/*****************************************************************************
+
+Copyright (c) 1995, 2021, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2014, 2021, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file fil/fil0fil.cc
+The tablespace memory cache
+
+Created 10/25/1995 Heikki Tuuri
+*******************************************************/
+
+#include "fil0fil.h"
+#include "fil0crypt.h"
+
+#include "btr0btr.h"
+#include "buf0buf.h"
+#include "dict0boot.h"
+#include "dict0dict.h"
+#include "dict0load.h"
+#include "fsp0file.h"
+#include "fsp0fsp.h"
+#include "hash0hash.h"
+#include "log0log.h"
+#include "log0recv.h"
+#include "mach0data.h"
+#include "mtr0log.h"
+#include "os0file.h"
+#include "page0zip.h"
+#include "row0mysql.h"
+#include "srv0start.h"
+#include "trx0purge.h"
+#include "buf0lru.h"
+#include "ibuf0ibuf.h"
+#include "os0event.h"
+#include "sync0sync.h"
+#include "buf0flu.h"
+#ifdef UNIV_LINUX
+# include <sys/types.h>
+# include <sys/sysmacros.h>
+# include <dirent.h>
+#endif
+
+/** Determine if the space id is a user tablespace id or not.
+@param space_id tablespace identifier
+@return true if it is a user tablespace ID */
+inline bool fil_is_user_tablespace_id(ulint space_id)
+{
+ return space_id != TRX_SYS_SPACE && space_id != SRV_TMP_SPACE_ID &&
+ !srv_is_undo_tablespace(space_id);
+}
+
+/** Try to close a file to adhere to the innodb_open_files limit.
+@param print_info whether to diagnose why a file cannot be closed
+@return whether a file was closed */
+bool fil_space_t::try_to_close(bool print_info)
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ for (fil_space_t *space= UT_LIST_GET_FIRST(fil_system.space_list); space;
+ space= UT_LIST_GET_NEXT(space_list, space))
+ {
+ switch (space->purpose) {
+ case FIL_TYPE_TEMPORARY:
+ continue;
+ case FIL_TYPE_IMPORT:
+ break;
+ case FIL_TYPE_TABLESPACE:
+ if (!fil_is_user_tablespace_id(space->id))
+ continue;
+ }
+
+ /* We are using an approximation of LRU replacement policy. In
+ fil_node_open_file_low(), newly opened files are moved to the end
+ of fil_system.space_list, so that they would be less likely to be
+ closed here. */
+ fil_node_t *node= UT_LIST_GET_FIRST(space->chain);
+ ut_ad(node);
+ ut_ad(!UT_LIST_GET_NEXT(chain, node));
+
+ if (!node->is_open())
+ continue;
+
+ if (const auto n= space->set_closing())
+ {
+ if (print_info)
+ ib::info() << "Cannot close file " << node->name
+ << " because of "
+ << (n & PENDING)
+ << ((n & NEEDS_FSYNC)
+ ? " pending operations and pending fsync"
+ : " pending operations");
+ continue;
+ }
+
+ node->close();
+ return true;
+ }
+
+ return false;
+}
+
+/** Rename a single-table tablespace.
+The tablespace must exist in the memory cache.
+@param[in] id tablespace identifier
+@param[in] old_path old file name
+@param[in] new_name new table name in the
+databasename/tablename format
+@param[in] new_path_in new file name,
+or NULL if it is located in the normal data directory
+@return true if success */
+static bool
+fil_rename_tablespace(
+ ulint id,
+ const char* old_path,
+ const char* new_name,
+ const char* new_path_in);
+
+/*
+ IMPLEMENTATION OF THE TABLESPACE MEMORY CACHE
+ =============================================
+
+The tablespace cache is responsible for providing fast read/write access to
+tablespaces and logs of the database. File creation and deletion is done
+in other modules which know more of the logic of the operation, however.
+
+A tablespace consists of a chain of files. The size of the files does not
+have to be divisible by the database block size, because we may just leave
+the last incomplete block unused. When a new file is appended to the
+tablespace, the maximum size of the file is also specified. At the moment,
+we think that it is best to extend the file to its maximum size already at
+the creation of the file, because then we can avoid dynamically extending
+the file when more space is needed for the tablespace.
+
+A block's position in the tablespace is specified with a 32-bit unsigned
+integer. The files in the chain are thought to be catenated, and the block
+corresponding to an address n is the nth block in the catenated file (where
+the first block is named the 0th block, and the incomplete block fragments
+at the end of files are not taken into account). A tablespace can be extended
+by appending a new file at the end of the chain.
+
+Our tablespace concept is similar to the one of Oracle.
+
+To acquire more speed in disk transfers, a technique called disk striping is
+sometimes used. This means that logical block addresses are divided in a
+round-robin fashion across several disks. Windows NT supports disk striping,
+so there we do not need to support it in the database. Disk striping is
+implemented in hardware in RAID disks. We conclude that it is not necessary
+to implement it in the database. Oracle 7 does not support disk striping,
+either.
+
+Another trick used at some database sites is replacing tablespace files by
+raw disks, that is, the whole physical disk drive, or a partition of it, is
+opened as a single file, and it is accessed through byte offsets calculated
+from the start of the disk or the partition. This is recommended in some
+books on database tuning to achieve more speed in i/o. Using raw disk
+certainly prevents the OS from fragmenting disk space, but it is not clear
+if it really adds speed. We measured on the Pentium 100 MHz + NT + NTFS file
+system + EIDE Conner disk only a negligible difference in speed when reading
+from a file, versus reading from a raw disk.
+
+To have fast access to a tablespace or a log file, we put the data structures
+to a hash table. Each tablespace and log file is given an unique 32-bit
+identifier. */
+
+/** Reference to the server data directory. Usually it is the
+current working directory ".", but in the MySQL Embedded Server Library
+it is an absolute path. */
+const char* fil_path_to_mysql_datadir;
+
+/** Common InnoDB file extensions */
+const char* dot_ext[] = { "", ".ibd", ".isl", ".cfg" };
+
+/** Number of pending tablespace flushes */
+Atomic_counter<ulint> fil_n_pending_tablespace_flushes;
+
+/** The tablespace memory cache. This variable is NULL before the module is
+initialized. */
+fil_system_t fil_system;
+
+/** At this age or older a space/page will be rotated */
+UNIV_INTERN extern uint srv_fil_crypt_rotate_key_age;
+
+#ifdef UNIV_DEBUG
+/** Try fil_validate() every this many times */
+# define FIL_VALIDATE_SKIP 17
+
+/******************************************************************//**
+Checks the consistency of the tablespace cache some of the time.
+@return true if ok or the check was skipped */
+static
+bool
+fil_validate_skip(void)
+/*===================*/
+{
+ /** The fil_validate() call skip counter. */
+ static Atomic_counter<uint32_t> fil_validate_count;
+
+ /* We want to reduce the call frequency of the costly fil_validate()
+ check in debug builds. */
+ return (fil_validate_count++ % FIL_VALIDATE_SKIP) || fil_validate();
+}
+#endif /* UNIV_DEBUG */
+
+/*******************************************************************//**
+Returns the table space by a given id, NULL if not found.
+It is unsafe to dereference the returned pointer. It is fine to check
+for NULL. */
+fil_space_t*
+fil_space_get_by_id(
+/*================*/
+ ulint id) /*!< in: space id */
+{
+ fil_space_t* space;
+
+ ut_ad(fil_system.is_initialised());
+ ut_ad(mutex_own(&fil_system.mutex));
+
+ HASH_SEARCH(hash, &fil_system.spaces, id,
+ fil_space_t*, space,
+ ut_ad(space->magic_n == FIL_SPACE_MAGIC_N),
+ space->id == id);
+
+ return(space);
+}
+
+/** Look up a tablespace.
+The caller should hold an InnoDB table lock or a MDL that prevents
+the tablespace from being dropped during the operation,
+or the caller should be in single-threaded crash recovery mode
+(no user connections that could drop tablespaces).
+Normally, fil_space_t::get() should be used instead.
+@param[in] id tablespace ID
+@return tablespace, or NULL if not found */
+fil_space_t*
+fil_space_get(
+ ulint id)
+{
+ mutex_enter(&fil_system.mutex);
+ fil_space_t* space = fil_space_get_by_id(id);
+ mutex_exit(&fil_system.mutex);
+ return(space);
+}
+
+/** Validate the compression algorithm for full crc32 format.
+@param[in] space tablespace object
+@return whether the compression algorithm support */
+static bool fil_comp_algo_validate(const fil_space_t* space)
+{
+ if (!space->full_crc32()) {
+ return true;
+ }
+
+ DBUG_EXECUTE_IF("fil_comp_algo_validate_fail",
+ return false;);
+
+ ulint comp_algo = space->get_compression_algo();
+ switch (comp_algo) {
+ case PAGE_UNCOMPRESSED:
+ case PAGE_ZLIB_ALGORITHM:
+#ifdef HAVE_LZ4
+ case PAGE_LZ4_ALGORITHM:
+#endif /* HAVE_LZ4 */
+#ifdef HAVE_LZO
+ case PAGE_LZO_ALGORITHM:
+#endif /* HAVE_LZO */
+#ifdef HAVE_LZMA
+ case PAGE_LZMA_ALGORITHM:
+#endif /* HAVE_LZMA */
+#ifdef HAVE_BZIP2
+ case PAGE_BZIP2_ALGORITHM:
+#endif /* HAVE_BZIP2 */
+#ifdef HAVE_SNAPPY
+ case PAGE_SNAPPY_ALGORITHM:
+#endif /* HAVE_SNAPPY */
+ return true;
+ }
+
+ return false;
+}
+
+/** Append a file to the chain of files of a space.
+@param[in] name file name of a file that is not open
+@param[in] handle file handle, or OS_FILE_CLOSED
+@param[in] size file size in entire database pages
+@param[in] is_raw whether this is a raw device
+@param[in] atomic_write true if atomic write could be enabled
+@param[in] max_pages maximum number of pages in file,
+or UINT32_MAX for unlimited
+@return file object */
+fil_node_t* fil_space_t::add(const char* name, pfs_os_file_t handle,
+ uint32_t size, bool is_raw, bool atomic_write,
+ uint32_t max_pages)
+{
+ fil_node_t* node;
+
+ ut_ad(name != NULL);
+ ut_ad(fil_system.is_initialised());
+
+ node = reinterpret_cast<fil_node_t*>(ut_zalloc_nokey(sizeof(*node)));
+
+ node->handle = handle;
+
+ node->name = mem_strdup(name);
+
+ ut_a(!is_raw || srv_start_raw_disk_in_use);
+
+ node->is_raw_disk = is_raw;
+
+ node->size = size;
+
+ node->magic_n = FIL_NODE_MAGIC_N;
+
+ node->init_size = size;
+ node->max_size = max_pages;
+
+ node->space = this;
+
+ node->atomic_write = atomic_write;
+
+ mutex_enter(&fil_system.mutex);
+ this->size += size;
+ UT_LIST_ADD_LAST(chain, node);
+ if (node->is_open()) {
+ n_pending.fetch_and(~CLOSING, std::memory_order_relaxed);
+ if (++fil_system.n_open >= srv_max_n_open_files) {
+ reacquire();
+ try_to_close(true);
+ release();
+ }
+ }
+ mutex_exit(&fil_system.mutex);
+
+ return node;
+}
+
+/** Open a tablespace file.
+@param node data file
+@return whether the file was successfully opened */
+static bool fil_node_open_file_low(fil_node_t *node)
+{
+ ut_ad(!node->is_open());
+ ut_ad(node->space->is_closing());
+ ut_ad(mutex_own(&fil_system.mutex));
+ ulint type;
+ static_assert(((UNIV_ZIP_SIZE_MIN >> 1) << 3) == 4096, "compatibility");
+ switch (FSP_FLAGS_GET_ZIP_SSIZE(node->space->flags)) {
+ case 1:
+ case 2:
+ type= OS_DATA_FILE_NO_O_DIRECT;
+ break;
+ default:
+ type= OS_DATA_FILE;
+ }
+
+ for (;;)
+ {
+ bool success;
+ node->handle= os_file_create(innodb_data_file_key, node->name,
+ node->is_raw_disk
+ ? OS_FILE_OPEN_RAW | OS_FILE_ON_ERROR_NO_EXIT
+ : OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT,
+ OS_FILE_AIO, type,
+ srv_read_only_mode, &success);
+ if (success)
+ break;
+
+ /* The following call prints an error message */
+ if (os_file_get_last_error(true) == EMFILE + 100 &&
+ fil_space_t::try_to_close(true))
+ continue;
+
+ ib::warn() << "Cannot open '" << node->name << "'.";
+ return false;
+ }
+
+ if (node->size);
+ else if (!node->read_page0() || !fil_comp_algo_validate(node->space))
+ {
+ os_file_close(node->handle);
+ node->handle= OS_FILE_CLOSED;
+ return false;
+ }
+
+ ut_ad(node->is_open());
+
+ if (UNIV_LIKELY(!fil_system.freeze_space_list))
+ {
+ /* Move the file last in fil_system.space_list, so that
+ fil_space_t::try_to_close() should close it as a last resort. */
+ UT_LIST_REMOVE(fil_system.space_list, node->space);
+ UT_LIST_ADD_LAST(fil_system.space_list, node->space);
+ }
+
+ fil_system.n_open++;
+ return true;
+}
+
+/** Open a tablespace file.
+@param node data file
+@return whether the file was successfully opened */
+static bool fil_node_open_file(fil_node_t *node)
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ ut_ad(!node->is_open());
+ ut_ad(fil_is_user_tablespace_id(node->space->id) ||
+ srv_operation == SRV_OPERATION_BACKUP ||
+ srv_operation == SRV_OPERATION_RESTORE ||
+ srv_operation == SRV_OPERATION_RESTORE_DELTA);
+ ut_ad(node->space->purpose != FIL_TYPE_TEMPORARY);
+ ut_ad(node->space->referenced());
+
+ for (ulint count= 0; fil_system.n_open >= srv_max_n_open_files; count++)
+ {
+ if (fil_space_t::try_to_close(count > 1))
+ count= 0;
+ else if (count >= 2)
+ {
+ ib::warn() << "innodb_open_files=" << srv_max_n_open_files
+ << " is exceeded (" << fil_system.n_open
+ << ") files stay open)";
+ break;
+ }
+ else
+ {
+ mutex_exit(&fil_system.mutex);
+ os_thread_sleep(20000);
+ /* Flush tablespaces so that we can close modified files. */
+ fil_flush_file_spaces();
+ mutex_enter(&fil_system.mutex);
+ }
+ }
+
+ return fil_node_open_file_low(node);
+}
+
+/** Close the file handle. */
+void fil_node_t::close()
+{
+ prepare_to_close_or_detach();
+
+ /* printf("Closing file %s\n", name); */
+ int ret= os_file_close(handle);
+ ut_a(ret);
+ handle= OS_FILE_CLOSED;
+}
+
+pfs_os_file_t fil_node_t::detach()
+{
+ prepare_to_close_or_detach();
+
+ pfs_os_file_t result= handle;
+ handle= OS_FILE_CLOSED;
+ return result;
+}
+
+void fil_node_t::prepare_to_close_or_detach()
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ ut_ad(space->is_ready_to_close() || srv_operation == SRV_OPERATION_BACKUP ||
+ srv_operation == SRV_OPERATION_RESTORE_DELTA);
+ ut_a(is_open());
+ ut_a(!being_extended);
+ ut_a(space->is_ready_to_close() || space->purpose == FIL_TYPE_TEMPORARY ||
+ srv_fast_shutdown == 2 || !srv_was_started);
+
+ ut_a(fil_system.n_open > 0);
+ fil_system.n_open--;
+}
+
+/** Flush any writes cached by the file system. */
+void fil_space_t::flush_low()
+{
+ ut_ad(!mutex_own(&fil_system.mutex));
+
+ uint32_t n= 1;
+ while (!n_pending.compare_exchange_strong(n, n | NEEDS_FSYNC,
+ std::memory_order_acquire,
+ std::memory_order_relaxed))
+ {
+ ut_ad(n & PENDING);
+ if (n & STOPPING)
+ return;
+ if (n & NEEDS_FSYNC)
+ break;
+ }
+
+ fil_n_pending_tablespace_flushes++;
+ for (fil_node_t *node= UT_LIST_GET_FIRST(chain); node;
+ node= UT_LIST_GET_NEXT(chain, node))
+ {
+ if (!node->is_open())
+ {
+ ut_ad(!is_in_unflushed_spaces);
+ continue;
+ }
+ IF_WIN(if (node->is_raw_disk) continue,);
+ os_file_flush(node->handle);
+ }
+
+ if (is_in_unflushed_spaces)
+ {
+ mutex_enter(&fil_system.mutex);
+ if (is_in_unflushed_spaces)
+ {
+ is_in_unflushed_spaces= false;
+ fil_system.unflushed_spaces.remove(*this);
+ }
+ mutex_exit(&fil_system.mutex);
+ }
+
+ clear_flush();
+ fil_n_pending_tablespace_flushes--;
+}
+
+/** Try to extend a tablespace.
+@param[in,out] space tablespace to be extended
+@param[in,out] node last file of the tablespace
+@param[in] size desired size in number of pages
+@param[out] success whether the operation succeeded
+@return whether the operation should be retried */
+static ATTRIBUTE_COLD __attribute__((warn_unused_result, nonnull))
+bool
+fil_space_extend_must_retry(
+ fil_space_t* space,
+ fil_node_t* node,
+ uint32_t size,
+ bool* success)
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ ut_ad(UT_LIST_GET_LAST(space->chain) == node);
+ ut_ad(size >= FIL_IBD_FILE_INITIAL_SIZE);
+ ut_ad(node->space == space);
+ ut_ad(space->referenced() || space->is_being_truncated);
+
+ *success = space->size >= size;
+
+ if (*success) {
+ /* Space already big enough */
+ return(false);
+ }
+
+ if (node->being_extended) {
+ /* Another thread is currently extending the file. Wait
+ for it to finish.
+ It'd have been better to use event driven mechanism but
+ the entire module is peppered with polling stuff. */
+ mutex_exit(&fil_system.mutex);
+ os_thread_sleep(100000);
+ return(true);
+ }
+
+ node->being_extended = true;
+
+ /* At this point it is safe to release fil_system.mutex. No
+ other thread can rename, delete, close or extend the file because
+ we have set the node->being_extended flag. */
+ mutex_exit(&fil_system.mutex);
+
+ ut_ad(size >= space->size);
+
+ uint32_t last_page_no = space->size;
+ const uint32_t file_start_page_no = last_page_no - node->size;
+
+ const unsigned page_size = space->physical_size();
+
+ /* Datafile::read_first_page() expects srv_page_size bytes.
+ fil_node_t::read_page0() expects at least 4 * srv_page_size bytes.*/
+ os_offset_t new_size = std::max(
+ os_offset_t(size - file_start_page_no) * page_size,
+ os_offset_t(FIL_IBD_FILE_INITIAL_SIZE << srv_page_size_shift));
+
+ *success = os_file_set_size(node->name, node->handle, new_size,
+ space->is_compressed());
+
+ os_has_said_disk_full = *success;
+ if (*success) {
+ os_file_flush(node->handle);
+ last_page_no = size;
+ } else {
+ /* Let us measure the size of the file
+ to determine how much we were able to
+ extend it */
+ os_offset_t fsize = os_file_get_size(node->handle);
+ ut_a(fsize != os_offset_t(-1));
+
+ last_page_no = uint32_t(fsize / page_size)
+ + file_start_page_no;
+ }
+ mutex_enter(&fil_system.mutex);
+
+ ut_a(node->being_extended);
+ node->being_extended = false;
+ ut_a(last_page_no - file_start_page_no >= node->size);
+
+ uint32_t file_size = last_page_no - file_start_page_no;
+ space->size += file_size - node->size;
+ node->size = file_size;
+ const uint32_t pages_in_MiB = node->size
+ & ~uint32_t((1U << (20U - srv_page_size_shift)) - 1);
+
+ /* Keep the last data file size info up to date, rounded to
+ full megabytes */
+
+ switch (space->id) {
+ case TRX_SYS_SPACE:
+ srv_sys_space.set_last_file_size(pages_in_MiB);
+ do_flush:
+ space->reacquire();
+ mutex_exit(&fil_system.mutex);
+ space->flush_low();
+ space->release();
+ mutex_enter(&fil_system.mutex);
+ break;
+ default:
+ ut_ad(space->purpose == FIL_TYPE_TABLESPACE
+ || space->purpose == FIL_TYPE_IMPORT);
+ if (space->purpose == FIL_TYPE_TABLESPACE
+ && !space->is_being_truncated) {
+ goto do_flush;
+ }
+ break;
+ case SRV_TMP_SPACE_ID:
+ ut_ad(space->purpose == FIL_TYPE_TEMPORARY);
+ srv_tmp_space.set_last_file_size(pages_in_MiB);
+ break;
+ }
+
+ return false;
+}
+
+/** @return whether the file is usable for io() */
+ATTRIBUTE_COLD bool fil_space_t::prepare(bool have_mutex)
+{
+ ut_ad(referenced());
+ if (!have_mutex)
+ mutex_enter(&fil_system.mutex);
+ ut_ad(mutex_own(&fil_system.mutex));
+ fil_node_t *node= UT_LIST_GET_LAST(chain);
+ ut_ad(!id || purpose == FIL_TYPE_TEMPORARY ||
+ node == UT_LIST_GET_FIRST(chain));
+
+ const bool is_open= node && (node->is_open() || fil_node_open_file(node));
+
+ if (!is_open)
+ release();
+ else if (auto desired_size= recv_size)
+ {
+ bool success;
+ while (fil_space_extend_must_retry(this, node, desired_size, &success))
+ mutex_enter(&fil_system.mutex);
+
+ ut_ad(mutex_own(&fil_system.mutex));
+ /* Crash recovery requires the file extension to succeed. */
+ ut_a(success);
+ /* InnoDB data files cannot shrink. */
+ ut_a(size >= desired_size);
+ if (desired_size > committed_size)
+ committed_size= desired_size;
+
+ /* There could be multiple concurrent I/O requests for this
+ tablespace (multiple threads trying to extend this tablespace).
+
+ Also, fil_space_set_recv_size_and_flags() may have been invoked
+ again during the file extension while fil_system.mutex was not
+ being held by us.
+
+ Only if recv_size matches what we read originally, reset the
+ field. In this way, a subsequent I/O request will handle any
+ pending fil_space_set_recv_size_and_flags(). */
+
+ if (desired_size == recv_size)
+ {
+ recv_size= 0;
+ goto clear;
+ }
+ }
+ else
+clear:
+ n_pending.fetch_and(~CLOSING, std::memory_order_relaxed);
+
+ if (!have_mutex)
+ mutex_exit(&fil_system.mutex);
+ return is_open;
+}
+
+/** Try to extend a tablespace if it is smaller than the specified size.
+@param[in,out] space tablespace
+@param[in] size desired size in pages
+@return whether the tablespace is at least as big as requested */
+bool fil_space_extend(fil_space_t *space, uint32_t size)
+{
+ ut_ad(!srv_read_only_mode || space->purpose == FIL_TYPE_TEMPORARY);
+ bool success= false;
+ const bool acquired= space->acquire();
+ mutex_enter(&fil_system.mutex);
+ if (acquired || space->is_being_truncated)
+ {
+ while (fil_space_extend_must_retry(space, UT_LIST_GET_LAST(space->chain),
+ size, &success))
+ mutex_enter(&fil_system.mutex);
+ }
+ mutex_exit(&fil_system.mutex);
+ if (acquired)
+ space->release();
+ return success;
+}
+
+/** Prepare to free a file from fil_system. */
+inline pfs_os_file_t fil_node_t::close_to_free(bool detach_handle)
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ ut_a(magic_n == FIL_NODE_MAGIC_N);
+ ut_a(!being_extended);
+
+ if (is_open() &&
+ (space->n_pending.fetch_or(fil_space_t::CLOSING,
+ std::memory_order_acquire) &
+ fil_space_t::PENDING))
+ {
+ mutex_exit(&fil_system.mutex);
+ while (space->referenced())
+ os_thread_sleep(100);
+ mutex_enter(&fil_system.mutex);
+ }
+
+ while (is_open())
+ {
+ if (space->is_in_unflushed_spaces)
+ {
+ ut_ad(srv_file_flush_method != SRV_O_DIRECT_NO_FSYNC);
+ space->is_in_unflushed_spaces= false;
+ fil_system.unflushed_spaces.remove(*space);
+ }
+
+ ut_a(!being_extended);
+ if (detach_handle)
+ {
+ auto result= handle;
+ handle= OS_FILE_CLOSED;
+ return result;
+ }
+ bool ret= os_file_close(handle);
+ ut_a(ret);
+ handle= OS_FILE_CLOSED;
+ break;
+ }
+
+ return OS_FILE_CLOSED;
+}
+
+/** Detach a tablespace from the cache and close the files. */
+std::vector<pfs_os_file_t> fil_system_t::detach(fil_space_t *space,
+ bool detach_handle)
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+ HASH_DELETE(fil_space_t, hash, &spaces, space->id, space);
+
+ if (space->is_in_unflushed_spaces)
+ {
+ ut_ad(srv_file_flush_method != SRV_O_DIRECT_NO_FSYNC);
+ space->is_in_unflushed_spaces= false;
+ unflushed_spaces.remove(*space);
+ }
+
+ if (space->is_in_default_encrypt)
+ {
+ space->is_in_default_encrypt= false;
+ default_encrypt_tables.remove(*space);
+ }
+ UT_LIST_REMOVE(space_list, space);
+ if (space == sys_space)
+ sys_space= nullptr;
+ else if (space == temp_space)
+ temp_space= nullptr;
+
+ ut_a(space->magic_n == FIL_SPACE_MAGIC_N);
+
+ for (fil_node_t* node= UT_LIST_GET_FIRST(space->chain); node;
+ node= UT_LIST_GET_NEXT(chain, node))
+ if (node->is_open())
+ {
+ ut_ad(n_open > 0);
+ n_open--;
+ }
+
+ std::vector<pfs_os_file_t> handles;
+ handles.reserve(UT_LIST_GET_LEN(space->chain));
+
+ for (fil_node_t* node= UT_LIST_GET_FIRST(space->chain); node;
+ node= UT_LIST_GET_NEXT(chain, node))
+ {
+ auto handle= node->close_to_free(detach_handle);
+ if (handle != OS_FILE_CLOSED)
+ handles.push_back(handle);
+ }
+
+ ut_ad(!space->referenced());
+ return handles;
+}
+
+/** Free a tablespace object on which fil_system_t::detach() was invoked.
+There must not be any pending i/o's or flushes on the files.
+@param[in,out] space tablespace */
+static
+void
+fil_space_free_low(
+ fil_space_t* space)
+{
+ /* The tablespace must not be in fil_system.named_spaces. */
+ ut_ad(srv_fast_shutdown == 2 || !srv_was_started
+ || space->max_lsn == 0);
+
+ /* Wait for fil_space_t::release() after
+ fil_system_t::detach(), the tablespace cannot be found, so
+ fil_space_t::get() would return NULL */
+ while (space->referenced()) {
+ os_thread_sleep(100);
+ }
+
+ for (fil_node_t* node = UT_LIST_GET_FIRST(space->chain);
+ node != NULL; ) {
+ ut_d(space->size -= node->size);
+ ut_free(node->name);
+ fil_node_t* old_node = node;
+ node = UT_LIST_GET_NEXT(chain, node);
+ ut_free(old_node);
+ }
+
+ ut_ad(space->size == 0);
+
+ rw_lock_free(&space->latch);
+ fil_space_destroy_crypt_data(&space->crypt_data);
+
+ space->~fil_space_t();
+ ut_free(space->name);
+ ut_free(space);
+}
+
+/** Frees a space object from the tablespace memory cache.
+Closes the files in the chain but does not delete them.
+There must not be any pending i/o's or flushes on the files.
+@param[in] id tablespace identifier
+@param[in] x_latched whether the caller holds X-mode space->latch
+@return true if success */
+bool
+fil_space_free(
+ ulint id,
+ bool x_latched)
+{
+ ut_ad(id != TRX_SYS_SPACE);
+
+ mutex_enter(&fil_system.mutex);
+ fil_space_t* space = fil_space_get_by_id(id);
+
+ if (space != NULL) {
+ fil_system.detach(space);
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ if (space != NULL) {
+ if (x_latched) {
+ rw_lock_x_unlock(&space->latch);
+ }
+
+ if (!recv_recovery_is_on()) {
+ mysql_mutex_lock(&log_sys.mutex);
+ }
+
+ mysql_mutex_assert_owner(&log_sys.mutex);
+
+ if (space->max_lsn != 0) {
+ ut_d(space->max_lsn = 0);
+ UT_LIST_REMOVE(fil_system.named_spaces, space);
+ }
+
+ if (!recv_recovery_is_on()) {
+ mysql_mutex_unlock(&log_sys.mutex);
+ }
+
+ fil_space_free_low(space);
+ }
+
+ return(space != NULL);
+}
+
+/** Create a tablespace in fil_system.
+@param name tablespace name
+@param id tablespace identifier
+@param flags tablespace flags
+@param purpose tablespace purpose
+@param crypt_data encryption information
+@param mode encryption mode
+@return pointer to created tablespace, to be filled in with add()
+@retval nullptr on failure (such as when the same tablespace exists) */
+fil_space_t *fil_space_t::create(const char *name, ulint id, ulint flags,
+ fil_type_t purpose,
+ fil_space_crypt_t *crypt_data,
+ fil_encryption_t mode)
+{
+ fil_space_t* space;
+
+ ut_ad(fil_system.is_initialised());
+ ut_ad(fil_space_t::is_valid_flags(flags & ~FSP_FLAGS_MEM_MASK, id));
+ ut_ad(srv_page_size == UNIV_PAGE_SIZE_ORIG || flags != 0);
+
+ DBUG_EXECUTE_IF("fil_space_create_failure", return(NULL););
+
+ /* FIXME: if calloc() is defined as an inline function that calls
+ memset() or bzero(), then GCC 6 -flifetime-dse can optimize it away */
+ space= new (ut_zalloc_nokey(sizeof(*space))) fil_space_t;
+
+ space->id = id;
+ space->name = mem_strdup(name);
+
+ UT_LIST_INIT(space->chain, &fil_node_t::chain);
+
+ space->purpose = purpose;
+ space->flags = flags;
+
+ space->magic_n = FIL_SPACE_MAGIC_N;
+ space->crypt_data = crypt_data;
+ space->n_pending.store(CLOSING, std::memory_order_relaxed);
+
+ DBUG_LOG("tablespace",
+ "Created metadata for " << id << " name " << name);
+ if (crypt_data) {
+ DBUG_LOG("crypt",
+ "Tablespace " << id << " name " << name
+ << " encryption " << crypt_data->encryption
+ << " key id " << crypt_data->key_id
+ << ":" << fil_crypt_get_mode(crypt_data)
+ << " " << fil_crypt_get_type(crypt_data));
+ }
+
+ rw_lock_create(fil_space_latch_key, &space->latch, SYNC_FSP);
+
+ if (space->purpose == FIL_TYPE_TEMPORARY) {
+ /* SysTablespace::open_or_create() would pass
+ size!=0 to fil_space_t::add(), so first_time_open
+ would not hold in fil_node_open_file(), and we
+ must assign this manually. We do not care about
+ the durability or atomicity of writes to the
+ temporary tablespace files. */
+ space->atomic_write_supported = true;
+ }
+
+ mutex_enter(&fil_system.mutex);
+
+ if (const fil_space_t *old_space = fil_space_get_by_id(id)) {
+ ib::error() << "Trying to add tablespace '" << name
+ << "' with id " << id
+ << " to the tablespace memory cache, but tablespace '"
+ << old_space->name << "' already exists in the cache!";
+ mutex_exit(&fil_system.mutex);
+ rw_lock_free(&space->latch);
+ space->~fil_space_t();
+ ut_free(space->name);
+ ut_free(space);
+ return(NULL);
+ }
+
+ HASH_INSERT(fil_space_t, hash, &fil_system.spaces, id, space);
+
+ UT_LIST_ADD_LAST(fil_system.space_list, space);
+
+ switch (id) {
+ case 0:
+ ut_ad(!fil_system.sys_space);
+ fil_system.sys_space = space;
+ break;
+ case SRV_TMP_SPACE_ID:
+ ut_ad(!fil_system.temp_space);
+ fil_system.temp_space = space;
+ break;
+ default:
+ ut_ad(purpose != FIL_TYPE_TEMPORARY);
+ if (UNIV_LIKELY(id <= fil_system.max_assigned_id)) {
+ break;
+ }
+ if (!fil_system.space_id_reuse_warned) {
+ ib::warn() << "Allocated tablespace ID " << id
+ << " for " << name << ", old maximum was "
+ << fil_system.max_assigned_id;
+ }
+
+ fil_system.max_assigned_id = id;
+ }
+
+ const bool rotate =
+ (purpose == FIL_TYPE_TABLESPACE
+ && (mode == FIL_ENCRYPTION_ON
+ || mode == FIL_ENCRYPTION_OFF || srv_encrypt_tables)
+ && fil_crypt_must_default_encrypt());
+
+ /* Inform key rotation that there could be something
+ to do */
+ if (rotate) {
+ /* Key rotation is not enabled, need to inform background
+ encryption threads. */
+ fil_system.default_encrypt_tables.push_back(*space);
+ space->is_in_default_encrypt = true;
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ if (rotate && srv_n_fil_crypt_threads_started) {
+ os_event_set(fil_crypt_threads_event);
+ }
+
+ return(space);
+}
+
+/*******************************************************************//**
+Assigns a new space id for a new single-table tablespace. This works simply by
+incrementing the global counter. If 4 billion id's is not enough, we may need
+to recycle id's.
+@return true if assigned, false if not */
+bool
+fil_assign_new_space_id(
+/*====================*/
+ ulint* space_id) /*!< in/out: space id */
+{
+ ulint id;
+ bool success;
+
+ mutex_enter(&fil_system.mutex);
+
+ id = *space_id;
+
+ if (id < fil_system.max_assigned_id) {
+ id = fil_system.max_assigned_id;
+ }
+
+ id++;
+
+ if (id > (SRV_SPACE_ID_UPPER_BOUND / 2) && (id % 1000000UL == 0)) {
+ ib::warn() << "You are running out of new single-table"
+ " tablespace id's. Current counter is " << id
+ << " and it must not exceed" <<SRV_SPACE_ID_UPPER_BOUND
+ << "! To reset the counter to zero you have to dump"
+ " all your tables and recreate the whole InnoDB"
+ " installation.";
+ }
+
+ success = (id < SRV_SPACE_ID_UPPER_BOUND);
+
+ if (success) {
+ *space_id = fil_system.max_assigned_id = id;
+ } else {
+ ib::warn() << "You have run out of single-table tablespace"
+ " id's! Current counter is " << id
+ << ". To reset the counter to zero"
+ " you have to dump all your tables and"
+ " recreate the whole InnoDB installation.";
+ *space_id = ULINT_UNDEFINED;
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ return(success);
+}
+
+/** Read the first page of a data file.
+@return whether the page was found valid */
+bool fil_space_t::read_page0()
+{
+ ut_ad(fil_system.is_initialised());
+ ut_ad(mutex_own(&fil_system.mutex));
+ if (size)
+ return true;
+
+ fil_node_t *node= UT_LIST_GET_FIRST(chain);
+ if (!node)
+ return false;
+ ut_ad(!UT_LIST_GET_NEXT(chain, node));
+
+ if (UNIV_UNLIKELY(acquire_low() & STOPPING))
+ {
+ ut_ad("this should not happen" == 0);
+ return false;
+ }
+ const bool ok= node->is_open() || fil_node_open_file(node);
+ release();
+ return ok;
+}
+
+/** Look up a tablespace and ensure that its first page has been validated. */
+static fil_space_t *fil_space_get_space(ulint id)
+{
+ if (fil_space_t *space= fil_space_get_by_id(id))
+ if (space->read_page0())
+ return space;
+ return nullptr;
+}
+
+void fil_space_set_recv_size_and_flags(ulint id, uint32_t size, uint32_t flags)
+{
+ ut_ad(id < SRV_SPACE_ID_UPPER_BOUND);
+ mutex_enter(&fil_system.mutex);
+ if (fil_space_t *space= fil_space_get_space(id))
+ {
+ if (size)
+ space->recv_size= size;
+ if (flags != FSP_FLAGS_FCRC32_MASK_MARKER)
+ space->flags= flags;
+ }
+ mutex_exit(&fil_system.mutex);
+}
+
+/** Open each file. Never invoked on .ibd files.
+@param create_new_db whether to skip the call to fil_node_t::read_page0()
+@return whether all files were opened */
+bool fil_space_t::open(bool create_new_db)
+{
+ ut_ad(fil_system.is_initialised());
+ ut_ad(!id || create_new_db);
+
+ bool success= true;
+ bool skip_read= create_new_db;
+
+ mutex_enter(&fil_system.mutex);
+
+ for (fil_node_t *node= UT_LIST_GET_FIRST(chain); node;
+ node= UT_LIST_GET_NEXT(chain, node))
+ {
+ if (!node->is_open() && !fil_node_open_file_low(node))
+ {
+err_exit:
+ success= false;
+ break;
+ }
+
+ if (create_new_db)
+ continue;
+ if (skip_read)
+ {
+ size+= node->size;
+ continue;
+ }
+
+ if (!node->read_page0())
+ {
+ fil_system.n_open--;
+ os_file_close(node->handle);
+ node->handle= OS_FILE_CLOSED;
+ goto err_exit;
+ }
+
+ skip_read= true;
+ }
+
+ if (!create_new_db)
+ committed_size= size;
+ mutex_exit(&fil_system.mutex);
+ return success;
+}
+
+/** Close each file. Only invoked on fil_system.temp_space. */
+void fil_space_t::close()
+{
+ if (!fil_system.is_initialised()) {
+ return;
+ }
+
+ mutex_enter(&fil_system.mutex);
+ ut_ad(this == fil_system.temp_space
+ || srv_operation == SRV_OPERATION_BACKUP
+ || srv_operation == SRV_OPERATION_RESTORE
+ || srv_operation == SRV_OPERATION_RESTORE_DELTA);
+
+ for (fil_node_t* node = UT_LIST_GET_FIRST(chain);
+ node != NULL;
+ node = UT_LIST_GET_NEXT(chain, node)) {
+ if (node->is_open()) {
+ node->close();
+ }
+ }
+
+ mutex_exit(&fil_system.mutex);
+}
+
+void fil_system_t::create(ulint hash_size)
+{
+ ut_ad(this == &fil_system);
+ ut_ad(!is_initialised());
+ ut_ad(!(srv_page_size % FSP_EXTENT_SIZE));
+ ut_ad(srv_page_size);
+ ut_ad(!spaces.array);
+
+ m_initialised = true;
+
+ compile_time_assert(!(UNIV_PAGE_SIZE_MAX % FSP_EXTENT_SIZE_MAX));
+ compile_time_assert(!(UNIV_PAGE_SIZE_MIN % FSP_EXTENT_SIZE_MIN));
+
+ ut_ad(hash_size > 0);
+
+ mutex_create(LATCH_ID_FIL_SYSTEM, &mutex);
+
+ spaces.create(hash_size);
+
+ fil_space_crypt_init();
+#ifdef UNIV_LINUX
+ ssd.clear();
+ char fn[sizeof(dirent::d_name)
+ + sizeof "/sys/block/" "/queue/rotational"];
+ const size_t sizeof_fnp = (sizeof fn) - sizeof "/sys/block";
+ memcpy(fn, "/sys/block/", sizeof "/sys/block");
+ char* fnp = &fn[sizeof "/sys/block"];
+
+ std::set<std::string> ssd_devices;
+ if (DIR* d = opendir("/sys/block")) {
+ while (struct dirent* e = readdir(d)) {
+ if (e->d_name[0] == '.') {
+ continue;
+ }
+ snprintf(fnp, sizeof_fnp, "%s/queue/rotational",
+ e->d_name);
+ int f = open(fn, O_RDONLY);
+ if (f == -1) {
+ continue;
+ }
+ char b[sizeof "4294967295:4294967295\n"];
+ ssize_t l = read(f, b, sizeof b);
+ ::close(f);
+ if (l != 2 || memcmp("0\n", b, 2)) {
+ continue;
+ }
+ snprintf(fnp, sizeof_fnp, "%s/dev", e->d_name);
+ f = open(fn, O_RDONLY);
+ if (f == -1) {
+ continue;
+ }
+ l = read(f, b, sizeof b);
+ ::close(f);
+ if (l <= 0 || b[l - 1] != '\n') {
+ continue;
+ }
+ b[l - 1] = '\0';
+ char* end = b;
+ unsigned long dev_major = strtoul(b, &end, 10);
+ if (b == end || *end != ':'
+ || dev_major != unsigned(dev_major)) {
+ continue;
+ }
+ char* c = end + 1;
+ unsigned long dev_minor = strtoul(c, &end, 10);
+ if (c == end || *end
+ || dev_minor != unsigned(dev_minor)) {
+ continue;
+ }
+ ssd.push_back(makedev(unsigned(dev_major),
+ unsigned(dev_minor)));
+ }
+ closedir(d);
+ }
+ /* fil_system_t::is_ssd() assumes the following */
+ ut_ad(makedev(0, 8) == 8);
+ ut_ad(makedev(0, 4) == 4);
+ ut_ad(makedev(0, 2) == 2);
+ ut_ad(makedev(0, 1) == 1);
+#endif
+}
+
+void fil_system_t::close()
+{
+ ut_ad(this == &fil_system);
+ ut_a(unflushed_spaces.empty());
+ ut_a(!UT_LIST_GET_LEN(space_list));
+ ut_ad(!sys_space);
+ ut_ad(!temp_space);
+
+ if (is_initialised())
+ {
+ m_initialised= false;
+ spaces.free();
+ mutex_free(&mutex);
+ fil_space_crypt_cleanup();
+ }
+
+ ut_ad(!spaces.array);
+
+#ifdef UNIV_LINUX
+ ssd.clear();
+ ssd.shrink_to_fit();
+#endif /* UNIV_LINUX */
+}
+
+/** Extend all open data files to the recovered size */
+ATTRIBUTE_COLD void fil_system_t::extend_to_recv_size()
+{
+ ut_ad(is_initialised());
+ mutex_enter(&mutex);
+ for (fil_space_t *space= UT_LIST_GET_FIRST(fil_system.space_list); space;
+ space= UT_LIST_GET_NEXT(space_list, space))
+ {
+ const uint32_t size= space->recv_size;
+
+ if (size > space->size)
+ {
+ if (space->is_closing())
+ continue;
+ space->reacquire();
+ bool success;
+ while (fil_space_extend_must_retry(space, UT_LIST_GET_LAST(space->chain),
+ size, &success))
+ mutex_enter(&mutex);
+ /* Crash recovery requires the file extension to succeed. */
+ ut_a(success);
+ space->release();
+ }
+ }
+ mutex_exit(&mutex);
+}
+
+/** Close all tablespace files at shutdown */
+void fil_space_t::close_all()
+{
+ if (!fil_system.is_initialised()) {
+ return;
+ }
+
+ fil_space_t* space;
+
+ /* At shutdown, we should not have any files in this list. */
+ ut_ad(srv_fast_shutdown == 2
+ || !srv_was_started
+ || UT_LIST_GET_LEN(fil_system.named_spaces) == 0);
+ fil_flush_file_spaces();
+
+ mutex_enter(&fil_system.mutex);
+
+ for (space = UT_LIST_GET_FIRST(fil_system.space_list); space; ) {
+ fil_node_t* node;
+ fil_space_t* prev_space = space;
+
+ for (node = UT_LIST_GET_FIRST(space->chain);
+ node != NULL;
+ node = UT_LIST_GET_NEXT(chain, node)) {
+
+ if (!node->is_open()) {
+next:
+ continue;
+ }
+
+ for (ulint count = 10000; count--; ) {
+ if (!space->set_closing()) {
+ node->close();
+ goto next;
+ }
+ mutex_exit(&fil_system.mutex);
+ os_thread_sleep(100);
+ mutex_enter(&fil_system.mutex);
+ if (!node->is_open()) {
+ goto next;
+ }
+ }
+
+ ib::error() << "File '" << node->name
+ << "' has " << space->referenced()
+ << " operations";
+ }
+
+ space = UT_LIST_GET_NEXT(space_list, space);
+ fil_system.detach(prev_space);
+ fil_space_free_low(prev_space);
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ ut_ad(srv_fast_shutdown == 2
+ || !srv_was_started
+ || UT_LIST_GET_LEN(fil_system.named_spaces) == 0);
+}
+
+/*******************************************************************//**
+Sets the max tablespace id counter if the given number is bigger than the
+previous value. */
+void
+fil_set_max_space_id_if_bigger(
+/*===========================*/
+ ulint max_id) /*!< in: maximum known id */
+{
+ if (max_id >= SRV_SPACE_ID_UPPER_BOUND) {
+ ib::fatal() << "Max tablespace id is too high, " << max_id;
+ }
+
+ mutex_enter(&fil_system.mutex);
+
+ if (fil_system.max_assigned_id < max_id) {
+
+ fil_system.max_assigned_id = max_id;
+ }
+
+ mutex_exit(&fil_system.mutex);
+}
+
+/** Write the flushed LSN to the page header of the first page in the
+system tablespace.
+@param[in] lsn flushed LSN
+@return DB_SUCCESS or error number */
+dberr_t
+fil_write_flushed_lsn(
+ lsn_t lsn)
+{
+ byte* buf;
+ ut_ad(!srv_read_only_mode);
+
+ if (!fil_system.sys_space->acquire()) {
+ return DB_ERROR;
+ }
+
+ buf = static_cast<byte*>(aligned_malloc(srv_page_size, srv_page_size));
+
+ auto fio = fil_system.sys_space->io(IORequestRead, 0, srv_page_size,
+ buf);
+
+ if (fio.err == DB_SUCCESS) {
+ mach_write_to_8(buf + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
+ lsn);
+
+ ulint fsp_flags = mach_read_from_4(
+ buf + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS);
+
+ if (fil_space_t::full_crc32(fsp_flags)) {
+ buf_flush_assign_full_crc32_checksum(buf);
+ }
+
+ fio = fil_system.sys_space->io(IORequestWrite,
+ 0, srv_page_size, buf);
+ fil_flush_file_spaces();
+ } else {
+ fil_system.sys_space->release();
+ }
+
+ aligned_free(buf);
+ return fio.err;
+}
+
+/** Acquire a tablespace reference.
+@param id tablespace identifier
+@return tablespace
+@retval nullptr if the tablespace is missing or inaccessible */
+fil_space_t *fil_space_t::get(ulint id)
+{
+ mutex_enter(&fil_system.mutex);
+ fil_space_t *space= fil_space_get_by_id(id);
+ const uint32_t n= space ? space->acquire_low() : 0;
+ mutex_exit(&fil_system.mutex);
+
+ if (n & STOPPING)
+ space= nullptr;
+
+ if ((n & CLOSING) && !space->prepare())
+ space= nullptr;
+
+ return space;
+}
+
+/** Write a log record about a file operation.
+@param type file operation
+@param first_page_no first page number in the file
+@param path file path
+@param new_path new file path for type=FILE_RENAME */
+inline void mtr_t::log_file_op(mfile_type_t type, ulint space_id,
+ const char *path, const char *new_path)
+{
+ ut_ad((new_path != nullptr) == (type == FILE_RENAME));
+ ut_ad(!(byte(type) & 15));
+
+ /* fil_name_parse() requires that there be at least one path
+ separator and that the file path end with ".ibd". */
+ ut_ad(strchr(path, OS_PATH_SEPARATOR) != NULL);
+ ut_ad(!strcmp(&path[strlen(path) - strlen(DOT_IBD)], DOT_IBD));
+
+ flag_modified();
+ if (m_log_mode != MTR_LOG_ALL)
+ return;
+ m_last= nullptr;
+
+ const size_t len= strlen(path);
+ const size_t new_len= type == FILE_RENAME ? 1 + strlen(new_path) : 0;
+ ut_ad(len > 0);
+ byte *const log_ptr= m_log.open(1 + 3/*length*/ + 5/*space_id*/ +
+ 1/*page_no=0*/);
+ byte *end= log_ptr + 1;
+ end= mlog_encode_varint(end, space_id);
+ *end++= 0;
+ if (UNIV_LIKELY(end + len + new_len >= &log_ptr[16]))
+ {
+ *log_ptr= type;
+ size_t total_len= len + new_len + end - log_ptr - 15;
+ if (total_len >= MIN_3BYTE)
+ total_len+= 2;
+ else if (total_len >= MIN_2BYTE)
+ total_len++;
+ end= mlog_encode_varint(log_ptr + 1, total_len);
+ end= mlog_encode_varint(end, space_id);
+ *end++= 0;
+ }
+ else
+ {
+ *log_ptr= static_cast<byte>(type | (end + len + new_len - &log_ptr[1]));
+ ut_ad(*log_ptr & 15);
+ }
+
+ m_log.close(end);
+
+ if (type == FILE_RENAME)
+ {
+ ut_ad(strchr(new_path, OS_PATH_SEPARATOR));
+ m_log.push(reinterpret_cast<const byte*>(path), uint32_t(len + 1));
+ m_log.push(reinterpret_cast<const byte*>(new_path), uint32_t(new_len));
+ }
+ else
+ m_log.push(reinterpret_cast<const byte*>(path), uint32_t(len));
+}
+
+/** Write redo log for renaming a file.
+@param[in] space_id tablespace id
+@param[in] old_name tablespace file name
+@param[in] new_name tablespace file name after renaming
+@param[in,out] mtr mini-transaction */
+static
+void
+fil_name_write_rename_low(
+ ulint space_id,
+ const char* old_name,
+ const char* new_name,
+ mtr_t* mtr)
+{
+ ut_ad(!is_predefined_tablespace(space_id));
+ mtr->log_file_op(FILE_RENAME, space_id, old_name, new_name);
+}
+
+/** Write redo log for renaming a file.
+@param[in] space_id tablespace id
+@param[in] old_name tablespace file name
+@param[in] new_name tablespace file name after renaming */
+static void
+fil_name_write_rename(
+ ulint space_id,
+ const char* old_name,
+ const char* new_name)
+{
+ mtr_t mtr;
+ mtr.start();
+ fil_name_write_rename_low(space_id, old_name, new_name, &mtr);
+ mtr.commit();
+ log_write_up_to(mtr.commit_lsn(), true);
+}
+
+/** Write FILE_MODIFY for a file.
+@param[in] space_id tablespace id
+@param[in] name tablespace file name
+@param[in,out] mtr mini-transaction */
+static
+void
+fil_name_write(
+ ulint space_id,
+ const char* name,
+ mtr_t* mtr)
+{
+ ut_ad(!is_predefined_tablespace(space_id));
+ mtr->log_file_op(FILE_MODIFY, space_id, name);
+}
+
+/** Check for pending operations.
+@param[in] space tablespace
+@param[in] count number of attempts so far
+@return 0 if no operations else count + 1. */
+static ulint fil_check_pending_ops(const fil_space_t* space, ulint count)
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+
+ if (!space) {
+ return 0;
+ }
+
+ if (auto n_pending_ops = space->referenced()) {
+
+ /* Give a warning every 10 second, starting after 1 second */
+ if ((count % 500) == 50) {
+ ib::warn() << "Trying to delete"
+ " tablespace '" << space->name
+ << "' but there are " << n_pending_ops
+ << " pending operations on it.";
+ }
+
+ return(count + 1);
+ }
+
+ return(0);
+}
+
+/*******************************************************************//**
+Check for pending IO.
+@return 0 if no pending else count + 1. */
+static
+ulint
+fil_check_pending_io(
+/*=================*/
+ fil_space_t* space, /*!< in/out: Tablespace to check */
+ fil_node_t** node, /*!< out: Node in space list */
+ ulint count) /*!< in: number of attempts so far */
+{
+ ut_ad(mutex_own(&fil_system.mutex));
+
+ /* The following code must change when InnoDB supports
+ multiple datafiles per tablespace. */
+ ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
+
+ *node = UT_LIST_GET_FIRST(space->chain);
+
+ if (const uint32_t p = space->referenced()) {
+ ut_a(!(*node)->being_extended);
+
+ /* Give a warning every 10 second, starting after 1 second */
+ if ((count % 500) == 50) {
+ ib::info() << "Trying to delete"
+ " tablespace '" << space->name
+ << "' but there are " << p
+ << " pending i/o's on it.";
+ }
+
+ return(count + 1);
+ }
+
+ return(0);
+}
+
+/*******************************************************************//**
+Check pending operations on a tablespace.
+@return tablespace */
+static
+fil_space_t*
+fil_check_pending_operations(
+/*=========================*/
+ ulint id, /*!< in: space id */
+ bool truncate, /*!< in: whether to truncate a file */
+ char** path) /*!< out/own: tablespace path */
+{
+ ulint count = 0;
+
+ ut_a(!is_system_tablespace(id));
+ mutex_enter(&fil_system.mutex);
+ fil_space_t* sp = fil_space_get_by_id(id);
+
+ if (sp) {
+ sp->set_stopping(true);
+ if (sp->crypt_data) {
+ sp->reacquire();
+ mutex_exit(&fil_system.mutex);
+ fil_space_crypt_close_tablespace(sp);
+ mutex_enter(&fil_system.mutex);
+ sp->release();
+ }
+ }
+
+ /* Check for pending operations. */
+
+ do {
+ count = fil_check_pending_ops(sp, count);
+
+ mutex_exit(&fil_system.mutex);
+
+ if (count) {
+ os_thread_sleep(20000); // Wait 0.02 seconds
+ } else if (!sp) {
+ return nullptr;
+ }
+
+ mutex_enter(&fil_system.mutex);
+
+ sp = fil_space_get_by_id(id);
+ } while (count);
+
+ /* Check for pending IO. */
+
+ for (;;) {
+ if (truncate) {
+ sp->is_being_truncated = true;
+ }
+
+ fil_node_t* node;
+
+ count = fil_check_pending_io(sp, &node, count);
+
+ if (count == 0 && path) {
+ *path = mem_strdup(node->name);
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ if (count == 0) {
+ break;
+ }
+
+ os_thread_sleep(20000); // Wait 0.02 seconds
+ mutex_enter(&fil_system.mutex);
+ sp = fil_space_get_by_id(id);
+
+ if (!sp) {
+ mutex_exit(&fil_system.mutex);
+ break;
+ }
+ }
+
+ return sp;
+}
+
+/** Close a single-table tablespace on failed IMPORT TABLESPACE.
+The tablespace must be cached in the memory cache.
+Free all pages used by the tablespace. */
+void fil_close_tablespace(ulint id)
+{
+ ut_ad(!is_system_tablespace(id));
+ char* path = nullptr;
+ fil_space_t* space = fil_check_pending_operations(id, false, &path);
+ if (!space) {
+ return;
+ }
+
+ rw_lock_x_lock(&space->latch);
+
+ /* Invalidate in the buffer pool all pages belonging to the
+ tablespace. Since we have invoked space->set_stopping(), readahead
+ can no longer read more pages of this tablespace to buf_pool.
+ Thus we can clean the tablespace out of buf_pool
+ completely and permanently. */
+ while (buf_flush_list_space(space));
+ ut_ad(space->is_stopping());
+
+ /* If the free is successful, the X lock will be released before
+ the space memory data structure is freed. */
+
+ if (!fil_space_free(id, true)) {
+ rw_lock_x_unlock(&space->latch);
+ }
+
+ /* If it is a delete then also delete any generated files, otherwise
+ when we drop the database the remove directory will fail. */
+
+ if (char* cfg_name = fil_make_filepath(path, NULL, CFG, false)) {
+ os_file_delete_if_exists(innodb_data_file_key, cfg_name, NULL);
+ ut_free(cfg_name);
+ }
+
+ ut_free(path);
+}
+
+/** Delete a tablespace and associated .ibd file.
+@param[in] id tablespace identifier
+@param[in] if_exists whether to ignore missing tablespace
+@param[in,out] detached_handles return detached handles if not nullptr
+@return DB_SUCCESS or error */
+dberr_t fil_delete_tablespace(ulint id, bool if_exists,
+ std::vector<pfs_os_file_t>* detached_handles)
+{
+ char* path = NULL;
+ ut_ad(!is_system_tablespace(id));
+ ut_ad(!detached_handles || detached_handles->empty());
+
+ dberr_t err;
+ fil_space_t *space = fil_check_pending_operations(id, false, &path);
+
+ if (!space) {
+ err = DB_TABLESPACE_NOT_FOUND;
+ if (!if_exists) {
+ ib::error() << "Cannot delete tablespace " << id
+ << " because it is not found"
+ " in the tablespace memory cache.";
+ }
+
+ goto func_exit;
+ }
+
+ /* IMPORTANT: Because we have set space::stop_new_ops there
+ can't be any new reads or flushes. We are here
+ because node::n_pending was zero above. However, it is still
+ possible to have pending read and write requests:
+
+ A read request can happen because the reader thread has
+ gone through the ::stop_new_ops check in buf_page_init_for_read()
+ before the flag was set and has not yet incremented ::n_pending
+ when we checked it above.
+
+ A write request can be issued any time because we don't check
+ fil_space_t::is_stopping() when queueing a block for write.
+
+ We deal with pending write requests in the following function
+ where we'd minimally evict all dirty pages belonging to this
+ space from the flush_list. Note that if a block is IO-fixed
+ we'll wait for IO to complete.
+
+ To deal with potential read requests, we will check the
+ is_stopping() in fil_space_t::io(). */
+
+ err = DB_SUCCESS;
+ buf_flush_remove_pages(id);
+
+ /* If it is a delete then also delete any generated files, otherwise
+ when we drop the database the remove directory will fail. */
+ {
+ /* Before deleting the file, write a log record about
+ it, so that InnoDB crash recovery will expect the file
+ to be gone. */
+ mtr_t mtr;
+
+ mtr.start();
+ mtr.log_file_op(FILE_DELETE, id, path);
+ mtr.commit();
+ /* Even if we got killed shortly after deleting the
+ tablespace file, the record must have already been
+ written to the redo log. */
+ log_write_up_to(mtr.commit_lsn(), true);
+
+ char* cfg_name = fil_make_filepath(path, NULL, CFG, false);
+ if (cfg_name != NULL) {
+ os_file_delete_if_exists(innodb_data_file_key, cfg_name, NULL);
+ ut_free(cfg_name);
+ }
+ }
+
+ /* Delete the link file pointing to the ibd file we are deleting. */
+ if (FSP_FLAGS_HAS_DATA_DIR(space->flags)) {
+ RemoteDatafile::delete_link_file(space->name);
+ }
+
+ mutex_enter(&fil_system.mutex);
+
+ /* Double check the sanity of pending ops after reacquiring
+ the fil_system::mutex. */
+ if (const fil_space_t* s = fil_space_get_by_id(id)) {
+ ut_a(s == space);
+ ut_a(!space->referenced());
+ ut_a(UT_LIST_GET_LEN(space->chain) == 1);
+ auto handles = fil_system.detach(space,
+ detached_handles != nullptr);
+ if (detached_handles) {
+ *detached_handles = std::move(handles);
+ }
+ mutex_exit(&fil_system.mutex);
+
+ mysql_mutex_lock(&log_sys.mutex);
+
+ if (space->max_lsn != 0) {
+ ut_d(space->max_lsn = 0);
+ UT_LIST_REMOVE(fil_system.named_spaces, space);
+ }
+
+ mysql_mutex_unlock(&log_sys.mutex);
+ fil_space_free_low(space);
+
+ if (!os_file_delete(innodb_data_file_key, path)
+ && !os_file_delete_if_exists(
+ innodb_data_file_key, path, NULL)) {
+
+ /* Note: This is because we have removed the
+ tablespace instance from the cache. */
+
+ err = DB_IO_ERROR;
+ }
+ } else {
+ mutex_exit(&fil_system.mutex);
+ err = DB_TABLESPACE_NOT_FOUND;
+ }
+
+func_exit:
+ ut_free(path);
+ ibuf_delete_for_discarded_space(id);
+ return(err);
+}
+
+/** Prepare to truncate an undo tablespace.
+@param[in] space_id undo tablespace id
+@return the tablespace
+@retval NULL if tablespace not found */
+fil_space_t *fil_truncate_prepare(ulint space_id)
+{
+ return fil_check_pending_operations(space_id, true, nullptr);
+}
+
+/*******************************************************************//**
+Allocates and builds a file name from a path, a table or tablespace name
+and a suffix. The string must be freed by caller with ut_free().
+@param[in] path NULL or the directory path or the full path and filename.
+@param[in] name NULL if path is full, or Table/Tablespace name
+@param[in] suffix NULL or the file extention to use.
+@param[in] trim_name true if the last name on the path should be trimmed.
+@return own: file name */
+char*
+fil_make_filepath(
+ const char* path,
+ const char* name,
+ ib_extention ext,
+ bool trim_name)
+{
+ /* The path may contain the basename of the file, if so we do not
+ need the name. If the path is NULL, we can use the default path,
+ but there needs to be a name. */
+ ut_ad(path != NULL || name != NULL);
+
+ /* If we are going to strip a name off the path, there better be a
+ path and a new name to put back on. */
+ ut_ad(!trim_name || (path != NULL && name != NULL));
+
+ if (path == NULL) {
+ path = fil_path_to_mysql_datadir;
+ }
+
+ ulint len = 0; /* current length */
+ ulint path_len = strlen(path);
+ ulint name_len = (name ? strlen(name) : 0);
+ const char* suffix = dot_ext[ext];
+ ulint suffix_len = strlen(suffix);
+ ulint full_len = path_len + 1 + name_len + suffix_len + 1;
+
+ char* full_name = static_cast<char*>(ut_malloc_nokey(full_len));
+ if (full_name == NULL) {
+ return NULL;
+ }
+
+ /* If the name is a relative path, do not prepend "./". */
+ if (path[0] == '.'
+ && (path[1] == '\0' || path[1] == OS_PATH_SEPARATOR)
+ && name != NULL && name[0] == '.') {
+ path = NULL;
+ path_len = 0;
+ }
+
+ if (path != NULL) {
+ memcpy(full_name, path, path_len);
+ len = path_len;
+ full_name[len] = '\0';
+ os_normalize_path(full_name);
+ }
+
+ if (trim_name) {
+ /* Find the offset of the last DIR separator and set it to
+ null in order to strip off the old basename from this path. */
+ char* last_dir_sep = strrchr(full_name, OS_PATH_SEPARATOR);
+ if (last_dir_sep) {
+ last_dir_sep[0] = '\0';
+ len = strlen(full_name);
+ }
+ }
+
+ if (name != NULL) {
+ if (len && full_name[len - 1] != OS_PATH_SEPARATOR) {
+ /* Add a DIR separator */
+ full_name[len] = OS_PATH_SEPARATOR;
+ full_name[++len] = '\0';
+ }
+
+ char* ptr = &full_name[len];
+ memcpy(ptr, name, name_len);
+ len += name_len;
+ full_name[len] = '\0';
+ os_normalize_path(ptr);
+ }
+
+ /* Make sure that the specified suffix is at the end of the filepath
+ string provided. This assumes that the suffix starts with '.'.
+ If the first char of the suffix is found in the filepath at the same
+ length as the suffix from the end, then we will assume that there is
+ a previous suffix that needs to be replaced. */
+ if (suffix != NULL) {
+ /* Need room for the trailing null byte. */
+ ut_ad(len < full_len);
+
+ if ((len > suffix_len)
+ && (full_name[len - suffix_len] == suffix[0])) {
+ /* Another suffix exists, make it the one requested. */
+ memcpy(&full_name[len - suffix_len], suffix, suffix_len);
+
+ } else {
+ /* No previous suffix, add it. */
+ ut_ad(len + suffix_len < full_len);
+ memcpy(&full_name[len], suffix, suffix_len);
+ full_name[len + suffix_len] = '\0';
+ }
+ }
+
+ return(full_name);
+}
+
+/** Test if a tablespace file can be renamed to a new filepath by checking
+if that the old filepath exists and the new filepath does not exist.
+@param[in] old_path old filepath
+@param[in] new_path new filepath
+@param[in] replace_new whether to ignore the existence of new_path
+@return innodb error code */
+static dberr_t
+fil_rename_tablespace_check(
+ const char* old_path,
+ const char* new_path,
+ bool replace_new)
+{
+ bool exists = false;
+ os_file_type_t ftype;
+
+ if (os_file_status(old_path, &exists, &ftype) && !exists) {
+ ib::error() << "Cannot rename '" << old_path
+ << "' to '" << new_path
+ << "' because the source file"
+ << " does not exist.";
+ return(DB_TABLESPACE_NOT_FOUND);
+ }
+
+ exists = false;
+ if (os_file_status(new_path, &exists, &ftype) && !exists) {
+ return DB_SUCCESS;
+ }
+
+ if (!replace_new) {
+ ib::error() << "Cannot rename '" << old_path
+ << "' to '" << new_path
+ << "' because the target file exists."
+ " Remove the target file and try again.";
+ return(DB_TABLESPACE_EXISTS);
+ }
+
+ /* This must be during the ROLLBACK of TRUNCATE TABLE.
+ Because InnoDB only allows at most one data dictionary
+ transaction at a time, and because this incomplete TRUNCATE
+ would have created a new tablespace file, we must remove
+ a possibly existing tablespace that is associated with the
+ new tablespace file. */
+retry:
+ mutex_enter(&fil_system.mutex);
+ for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
+ space; space = UT_LIST_GET_NEXT(space_list, space)) {
+ ulint id = space->id;
+ if (id
+ && space->purpose == FIL_TYPE_TABLESPACE
+ && !strcmp(new_path,
+ UT_LIST_GET_FIRST(space->chain)->name)) {
+ ib::info() << "TRUNCATE rollback: " << id
+ << "," << new_path;
+ mutex_exit(&fil_system.mutex);
+ dberr_t err = fil_delete_tablespace(id);
+ if (err != DB_SUCCESS) {
+ return err;
+ }
+ goto retry;
+ }
+ }
+ mutex_exit(&fil_system.mutex);
+ fil_delete_file(new_path);
+
+ return(DB_SUCCESS);
+}
+
+dberr_t fil_space_t::rename(const char* name, const char* path, bool log,
+ bool replace)
+{
+ ut_ad(UT_LIST_GET_LEN(chain) == 1);
+ ut_ad(!is_system_tablespace(id));
+
+ if (log) {
+ dberr_t err = fil_rename_tablespace_check(
+ chain.start->name, path, replace);
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+ fil_name_write_rename(id, chain.start->name, path);
+ }
+
+ return fil_rename_tablespace(id, chain.start->name, name, path)
+ ? DB_SUCCESS : DB_ERROR;
+}
+
+/** Rename a single-table tablespace.
+The tablespace must exist in the memory cache.
+@param[in] id tablespace identifier
+@param[in] old_path old file name
+@param[in] new_name new table name in the
+databasename/tablename format
+@param[in] new_path_in new file name,
+or NULL if it is located in the normal data directory
+@return true if success */
+static bool
+fil_rename_tablespace(
+ ulint id,
+ const char* old_path,
+ const char* new_name,
+ const char* new_path_in)
+{
+ fil_space_t* space;
+ fil_node_t* node;
+ ut_a(id != 0);
+
+ ut_ad(strchr(new_name, '/') != NULL);
+
+ mutex_enter(&fil_system.mutex);
+
+ space = fil_space_get_by_id(id);
+
+ if (space == NULL) {
+ ib::error() << "Cannot find space id " << id
+ << " in the tablespace memory cache, though the file '"
+ << old_path
+ << "' in a rename operation should have that id.";
+ mutex_exit(&fil_system.mutex);
+ return(false);
+ }
+
+ /* The following code must change when InnoDB supports
+ multiple datafiles per tablespace. */
+ ut_a(UT_LIST_GET_LEN(space->chain) == 1);
+ node = UT_LIST_GET_FIRST(space->chain);
+ space->reacquire();
+
+ mutex_exit(&fil_system.mutex);
+
+ char* new_file_name = new_path_in == NULL
+ ? fil_make_filepath(NULL, new_name, IBD, false)
+ : mem_strdup(new_path_in);
+ char* old_file_name = node->name;
+ char* new_space_name = mem_strdup(new_name);
+ char* old_space_name = space->name;
+
+ ut_ad(strchr(old_file_name, OS_PATH_SEPARATOR) != NULL);
+ ut_ad(strchr(new_file_name, OS_PATH_SEPARATOR) != NULL);
+
+ if (!recv_recovery_is_on()) {
+ mysql_mutex_lock(&log_sys.mutex);
+ }
+
+ /* log_sys.mutex is above fil_system.mutex in the latching order */
+ mysql_mutex_assert_owner(&log_sys.mutex);
+ mutex_enter(&fil_system.mutex);
+ space->release();
+ ut_ad(space->name == old_space_name);
+ ut_ad(node->name == old_file_name);
+ bool success;
+ DBUG_EXECUTE_IF("fil_rename_tablespace_failure_2",
+ goto skip_second_rename; );
+ success = os_file_rename(innodb_data_file_key,
+ old_file_name,
+ new_file_name);
+ DBUG_EXECUTE_IF("fil_rename_tablespace_failure_2",
+skip_second_rename:
+ success = false; );
+
+ ut_ad(node->name == old_file_name);
+
+ if (success) {
+ node->name = new_file_name;
+ }
+
+ if (!recv_recovery_is_on()) {
+ mysql_mutex_unlock(&log_sys.mutex);
+ }
+
+ ut_ad(space->name == old_space_name);
+ if (success) {
+ space->name = new_space_name;
+ } else {
+ /* Because nothing was renamed, we must free the new
+ names, not the old ones. */
+ old_file_name = new_file_name;
+ old_space_name = new_space_name;
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ ut_free(old_file_name);
+ ut_free(old_space_name);
+
+ return(success);
+}
+
+/* FIXME: remove this! */
+IF_WIN(, bool os_is_sparse_file_supported(os_file_t fh));
+
+/** Create a tablespace file.
+@param[in] space_id Tablespace ID
+@param[in] name Tablespace name in dbname/tablename format.
+@param[in] path Path and filename of the datafile to create.
+@param[in] flags Tablespace flags
+@param[in] size Initial size of the tablespace file in pages,
+must be >= FIL_IBD_FILE_INITIAL_SIZE
+@param[in] mode MariaDB encryption mode
+@param[in] key_id MariaDB encryption key_id
+@param[out] err DB_SUCCESS or error code
+@return the created tablespace
+@retval NULL on error */
+fil_space_t*
+fil_ibd_create(
+ ulint space_id,
+ const char* name,
+ const char* path,
+ ulint flags,
+ uint32_t size,
+ fil_encryption_t mode,
+ uint32_t key_id,
+ dberr_t* err)
+{
+ pfs_os_file_t file;
+ byte* page;
+ bool success;
+ bool has_data_dir = FSP_FLAGS_HAS_DATA_DIR(flags) != 0;
+
+ ut_ad(!is_system_tablespace(space_id));
+ ut_ad(!srv_read_only_mode);
+ ut_a(space_id < SRV_SPACE_ID_UPPER_BOUND);
+ ut_a(size >= FIL_IBD_FILE_INITIAL_SIZE);
+ ut_a(fil_space_t::is_valid_flags(flags & ~FSP_FLAGS_MEM_MASK, space_id));
+
+ /* Create the subdirectories in the path, if they are
+ not there already. */
+ *err = os_file_create_subdirs_if_needed(path);
+ if (*err != DB_SUCCESS) {
+ return NULL;
+ }
+
+ ulint type;
+ static_assert(((UNIV_ZIP_SIZE_MIN >> 1) << 3) == 4096,
+ "compatibility");
+ switch (FSP_FLAGS_GET_ZIP_SSIZE(flags)) {
+ case 1:
+ case 2:
+ type = OS_DATA_FILE_NO_O_DIRECT;
+ break;
+ default:
+ type = OS_DATA_FILE;
+ }
+
+ file = os_file_create(
+ innodb_data_file_key, path,
+ OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT,
+ OS_FILE_AIO, type, srv_read_only_mode, &success);
+
+ if (!success) {
+ /* The following call will print an error message */
+ switch (os_file_get_last_error(true)) {
+ case OS_FILE_ALREADY_EXISTS:
+ ib::info() << "The file '" << path << "'"
+ " already exists though the"
+ " corresponding table did not exist"
+ " in the InnoDB data dictionary."
+ " You can resolve the problem by removing"
+ " the file.";
+ *err = DB_TABLESPACE_EXISTS;
+ break;
+ case OS_FILE_DISK_FULL:
+ *err = DB_OUT_OF_FILE_SPACE;
+ break;
+ default:
+ *err = DB_ERROR;
+ }
+ ib::error() << "Cannot create file '" << path << "'";
+ return NULL;
+ }
+
+ const bool is_compressed = fil_space_t::is_compressed(flags);
+ bool punch_hole = is_compressed;
+ fil_space_crypt_t* crypt_data = nullptr;
+#ifdef _WIN32
+ if (is_compressed) {
+ os_file_set_sparse_win32(file);
+ }
+#endif
+
+ if (!os_file_set_size(
+ path, file,
+ os_offset_t(size) << srv_page_size_shift, is_compressed)) {
+ *err = DB_OUT_OF_FILE_SPACE;
+err_exit:
+ os_file_close(file);
+ os_file_delete(innodb_data_file_key, path);
+ free(crypt_data);
+ return NULL;
+ }
+
+ /* FIXME: remove this */
+ IF_WIN(, punch_hole = punch_hole && os_is_sparse_file_supported(file));
+
+ /* We have to write the space id to the file immediately and flush the
+ file to disk. This is because in crash recovery we must be aware what
+ tablespaces exist and what are their space id's, so that we can apply
+ the log records to the right file. It may take quite a while until
+ buffer pool flush algorithms write anything to the file and flush it to
+ disk. If we would not write here anything, the file would be filled
+ with zeros from the call of os_file_set_size(), until a buffer pool
+ flush would write to it. */
+
+ /* Align the memory for file i/o if we might have O_DIRECT set */
+ page = static_cast<byte*>(aligned_malloc(2 * srv_page_size,
+ srv_page_size));
+
+ memset(page, '\0', srv_page_size);
+
+ if (fil_space_t::full_crc32(flags)) {
+ flags |= FSP_FLAGS_FCRC32_PAGE_SSIZE();
+ } else {
+ flags |= FSP_FLAGS_PAGE_SSIZE();
+ }
+
+ fsp_header_init_fields(page, space_id, flags);
+ mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, space_id);
+
+ /* Create crypt data if the tablespace is either encrypted or user has
+ requested it to remain unencrypted. */
+ crypt_data = (mode != FIL_ENCRYPTION_DEFAULT || srv_encrypt_tables)
+ ? fil_space_create_crypt_data(mode, key_id)
+ : NULL;
+
+ if (crypt_data) {
+ /* Write crypt data information in page0 while creating
+ ibd file. */
+ crypt_data->fill_page0(flags, page);
+ }
+
+ if (ulint zip_size = fil_space_t::zip_size(flags)) {
+ page_zip_des_t page_zip;
+ page_zip_set_size(&page_zip, zip_size);
+ page_zip.data = page + srv_page_size;
+#ifdef UNIV_DEBUG
+ page_zip.m_start = 0;
+#endif /* UNIV_DEBUG */
+ page_zip.m_end = 0;
+ page_zip.m_nonempty = 0;
+ page_zip.n_blobs = 0;
+
+ buf_flush_init_for_writing(NULL, page, &page_zip, false);
+
+ *err = os_file_write(IORequestWrite, path, file,
+ page_zip.data, 0, zip_size);
+ } else {
+ buf_flush_init_for_writing(NULL, page, NULL,
+ fil_space_t::full_crc32(flags));
+
+ *err = os_file_write(IORequestWrite, path, file,
+ page, 0, srv_page_size);
+ }
+
+ aligned_free(page);
+
+ if (*err != DB_SUCCESS) {
+ ib::error()
+ << "Could not write the first page to"
+ << " tablespace '" << path << "'";
+ goto err_exit;
+ }
+
+ if (!os_file_flush(file)) {
+ ib::error() << "File flush of tablespace '"
+ << path << "' failed";
+ *err = DB_ERROR;
+ goto err_exit;
+ }
+
+ if (has_data_dir) {
+ /* Make the ISL file if the IBD file is not
+ in the default location. */
+ *err = RemoteDatafile::create_link_file(name, path);
+ if (*err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+
+ if (fil_space_t* space = fil_space_t::create(name, space_id, flags,
+ FIL_TYPE_TABLESPACE,
+ crypt_data, mode)) {
+ space->punch_hole = punch_hole;
+ fil_node_t* node = space->add(path, file, size, false, true);
+ mtr_t mtr;
+ mtr.start();
+ mtr.log_file_op(FILE_CREATE, space_id, node->name);
+ mtr.commit();
+
+ node->find_metadata(file);
+ *err = DB_SUCCESS;
+ return space;
+ }
+
+ if (has_data_dir) {
+ RemoteDatafile::delete_link_file(name);
+ }
+
+ *err = DB_ERROR;
+ goto err_exit;
+}
+
+/** Try to open a single-table tablespace and optionally check that the
+space id in it is correct. If this does not succeed, print an error message
+to the .err log. This function is used to open a tablespace when we start
+mysqld after the dictionary has been booted, and also in IMPORT TABLESPACE.
+
+NOTE that we assume this operation is used either at the database startup
+or under the protection of the dictionary mutex, so that two users cannot
+race here. This operation does not leave the file associated with the
+tablespace open, but closes it after we have looked at the space id in it.
+
+If the validate boolean is set, we read the first page of the file and
+check that the space id in the file is what we expect. We assume that
+this function runs much faster if no check is made, since accessing the
+file inode probably is much faster (the OS caches them) than accessing
+the first page of the file. This boolean may be initially false, but if
+a remote tablespace is found it will be changed to true.
+
+If the fix_dict boolean is set, then it is safe to use an internal SQL
+statement to update the dictionary tables if they are incorrect.
+
+@param[in] validate true if we should validate the tablespace
+@param[in] fix_dict true if the dictionary is available to be fixed
+@param[in] purpose FIL_TYPE_TABLESPACE or FIL_TYPE_TEMPORARY
+@param[in] id tablespace ID
+@param[in] flags expected FSP_SPACE_FLAGS
+@param[in] space_name tablespace name of the datafile
+If file-per-table, it is the table name in the databasename/tablename format
+@param[in] path_in expected filepath, usually read from dictionary
+@param[out] err DB_SUCCESS or error code
+@return tablespace
+@retval NULL if the tablespace could not be opened */
+fil_space_t*
+fil_ibd_open(
+ bool validate,
+ bool fix_dict,
+ fil_type_t purpose,
+ ulint id,
+ ulint flags,
+ const table_name_t& tablename,
+ const char* path_in,
+ dberr_t* err)
+{
+ mutex_enter(&fil_system.mutex);
+ if (fil_space_t* space = fil_space_get_by_id(id)) {
+ if (strcmp(space->name, tablename.m_name)) {
+ table_name_t space_name;
+ space_name.m_name = space->name;
+ ib::error()
+ << "Trying to open table " << tablename
+ << " with id " << id
+ << ", conflicting with " << space_name;
+ space = NULL;
+ if (err) *err = DB_TABLESPACE_EXISTS;
+ } else if (err) *err = DB_SUCCESS;
+
+ mutex_exit(&fil_system.mutex);
+
+ if (space && validate && !srv_read_only_mode) {
+ fsp_flags_try_adjust(space,
+ flags & ~FSP_FLAGS_MEM_MASK);
+ }
+
+ return space;
+ }
+ mutex_exit(&fil_system.mutex);
+
+ bool dict_filepath_same_as_default = false;
+ bool link_file_found = false;
+ bool link_file_is_bad = false;
+ Datafile df_default; /* default location */
+ Datafile df_dict; /* dictionary location */
+ RemoteDatafile df_remote; /* remote location */
+ ulint tablespaces_found = 0;
+ ulint valid_tablespaces_found = 0;
+
+ if (fix_dict) {
+ ut_d(dict_sys.assert_locked());
+ ut_ad(!srv_read_only_mode);
+ ut_ad(srv_log_file_size != 0);
+ }
+
+ /* Table flags can be ULINT_UNDEFINED if
+ dict_tf_to_fsp_flags_failure is set. */
+ if (flags == ULINT_UNDEFINED) {
+corrupted:
+ if (err) *err = DB_CORRUPTION;
+ return NULL;
+ }
+
+ ut_ad(fil_space_t::is_valid_flags(flags & ~FSP_FLAGS_MEM_MASK, id));
+ df_default.init(tablename.m_name, flags);
+ df_dict.init(tablename.m_name, flags);
+ df_remote.init(tablename.m_name, flags);
+
+ /* Discover the correct file by looking in three possible locations
+ while avoiding unecessary effort. */
+
+ /* We will always look for an ibd in the default location. */
+ df_default.make_filepath(NULL, tablename.m_name, IBD);
+
+ /* Look for a filepath embedded in an ISL where the default file
+ would be. */
+ if (df_remote.open_read_only(true) == DB_SUCCESS) {
+ ut_ad(df_remote.is_open());
+
+ /* Always validate a file opened from an ISL pointer */
+ validate = true;
+ ++tablespaces_found;
+ link_file_found = true;
+ } else if (df_remote.filepath() != NULL) {
+ /* An ISL file was found but contained a bad filepath in it.
+ Better validate anything we do find. */
+ validate = true;
+ }
+
+ /* Attempt to open the tablespace at the dictionary filepath. */
+ if (path_in) {
+ if (df_default.same_filepath_as(path_in)) {
+ dict_filepath_same_as_default = true;
+ } else {
+ /* Dict path is not the default path. Always validate
+ remote files. If default is opened, it was moved. */
+ validate = true;
+ df_dict.set_filepath(path_in);
+ if (df_dict.open_read_only(true) == DB_SUCCESS) {
+ ut_ad(df_dict.is_open());
+ ++tablespaces_found;
+ }
+ }
+ }
+
+ /* Always look for a file at the default location. But don't log
+ an error if the tablespace is already open in remote or dict. */
+ ut_a(df_default.filepath());
+ const bool strict = (tablespaces_found == 0);
+ if (df_default.open_read_only(strict) == DB_SUCCESS) {
+ ut_ad(df_default.is_open());
+ ++tablespaces_found;
+ }
+
+ /* Check if multiple locations point to the same file. */
+ if (tablespaces_found > 1 && df_default.same_as(df_remote)) {
+ /* A link file was found with the default path in it.
+ Use the default path and delete the link file. */
+ --tablespaces_found;
+ df_remote.delete_link_file();
+ df_remote.close();
+ }
+ if (tablespaces_found > 1 && df_default.same_as(df_dict)) {
+ --tablespaces_found;
+ df_dict.close();
+ }
+ if (tablespaces_found > 1 && df_remote.same_as(df_dict)) {
+ --tablespaces_found;
+ df_dict.close();
+ }
+
+ /* We have now checked all possible tablespace locations and
+ have a count of how many unique files we found. If things are
+ normal, we only found 1. */
+ /* For encrypted tablespace, we need to check the
+ encryption in header of first page. */
+ if (!validate && tablespaces_found == 1) {
+ goto skip_validate;
+ }
+
+ /* Read and validate the first page of these three tablespace
+ locations, if found. */
+ valid_tablespaces_found +=
+ (df_remote.validate_to_dd(id, flags) == DB_SUCCESS);
+
+ valid_tablespaces_found +=
+ (df_default.validate_to_dd(id, flags) == DB_SUCCESS);
+
+ valid_tablespaces_found +=
+ (df_dict.validate_to_dd(id, flags) == DB_SUCCESS);
+
+ /* Make sense of these three possible locations.
+ First, bail out if no tablespace files were found. */
+ if (valid_tablespaces_found == 0) {
+ os_file_get_last_error(true);
+ ib::error() << "Could not find a valid tablespace file for `"
+ << tablename << "`. " << TROUBLESHOOT_DATADICT_MSG;
+ goto corrupted;
+ }
+ if (!validate) {
+ goto skip_validate;
+ }
+
+ /* Do not open any tablespaces if more than one tablespace with
+ the correct space ID and flags were found. */
+ if (tablespaces_found > 1) {
+ ib::error() << "A tablespace for `" << tablename
+ << "` has been found in multiple places;";
+
+ if (df_default.is_open()) {
+ ib::error() << "Default location: "
+ << df_default.filepath()
+ << ", Space ID=" << df_default.space_id()
+ << ", Flags=" << df_default.flags();
+ }
+ if (df_remote.is_open()) {
+ ib::error() << "Remote location: "
+ << df_remote.filepath()
+ << ", Space ID=" << df_remote.space_id()
+ << ", Flags=" << df_remote.flags();
+ }
+ if (df_dict.is_open()) {
+ ib::error() << "Dictionary location: "
+ << df_dict.filepath()
+ << ", Space ID=" << df_dict.space_id()
+ << ", Flags=" << df_dict.flags();
+ }
+
+ /* Force-recovery will allow some tablespaces to be
+ skipped by REDO if there was more than one file found.
+ Unlike during the REDO phase of recovery, we now know
+ if the tablespace is valid according to the dictionary,
+ which was not available then. So if we did not force
+ recovery and there is only one good tablespace, ignore
+ any bad tablespaces. */
+ if (valid_tablespaces_found > 1 || srv_force_recovery > 0) {
+ ib::error() << "Will not open tablespace `"
+ << tablename << "`";
+
+ /* If the file is not open it cannot be valid. */
+ ut_ad(df_default.is_open() || !df_default.is_valid());
+ ut_ad(df_dict.is_open() || !df_dict.is_valid());
+ ut_ad(df_remote.is_open() || !df_remote.is_valid());
+
+ /* Having established that, this is an easy way to
+ look for corrupted data files. */
+ if (df_default.is_open() != df_default.is_valid()
+ || df_dict.is_open() != df_dict.is_valid()
+ || df_remote.is_open() != df_remote.is_valid()) {
+ goto corrupted;
+ }
+error:
+ if (err) *err = DB_ERROR;
+ return NULL;
+ }
+
+ /* There is only one valid tablespace found and we did
+ not use srv_force_recovery during REDO. Use this one
+ tablespace and clean up invalid tablespace pointers */
+ if (df_default.is_open() && !df_default.is_valid()) {
+ df_default.close();
+ tablespaces_found--;
+ }
+
+ if (df_dict.is_open() && !df_dict.is_valid()) {
+ df_dict.close();
+ /* Leave dict.filepath so that SYS_DATAFILES
+ can be corrected below. */
+ tablespaces_found--;
+ }
+
+ if (df_remote.is_open() && !df_remote.is_valid()) {
+ df_remote.close();
+ tablespaces_found--;
+ link_file_is_bad = true;
+ }
+ }
+
+ /* At this point, there should be only one filepath. */
+ ut_a(tablespaces_found == 1);
+ ut_a(valid_tablespaces_found == 1);
+
+ /* Only fix the dictionary at startup when there is only one thread.
+ Calls to dict_load_table() can be done while holding other latches. */
+ if (!fix_dict) {
+ goto skip_validate;
+ }
+
+ /* We may need to update what is stored in SYS_DATAFILES or
+ SYS_TABLESPACES or adjust the link file. Since a failure to
+ update SYS_TABLESPACES or SYS_DATAFILES does not prevent opening
+ and using the tablespace either this time or the next, we do not
+ check the return code or fail to open the tablespace. But if it
+ fails, dict_update_filepath() will issue a warning to the log. */
+ if (df_dict.filepath()) {
+ ut_ad(path_in != NULL);
+ ut_ad(df_dict.same_filepath_as(path_in));
+
+ if (df_remote.is_open()) {
+ if (!df_remote.same_filepath_as(path_in)) {
+ dict_update_filepath(id, df_remote.filepath());
+ }
+
+ } else if (df_default.is_open()) {
+ ut_ad(!dict_filepath_same_as_default);
+ dict_update_filepath(id, df_default.filepath());
+ if (link_file_is_bad) {
+ RemoteDatafile::delete_link_file(
+ tablename.m_name);
+ }
+
+ } else if (!link_file_found || link_file_is_bad) {
+ ut_ad(df_dict.is_open());
+ /* Fix the link file if we got our filepath
+ from the dictionary but a link file did not
+ exist or it did not point to a valid file. */
+ RemoteDatafile::delete_link_file(tablename.m_name);
+ RemoteDatafile::create_link_file(
+ tablename.m_name, df_dict.filepath());
+ }
+
+ } else if (df_remote.is_open()) {
+ if (dict_filepath_same_as_default) {
+ dict_update_filepath(id, df_remote.filepath());
+
+ } else if (path_in == NULL) {
+ /* SYS_DATAFILES record for this space ID
+ was not found. */
+ dict_replace_tablespace_and_filepath(
+ id, tablename.m_name,
+ df_remote.filepath(), flags);
+ }
+
+ } else if (df_default.is_open()) {
+ /* We opened the tablespace in the default location.
+ SYS_DATAFILES.PATH needs to be updated if it is different
+ from this default path or if the SYS_DATAFILES.PATH was not
+ supplied and it should have been. Also update the dictionary
+ if we found an ISL file (since !df_remote.is_open). Since
+ path_in is not suppled for file-per-table, we must assume
+ that it matched the ISL. */
+ if ((path_in != NULL && !dict_filepath_same_as_default)
+ || (path_in == NULL && DICT_TF_HAS_DATA_DIR(flags))
+ || df_remote.filepath() != NULL) {
+ dict_replace_tablespace_and_filepath(
+ id, tablename.m_name, df_default.filepath(),
+ flags);
+ }
+ }
+
+skip_validate:
+ const byte* first_page =
+ df_default.is_open() ? df_default.get_first_page() :
+ df_dict.is_open() ? df_dict.get_first_page() :
+ df_remote.get_first_page();
+
+ fil_space_crypt_t* crypt_data = first_page
+ ? fil_space_read_crypt_data(fil_space_t::zip_size(flags),
+ first_page)
+ : NULL;
+
+ fil_space_t* space = fil_space_t::create(
+ tablename.m_name, id, flags, purpose, crypt_data);
+ if (!space) {
+ goto error;
+ }
+
+ /* We do not measure the size of the file, that is why
+ we pass the 0 below */
+
+ space->add(
+ df_remote.is_open() ? df_remote.filepath() :
+ df_dict.is_open() ? df_dict.filepath() :
+ df_default.filepath(), OS_FILE_CLOSED, 0, false, true);
+
+ if (validate && !srv_read_only_mode) {
+ df_remote.close();
+ df_dict.close();
+ df_default.close();
+ if (space->acquire()) {
+ if (purpose != FIL_TYPE_IMPORT) {
+ fsp_flags_try_adjust(space, flags
+ & ~FSP_FLAGS_MEM_MASK);
+ }
+ space->release();
+ }
+ }
+
+ if (err) *err = DB_SUCCESS;
+ return space;
+}
+
+/** Looks for a pre-existing fil_space_t with the given tablespace ID
+and, if found, returns the name and filepath in newly allocated buffers
+that the caller must free.
+@param[in] space_id The tablespace ID to search for.
+@param[out] name Name of the tablespace found.
+@param[out] filepath The filepath of the first datafile for the
+tablespace.
+@return true if tablespace is found, false if not. */
+bool
+fil_space_read_name_and_filepath(
+ ulint space_id,
+ char** name,
+ char** filepath)
+{
+ bool success = false;
+ *name = NULL;
+ *filepath = NULL;
+
+ mutex_enter(&fil_system.mutex);
+
+ fil_space_t* space = fil_space_get_by_id(space_id);
+
+ if (space != NULL) {
+ *name = mem_strdup(space->name);
+
+ fil_node_t* node = UT_LIST_GET_FIRST(space->chain);
+ *filepath = mem_strdup(node->name);
+
+ success = true;
+ }
+
+ mutex_exit(&fil_system.mutex);
+
+ return(success);
+}
+
+/** Convert a file name to a tablespace name.
+@param[in] filename directory/databasename/tablename.ibd
+@return database/tablename string, to be freed with ut_free() */
+char*
+fil_path_to_space_name(
+ const char* filename)
+{
+ /* Strip the file name prefix and suffix, leaving
+ only databasename/tablename. */
+ ulint filename_len = strlen(filename);
+ const char* end = filename + filename_len;
+#ifdef HAVE_MEMRCHR
+ const char* tablename = 1 + static_cast<const char*>(
+ memrchr(filename, OS_PATH_SEPARATOR,
+ filename_len));
+ const char* dbname = 1 + static_cast<const char*>(
+ memrchr(filename, OS_PATH_SEPARATOR,
+ tablename - filename - 1));
+#else /* HAVE_MEMRCHR */
+ const char* tablename = filename;
+ const char* dbname = NULL;
+
+ while (const char* t = static_cast<const char*>(
+ memchr(tablename, OS_PATH_SEPARATOR,
+ ulint(end - tablename)))) {
+ dbname = tablename;
+ tablename = t + 1;
+ }
+#endif /* HAVE_MEMRCHR */
+
+ ut_ad(dbname != NULL);
+ ut_ad(tablename > dbname);
+ ut_ad(tablename < end);
+ ut_ad(end - tablename > 4);
+ ut_ad(memcmp(end - 4, DOT_IBD, 4) == 0);
+
+ char* name = mem_strdupl(dbname, ulint(end - dbname) - 4);
+
+ ut_ad(name[tablename - dbname - 1] == OS_PATH_SEPARATOR);
+#if OS_PATH_SEPARATOR != '/'
+ /* space->name uses '/', not OS_PATH_SEPARATOR. */
+ name[tablename - dbname - 1] = '/';
+#endif
+
+ return(name);
+}
+
+/** Discover the correct IBD file to open given a remote or missing
+filepath from the REDO log. Administrators can move a crashed
+database to another location on the same machine and try to recover it.
+Remote IBD files might be moved as well to the new location.
+ The problem with this is that the REDO log contains the old location
+which may be still accessible. During recovery, if files are found in
+both locations, we can chose on based on these priorities;
+1. Default location
+2. ISL location
+3. REDO location
+@param[in] space_id tablespace ID
+@param[in] df Datafile object with path from redo
+@return true if a valid datafile was found, false if not */
+static
+bool
+fil_ibd_discover(
+ ulint space_id,
+ Datafile& df)
+{
+ Datafile df_def_per; /* default file-per-table datafile */
+ RemoteDatafile df_rem_per; /* remote file-per-table datafile */
+
+ /* Look for the datafile in the default location. */
+ const char* filename = df.filepath();
+ const char* basename = base_name(filename);
+
+ /* If this datafile is file-per-table it will have a schema dir. */
+ ulint sep_found = 0;
+ const char* db = basename;
+ for (; db > filename && sep_found < 2; db--) {
+ if (db[0] == OS_PATH_SEPARATOR) {
+ sep_found++;
+ }
+ }
+ if (sep_found == 2) {
+ db += 2;
+ df_def_per.init(db, 0);
+ df_def_per.make_filepath(NULL, db, IBD);
+ if (df_def_per.open_read_only(false) == DB_SUCCESS
+ && df_def_per.validate_for_recovery() == DB_SUCCESS
+ && df_def_per.space_id() == space_id) {
+ df.set_filepath(df_def_per.filepath());
+ df.open_read_only(false);
+ return(true);
+ }
+
+ /* Look for a remote file-per-table tablespace. */
+
+ switch (srv_operation) {
+ case SRV_OPERATION_BACKUP:
+ case SRV_OPERATION_RESTORE_DELTA:
+ ut_ad(0);
+ break;
+ case SRV_OPERATION_RESTORE_EXPORT:
+ case SRV_OPERATION_RESTORE:
+ break;
+ case SRV_OPERATION_NORMAL:
+ df_rem_per.set_name(db);
+ if (df_rem_per.open_link_file() != DB_SUCCESS) {
+ break;
+ }
+
+ /* An ISL file was found with contents. */
+ if (df_rem_per.open_read_only(false) != DB_SUCCESS
+ || df_rem_per.validate_for_recovery()
+ != DB_SUCCESS) {
+
+ /* Assume that this ISL file is intended to
+ be used. Do not continue looking for another
+ if this file cannot be opened or is not
+ a valid IBD file. */
+ ib::error() << "ISL file '"
+ << df_rem_per.link_filepath()
+ << "' was found but the linked file '"
+ << df_rem_per.filepath()
+ << "' could not be opened or is"
+ " not correct.";
+ return(false);
+ }
+
+ /* Use this file if it has the space_id from the
+ MLOG record. */
+ if (df_rem_per.space_id() == space_id) {
+ df.set_filepath(df_rem_per.filepath());
+ df.open_read_only(false);
+ return(true);
+ }
+
+ /* Since old MLOG records can use the same basename
+ in multiple CREATE/DROP TABLE sequences, this ISL
+ file could be pointing to a later version of this
+ basename.ibd file which has a different space_id.
+ Keep looking. */
+ }
+ }
+
+ /* No ISL files were found in the default location. Use the location
+ given in the redo log. */
+ if (df.open_read_only(false) == DB_SUCCESS
+ && df.validate_for_recovery() == DB_SUCCESS
+ && df.space_id() == space_id) {
+ return(true);
+ }
+
+ /* A datafile was not discovered for the filename given. */
+ return(false);
+}
+/** Open an ibd tablespace and add it to the InnoDB data structures.
+This is similar to fil_ibd_open() except that it is used while processing
+the REDO log, so the data dictionary is not available and very little
+validation is done. The tablespace name is extracred from the
+dbname/tablename.ibd portion of the filename, which assumes that the file
+is a file-per-table tablespace. Any name will do for now. General
+tablespace names will be read from the dictionary after it has been
+recovered. The tablespace flags are read at this time from the first page
+of the file in validate_for_recovery().
+@param[in] space_id tablespace ID
+@param[in] filename path/to/databasename/tablename.ibd
+@param[out] space the tablespace, or NULL on error
+@return status of the operation */
+enum fil_load_status
+fil_ibd_load(
+ ulint space_id,
+ const char* filename,
+ fil_space_t*& space)
+{
+ /* If the a space is already in the file system cache with this
+ space ID, then there is nothing to do. */
+ mutex_enter(&fil_system.mutex);
+ space = fil_space_get_by_id(space_id);
+ mutex_exit(&fil_system.mutex);
+
+ if (space) {
+ /* Compare the filename we are trying to open with the
+ filename from the first node of the tablespace we opened
+ previously. Fail if it is different. */
+ fil_node_t* node = UT_LIST_GET_FIRST(space->chain);
+ if (0 != strcmp(innobase_basename(filename),
+ innobase_basename(node->name))) {
+ ib::info()
+ << "Ignoring data file '" << filename
+ << "' with space ID " << space->id
+ << ". Another data file called " << node->name
+ << " exists with the same space ID.";
+ space = NULL;
+ return(FIL_LOAD_ID_CHANGED);
+ }
+ return(FIL_LOAD_OK);
+ }
+
+ if (srv_operation == SRV_OPERATION_RESTORE) {
+ /* Replace absolute DATA DIRECTORY file paths with
+ short names relative to the backup directory. */
+ if (const char* name = strrchr(filename, OS_PATH_SEPARATOR)) {
+ while (--name > filename
+ && *name != OS_PATH_SEPARATOR);
+ if (name > filename) {
+ filename = name + 1;
+ }
+ }
+ }
+
+ Datafile file;
+ file.set_filepath(filename);
+ file.open_read_only(false);
+
+ if (!file.is_open()) {
+ /* The file has been moved or it is a remote datafile. */
+ if (!fil_ibd_discover(space_id, file)
+ || !file.is_open()) {
+ return(FIL_LOAD_NOT_FOUND);
+ }
+ }
+
+ os_offset_t size;
+
+ /* Read and validate the first page of the tablespace.
+ Assign a tablespace name based on the tablespace type. */
+ switch (file.validate_for_recovery()) {
+ os_offset_t minimum_size;
+ case DB_SUCCESS:
+ if (file.space_id() != space_id) {
+ return(FIL_LOAD_ID_CHANGED);
+ }
+ /* Get and test the file size. */
+ size = os_file_get_size(file.handle());
+
+ /* Every .ibd file is created >= 4 pages in size.
+ Smaller files cannot be OK. */
+ minimum_size = os_offset_t(FIL_IBD_FILE_INITIAL_SIZE)
+ << srv_page_size_shift;
+
+ if (size == static_cast<os_offset_t>(-1)) {
+ /* The following call prints an error message */
+ os_file_get_last_error(true);
+
+ ib::error() << "Could not measure the size of"
+ " single-table tablespace file '"
+ << file.filepath() << "'";
+ } else if (size < minimum_size) {
+ ib::error() << "The size of tablespace file '"
+ << file.filepath() << "' is only " << size
+ << ", should be at least " << minimum_size
+ << "!";
+ } else {
+ /* Everything is fine so far. */
+ break;
+ }
+
+ /* fall through */
+
+ case DB_TABLESPACE_EXISTS:
+ return(FIL_LOAD_INVALID);
+
+ default:
+ return(FIL_LOAD_NOT_FOUND);
+ }
+
+ ut_ad(space == NULL);
+
+ /* Adjust the memory-based flags that would normally be set by
+ dict_tf_to_fsp_flags(). In recovery, we have no data dictionary. */
+ ulint flags = file.flags();
+ if (fil_space_t::is_compressed(flags)) {
+ flags |= page_zip_level
+ << FSP_FLAGS_MEM_COMPRESSION_LEVEL;
+ }
+
+ const byte* first_page = file.get_first_page();
+ fil_space_crypt_t* crypt_data = first_page
+ ? fil_space_read_crypt_data(fil_space_t::zip_size(flags),
+ first_page)
+ : NULL;
+ space = fil_space_t::create(
+ file.name(), space_id, flags, FIL_TYPE_TABLESPACE, crypt_data);
+
+ if (space == NULL) {
+ return(FIL_LOAD_INVALID);
+ }
+
+ ut_ad(space->id == file.space_id());
+ ut_ad(space->id == space_id);
+
+ /* We do not use the size information we have about the file, because
+ the rounding formula for extents and pages is somewhat complex; we
+ let fil_node_open() do that task. */
+
+ space->add(file.filepath(), OS_FILE_CLOSED, 0, false, false);
+
+ return(FIL_LOAD_OK);
+}
+
+/** Try to adjust FSP_SPACE_FLAGS if they differ from the expectations.
+(Typically when upgrading from MariaDB 10.1.0..10.1.20.)
+@param[in,out] space tablespace
+@param[in] flags desired tablespace flags */
+void fsp_flags_try_adjust(fil_space_t* space, ulint flags)
+{
+ ut_ad(!srv_read_only_mode);
+ ut_ad(fil_space_t::is_valid_flags(flags, space->id));
+ if (space->full_crc32() || fil_space_t::full_crc32(flags)) {
+ return;
+ }
+ if (!space->size && (space->purpose != FIL_TYPE_TABLESPACE
+ || !space->get_size())) {
+ return;
+ }
+ /* This code is executed during server startup while no
+ connections are allowed. We do not need to protect against
+ DROP TABLE by fil_space_acquire(). */
+ mtr_t mtr;
+ mtr.start();
+ if (buf_block_t* b = buf_page_get(
+ page_id_t(space->id, 0), space->zip_size(),
+ RW_X_LATCH, &mtr)) {
+ uint32_t f = fsp_header_get_flags(b->frame);
+ if (fil_space_t::full_crc32(f)) {
+ goto func_exit;
+ }
+ if (fil_space_t::is_flags_equal(f, flags)) {
+ goto func_exit;
+ }
+ /* Suppress the message if only the DATA_DIR flag to differs. */
+ if ((f ^ flags) & ~(1U << FSP_FLAGS_POS_RESERVED)) {
+ ib::warn()
+ << "adjusting FSP_SPACE_FLAGS of file '"
+ << UT_LIST_GET_FIRST(space->chain)->name
+ << "' from " << ib::hex(f)
+ << " to " << ib::hex(flags);
+ }
+ mtr.set_named_space(space);
+ mtr.write<4,mtr_t::FORCED>(*b,
+ FSP_HEADER_OFFSET + FSP_SPACE_FLAGS
+ + b->frame, flags);
+ }
+func_exit:
+ mtr.commit();
+}
+
+/** Determine if a matching tablespace exists in the InnoDB tablespace
+memory cache. Note that if we have not done a crash recovery at the database
+startup, there may be many tablespaces which are not yet in the memory cache.
+@param[in] id Tablespace ID
+@param[in] name Tablespace name used in fil_space_t::create().
+@param[in] table_flags table flags
+@return the tablespace
+@retval NULL if no matching tablespace exists in the memory cache */
+fil_space_t*
+fil_space_for_table_exists_in_mem(
+ ulint id,
+ const char* name,
+ ulint table_flags)
+{
+ const ulint expected_flags = dict_tf_to_fsp_flags(table_flags);
+
+ mutex_enter(&fil_system.mutex);
+ if (fil_space_t* space = fil_space_get_by_id(id)) {
+ ulint tf = expected_flags & ~FSP_FLAGS_MEM_MASK;
+ ulint sf = space->flags & ~FSP_FLAGS_MEM_MASK;
+
+ if (!fil_space_t::is_flags_equal(tf, sf)
+ && !fil_space_t::is_flags_equal(sf, tf)) {
+ goto func_exit;
+ }
+
+ if (strcmp(space->name, name)) {
+ ib::error() << "Table " << name
+ << " in InnoDB data dictionary"
+ " has tablespace id " << id
+ << ", but the tablespace"
+ " with that id has name " << space->name << "."
+ " Have you deleted or moved .ibd files?";
+ ib::info() << TROUBLESHOOT_DATADICT_MSG;
+ goto func_exit;
+ }
+
+ /* Adjust the flags that are in FSP_FLAGS_MEM_MASK.
+ FSP_SPACE_FLAGS will not be written back here. */
+ space->flags = (space->flags & ~FSP_FLAGS_MEM_MASK)
+ | (expected_flags & FSP_FLAGS_MEM_MASK);
+ mutex_exit(&fil_system.mutex);
+ if (!srv_read_only_mode) {
+ fsp_flags_try_adjust(space, expected_flags
+ & ~FSP_FLAGS_MEM_MASK);
+ }
+ return space;
+ }
+
+func_exit:
+ mutex_exit(&fil_system.mutex);
+ return NULL;
+}
+
+/*============================ FILE I/O ================================*/
+
+/** Report information about an invalid page access. */
+ATTRIBUTE_COLD __attribute__((noreturn))
+static void
+fil_report_invalid_page_access(const char *name,
+ os_offset_t offset, ulint len, bool is_read)
+{
+ ib::fatal() << "Trying to " << (is_read ? "read " : "write ") << len
+ << " bytes at " << offset
+ << " outside the bounds of the file: " << name;
+}
+
+
+/** Update the data structures on write completion */
+inline void fil_node_t::complete_write()
+{
+ ut_ad(!mutex_own(&fil_system.mutex));
+
+ if (space->purpose != FIL_TYPE_TEMPORARY &&
+ srv_file_flush_method != SRV_O_DIRECT_NO_FSYNC &&
+ space->set_needs_flush())
+ {
+ mutex_enter(&fil_system.mutex);
+ if (!space->is_in_unflushed_spaces)
+ {
+ space->is_in_unflushed_spaces= true;
+ fil_system.unflushed_spaces.push_front(*space);
+ }
+ mutex_exit(&fil_system.mutex);
+ }
+}
+
+/** Read or write data.
+@param type I/O context
+@param offset offset in bytes
+@param len number of bytes
+@param buf the data to be read or written
+@param bpage buffer block (for type.is_async() completion callback)
+@return status and file descriptor */
+fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
+ void *buf, buf_page_t *bpage)
+{
+ ut_ad(referenced());
+ ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0);
+ ut_ad((len % OS_FILE_LOG_BLOCK_SIZE) == 0);
+ ut_ad(fil_validate_skip());
+ ut_ad(type.is_read() || type.is_write());
+ ut_ad(type.type != IORequest::DBLWR_BATCH);
+
+ if (type.is_read()) {
+ srv_stats.data_read.add(len);
+ } else {
+ ut_ad(!srv_read_only_mode || this == fil_system.temp_space);
+ srv_stats.data_written.add(len);
+ }
+
+ fil_node_t* node= UT_LIST_GET_FIRST(chain);
+ ut_ad(node);
+
+ if (type.type == IORequest::READ_ASYNC && is_stopping()
+ && !is_being_truncated) {
+ release();
+ return {DB_TABLESPACE_DELETED, nullptr};
+ }
+
+ ulint p = static_cast<ulint>(offset >> srv_page_size_shift);
+
+ if (UNIV_LIKELY_NULL(UT_LIST_GET_NEXT(chain, node))) {
+ ut_ad(this == fil_system.sys_space
+ || this == fil_system.temp_space);
+ ut_ad(!(offset & ((1 << srv_page_size_shift) - 1)));
+
+ while (node->size <= p) {
+ p -= node->size;
+ node = UT_LIST_GET_NEXT(chain, node);
+ if (!node) {
+ if (type.type == IORequest::READ_ASYNC) {
+ release();
+ return {DB_ERROR, nullptr};
+ }
+ fil_report_invalid_page_access(name, offset,
+ len,
+ type.is_read());
+ }
+ }
+
+ offset = os_offset_t{p} << srv_page_size_shift;
+ }
+
+ if (UNIV_UNLIKELY(node->size <= p)) {
+ if (type.type == IORequest::READ_ASYNC) {
+ release();
+ /* If we can tolerate the non-existent pages, we
+ should return with DB_ERROR and let caller decide
+ what to do. */
+ return {DB_ERROR, nullptr};
+ }
+
+ fil_report_invalid_page_access(
+ node->name, offset, len, type.is_read());
+ }
+
+ dberr_t err;
+
+ if (type.type == IORequest::PUNCH_RANGE) {
+ err = os_file_punch_hole(node->handle, offset, len);
+ /* Punch hole is not supported, make space not to
+ support punch hole */
+ if (UNIV_UNLIKELY(err == DB_IO_NO_PUNCH_HOLE)) {
+ punch_hole = false;
+ err = DB_SUCCESS;
+ }
+ goto release_sync_write;
+ } else {
+ /* Queue the aio request */
+ err = os_aio(IORequest(bpage, node, type.type),
+ buf, offset, len);
+ }
+
+ /* We an try to recover the page from the double write buffer if
+ the decompression fails or the page is corrupt. */
+
+ ut_a(type.type == IORequest::DBLWR_RECOVER || err == DB_SUCCESS);
+ if (!type.is_async()) {
+ if (type.is_write()) {
+release_sync_write:
+ node->complete_write();
+release:
+ release();
+ }
+ ut_ad(fil_validate_skip());
+ }
+ if (err != DB_SUCCESS) {
+ goto release;
+ }
+ return {err, node};
+}
+
+#include <tpool.h>
+
+/** Callback for AIO completion */
+void fil_aio_callback(const IORequest &request)
+{
+ ut_ad(fil_validate_skip());
+ ut_ad(request.node);
+
+ if (!request.bpage)
+ {
+ ut_ad(!srv_read_only_mode);
+ if (request.type == IORequest::DBLWR_BATCH)
+ buf_dblwr.flush_buffered_writes_completed(request);
+ else
+ ut_ad(request.type == IORequest::WRITE_ASYNC);
+write_completed:
+ request.node->complete_write();
+ }
+ else if (request.is_write())
+ {
+ buf_page_write_complete(request);
+ goto write_completed;
+ }
+ else
+ {
+ ut_ad(request.is_read());
+
+ /* IMPORTANT: since i/o handling for reads will read also the insert
+ buffer in fil_system.sys_space, we have to be very careful not to
+ introduce deadlocks. We never close fil_system.sys_space data
+ files and never issue asynchronous reads of change buffer pages. */
+ const page_id_t id(request.bpage->id());
+
+ if (dberr_t err= buf_page_read_complete(request.bpage, *request.node))
+ {
+ if (recv_recovery_is_on() && !srv_force_recovery)
+ recv_sys.found_corrupt_fs= true;
+
+ ib::error() << "Failed to read page " << id.page_no()
+ << " from file '" << request.node->name << "': " << err;
+ }
+ }
+
+ request.node->space->release();
+}
+
+/** Flush to disk the writes in file spaces of the given type
+possibly cached by the OS. */
+void fil_flush_file_spaces()
+{
+ if (srv_file_flush_method == SRV_O_DIRECT_NO_FSYNC)
+ {
+ ut_d(mutex_enter(&fil_system.mutex));
+ ut_ad(fil_system.unflushed_spaces.empty());
+ ut_d(mutex_exit(&fil_system.mutex));
+ return;
+ }
+
+rescan:
+ mutex_enter(&fil_system.mutex);
+
+ for (fil_space_t &space : fil_system.unflushed_spaces)
+ {
+ if (space.needs_flush_not_stopping())
+ {
+ space.reacquire();
+ mutex_exit(&fil_system.mutex);
+ space.flush_low();
+ space.release();
+ goto rescan;
+ }
+ }
+
+ mutex_exit(&fil_system.mutex);
+}
+
+/** Functor to validate the file node list of a tablespace. */
+struct Check {
+ /** Total size of file nodes visited so far */
+ ulint size;
+ /** Total number of open files visited so far */
+ ulint n_open;
+
+ /** Constructor */
+ Check() : size(0), n_open(0) {}
+
+ /** Visit a file node
+ @param[in] elem file node to visit */
+ void operator()(const fil_node_t* elem)
+ {
+ n_open += elem->is_open();
+ size += elem->size;
+ }
+
+ /** Validate a tablespace.
+ @param[in] space tablespace to validate
+ @return number of open file nodes */
+ static ulint validate(const fil_space_t* space)
+ {
+ ut_ad(mutex_own(&fil_system.mutex));
+ Check check;
+ ut_list_validate(space->chain, check);
+ ut_a(space->size == check.size);
+
+ switch (space->id) {
+ case TRX_SYS_SPACE:
+ ut_ad(fil_system.sys_space == NULL
+ || fil_system.sys_space == space);
+ break;
+ case SRV_TMP_SPACE_ID:
+ ut_ad(fil_system.temp_space == NULL
+ || fil_system.temp_space == space);
+ break;
+ default:
+ break;
+ }
+
+ return(check.n_open);
+ }
+};
+
+/******************************************************************//**
+Checks the consistency of the tablespace cache.
+@return true if ok */
+bool fil_validate()
+{
+ ulint n_open = 0;
+
+ mutex_enter(&fil_system.mutex);
+
+ for (fil_space_t *space = UT_LIST_GET_FIRST(fil_system.space_list);
+ space != NULL;
+ space = UT_LIST_GET_NEXT(space_list, space)) {
+ n_open += Check::validate(space);
+ }
+
+ ut_a(fil_system.n_open == n_open);
+
+ mutex_exit(&fil_system.mutex);
+
+ return(true);
+}
+
+/*********************************************************************//**
+Sets the file page type. */
+void
+fil_page_set_type(
+/*==============*/
+ byte* page, /*!< in/out: file page */
+ ulint type) /*!< in: type */
+{
+ ut_ad(page);
+
+ mach_write_to_2(page + FIL_PAGE_TYPE, type);
+}
+
+/********************************************************************//**
+Delete the tablespace file and any related files like .cfg.
+This should not be called for temporary tables.
+@param[in] ibd_filepath File path of the IBD tablespace */
+void
+fil_delete_file(
+/*============*/
+ const char* ibd_filepath)
+{
+ /* Force a delete of any stale .ibd files that are lying around. */
+
+ ib::info() << "Deleting " << ibd_filepath;
+ os_file_delete_if_exists(innodb_data_file_key, ibd_filepath, NULL);
+
+ char* cfg_filepath = fil_make_filepath(
+ ibd_filepath, NULL, CFG, false);
+ if (cfg_filepath != NULL) {
+ os_file_delete_if_exists(
+ innodb_data_file_key, cfg_filepath, NULL);
+ ut_free(cfg_filepath);
+ }
+}
+
+#ifdef UNIV_DEBUG
+/** Check that a tablespace is valid for mtr_commit().
+@param[in] space persistent tablespace that has been changed */
+static
+void
+fil_space_validate_for_mtr_commit(
+ const fil_space_t* space)
+{
+ ut_ad(!mutex_own(&fil_system.mutex));
+ ut_ad(space != NULL);
+ ut_ad(space->purpose == FIL_TYPE_TABLESPACE);
+ ut_ad(!is_predefined_tablespace(space->id));
+
+ /* We are serving mtr_commit(). While there is an active
+ mini-transaction, we should have !space->stop_new_ops. This is
+ guaranteed by meta-data locks or transactional locks, or
+ dict_sys.latch (X-lock in DROP, S-lock in purge). */
+ ut_ad(!space->is_stopping()
+ || space->is_being_truncated /* fil_truncate_prepare() */
+ || space->referenced());
+}
+#endif /* UNIV_DEBUG */
+
+/** Write a FILE_MODIFY record for a persistent tablespace.
+@param[in] space tablespace
+@param[in,out] mtr mini-transaction */
+static
+void
+fil_names_write(
+ const fil_space_t* space,
+ mtr_t* mtr)
+{
+ ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
+ fil_name_write(space->id, UT_LIST_GET_FIRST(space->chain)->name, mtr);
+}
+
+/** Note that a non-predefined persistent tablespace has been modified
+by redo log.
+@param[in,out] space tablespace */
+void
+fil_names_dirty(
+ fil_space_t* space)
+{
+ mysql_mutex_assert_owner(&log_sys.mutex);
+ ut_ad(recv_recovery_is_on());
+ ut_ad(log_sys.get_lsn() != 0);
+ ut_ad(space->max_lsn == 0);
+ ut_d(fil_space_validate_for_mtr_commit(space));
+
+ UT_LIST_ADD_LAST(fil_system.named_spaces, space);
+ space->max_lsn = log_sys.get_lsn();
+}
+
+/** Write FILE_MODIFY records when a non-predefined persistent
+tablespace was modified for the first time since the latest
+fil_names_clear().
+@param[in,out] space tablespace */
+void fil_names_dirty_and_write(fil_space_t* space)
+{
+ mysql_mutex_assert_owner(&log_sys.mutex);
+ ut_d(fil_space_validate_for_mtr_commit(space));
+ ut_ad(space->max_lsn == log_sys.get_lsn());
+
+ UT_LIST_ADD_LAST(fil_system.named_spaces, space);
+ mtr_t mtr;
+ mtr.start();
+ fil_names_write(space, &mtr);
+
+ DBUG_EXECUTE_IF("fil_names_write_bogus",
+ {
+ char bogus_name[] = "./test/bogus file.ibd";
+ os_normalize_path(bogus_name);
+ fil_name_write(
+ SRV_SPACE_ID_UPPER_BOUND,
+ bogus_name, &mtr);
+ });
+
+ mtr.commit_files();
+}
+
+/** On a log checkpoint, reset fil_names_dirty_and_write() flags
+and write out FILE_MODIFY and FILE_CHECKPOINT if needed.
+@param[in] lsn checkpoint LSN
+@param[in] do_write whether to always write FILE_CHECKPOINT
+@return whether anything was written to the redo log
+@retval false if no flags were set and nothing written
+@retval true if anything was written to the redo log */
+bool
+fil_names_clear(
+ lsn_t lsn,
+ bool do_write)
+{
+ mtr_t mtr;
+ ulint mtr_checkpoint_size = RECV_SCAN_SIZE - 1;
+
+ DBUG_EXECUTE_IF(
+ "increase_mtr_checkpoint_size",
+ mtr_checkpoint_size = 75 * 1024;
+ );
+
+ mysql_mutex_assert_owner(&log_sys.mutex);
+ ut_ad(lsn);
+
+ mtr.start();
+
+ for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.named_spaces);
+ space != NULL; ) {
+ if (mtr.get_log()->size()
+ + (3 + 5 + 1) + strlen(space->chain.start->name)
+ >= mtr_checkpoint_size) {
+ /* Prevent log parse buffer overflow */
+ mtr.commit_files();
+ mtr.start();
+ }
+
+ fil_space_t* next = UT_LIST_GET_NEXT(named_spaces, space);
+
+ ut_ad(space->max_lsn > 0);
+ if (space->max_lsn < lsn) {
+ /* The tablespace was last dirtied before the
+ checkpoint LSN. Remove it from the list, so
+ that if the tablespace is not going to be
+ modified any more, subsequent checkpoints will
+ avoid calling fil_names_write() on it. */
+ space->max_lsn = 0;
+ UT_LIST_REMOVE(fil_system.named_spaces, space);
+ }
+
+ /* max_lsn is the last LSN where fil_names_dirty_and_write()
+ was called. If we kept track of "min_lsn" (the first LSN
+ where max_lsn turned nonzero), we could avoid the
+ fil_names_write() call if min_lsn > lsn. */
+
+ fil_names_write(space, &mtr);
+ do_write = true;
+
+ space = next;
+ }
+
+ if (do_write) {
+ mtr.commit_files(lsn);
+ } else {
+ ut_ad(!mtr.has_modifications());
+ }
+
+ return(do_write);
+}
+
+/* Unit Tests */
+#ifdef UNIV_ENABLE_UNIT_TEST_MAKE_FILEPATH
+#define MF fil_make_filepath
+#define DISPLAY ib::info() << path
+void
+test_make_filepath()
+{
+ char* path;
+ const char* long_path =
+ "this/is/a/very/long/path/including/a/very/"
+ "looooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooong"
+ "/folder/name";
+ path = MF("/this/is/a/path/with/a/filename", NULL, IBD, false); DISPLAY;
+ path = MF("/this/is/a/path/with/a/filename", NULL, ISL, false); DISPLAY;
+ path = MF("/this/is/a/path/with/a/filename", NULL, CFG, false); DISPLAY;
+ path = MF("/this/is/a/path/with/a/filename.ibd", NULL, IBD, false); DISPLAY;
+ path = MF("/this/is/a/path/with/a/filename.ibd", NULL, IBD, false); DISPLAY;
+ path = MF("/this/is/a/path/with/a/filename.dat", NULL, IBD, false); DISPLAY;
+ path = MF(NULL, "tablespacename", NO_EXT, false); DISPLAY;
+ path = MF(NULL, "tablespacename", IBD, false); DISPLAY;
+ path = MF(NULL, "dbname/tablespacename", NO_EXT, false); DISPLAY;
+ path = MF(NULL, "dbname/tablespacename", IBD, false); DISPLAY;
+ path = MF(NULL, "dbname/tablespacename", ISL, false); DISPLAY;
+ path = MF(NULL, "dbname/tablespacename", CFG, false); DISPLAY;
+ path = MF(NULL, "dbname\\tablespacename", NO_EXT, false); DISPLAY;
+ path = MF(NULL, "dbname\\tablespacename", IBD, false); DISPLAY;
+ path = MF("/this/is/a/path", "dbname/tablespacename", IBD, false); DISPLAY;
+ path = MF("/this/is/a/path", "dbname/tablespacename", IBD, true); DISPLAY;
+ path = MF("./this/is/a/path", "dbname/tablespacename.ibd", IBD, true); DISPLAY;
+ path = MF("this\\is\\a\\path", "dbname/tablespacename", IBD, true); DISPLAY;
+ path = MF("/this/is/a/path", "dbname\\tablespacename", IBD, true); DISPLAY;
+ path = MF(long_path, NULL, IBD, false); DISPLAY;
+ path = MF(long_path, "tablespacename", IBD, false); DISPLAY;
+ path = MF(long_path, "tablespacename", IBD, true); DISPLAY;
+}
+#endif /* UNIV_ENABLE_UNIT_TEST_MAKE_FILEPATH */
+/* @} */
+
+/** Determine the block size of the data file.
+@param[in] space tablespace
+@param[in] offset page number
+@return block size */
+UNIV_INTERN
+ulint
+fil_space_get_block_size(const fil_space_t* space, unsigned offset)
+{
+ ulint block_size = 512;
+
+ for (fil_node_t* node = UT_LIST_GET_FIRST(space->chain);
+ node != NULL;
+ node = UT_LIST_GET_NEXT(chain, node)) {
+ block_size = node->block_size;
+ if (node->size > offset) {
+ ut_ad(node->size <= 0xFFFFFFFFU);
+ break;
+ }
+ offset -= static_cast<unsigned>(node->size);
+ }
+
+ /* Currently supporting block size up to 4K,
+ fall back to default if bigger requested. */
+ if (block_size > 4096) {
+ block_size = 512;
+ }
+
+ return block_size;
+}
diff --git a/storage/innobase/fil/fil0pagecompress.cc b/storage/innobase/fil/fil0pagecompress.cc
new file mode 100644
index 00000000..909e8092
--- /dev/null
+++ b/storage/innobase/fil/fil0pagecompress.cc
@@ -0,0 +1,613 @@
+/*****************************************************************************
+
+Copyright (C) 2013, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file fil/fil0pagecompress.cc
+Implementation for page compressed file spaces.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@mariadb.com
+Updated 14/02/2015
+***********************************************************************/
+
+#include "fil0fil.h"
+#include "fil0pagecompress.h"
+
+#include <my_dbug.h>
+
+#include "mem0mem.h"
+#include "hash0hash.h"
+#include "os0file.h"
+#include "mach0data.h"
+#include "buf0buf.h"
+#include "buf0flu.h"
+#include "log0recv.h"
+#include "fsp0fsp.h"
+#include "srv0srv.h"
+#include "srv0start.h"
+#include "mtr0mtr.h"
+#include "mtr0log.h"
+#include "dict0dict.h"
+#include "page0page.h"
+#include "page0zip.h"
+#include "trx0sys.h"
+#include "row0mysql.h"
+#include "buf0lru.h"
+#include "ibuf0ibuf.h"
+#include "sync0sync.h"
+#include "zlib.h"
+#ifdef __linux__
+#include <linux/fs.h>
+#include <sys/ioctl.h>
+#include <fcntl.h>
+#endif
+#include "row0mysql.h"
+#ifdef HAVE_LZ4
+#include "lz4.h"
+#endif
+#ifdef HAVE_LZO
+#include "lzo/lzo1x.h"
+#endif
+#ifdef HAVE_LZMA
+#include "lzma.h"
+#endif
+#ifdef HAVE_BZIP2
+#include "bzlib.h"
+#endif
+#ifdef HAVE_SNAPPY
+#include "snappy-c.h"
+#endif
+
+/** Compress a page for the given compression algorithm.
+@param[in] buf page to be compressed
+@param[out] out_buf compressed page
+@param[in] header_len header length of the page
+@param[in] comp_algo compression algorithm
+@param[in] comp_level compression level
+@return actual length of compressed page data
+@retval 0 if the page was not compressed */
+static ulint fil_page_compress_low(
+ const byte* buf,
+ byte* out_buf,
+ ulint header_len,
+ ulint comp_algo,
+ unsigned comp_level)
+{
+ ulint write_size = srv_page_size - header_len;
+
+ switch (comp_algo) {
+ default:
+ ut_ad("unknown compression method" == 0);
+ /* fall through */
+ case PAGE_UNCOMPRESSED:
+ return 0;
+ case PAGE_ZLIB_ALGORITHM:
+ {
+ ulong len = uLong(write_size);
+ if (Z_OK == compress2(
+ out_buf + header_len, &len, buf,
+ uLong(srv_page_size), int(comp_level))) {
+ return len;
+ }
+ }
+ break;
+#ifdef HAVE_LZ4
+ case PAGE_LZ4_ALGORITHM:
+# ifdef HAVE_LZ4_COMPRESS_DEFAULT
+ write_size = LZ4_compress_default(
+ reinterpret_cast<const char*>(buf),
+ reinterpret_cast<char*>(out_buf) + header_len,
+ int(srv_page_size), int(write_size));
+# else
+ write_size = LZ4_compress_limitedOutput(
+ reinterpret_cast<const char*>(buf),
+ reinterpret_cast<char*>(out_buf) + header_len,
+ int(srv_page_size), int(write_size));
+# endif
+
+ return write_size;
+#endif /* HAVE_LZ4 */
+#ifdef HAVE_LZO
+ case PAGE_LZO_ALGORITHM: {
+ lzo_uint len = write_size;
+
+ if (LZO_E_OK == lzo1x_1_15_compress(
+ buf, srv_page_size,
+ out_buf + header_len, &len,
+ out_buf + srv_page_size)
+ && len <= write_size) {
+ return len;
+ }
+ break;
+ }
+#endif /* HAVE_LZO */
+#ifdef HAVE_LZMA
+ case PAGE_LZMA_ALGORITHM: {
+ size_t out_pos = 0;
+
+ if (LZMA_OK == lzma_easy_buffer_encode(
+ comp_level, LZMA_CHECK_NONE, NULL,
+ buf, srv_page_size, out_buf + header_len,
+ &out_pos, write_size)
+ && out_pos <= write_size) {
+ return out_pos;
+ }
+ break;
+ }
+#endif /* HAVE_LZMA */
+
+#ifdef HAVE_BZIP2
+ case PAGE_BZIP2_ALGORITHM: {
+ unsigned len = unsigned(write_size);
+ if (BZ_OK == BZ2_bzBuffToBuffCompress(
+ reinterpret_cast<char*>(out_buf + header_len),
+ &len,
+ const_cast<char*>(
+ reinterpret_cast<const char*>(buf)),
+ unsigned(srv_page_size), 1, 0, 0)
+ && len <= write_size) {
+ return len;
+ }
+ break;
+ }
+#endif /* HAVE_BZIP2 */
+
+#ifdef HAVE_SNAPPY
+ case PAGE_SNAPPY_ALGORITHM: {
+ size_t len = snappy_max_compressed_length(srv_page_size);
+
+ if (SNAPPY_OK == snappy_compress(
+ reinterpret_cast<const char*>(buf),
+ srv_page_size,
+ reinterpret_cast<char*>(out_buf) + header_len,
+ &len)
+ && len <= write_size) {
+ return len;
+ }
+ break;
+ }
+#endif /* HAVE_SNAPPY */
+ }
+
+ return 0;
+}
+
+/** Compress a page_compressed page for full crc32 format.
+@param[in] buf page to be compressed
+@param[out] out_buf compressed page
+@param[in] flags tablespace flags
+@param[in] block_size file system block size
+@return actual length of compressed page
+@retval 0 if the page was not compressed */
+static ulint fil_page_compress_for_full_crc32(
+ const byte* buf,
+ byte* out_buf,
+ ulint flags,
+ ulint block_size,
+ bool encrypted)
+{
+ ulint comp_level = FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags);
+
+ if (comp_level == 0) {
+ comp_level = page_zip_level;
+ }
+
+ const ulint header_len = FIL_PAGE_COMP_ALGO;
+
+ ulint write_size = fil_page_compress_low(
+ buf, out_buf, header_len,
+ fil_space_t::get_compression_algo(flags),
+ static_cast<unsigned>(comp_level));
+
+ if (write_size == 0) {
+fail:
+ srv_stats.pages_page_compression_error.inc();
+ return 0;
+ }
+
+ write_size += header_len;
+ const ulint actual_size = write_size;
+ /* Write the actual length of the data & page type
+ for full crc32 format. */
+ const bool lsb = fil_space_t::full_crc32_page_compressed_len(flags);
+ /* In the MSB, store the rounded-up page size. */
+ write_size = (write_size + lsb + (4 + 255)) & ~255;
+ if (write_size >= srv_page_size) {
+ goto fail;
+ }
+
+ /* Set up the page header */
+ memcpy(out_buf, buf, header_len);
+ out_buf[FIL_PAGE_TYPE] = 1U << (FIL_PAGE_COMPRESS_FCRC32_MARKER - 8);
+ out_buf[FIL_PAGE_TYPE + 1] = byte(write_size >> 8);
+ /* Clean up the buffer for the remaining write_size (except checksum) */
+ memset(out_buf + actual_size, 0, write_size - actual_size - 4);
+ if (lsb) {
+ /* Store the LSB */
+ out_buf[write_size - 5] = byte(actual_size + (1 + 4));
+ }
+
+ if (!block_size) {
+ block_size = 512;
+ }
+
+ ut_ad(write_size);
+ if (write_size & (block_size - 1)) {
+ size_t tmp = write_size;
+ write_size = (write_size + (block_size - 1))
+ & ~(block_size - 1);
+ memset(out_buf + tmp, 0, write_size - tmp);
+ }
+
+ srv_stats.page_compression_saved.add(srv_page_size - write_size);
+ srv_stats.pages_page_compressed.inc();
+
+ return write_size;
+}
+
+/** Compress a page_compressed page for non full crc32 format.
+@param[in] buf page to be compressed
+@param[out] out_buf compressed page
+@param[in] flags tablespace flags
+@param[in] block_size file system block size
+@param[in] encrypted whether the page will be subsequently encrypted
+@return actual length of compressed page
+@retval 0 if the page was not compressed */
+static ulint fil_page_compress_for_non_full_crc32(
+ const byte* buf,
+ byte* out_buf,
+ ulint flags,
+ ulint block_size,
+ bool encrypted)
+{
+ uint comp_level = static_cast<uint>(
+ FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags));
+ ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
+ /* Cache to avoid change during function execution */
+ ulint comp_algo = innodb_compression_algorithm;
+
+ if (encrypted) {
+ header_len += FIL_PAGE_ENCRYPT_COMP_ALGO;
+ }
+
+ /* If no compression level was provided to this table, use system
+ default level */
+ if (comp_level == 0) {
+ comp_level = page_zip_level;
+ }
+
+ ulint write_size = fil_page_compress_low(
+ buf, out_buf,
+ header_len, comp_algo, comp_level);
+
+ if (write_size == 0) {
+ srv_stats.pages_page_compression_error.inc();
+ return 0;
+ }
+
+ /* Set up the page header */
+ memcpy(out_buf, buf, FIL_PAGE_DATA);
+ /* Set up the checksum */
+ mach_write_to_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC);
+
+ /* Set up the compression algorithm */
+ mach_write_to_8(out_buf + FIL_PAGE_COMP_ALGO, comp_algo);
+
+ if (encrypted) {
+ /* Set up the correct page type */
+ mach_write_to_2(out_buf + FIL_PAGE_TYPE,
+ FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
+
+ mach_write_to_2(out_buf + FIL_PAGE_DATA
+ + FIL_PAGE_ENCRYPT_COMP_ALGO, comp_algo);
+ } else {
+ /* Set up the correct page type */
+ mach_write_to_2(out_buf + FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED);
+ }
+
+ /* Set up the actual payload lenght */
+ mach_write_to_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE,
+ write_size);
+
+ ut_ad(mach_read_from_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM)
+ == BUF_NO_CHECKSUM_MAGIC);
+
+ ut_ad(mach_read_from_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE)
+ == write_size);
+
+#ifdef UNIV_DEBUG
+ bool is_compressed = (mach_read_from_8(out_buf + FIL_PAGE_COMP_ALGO)
+ == (ulint) comp_algo);
+
+ bool is_encrypted_compressed =
+ (mach_read_from_2(out_buf + FIL_PAGE_DATA
+ + FIL_PAGE_ENCRYPT_COMP_ALGO)
+ == (ulint) comp_algo);
+#endif /* UNIV_DEBUG */
+
+ ut_ad(is_compressed || is_encrypted_compressed);
+
+ write_size+=header_len;
+
+ if (block_size <= 0) {
+ block_size = 512;
+ }
+
+ ut_ad(write_size > 0 && block_size > 0);
+
+ /* Actual write needs to be alligned on block size */
+ if (write_size % block_size) {
+ size_t tmp = write_size;
+ write_size = (size_t)ut_uint64_align_up(
+ (ib_uint64_t)write_size, block_size);
+ /* Clean up the end of buffer */
+ memset(out_buf+tmp, 0, write_size - tmp);
+#ifdef UNIV_DEBUG
+ ut_a(write_size > 0 && ((write_size % block_size) == 0));
+ ut_a(write_size >= tmp);
+#endif
+ }
+
+ srv_stats.page_compression_saved.add(srv_page_size - write_size);
+ srv_stats.pages_page_compressed.inc();
+
+ return write_size;
+}
+
+/** Compress a page_compressed page before writing to a data file.
+@param[in] buf page to be compressed
+@param[out] out_buf compressed page
+@param[in] flags tablespace flags
+@param[in] block_size file system block size
+@param[in] encrypted whether the page will be subsequently encrypted
+@return actual length of compressed page
+@retval 0 if the page was not compressed */
+ulint fil_page_compress(
+ const byte* buf,
+ byte* out_buf,
+ ulint flags,
+ ulint block_size,
+ bool encrypted)
+{
+ /* The full_crc32 page_compressed format assumes this. */
+ ut_ad(!(block_size & 255));
+ ut_ad(ut_is_2pow(block_size));
+
+ /* Let's not compress file space header or
+ extent descriptor */
+ switch (fil_page_get_type(buf)) {
+ case 0:
+ case FIL_PAGE_TYPE_FSP_HDR:
+ case FIL_PAGE_TYPE_XDES:
+ case FIL_PAGE_PAGE_COMPRESSED:
+ return 0;
+ }
+
+ if (fil_space_t::full_crc32(flags)) {
+ return fil_page_compress_for_full_crc32(
+ buf, out_buf, flags, block_size, encrypted);
+ }
+
+ return fil_page_compress_for_non_full_crc32(
+ buf, out_buf, flags, block_size, encrypted);
+}
+
+/** Decompress a page that may be subject to page_compressed compression.
+@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
+@param[in,out] buf possibly compressed page buffer
+@param[in] comp_algo compression algorithm
+@param[in] header_len header length of the page
+@param[in] actual size actual size of the page
+@retval true if the page is decompressed or false */
+static bool fil_page_decompress_low(
+ byte* tmp_buf,
+ byte* buf,
+ ulint comp_algo,
+ ulint header_len,
+ ulint actual_size)
+{
+ switch (comp_algo) {
+ default:
+ ib::error() << "Unknown compression algorithm "
+ << comp_algo;
+ return false;
+ case PAGE_ZLIB_ALGORITHM:
+ {
+ uLong len = srv_page_size;
+ return (Z_OK == uncompress(tmp_buf, &len,
+ buf + header_len,
+ uLong(actual_size))
+ && len == srv_page_size);
+ }
+#ifdef HAVE_LZ4
+ case PAGE_LZ4_ALGORITHM:
+ return LZ4_decompress_safe(
+ reinterpret_cast<const char*>(buf) + header_len,
+ reinterpret_cast<char*>(tmp_buf),
+ static_cast<int>(actual_size),
+ static_cast<int>(srv_page_size)) ==
+ static_cast<int>(srv_page_size);
+#endif /* HAVE_LZ4 */
+#ifdef HAVE_LZO
+ case PAGE_LZO_ALGORITHM:
+ {
+ lzo_uint len_lzo = srv_page_size;
+ return (LZO_E_OK == lzo1x_decompress_safe(
+ buf + header_len,
+ actual_size, tmp_buf, &len_lzo, NULL)
+ && len_lzo == srv_page_size);
+ }
+#endif /* HAVE_LZO */
+#ifdef HAVE_LZMA
+ case PAGE_LZMA_ALGORITHM:
+ {
+ size_t src_pos = 0;
+ size_t dst_pos = 0;
+ uint64_t memlimit = UINT64_MAX;
+
+ return LZMA_OK == lzma_stream_buffer_decode(
+ &memlimit, 0, NULL, buf + header_len,
+ &src_pos, actual_size, tmp_buf, &dst_pos,
+ srv_page_size)
+ && dst_pos == srv_page_size;
+ }
+#endif /* HAVE_LZMA */
+#ifdef HAVE_BZIP2
+ case PAGE_BZIP2_ALGORITHM:
+ {
+ uint dst_pos = static_cast<uint>(srv_page_size);
+ return BZ_OK == BZ2_bzBuffToBuffDecompress(
+ reinterpret_cast<char*>(tmp_buf),
+ &dst_pos,
+ reinterpret_cast<char*>(buf) + header_len,
+ static_cast<uint>(actual_size), 1, 0)
+ && dst_pos == srv_page_size;
+ }
+#endif /* HAVE_BZIP2 */
+#ifdef HAVE_SNAPPY
+ case PAGE_SNAPPY_ALGORITHM:
+ {
+ size_t olen = srv_page_size;
+
+ return SNAPPY_OK == snappy_uncompress(
+ reinterpret_cast<const char*>(buf)
+ + header_len,
+ actual_size,
+ reinterpret_cast<char*>(tmp_buf), &olen)
+ && olen == srv_page_size;
+ }
+#endif /* HAVE_SNAPPY */
+ }
+
+ return false;
+}
+
+/** Decompress a page for full crc32 format.
+@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
+@param[in,out] buf possibly compressed page buffer
+@param[in] flags tablespace flags
+@return size of the compressed data
+@retval 0 if decompression failed
+@retval srv_page_size if the page was not compressed */
+ulint fil_page_decompress_for_full_crc32(byte* tmp_buf, byte* buf, ulint flags)
+{
+ ut_ad(fil_space_t::full_crc32(flags));
+ bool compressed = false;
+ size_t size = buf_page_full_crc32_size(buf, &compressed, NULL);
+ if (!compressed) {
+ ut_ad(size == srv_page_size);
+ return size;
+ }
+
+ if (!fil_space_t::is_compressed(flags)) {
+ return 0;
+ }
+
+ if (size >= srv_page_size) {
+ return 0;
+ }
+
+ if (fil_space_t::full_crc32_page_compressed_len(flags)) {
+ compile_time_assert(FIL_PAGE_FCRC32_CHECKSUM == 4);
+ if (size_t lsb = buf[size - 5]) {
+ size += lsb - 0x100;
+ }
+ size -= 5;
+ }
+
+ const size_t header_len = FIL_PAGE_COMP_ALGO;
+
+ if (!fil_page_decompress_low(tmp_buf, buf,
+ fil_space_t::get_compression_algo(flags),
+ header_len, size - header_len)) {
+ return 0;
+ }
+
+ srv_stats.pages_page_decompressed.inc();
+ memcpy(buf, tmp_buf, srv_page_size);
+ return size;
+}
+
+/** Decompress a page for non full crc32 format.
+@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
+@param[in,out] buf possibly compressed page buffer
+@return size of the compressed data
+@retval 0 if decompression failed
+@retval srv_page_size if the page was not compressed */
+ulint fil_page_decompress_for_non_full_crc32(
+ byte* tmp_buf,
+ byte* buf)
+{
+ ulint header_len;
+ uint comp_algo;
+ switch (fil_page_get_type(buf)) {
+ case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
+ header_len= FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
+ comp_algo = mach_read_from_2(
+ FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_ALGO + buf);
+ break;
+ case FIL_PAGE_PAGE_COMPRESSED:
+ header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
+ if (mach_read_from_6(FIL_PAGE_COMP_ALGO + buf)) {
+ return 0;
+ }
+ comp_algo = mach_read_from_2(FIL_PAGE_COMP_ALGO + 6 + buf);
+ break;
+ default:
+ return srv_page_size;
+ }
+
+ if (mach_read_from_4(buf + FIL_PAGE_SPACE_OR_CHKSUM)
+ != BUF_NO_CHECKSUM_MAGIC) {
+ return 0;
+ }
+
+ ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA
+ + FIL_PAGE_COMP_SIZE);
+
+ /* Check if payload size is corrupted */
+ if (actual_size == 0 || actual_size > srv_page_size - header_len) {
+ return 0;
+ }
+
+ if (!fil_page_decompress_low(tmp_buf, buf, comp_algo, header_len,
+ actual_size)) {
+ return 0;
+ }
+
+ srv_stats.pages_page_decompressed.inc();
+ memcpy(buf, tmp_buf, srv_page_size);
+ return actual_size;
+}
+
+/** Decompress a page that may be subject to page_compressed compression.
+@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
+@param[in,out] buf possibly compressed page buffer
+@return size of the compressed data
+@retval 0 if decompression failed
+@retval srv_page_size if the page was not compressed */
+ulint fil_page_decompress(
+ byte* tmp_buf,
+ byte* buf,
+ ulint flags)
+{
+ if (fil_space_t::full_crc32(flags)) {
+ return fil_page_decompress_for_full_crc32(tmp_buf, buf, flags);
+ }
+
+ return fil_page_decompress_for_non_full_crc32(tmp_buf, buf);
+}