diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
commit | a175314c3e5827eb193872241446f2f8f5c9d33c (patch) | |
tree | cd3d60ca99ae00829c52a6ca79150a5b6e62528b /storage/innobase/fil | |
parent | Initial commit. (diff) | |
download | mariadb-10.5-upstream.tar.xz mariadb-10.5-upstream.zip |
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/innobase/fil')
-rw-r--r-- | storage/innobase/fil/fil0crypt.cc | 2642 | ||||
-rw-r--r-- | storage/innobase/fil/fil0fil.cc | 3757 | ||||
-rw-r--r-- | storage/innobase/fil/fil0pagecompress.cc | 613 |
3 files changed, 7012 insertions, 0 deletions
diff --git a/storage/innobase/fil/fil0crypt.cc b/storage/innobase/fil/fil0crypt.cc new file mode 100644 index 00000000..240a2682 --- /dev/null +++ b/storage/innobase/fil/fil0crypt.cc @@ -0,0 +1,2642 @@ +/***************************************************************************** +Copyright (C) 2013, 2015, Google Inc. All Rights Reserved. +Copyright (c) 2014, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ +/**************************************************//** +@file fil0crypt.cc +Innodb file space encrypt/decrypt + +Created Jonas Oreland Google +Modified Jan Lindström jan.lindstrom@mariadb.com +*******************************************************/ + +#include "fil0crypt.h" +#include "mtr0types.h" +#include "mach0data.h" +#include "page0zip.h" +#include "buf0checksum.h" +#ifdef UNIV_INNOCHECKSUM +# include "buf0buf.h" +#else +#include "buf0dblwr.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "mtr0mtr.h" +#include "mtr0log.h" +#include "ut0ut.h" +#include "fsp0fsp.h" +#include "fil0pagecompress.h" +#include <my_crypt.h> + +static bool fil_crypt_threads_inited = false; + +/** Is encryption enabled/disabled */ +UNIV_INTERN ulong srv_encrypt_tables = 0; + +/** No of key rotation threads requested */ +UNIV_INTERN uint srv_n_fil_crypt_threads = 0; + +/** No of key rotation threads started */ +UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0; + +/** At this age or older a space/page will be rotated */ +UNIV_INTERN uint srv_fil_crypt_rotate_key_age; + +/** Whether the encryption plugin does key rotation */ +static bool srv_encrypt_rotate; + +/** Event to signal FROM the key rotation threads. */ +static os_event_t fil_crypt_event; + +/** Event to signal TO the key rotation threads. */ +UNIV_INTERN os_event_t fil_crypt_threads_event; + +/** Event for waking up threads throttle. */ +static os_event_t fil_crypt_throttle_sleep_event; + +/** Mutex for key rotation threads. */ +UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex; + +/** Variable ensuring only 1 thread at time does initial conversion */ +static bool fil_crypt_start_converting = false; + +/** Variables for throttling */ +UNIV_INTERN uint srv_n_fil_crypt_iops = 100; // 10ms per iop +static uint srv_alloc_time = 3; // allocate iops for 3s at a time +static uint n_fil_crypt_iops_allocated = 0; + +#define DEBUG_KEYROTATION_THROTTLING 0 + +/** Statistics variables */ +static fil_crypt_stat_t crypt_stat; +static ib_mutex_t crypt_stat_mutex; + +/*********************************************************************** +Check if a key needs rotation given a key_state +@param[in] crypt_data Encryption information +@param[in] key_version Current key version +@param[in] latest_key_version Latest key version +@param[in] rotate_key_age when to rotate +@return true if key needs rotation, false if not */ +static bool +fil_crypt_needs_rotation( + const fil_space_crypt_t* crypt_data, + uint key_version, + uint latest_key_version, + uint rotate_key_age) + MY_ATTRIBUTE((warn_unused_result)); + +/********************************************************************* +Init space crypt */ +UNIV_INTERN +void +fil_space_crypt_init() +{ + fil_crypt_throttle_sleep_event = os_event_create(0); + + mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex); + memset(&crypt_stat, 0, sizeof(crypt_stat)); +} + +/********************************************************************* +Cleanup space crypt */ +UNIV_INTERN +void +fil_space_crypt_cleanup() +{ + os_event_destroy(fil_crypt_throttle_sleep_event); + mutex_free(&crypt_stat_mutex); +} + +/** +Get latest key version from encryption plugin. +@return key version or ENCRYPTION_KEY_VERSION_INVALID */ +uint +fil_space_crypt_t::key_get_latest_version(void) +{ + uint key_version = key_found; + + if (is_key_found()) { + key_version = encryption_key_get_latest_version(key_id); + /* InnoDB does dirty read of srv_fil_crypt_rotate_key_age. + It doesn't matter because srv_encrypt_rotate + can be set to true only once */ + if (!srv_encrypt_rotate + && key_version > srv_fil_crypt_rotate_key_age) { + srv_encrypt_rotate = true; + } + + srv_stats.n_key_requests.inc(); + key_found = key_version; + } + + return key_version; +} + +/****************************************************************** +Get the latest(key-version), waking the encrypt thread, if needed +@param[in,out] crypt_data Crypt data */ +static inline +uint +fil_crypt_get_latest_key_version( + fil_space_crypt_t* crypt_data) +{ + ut_ad(crypt_data != NULL); + + uint key_version = crypt_data->key_get_latest_version(); + + if (crypt_data->is_key_found()) { + + if (fil_crypt_needs_rotation( + crypt_data, + crypt_data->min_key_version, + key_version, + srv_fil_crypt_rotate_key_age)) { + /* Below event seen as NULL-pointer at startup + when new database was created and we create a + checkpoint. Only seen when debugging. */ + if (fil_crypt_threads_inited) { + os_event_set(fil_crypt_threads_event); + } + } + } + + return key_version; +} + +/****************************************************************** +Mutex helper for crypt_data->scheme */ +void +crypt_data_scheme_locker( +/*=====================*/ + st_encryption_scheme* scheme, + int exit) +{ + fil_space_crypt_t* crypt_data = + static_cast<fil_space_crypt_t*>(scheme); + + if (exit) { + mutex_exit(&crypt_data->mutex); + } else { + mutex_enter(&crypt_data->mutex); + } +} + +/****************************************************************** +Create a fil_space_crypt_t object +@param[in] type CRYPT_SCHEME_UNENCRYPTE or + CRYPT_SCHEME_1 +@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or + FIL_ENCRYPTION_ON or + FIL_ENCRYPTION_OFF +@param[in] min_key_version key_version or 0 +@param[in] key_id Used key id +@return crypt object */ +static +fil_space_crypt_t* +fil_space_create_crypt_data( + uint type, + fil_encryption_t encrypt_mode, + uint min_key_version, + uint key_id) +{ + fil_space_crypt_t* crypt_data = NULL; + if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) { + crypt_data = new(buf) + fil_space_crypt_t( + type, + min_key_version, + key_id, + encrypt_mode); + } + + return crypt_data; +} + +/****************************************************************** +Create a fil_space_crypt_t object +@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or + FIL_ENCRYPTION_ON or + FIL_ENCRYPTION_OFF + +@param[in] key_id Encryption key id +@return crypt object */ +UNIV_INTERN +fil_space_crypt_t* +fil_space_create_crypt_data( + fil_encryption_t encrypt_mode, + uint key_id) +{ + return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id)); +} + +/****************************************************************** +Merge fil_space_crypt_t object +@param[in,out] dst Destination cryp data +@param[in] src Source crypt data */ +UNIV_INTERN +void +fil_space_merge_crypt_data( + fil_space_crypt_t* dst, + const fil_space_crypt_t* src) +{ + mutex_enter(&dst->mutex); + + /* validate that they are mergeable */ + ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED || + src->type == CRYPT_SCHEME_1); + + ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED || + dst->type == CRYPT_SCHEME_1); + + dst->encryption = src->encryption; + dst->type = src->type; + dst->min_key_version = src->min_key_version; + dst->keyserver_requests += src->keyserver_requests; + + mutex_exit(&dst->mutex); +} + +/** Initialize encryption parameters from a tablespace header page. +@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0 +@param[in] page first page of the tablespace +@return crypt data from page 0 +@retval NULL if not present or not valid */ +fil_space_crypt_t* fil_space_read_crypt_data(ulint zip_size, const byte* page) +{ + const ulint offset = FSP_HEADER_OFFSET + + fsp_header_get_encryption_offset(zip_size); + + if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) { + /* Crypt data is not stored. */ + return NULL; + } + + uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0); + uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1); + fil_space_crypt_t* crypt_data; + + if (!(type == CRYPT_SCHEME_UNENCRYPTED || + type == CRYPT_SCHEME_1) + || iv_length != sizeof crypt_data->iv) { + ib::error() << "Found non sensible crypt scheme: " + << type << "," << iv_length + << " for space: " + << page_get_space_id(page); + return NULL; + } + + uint min_key_version = mach_read_from_4 + (page + offset + MAGIC_SZ + 2 + iv_length); + + uint key_id = mach_read_from_4 + (page + offset + MAGIC_SZ + 2 + iv_length + 4); + + fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1( + page + offset + MAGIC_SZ + 2 + iv_length + 8); + + crypt_data = fil_space_create_crypt_data(encryption, key_id); + /* We need to overwrite these as above function will initialize + members */ + crypt_data->type = type; + crypt_data->min_key_version = min_key_version; + memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length); + + return crypt_data; +} + +/****************************************************************** +Free a crypt data object +@param[in,out] crypt_data crypt data to be freed */ +UNIV_INTERN +void +fil_space_destroy_crypt_data( + fil_space_crypt_t **crypt_data) +{ + if (crypt_data != NULL && (*crypt_data) != NULL) { + fil_space_crypt_t* c; + if (UNIV_LIKELY(fil_crypt_threads_inited)) { + mutex_enter(&fil_crypt_threads_mutex); + c = *crypt_data; + *crypt_data = NULL; + mutex_exit(&fil_crypt_threads_mutex); + } else { + ut_ad(srv_read_only_mode || !srv_was_started); + c = *crypt_data; + *crypt_data = NULL; + } + if (c) { + c->~fil_space_crypt_t(); + ut_free(c); + } + } +} + +/** Amend encryption information from redo log. +@param[in] space tablespace +@param[in] data encryption metadata */ +void fil_crypt_parse(fil_space_t* space, const byte* data) +{ + ut_ad(data[1] == MY_AES_BLOCK_SIZE); + if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) { + fil_space_crypt_t* crypt_data = new(buf) + fil_space_crypt_t( + data[0], + mach_read_from_4(&data[2 + MY_AES_BLOCK_SIZE]), + mach_read_from_4(&data[6 + MY_AES_BLOCK_SIZE]), + static_cast<fil_encryption_t> + (data[10 + MY_AES_BLOCK_SIZE])); + memcpy(crypt_data->iv, data + 2, MY_AES_BLOCK_SIZE); + mutex_enter(&fil_system.mutex); + if (space->crypt_data) { + fil_space_merge_crypt_data(space->crypt_data, + crypt_data); + fil_space_destroy_crypt_data(&crypt_data); + crypt_data = space->crypt_data; + } else { + space->crypt_data = crypt_data; + } + mutex_exit(&fil_system.mutex); + } +} + +/** Fill crypt data information to the give page. +It should be called during ibd file creation. +@param[in] flags tablespace flags +@param[in,out] page first page of the tablespace */ +void +fil_space_crypt_t::fill_page0( + ulint flags, + byte* page) +{ + const uint len = sizeof(iv); + const ulint offset = FSP_HEADER_OFFSET + + fsp_header_get_encryption_offset( + fil_space_t::zip_size(flags)); + + memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ); + mach_write_to_1(page + offset + MAGIC_SZ, type); + mach_write_to_1(page + offset + MAGIC_SZ + 1, len); + memcpy(page + offset + MAGIC_SZ + 2, &iv, len); + + mach_write_to_4(page + offset + MAGIC_SZ + 2 + len, + min_key_version); + mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4, + key_id); + mach_write_to_1(page + offset + MAGIC_SZ + 2 + len + 8, + encryption); +} + +/** Write encryption metadata to the first page. +@param[in,out] block first page of the tablespace +@param[in,out] mtr mini-transaction */ +void fil_space_crypt_t::write_page0(buf_block_t* block, mtr_t* mtr) +{ + const ulint offset = FSP_HEADER_OFFSET + + fsp_header_get_encryption_offset(block->zip_size()); + byte* b = block->frame + offset; + + mtr->memcpy<mtr_t::MAYBE_NOP>(*block, b, CRYPT_MAGIC, MAGIC_SZ); + + b += MAGIC_SZ; + byte* const start = b; + *b++ = static_cast<byte>(type); + compile_time_assert(sizeof iv == MY_AES_BLOCK_SIZE); + compile_time_assert(sizeof iv == CRYPT_SCHEME_1_IV_LEN); + *b++ = sizeof iv; + memcpy(b, iv, sizeof iv); + b += sizeof iv; + mach_write_to_4(b, min_key_version); + b += 4; + mach_write_to_4(b, key_id); + b += 4; + *b++ = byte(encryption); + ut_ad(b - start == 11 + MY_AES_BLOCK_SIZE); + /* We must log also any unchanged bytes, because recovery will + invoke fil_crypt_parse() based on this log record. */ + mtr->memcpy(*block, offset + MAGIC_SZ, b - start); +} + +/** Encrypt a buffer for non full checksum. +@param[in,out] crypt_data Crypt data +@param[in] space space_id +@param[in] offset Page offset +@param[in] lsn Log sequence number +@param[in] src_frame Page to encrypt +@param[in] zip_size ROW_FORMAT=COMPRESSED + page size, or 0 +@param[in,out] dst_frame Output buffer +@return encrypted buffer or NULL */ +static byte* fil_encrypt_buf_for_non_full_checksum( + fil_space_crypt_t* crypt_data, + ulint space, + ulint offset, + lsn_t lsn, + const byte* src_frame, + ulint zip_size, + byte* dst_frame) +{ + uint size = uint(zip_size ? zip_size : srv_page_size); + uint key_version = fil_crypt_get_latest_key_version(crypt_data); + ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID); + ut_ad(!ut_align_offset(src_frame, 8)); + ut_ad(!ut_align_offset(dst_frame, 8)); + + const bool page_compressed = fil_page_get_type(src_frame) + == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED; + uint header_len = FIL_PAGE_DATA; + + if (page_compressed) { + header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN; + } + + /* FIL page header is not encrypted */ + memcpy(dst_frame, src_frame, header_len); + mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, + key_version); + + /* Calculate the start offset in a page */ + uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END; + uint srclen = size - unencrypted_bytes; + const byte* src = src_frame + header_len; + byte* dst = dst_frame + header_len; + uint32 dstlen = 0; + ib_uint32_t checksum = 0; + + if (page_compressed) { + srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA); + } + + int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen, + crypt_data, key_version, + (uint32)space, (uint32)offset, lsn); + ut_a(rc == MY_AES_OK); + ut_a(dstlen == srclen); + + /* For compressed tables we do not store the FIL header because + the whole page is not stored to the disk. In compressed tables only + the FIL header + compressed (and now encrypted) payload alligned + to sector boundary is written. */ + if (!page_compressed) { + /* FIL page trailer is also not encrypted */ + static_assert(FIL_PAGE_DATA_END == 8, "alignment"); + memcpy_aligned<8>(dst_frame + size - FIL_PAGE_DATA_END, + src_frame + size - FIL_PAGE_DATA_END, 8); + } else { + /* Clean up rest of buffer */ + memset(dst_frame+header_len+srclen, 0, + size - (header_len + srclen)); + } + + checksum = fil_crypt_calculate_checksum(zip_size, dst_frame); + + /* store the post-encryption checksum after the key-version */ + mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, + checksum); + + ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size)); + + srv_stats.pages_encrypted.inc(); + + return dst_frame; +} + +/** Encrypt a buffer for full checksum format. +@param[in,out] crypt_data Crypt data +@param[in] space space_id +@param[in] offset Page offset +@param[in] lsn Log sequence number +@param[in] src_frame Page to encrypt +@param[in,out] dst_frame Output buffer +@return encrypted buffer or NULL */ +static byte* fil_encrypt_buf_for_full_crc32( + fil_space_crypt_t* crypt_data, + ulint space, + ulint offset, + lsn_t lsn, + const byte* src_frame, + byte* dst_frame) +{ + uint key_version = fil_crypt_get_latest_key_version(crypt_data); + ut_d(bool corrupted = false); + const uint size = buf_page_full_crc32_size(src_frame, NULL, +#ifdef UNIV_DEBUG + &corrupted +#else + NULL +#endif + ); + ut_ad(!corrupted); + uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + + FIL_PAGE_FCRC32_CHECKSUM); + const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; + byte* dst = dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; + uint dstlen = 0; + + ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID); + + /* Till FIL_PAGE_LSN, page is not encrypted */ + memcpy(dst_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); + + /* Write key version to the page. */ + mach_write_to_4(dst_frame + FIL_PAGE_FCRC32_KEY_VERSION, key_version); + + int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen, + crypt_data, key_version, + uint(space), uint(offset), lsn); + ut_a(rc == MY_AES_OK); + ut_a(dstlen == srclen); + + const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM; + mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload)); + /* Clean the rest of the buffer. FIXME: Punch holes when writing! */ + memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4)); + + srv_stats.pages_encrypted.inc(); + + return dst_frame; +} + +/** Encrypt a buffer. +@param[in,out] crypt_data Crypt data +@param[in] space space_id +@param[in] offset Page offset +@param[in] src_frame Page to encrypt +@param[in] zip_size ROW_FORMAT=COMPRESSED + page size, or 0 +@param[in,out] dst_frame Output buffer +@param[in] use_full_checksum full crc32 algo is used +@return encrypted buffer or NULL */ +byte* fil_encrypt_buf( + fil_space_crypt_t* crypt_data, + ulint space, + ulint offset, + const byte* src_frame, + ulint zip_size, + byte* dst_frame, + bool use_full_checksum) +{ + const lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN); + + if (use_full_checksum) { + ut_ad(!zip_size); + return fil_encrypt_buf_for_full_crc32( + crypt_data, space, offset, + lsn, src_frame, dst_frame); + } + + return fil_encrypt_buf_for_non_full_checksum( + crypt_data, space, offset, lsn, + src_frame, zip_size, dst_frame); +} + +/** Check whether these page types are allowed to encrypt. +@param[in] space tablespace object +@param[in] src_frame source page +@return true if it is valid page type */ +static bool fil_space_encrypt_valid_page_type( + const fil_space_t* space, + const byte* src_frame) +{ + switch (fil_page_get_type(src_frame)) { + case FIL_PAGE_RTREE: + return space->full_crc32(); + case FIL_PAGE_TYPE_FSP_HDR: + case FIL_PAGE_TYPE_XDES: + return false; + } + + return true; +} + +/****************************************************************** +Encrypt a page + +@param[in] space Tablespace +@param[in] offset Page offset +@param[in] src_frame Page to encrypt +@param[in,out] dst_frame Output buffer +@return encrypted buffer or NULL */ +byte* fil_space_encrypt( + const fil_space_t* space, + ulint offset, + byte* src_frame, + byte* dst_frame) +{ + if (!fil_space_encrypt_valid_page_type(space, src_frame)) { + return src_frame; + } + + if (!space->crypt_data || !space->crypt_data->is_encrypted()) { + return (src_frame); + } + + ut_ad(space->referenced()); + + return fil_encrypt_buf(space->crypt_data, space->id, offset, + src_frame, space->zip_size(), + dst_frame, space->full_crc32()); +} + +/** Decrypt a page for full checksum format. +@param[in] space space id +@param[in] crypt_data crypt_data +@param[in] tmp_frame Temporary buffer +@param[in,out] src_frame Page to decrypt +@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED +@return true if page decrypted, false if not.*/ +static bool fil_space_decrypt_full_crc32( + ulint space, + fil_space_crypt_t* crypt_data, + byte* tmp_frame, + byte* src_frame, + dberr_t* err) +{ + uint key_version = mach_read_from_4( + src_frame + FIL_PAGE_FCRC32_KEY_VERSION); + lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN); + uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET); + *err = DB_SUCCESS; + + if (key_version == ENCRYPTION_KEY_NOT_ENCRYPTED) { + return false; + } + + ut_ad(crypt_data); + ut_ad(crypt_data->is_encrypted()); + + memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); + + /* Calculate the offset where decryption starts */ + const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; + byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; + uint dstlen = 0; + bool corrupted = false; + uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted); + if (UNIV_UNLIKELY(corrupted)) { +fail: + *err = DB_DECRYPTION_FAILED; + return false; + } + + uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + + FIL_PAGE_FCRC32_CHECKSUM); + + int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen, + crypt_data, key_version, + (uint) space, offset, lsn); + + if (rc != MY_AES_OK || dstlen != srclen) { + if (rc == -1) { + goto fail; + } + + ib::fatal() << "Unable to decrypt data-block " + << " src: " << src << "srclen: " + << srclen << " buf: " << dst << "buflen: " + << dstlen << " return-code: " << rc + << " Can't continue!"; + } + + /* Copy only checksum part in the trailer */ + memcpy(tmp_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM, + src_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM, + FIL_PAGE_FCRC32_CHECKSUM); + + srv_stats.pages_decrypted.inc(); + + return true; /* page was decrypted */ +} + +/** Decrypt a page for non full checksum format. +@param[in] crypt_data crypt_data +@param[in] tmp_frame Temporary buffer +@param[in] physical_size page size +@param[in,out] src_frame Page to decrypt +@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED +@return true if page decrypted, false if not.*/ +static bool fil_space_decrypt_for_non_full_checksum( + fil_space_crypt_t* crypt_data, + byte* tmp_frame, + ulint physical_size, + byte* src_frame, + dberr_t* err) +{ + uint key_version = mach_read_from_4( + src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); + bool page_compressed = (fil_page_get_type(src_frame) + == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); + uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET); + uint space = mach_read_from_4( + src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID); + ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN); + + *err = DB_SUCCESS; + + if (key_version == ENCRYPTION_KEY_NOT_ENCRYPTED) { + return false; + } + + ut_a(crypt_data != NULL && crypt_data->is_encrypted()); + + /* read space & lsn */ + uint header_len = FIL_PAGE_DATA; + + if (page_compressed) { + header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN; + } + + /* Copy FIL page header, it is not encrypted */ + memcpy(tmp_frame, src_frame, header_len); + + /* Calculate the offset where decryption starts */ + const byte* src = src_frame + header_len; + byte* dst = tmp_frame + header_len; + uint32 dstlen = 0; + uint srclen = uint(physical_size) - header_len - FIL_PAGE_DATA_END; + + if (page_compressed) { + srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA); + } + + int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen, + crypt_data, key_version, + space, offset, lsn); + + if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) { + + if (rc == -1) { + *err = DB_DECRYPTION_FAILED; + return false; + } + + ib::fatal() << "Unable to decrypt data-block " + << " src: " << static_cast<const void*>(src) + << "srclen: " + << srclen << " buf: " + << static_cast<const void*>(dst) << "buflen: " + << dstlen << " return-code: " << rc + << " Can't continue!"; + } + + /* For compressed tables we do not store the FIL header because + the whole page is not stored to the disk. In compressed tables only + the FIL header + compressed (and now encrypted) payload alligned + to sector boundary is written. */ + if (!page_compressed) { + /* Copy FIL trailer */ + memcpy(tmp_frame + physical_size - FIL_PAGE_DATA_END, + src_frame + physical_size - FIL_PAGE_DATA_END, + FIL_PAGE_DATA_END); + } + + srv_stats.pages_decrypted.inc(); + + return true; /* page was decrypted */ +} + +/** Decrypt a page. +@param[in] space_id tablespace id +@param[in] crypt_data crypt_data +@param[in] tmp_frame Temporary buffer +@param[in] physical_size page size +@param[in] fsp_flags Tablespace flags +@param[in,out] src_frame Page to decrypt +@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED +@return true if page decrypted, false if not.*/ +UNIV_INTERN +bool +fil_space_decrypt( + ulint space_id, + fil_space_crypt_t* crypt_data, + byte* tmp_frame, + ulint physical_size, + ulint fsp_flags, + byte* src_frame, + dberr_t* err) +{ + if (fil_space_t::full_crc32(fsp_flags)) { + return fil_space_decrypt_full_crc32( + space_id, crypt_data, tmp_frame, src_frame, err); + } + + return fil_space_decrypt_for_non_full_checksum(crypt_data, tmp_frame, + physical_size, src_frame, + err); +} + +/** +Decrypt a page. +@param[in] space Tablespace +@param[in] tmp_frame Temporary buffer used for decrypting +@param[in,out] src_frame Page to decrypt +@return decrypted page, or original not encrypted page if decryption is +not needed.*/ +UNIV_INTERN +byte* +fil_space_decrypt( + const fil_space_t* space, + byte* tmp_frame, + byte* src_frame) +{ + dberr_t err = DB_SUCCESS; + byte* res = NULL; + const ulint physical_size = space->physical_size(); + + ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted()); + ut_ad(space->referenced()); + + bool encrypted = fil_space_decrypt(space->id, space->crypt_data, + tmp_frame, physical_size, + space->flags, + src_frame, &err); + + if (err == DB_SUCCESS) { + if (encrypted) { + /* Copy the decrypted page back to page buffer, not + really any other options. */ + memcpy(src_frame, tmp_frame, physical_size); + } + + res = src_frame; + } + + return res; +} + +/** +Calculate post encryption checksum +@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0 +@param[in] dst_frame Block where checksum is calculated +@return page checksum +not needed. */ +uint32_t +fil_crypt_calculate_checksum(ulint zip_size, const byte* dst_frame) +{ + /* For encrypted tables we use only crc32 and strict_crc32 */ + return zip_size + ? page_zip_calc_checksum(dst_frame, zip_size, + SRV_CHECKSUM_ALGORITHM_CRC32) + : buf_calc_page_crc32(dst_frame); +} + +/***********************************************************************/ + +/** A copy of global key state */ +struct key_state_t { + key_state_t() : key_id(0), key_version(0), + rotate_key_age(srv_fil_crypt_rotate_key_age) {} + bool operator==(const key_state_t& other) const { + return key_version == other.key_version && + rotate_key_age == other.rotate_key_age; + } + uint key_id; + uint key_version; + uint rotate_key_age; +}; + +/*********************************************************************** +Copy global key state +@param[in,out] new_state key state +@param[in] crypt_data crypt data */ +static void +fil_crypt_get_key_state( + key_state_t* new_state, + fil_space_crypt_t* crypt_data) +{ + if (srv_encrypt_tables) { + new_state->key_version = crypt_data->key_get_latest_version(); + new_state->rotate_key_age = srv_fil_crypt_rotate_key_age; + + ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED); + } else { + new_state->key_version = 0; + new_state->rotate_key_age = 0; + } +} + +/*********************************************************************** +Check if a key needs rotation given a key_state +@param[in] crypt_data Encryption information +@param[in] key_version Current key version +@param[in] latest_key_version Latest key version +@param[in] rotate_key_age when to rotate +@return true if key needs rotation, false if not */ +static bool +fil_crypt_needs_rotation( + const fil_space_crypt_t* crypt_data, + uint key_version, + uint latest_key_version, + uint rotate_key_age) +{ + if (key_version == ENCRYPTION_KEY_VERSION_INVALID) { + return false; + } + + if (key_version == 0 && latest_key_version != 0) { + /* this is rotation unencrypted => encrypted + * ignore rotate_key_age */ + return true; + } + + if (latest_key_version == 0 && key_version != 0) { + if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) { + /* this is rotation encrypted => unencrypted */ + return true; + } + return false; + } + + if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT + && crypt_data->type == CRYPT_SCHEME_1 + && !srv_encrypt_tables) { + /* This is rotation encrypted => unencrypted */ + return true; + } + + if (rotate_key_age == 0) { + return false; + } + + /* this is rotation encrypted => encrypted, + * only reencrypt if key is sufficiently old */ + if (key_version + rotate_key_age < latest_key_version) { + return true; + } + + return false; +} + +/** Read page 0 and possible crypt data from there. +@param[in,out] space Tablespace */ +static inline +void +fil_crypt_read_crypt_data(fil_space_t* space) +{ + if (space->crypt_data || space->size || !space->get_size()) { + /* The encryption metadata has already been read, or + the tablespace is not encrypted and the file has been + opened already, or the file cannot be accessed, + likely due to a concurrent DROP + (possibly as part of TRUNCATE or ALTER TABLE). + FIXME: The file can become unaccessible any time + after this check! We should really remove this + function and instead make crypt_data an integral + part of fil_space_t. */ + return; + } + + const ulint zip_size = space->zip_size(); + mtr_t mtr; + mtr.start(); + if (buf_block_t* block = buf_page_get_gen(page_id_t(space->id, 0), + zip_size, RW_S_LATCH, + nullptr, + BUF_GET_POSSIBLY_FREED, + __FILE__, __LINE__, &mtr)) { + if (block->page.status == buf_page_t::FREED) { + goto func_exit; + } + mutex_enter(&fil_system.mutex); + if (!space->crypt_data && !space->is_stopping()) { + space->crypt_data = fil_space_read_crypt_data( + zip_size, block->frame); + } + mutex_exit(&fil_system.mutex); + } +func_exit: + mtr.commit(); +} + +/** Start encrypting a space +@param[in,out] space Tablespace +@return true if a recheck of tablespace is needed by encryption thread. */ +static bool fil_crypt_start_encrypting_space(fil_space_t* space) +{ + mutex_enter(&fil_crypt_threads_mutex); + + fil_space_crypt_t *crypt_data = space->crypt_data; + + /* If space is not encrypted and encryption is not enabled, then + do not continue encrypting the space. */ + if (!crypt_data && !srv_encrypt_tables) { + mutex_exit(&fil_crypt_threads_mutex); + return false; + } + + const bool recheck = fil_crypt_start_converting; + + if (recheck || crypt_data || space->is_stopping()) { + mutex_exit(&fil_crypt_threads_mutex); + return recheck; + } + + /* NOTE: we need to write and flush page 0 before publishing + * the crypt data. This so that after restart there is no + * risk of finding encrypted pages without having + * crypt data in page 0 */ + + /* 1 - create crypt data */ + crypt_data = fil_space_create_crypt_data( + FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY); + + if (crypt_data == NULL) { + mutex_exit(&fil_crypt_threads_mutex); + return false; + } + + fil_crypt_start_converting = true; + mutex_exit(&fil_crypt_threads_mutex); + + mtr_t mtr; + mtr.start(); + + /* 2 - get page 0 */ + dberr_t err = DB_SUCCESS; + if (buf_block_t* block = buf_page_get_gen( + page_id_t(space->id, 0), space->zip_size(), + RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED, + __FILE__, __LINE__, &mtr, &err)) { + if (block->page.status == buf_page_t::FREED) { + goto abort; + } + + crypt_data->type = CRYPT_SCHEME_1; + crypt_data->min_key_version = 0; // all pages are unencrypted + crypt_data->rotate_state.start_time = time(0); + crypt_data->rotate_state.starting = true; + crypt_data->rotate_state.active_threads = 1; + + mutex_enter(&fil_system.mutex); + const bool stopping = space->is_stopping(); + if (!stopping) { + space->crypt_data = crypt_data; + } + mutex_exit(&fil_system.mutex); + + if (stopping) { + goto abort; + } + + /* 3 - write crypt data to page 0 */ + mtr.set_named_space(space); + crypt_data->write_page0(block, &mtr); + + mtr.commit(); + + /* 4 - sync tablespace before publishing crypt data */ + while (buf_flush_list_space(space)); + + /* 5 - publish crypt data */ + mutex_enter(&fil_crypt_threads_mutex); + mutex_enter(&crypt_data->mutex); + crypt_data->type = CRYPT_SCHEME_1; + ut_a(crypt_data->rotate_state.active_threads == 1); + crypt_data->rotate_state.active_threads = 0; + crypt_data->rotate_state.starting = false; + + fil_crypt_start_converting = false; + mutex_exit(&crypt_data->mutex); + mutex_exit(&fil_crypt_threads_mutex); + + return false; + } + +abort: + mtr.commit(); + mutex_enter(&fil_crypt_threads_mutex); + fil_crypt_start_converting = false; + mutex_exit(&fil_crypt_threads_mutex); + + crypt_data->~fil_space_crypt_t(); + ut_free(crypt_data); + return false; +} + +/** State of a rotation thread */ +struct rotate_thread_t { + explicit rotate_thread_t(uint no) { + memset(this, 0, sizeof(* this)); + thread_no = no; + first = true; + estimated_max_iops = 20; + } + + uint thread_no; + bool first; /*!< is position before first space */ + fil_space_t* space; /*!< current space or NULL */ + uint32_t offset; /*!< current page number */ + ulint batch; /*!< #pages to rotate */ + uint min_key_version_found;/*!< min key version found but not rotated */ + lsn_t end_lsn; /*!< max lsn when rotating this space */ + + uint estimated_max_iops; /*!< estimation of max iops */ + uint allocated_iops; /*!< allocated iops */ + ulint cnt_waited; /*!< #times waited during this slot */ + uintmax_t sum_waited_us; /*!< wait time during this slot */ + + fil_crypt_stat_t crypt_stat; // statistics + + /** @return whether this thread should terminate */ + bool should_shutdown() const { + switch (srv_shutdown_state) { + case SRV_SHUTDOWN_NONE: + return thread_no >= srv_n_fil_crypt_threads; + case SRV_SHUTDOWN_EXIT_THREADS: + /* srv_init_abort() must have been invoked */ + case SRV_SHUTDOWN_CLEANUP: + case SRV_SHUTDOWN_INITIATED: + return true; + case SRV_SHUTDOWN_LAST_PHASE: + break; + } + ut_ad(0); + return true; + } +}; + +/** Avoid the removal of the tablespace from +default_encrypt_list only when +1) Another active encryption thread working on tablespace +2) Eligible for tablespace key rotation +3) Tablespace is in flushing phase +@return true if tablespace should be removed from +default encrypt */ +static bool fil_crypt_must_remove(const fil_space_t &space) +{ + ut_ad(space.purpose == FIL_TYPE_TABLESPACE); + fil_space_crypt_t *crypt_data = space.crypt_data; + ut_ad(mutex_own(&fil_system.mutex)); + const ulong encrypt_tables= srv_encrypt_tables; + if (!crypt_data) + return !encrypt_tables; + if (!crypt_data->is_key_found()) + return true; + + mutex_enter(&crypt_data->mutex); + const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) && + (!crypt_data->rotate_state.flushing && + !encrypt_tables == !!crypt_data->min_key_version && + !crypt_data->rotate_state.active_threads); + mutex_exit(&crypt_data->mutex); + return remove; +} + +/*********************************************************************** +Check if space needs rotation given a key_state +@param[in,out] state Key rotation state +@param[in,out] key_state Key state +@param[in,out] recheck needs recheck ? +@return true if space needs key rotation */ +static +bool +fil_crypt_space_needs_rotation( + rotate_thread_t* state, + key_state_t* key_state, + bool* recheck) +{ + fil_space_t* space = state->space; + + /* Make sure that tablespace is normal tablespace */ + if (space->purpose != FIL_TYPE_TABLESPACE) { + return false; + } + + ut_ad(space->referenced()); + + fil_space_crypt_t *crypt_data = space->crypt_data; + + if (crypt_data == NULL) { + /** + * space has no crypt data + * start encrypting it... + */ + *recheck = fil_crypt_start_encrypting_space(space); + crypt_data = space->crypt_data; + + if (crypt_data == NULL) { + return false; + } + + crypt_data->key_get_latest_version(); + } + + /* If used key_id is not found from encryption plugin we can't + continue to rotate the tablespace */ + if (!crypt_data->is_key_found()) { + return false; + } + + bool need_key_rotation = false; + mutex_enter(&crypt_data->mutex); + + do { + /* prevent threads from starting to rotate space */ + if (crypt_data->rotate_state.starting) { + /* recheck this space later */ + *recheck = true; + break; + } + + /* prevent threads from starting to rotate space */ + if (space->is_stopping()) { + break; + } + + if (crypt_data->rotate_state.flushing) { + break; + } + + /* No need to rotate space if encryption is disabled */ + if (crypt_data->not_encrypted()) { + break; + } + + if (crypt_data->key_id != key_state->key_id) { + key_state->key_id= crypt_data->key_id; + fil_crypt_get_key_state(key_state, crypt_data); + } + + need_key_rotation = fil_crypt_needs_rotation( + crypt_data, + crypt_data->min_key_version, + key_state->key_version, + key_state->rotate_key_age); + } while (0); + + mutex_exit(&crypt_data->mutex); + return need_key_rotation; +} + +/*********************************************************************** +Update global statistics with thread statistics +@param[in,out] state key rotation statistics */ +static void +fil_crypt_update_total_stat( + rotate_thread_t *state) +{ + mutex_enter(&crypt_stat_mutex); + crypt_stat.pages_read_from_cache += + state->crypt_stat.pages_read_from_cache; + crypt_stat.pages_read_from_disk += + state->crypt_stat.pages_read_from_disk; + crypt_stat.pages_modified += state->crypt_stat.pages_modified; + crypt_stat.pages_flushed += state->crypt_stat.pages_flushed; + // remote old estimate + crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops; + // add new estimate + crypt_stat.estimated_iops += state->estimated_max_iops; + mutex_exit(&crypt_stat_mutex); + + // make new estimate "current" estimate + memset(&state->crypt_stat, 0, sizeof(state->crypt_stat)); + // record our old (current) estimate + state->crypt_stat.estimated_iops = state->estimated_max_iops; +} + +/*********************************************************************** +Allocate iops to thread from global setting, +used before starting to rotate a space. +@param[in,out] state Rotation state +@return true if allocation succeeded, false if failed */ +static +bool +fil_crypt_alloc_iops( + rotate_thread_t *state) +{ + ut_ad(state->allocated_iops == 0); + + /* We have not yet selected the space to rotate, thus + state might not contain space and we can't check + its status yet. */ + + uint max_iops = state->estimated_max_iops; + mutex_enter(&fil_crypt_threads_mutex); + + if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) { + /* this can happen when user decreases srv_fil_crypt_iops */ + mutex_exit(&fil_crypt_threads_mutex); + return false; + } + + uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated; + + if (alloc > max_iops) { + alloc = max_iops; + } + + n_fil_crypt_iops_allocated += alloc; + mutex_exit(&fil_crypt_threads_mutex); + + state->allocated_iops = alloc; + + return alloc > 0; +} + +/*********************************************************************** +Reallocate iops to thread, +used when inside a space +@param[in,out] state Rotation state */ +static +void +fil_crypt_realloc_iops( + rotate_thread_t *state) +{ + ut_a(state->allocated_iops > 0); + + if (10 * state->cnt_waited > state->batch) { + /* if we waited more than 10% re-estimate max_iops */ + ulint avg_wait_time_us = + ulint(state->sum_waited_us / state->cnt_waited); + + if (avg_wait_time_us == 0) { + avg_wait_time_us = 1; // prevent division by zero + } + + DBUG_PRINT("ib_crypt", + ("thr_no: %u - update estimated_max_iops from %u to " + ULINTPF ".", + state->thread_no, + state->estimated_max_iops, + 1000000 / avg_wait_time_us)); + + state->estimated_max_iops = uint(1000000 / avg_wait_time_us); + state->cnt_waited = 0; + state->sum_waited_us = 0; + } else { + DBUG_PRINT("ib_crypt", + ("thr_no: %u only waited " ULINTPF + "%% skip re-estimate.", + state->thread_no, + (100 * state->cnt_waited) + / (state->batch ? state->batch : 1))); + } + + if (state->estimated_max_iops <= state->allocated_iops) { + /* return extra iops */ + uint extra = state->allocated_iops - state->estimated_max_iops; + + if (extra > 0) { + mutex_enter(&fil_crypt_threads_mutex); + if (n_fil_crypt_iops_allocated < extra) { + /* unknown bug! + * crash in debug + * keep n_fil_crypt_iops_allocated unchanged + * in release */ + ut_ad(0); + extra = 0; + } + n_fil_crypt_iops_allocated -= extra; + state->allocated_iops -= extra; + + if (state->allocated_iops == 0) { + /* no matter how slow io system seems to be + * never decrease allocated_iops to 0... */ + state->allocated_iops ++; + n_fil_crypt_iops_allocated ++; + } + + os_event_set(fil_crypt_threads_event); + mutex_exit(&fil_crypt_threads_mutex); + } + } else { + /* see if there are more to get */ + mutex_enter(&fil_crypt_threads_mutex); + if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) { + /* there are extra iops free */ + uint extra = srv_n_fil_crypt_iops - + n_fil_crypt_iops_allocated; + if (state->allocated_iops + extra > + state->estimated_max_iops) { + /* but don't alloc more than our max */ + extra = state->estimated_max_iops - + state->allocated_iops; + } + n_fil_crypt_iops_allocated += extra; + state->allocated_iops += extra; + + DBUG_PRINT("ib_crypt", + ("thr_no: %u increased iops from %u to %u.", + state->thread_no, + state->allocated_iops - extra, + state->allocated_iops)); + + } + mutex_exit(&fil_crypt_threads_mutex); + } + + fil_crypt_update_total_stat(state); +} + +/** Release excess allocated iops +@param state rotation state +@param wake whether to wake up other threads */ +static void fil_crypt_return_iops(rotate_thread_t *state, bool wake= true) +{ + if (state->allocated_iops > 0) { + uint iops = state->allocated_iops; + mutex_enter(&fil_crypt_threads_mutex); + if (n_fil_crypt_iops_allocated < iops) { + /* unknown bug! + * crash in debug + * keep n_fil_crypt_iops_allocated unchanged + * in release */ + ut_ad(0); + iops = 0; + } + + n_fil_crypt_iops_allocated -= iops; + state->allocated_iops = 0; + if (wake) { + os_event_set(fil_crypt_threads_event); + } + mutex_exit(&fil_crypt_threads_mutex); + } + + fil_crypt_update_total_stat(state); +} + +/** Acquire a tablespace reference. +@return whether a tablespace reference was successfully acquired */ +inline bool fil_space_t::acquire_if_not_stopped() +{ + ut_ad(mutex_own(&fil_system.mutex)); + const uint32_t n= acquire_low(); + if (UNIV_LIKELY(!(n & (STOPPING | CLOSING)))) + return true; + if (UNIV_UNLIKELY(n & STOPPING)) + return false; + return UNIV_LIKELY(!(n & CLOSING)) || prepare(true); +} + +bool fil_crypt_must_default_encrypt() +{ + return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate; +} + +/** Return the next tablespace from default_encrypt_tables list. +@param space previous tablespace (nullptr to start from the start) +@param recheck whether the removal condition needs to be rechecked after +the encryption parameters were changed +@param encrypt expected state of innodb_encrypt_tables +@return the next tablespace to process (n_pending_ops incremented) +@retval fil_system.temp_space if there is no work to do +@retval nullptr upon reaching the end of the iteration */ +inline fil_space_t *fil_system_t::default_encrypt_next(fil_space_t *space, + bool recheck, + bool encrypt) +{ + ut_ad(mutex_own(&mutex)); + + sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it= + space && space->is_in_default_encrypt + ? space + : default_encrypt_tables.begin(); + const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end= + default_encrypt_tables.end(); + + if (space) + { + const bool released= !space->release(); + + if (space->is_in_default_encrypt) + { + while (++it != end && + (!UT_LIST_GET_LEN(it->chain) || it->is_stopping())); + + /* If one of the encryption threads already started + the encryption of the table then don't remove the + unencrypted spaces from default encrypt list. + + If there is a change in innodb_encrypt_tables variables + value then don't remove the last processed tablespace + from the default encrypt list. */ + if (released && !recheck && fil_crypt_must_remove(*space)) + { + ut_a(!default_encrypt_tables.empty()); + default_encrypt_tables.remove(*space); + space->is_in_default_encrypt= false; + } + } + } + else while (it != end && + (!UT_LIST_GET_LEN(it->chain) || it->is_stopping())) + { + /* Find the next suitable default encrypt table if + beginning of default_encrypt_tables list has been scheduled + to be deleted */ + it++; + } + + if (it == end) + return temp_space; + + do + { + space= &*it; + if (space->acquire_if_not_stopped()) + return space; + if (++it == end) + return nullptr; + } + while (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()); + + return nullptr; +} + +/** Determine the next tablespace for encryption key rotation. +@param space current tablespace (nullptr to start from the beginning) +@param recheck whether the removal condition needs to be rechecked after +encryption parameters were changed +@param encrypt expected state of innodb_encrypt_tables +@return the next tablespace +@retval fil_system.temp_space if there is no work to do +@retval nullptr upon reaching the end of the iteration */ +inline fil_space_t *fil_space_t::next(fil_space_t *space, bool recheck, + bool encrypt) +{ + mutex_enter(&fil_system.mutex); + + if (fil_crypt_must_default_encrypt()) + space= fil_system.default_encrypt_next(space, recheck, encrypt); + else + { + if (!space) + space= UT_LIST_GET_FIRST(fil_system.space_list); + else + { + /* Move on to the next fil_space_t */ + space->release(); + space= UT_LIST_GET_NEXT(space_list, space); + } + + for (; space; space= UT_LIST_GET_NEXT(space_list, space)) + { + if (space->purpose != FIL_TYPE_TABLESPACE) + continue; + const uint32_t n= space->acquire_low(); + if (UNIV_LIKELY(!(n & (STOPPING | CLOSING)))) + break; + if (!(n & STOPPING) && space->prepare(true)) + break; + } + } + + mutex_exit(&fil_system.mutex); + return space; +} + +/** Search for a space needing rotation +@param[in,out] key_state Key state +@param[in,out] state Rotation state +@param[in,out] recheck recheck of the tablespace is needed or + still encryption thread does write page 0 */ +static bool fil_crypt_find_space_to_rotate( + key_state_t* key_state, + rotate_thread_t* state, + bool* recheck) +{ + /* we need iops to start rotating */ + while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) { + if (state->space && state->space->is_stopping()) { + state->space->release(); + state->space = NULL; + } + + os_event_reset(fil_crypt_threads_event); + os_event_wait_time(fil_crypt_threads_event, 100000); + } + + if (state->should_shutdown()) { + if (state->space) { + state->space->release(); + state->space = NULL; + } + return false; + } + + if (state->first) { + state->first = false; + if (state->space) { + state->space->release(); + } + state->space = NULL; + } + + bool wake; + for (;;) { + state->space = fil_space_t::next(state->space, *recheck, + key_state->key_version != 0); + wake = state->should_shutdown(); + + if (state->space == fil_system.temp_space) { + goto done; + } else if (wake) { + break; + } else { + wake = true; + } + + if (!state->space) { + break; + } + + /* If there is no crypt data and we have not yet read + page 0 for this tablespace, we need to read it before + we can continue. */ + if (!state->space->crypt_data) { + fil_crypt_read_crypt_data(state->space); + } + + if (fil_crypt_space_needs_rotation(state, key_state, recheck)) { + ut_ad(key_state->key_id); + /* init state->min_key_version_found before + * starting on a space */ + state->min_key_version_found = key_state->key_version; + return true; + } + } + + if (state->space) { + state->space->release(); +done: + state->space = NULL; + } + + /* no work to do; release our allocation of I/O capacity */ + fil_crypt_return_iops(state, wake); + + return false; + +} + +/*********************************************************************** +Start rotating a space +@param[in] key_state Key state +@param[in,out] state Rotation state */ +static +void +fil_crypt_start_rotate_space( + const key_state_t* key_state, + rotate_thread_t* state) +{ + fil_space_crypt_t *crypt_data = state->space->crypt_data; + + ut_ad(crypt_data); + mutex_enter(&crypt_data->mutex); + ut_ad(key_state->key_id == crypt_data->key_id); + + if (crypt_data->rotate_state.active_threads == 0) { + /* only first thread needs to init */ + crypt_data->rotate_state.next_offset = 1; // skip page 0 + /* no need to rotate beyond current max + * if space extends, it will be encrypted with newer version */ + /* FIXME: max_offset could be removed and instead + space->size consulted.*/ + crypt_data->rotate_state.max_offset = state->space->size; + crypt_data->rotate_state.end_lsn = 0; + crypt_data->rotate_state.min_key_version_found = + key_state->key_version; + + crypt_data->rotate_state.start_time = time(0); + + if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED && + crypt_data->is_encrypted() && + key_state->key_version != 0) { + /* this is rotation unencrypted => encrypted */ + crypt_data->type = CRYPT_SCHEME_1; + } + } + + /* count active threads in space */ + crypt_data->rotate_state.active_threads++; + + /* Initialize thread local state */ + state->end_lsn = crypt_data->rotate_state.end_lsn; + state->min_key_version_found = + crypt_data->rotate_state.min_key_version_found; + + mutex_exit(&crypt_data->mutex); +} + +/*********************************************************************** +Search for batch of pages needing rotation +@param[in] key_state Key state +@param[in,out] state Rotation state +@return true if page needing key rotation found, false if not found */ +static +bool +fil_crypt_find_page_to_rotate( + const key_state_t* key_state, + rotate_thread_t* state) +{ + ulint batch = srv_alloc_time * state->allocated_iops; + fil_space_t* space = state->space; + + ut_ad(!space || space->referenced()); + + /* If space is marked to be dropped stop rotation. */ + if (!space || space->is_stopping()) { + return false; + } + + fil_space_crypt_t *crypt_data = space->crypt_data; + + mutex_enter(&crypt_data->mutex); + ut_ad(key_state->key_id == crypt_data->key_id); + + bool found = crypt_data->rotate_state.max_offset >= + crypt_data->rotate_state.next_offset; + + if (found) { + state->offset = crypt_data->rotate_state.next_offset; + ulint remaining = crypt_data->rotate_state.max_offset - + crypt_data->rotate_state.next_offset; + + if (batch <= remaining) { + state->batch = batch; + } else { + state->batch = remaining; + } + } + + crypt_data->rotate_state.next_offset += uint32_t(batch); + mutex_exit(&crypt_data->mutex); + return found; +} + +#define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \ + fil_crypt_get_page_throttle_func(state, offset, mtr, \ + sleeptime_ms, __FILE__, __LINE__) + +/*********************************************************************** +Get a page and compute sleep time +@param[in,out] state Rotation state +@param[in] offset Page offset +@param[in,out] mtr Minitransaction +@param[out] sleeptime_ms Sleep time +@param[in] file File where called +@param[in] line Line where called +@return page or NULL*/ +static +buf_block_t* +fil_crypt_get_page_throttle_func( + rotate_thread_t* state, + uint32_t offset, + mtr_t* mtr, + ulint* sleeptime_ms, + const char* file, + unsigned line) +{ + fil_space_t* space = state->space; + const ulint zip_size = space->zip_size(); + const page_id_t page_id(space->id, offset); + ut_ad(space->referenced()); + + /* Before reading from tablespace we need to make sure that + the tablespace is not about to be dropped. */ + if (space->is_stopping()) { + return NULL; + } + + dberr_t err = DB_SUCCESS; + buf_block_t* block = buf_page_get_gen(page_id, zip_size, RW_X_LATCH, + NULL, + BUF_PEEK_IF_IN_POOL, file, line, + mtr, &err); + if (block != NULL) { + /* page was in buffer pool */ + state->crypt_stat.pages_read_from_cache++; + return block; + } + + if (space->is_stopping()) { + return NULL; + } + + if (fseg_page_is_free(space, state->offset)) { + /* page is already freed */ + return NULL; + } + + state->crypt_stat.pages_read_from_disk++; + + const ulonglong start = my_interval_timer(); + block = buf_page_get_gen(page_id, zip_size, + RW_X_LATCH, + NULL, BUF_GET_POSSIBLY_FREED, + file, line, mtr, &err); + const ulonglong end = my_interval_timer(); + + state->cnt_waited++; + + if (end > start) { + state->sum_waited_us += (end - start) / 1000; + } + + /* average page load */ + ulint add_sleeptime_ms = 0; + ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited); + ulint alloc_wait_us = 1000000 / state->allocated_iops; + + if (avg_wait_time_us < alloc_wait_us) { + /* we reading faster than we allocated */ + add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000; + } else { + /* if page load time is longer than we want, skip sleeping */ + } + + *sleeptime_ms += add_sleeptime_ms; + + return block; +} + +/*********************************************************************** +Rotate one page +@param[in,out] key_state Key state +@param[in,out] state Rotation state */ +static +void +fil_crypt_rotate_page( + const key_state_t* key_state, + rotate_thread_t* state) +{ + fil_space_t*space = state->space; + ulint space_id = space->id; + uint32_t offset = state->offset; + ulint sleeptime_ms = 0; + fil_space_crypt_t *crypt_data = space->crypt_data; + + ut_ad(space->referenced()); + ut_ad(offset > 0); + + /* In fil_crypt_thread where key rotation is done we have + acquired space and checked that this space is not yet + marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate(). + Check here also to give DROP TABLE or similar a change. */ + if (space->is_stopping()) { + return; + } + + if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) { + /* don't encrypt this as it contains address to dblwr buffer */ + return; + } + + mtr_t mtr; + mtr.start(); + if (buf_block_t* block = fil_crypt_get_page_throttle(state, + offset, &mtr, + &sleeptime_ms)) { + bool modified = false; + byte* frame = buf_block_get_frame(block); + const lsn_t block_lsn = mach_read_from_8(FIL_PAGE_LSN + frame); + uint kv = buf_page_get_key_version(frame, space->flags); + + if (block->page.status == buf_page_t::FREED) { + /* Do not modify freed pages to avoid an assertion + failure on recovery.*/ + } else if (block->page.oldest_modification() > 1) { + /* Do not unnecessarily touch pages that are + already dirty. */ + } else if (space->is_stopping()) { + /* The tablespace is closing (in DROP TABLE or + TRUNCATE TABLE or similar): avoid further access */ + } else if (!kv && !*reinterpret_cast<uint16_t*> + (&frame[FIL_PAGE_TYPE])) { + /* It looks like this page is not + allocated. Because key rotation is accessing + pages in a pattern that is unlike the normal + B-tree and undo log access pattern, we cannot + invoke fseg_page_is_free() here, because that + could result in a deadlock. If we invoked + fseg_page_is_free() and released the + tablespace latch before acquiring block->lock, + then the fseg_page_is_free() information + could be stale already. */ + + /* If the data file was originally created + before MariaDB 10.0 or MySQL 5.6, some + allocated data pages could carry 0 in + FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those + pages will be updated in + buf_flush_init_for_writing() when the page + is modified the next time. + + Also, when the doublewrite buffer pages are + allocated on bootstrap in a non-debug build, + some dummy pages will be allocated, with 0 in + the FIL_PAGE_TYPE. Those pages should be + skipped from key rotation forever. */ + } else if (fil_crypt_needs_rotation( + crypt_data, + kv, + key_state->key_version, + key_state->rotate_key_age)) { + + mtr.set_named_space(space); + modified = true; + + /* force rotation by dummy updating page */ + mtr.write<1,mtr_t::FORCED>(*block, + &frame[FIL_PAGE_SPACE_ID], + frame[FIL_PAGE_SPACE_ID]); + + /* statistics */ + state->crypt_stat.pages_modified++; + } else { + if (crypt_data->is_encrypted()) { + if (kv < state->min_key_version_found) { + state->min_key_version_found = kv; + } + } + } + + mtr.commit(); + lsn_t end_lsn = mtr.commit_lsn(); + + + if (modified) { + /* if we modified page, we take lsn from mtr */ + ut_a(end_lsn > state->end_lsn); + ut_a(end_lsn > block_lsn); + state->end_lsn = end_lsn; + } else { + /* if we did not modify page, check for max lsn */ + if (block_lsn > state->end_lsn) { + state->end_lsn = block_lsn; + } + } + } else { + /* If block read failed mtr memo and log should be empty. */ + ut_ad(!mtr.has_modifications()); + ut_ad(!mtr.is_dirty()); + ut_ad(mtr.get_memo()->size() == 0); + ut_ad(mtr.get_log()->size() == 0); + mtr.commit(); + } + + if (sleeptime_ms) { + os_event_reset(fil_crypt_throttle_sleep_event); + os_event_wait_time(fil_crypt_throttle_sleep_event, + 1000 * sleeptime_ms); + } +} + +/*********************************************************************** +Rotate a batch of pages +@param[in,out] key_state Key state +@param[in,out] state Rotation state */ +static +void +fil_crypt_rotate_pages( + const key_state_t* key_state, + rotate_thread_t* state) +{ + ulint space_id = state->space->id; + uint32_t end = std::min(state->offset + uint32_t(state->batch), + state->space->free_limit); + + ut_ad(state->space->referenced()); + + for (; state->offset < end; state->offset++) { + + /* we can't rotate pages in dblwr buffer as + * it's not possible to read those due to lots of asserts + * in buffer pool. + * + * However since these are only (short-lived) copies of + * real pages, they will be updated anyway when the + * real page is updated + */ + if (buf_dblwr.is_inside(page_id_t(space_id, state->offset))) { + continue; + } + + /* If space is marked as stopping, stop rotating + pages. */ + if (state->space->is_stopping()) { + break; + } + + fil_crypt_rotate_page(key_state, state); + } +} + +/*********************************************************************** +Flush rotated pages and then update page 0 + +@param[in,out] state rotation state */ +static +void +fil_crypt_flush_space( + rotate_thread_t* state) +{ + fil_space_t* space = state->space; + fil_space_crypt_t *crypt_data = space->crypt_data; + + ut_ad(space->referenced()); + + /* flush tablespace pages so that there are no pages left with old key */ + lsn_t end_lsn = crypt_data->rotate_state.end_lsn; + + if (end_lsn > 0 && !space->is_stopping()) { + ulint sum_pages = 0; + const ulonglong start = my_interval_timer(); + while (buf_flush_list_space(space, &sum_pages)); + if (sum_pages) { + const ulonglong end = my_interval_timer(); + + state->cnt_waited += sum_pages; + state->sum_waited_us += (end - start) / 1000; + + /* statistics */ + state->crypt_stat.pages_flushed += sum_pages; + } + } + + if (crypt_data->min_key_version == 0) { + crypt_data->type = CRYPT_SCHEME_UNENCRYPTED; + } + + if (space->is_stopping()) { + return; + } + + /* update page 0 */ + mtr_t mtr; + mtr.start(); + + if (buf_block_t* block = buf_page_get_gen( + page_id_t(space->id, 0), space->zip_size(), + RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED, + __FILE__, __LINE__, &mtr)) { + if (block->page.status != buf_page_t::FREED) { + mtr.set_named_space(space); + crypt_data->write_page0(block, &mtr); + } + } + + mtr.commit(); +} + +/*********************************************************************** +Complete rotating a space +@param[in,out] state Rotation state */ +static void fil_crypt_complete_rotate_space(rotate_thread_t* state) +{ + fil_space_crypt_t *crypt_data = state->space->crypt_data; + + ut_ad(crypt_data); + ut_ad(state->space->referenced()); + + /* Space might already be dropped */ + if (!state->space->is_stopping()) { + mutex_enter(&crypt_data->mutex); + + /** + * Update crypt data state with state from thread + */ + if (state->min_key_version_found < + crypt_data->rotate_state.min_key_version_found) { + crypt_data->rotate_state.min_key_version_found = + state->min_key_version_found; + } + + if (state->end_lsn > crypt_data->rotate_state.end_lsn) { + crypt_data->rotate_state.end_lsn = state->end_lsn; + } + + ut_a(crypt_data->rotate_state.active_threads > 0); + crypt_data->rotate_state.active_threads--; + bool last = crypt_data->rotate_state.active_threads == 0; + + /** + * check if space is fully done + * this as when threads shutdown, it could be that we "complete" + * iterating before we have scanned the full space. + */ + bool done = crypt_data->rotate_state.next_offset >= + crypt_data->rotate_state.max_offset; + + /** + * we should flush space if we're last thread AND + * the iteration is done + */ + bool should_flush = last && done; + + if (should_flush) { + /* we're the last active thread */ + crypt_data->rotate_state.flushing = true; + crypt_data->min_key_version = + crypt_data->rotate_state.min_key_version_found; + mutex_exit(&crypt_data->mutex); + fil_crypt_flush_space(state); + + mutex_enter(&crypt_data->mutex); + crypt_data->rotate_state.flushing = false; + mutex_exit(&crypt_data->mutex); + } else { + mutex_exit(&crypt_data->mutex); + } + } else { + mutex_enter(&crypt_data->mutex); + ut_a(crypt_data->rotate_state.active_threads > 0); + crypt_data->rotate_state.active_threads--; + mutex_exit(&crypt_data->mutex); + } +} + +/*********************************************************************//** +A thread which monitors global key state and rotates tablespaces accordingly +@return a dummy parameter */ +extern "C" UNIV_INTERN +os_thread_ret_t +DECLARE_THREAD(fil_crypt_thread)(void*) +{ + mutex_enter(&fil_crypt_threads_mutex); + uint thread_no = srv_n_fil_crypt_threads_started; + srv_n_fil_crypt_threads_started++; + os_event_set(fil_crypt_event); /* signal that we started */ + mutex_exit(&fil_crypt_threads_mutex); + + /* state of this thread */ + rotate_thread_t thr(thread_no); + + /* if we find a space that is starting, skip over it and recheck it later */ + bool recheck = false; + + while (!thr.should_shutdown()) { + + key_state_t new_state; + + while (!thr.should_shutdown()) { + + /* wait for key state changes + * i.e either new key version of change or + * new rotate_key_age */ + os_event_reset(fil_crypt_threads_event); + + if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) { + break; + } + + if (recheck) { + /* check recheck here, after sleep, so + * that we don't busy loop while when one thread is starting + * a space*/ + break; + } + } + + recheck = false; + thr.first = true; // restart from first tablespace + + /* iterate all spaces searching for those needing rotation */ + while (!thr.should_shutdown() && + fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) { + + /* we found a space to rotate */ + fil_crypt_start_rotate_space(&new_state, &thr); + + /* iterate all pages (cooperativly with other threads) */ + while (!thr.should_shutdown() && + fil_crypt_find_page_to_rotate(&new_state, &thr)) { + + if (!thr.space->is_stopping()) { + /* rotate a (set) of pages */ + fil_crypt_rotate_pages(&new_state, &thr); + } + + /* If space is marked as stopping, release + space and stop rotation. */ + if (thr.space->is_stopping()) { + fil_crypt_complete_rotate_space(&thr); + thr.space->release(); + thr.space = NULL; + break; + } + + /* realloc iops */ + fil_crypt_realloc_iops(&thr); + } + + /* complete rotation */ + if (thr.space) { + fil_crypt_complete_rotate_space(&thr); + } + + /* force key state refresh */ + new_state.key_id = 0; + + /* return iops */ + fil_crypt_return_iops(&thr); + } + } + + /* return iops if shutting down */ + fil_crypt_return_iops(&thr); + + /* release current space if shutting down */ + if (thr.space) { + thr.space->release(); + thr.space = NULL; + } + + mutex_enter(&fil_crypt_threads_mutex); + srv_n_fil_crypt_threads_started--; + os_event_set(fil_crypt_event); /* signal that we stopped */ + mutex_exit(&fil_crypt_threads_mutex); + + /* We count the number of threads in os_thread_exit(). A created + thread should always use that to exit and not use return() to exit. */ + + os_thread_exit(); + + OS_THREAD_DUMMY_RETURN; +} + +/********************************************************************* +Adjust thread count for key rotation +@param[in] enw_cnt Number of threads to be used */ +UNIV_INTERN +void +fil_crypt_set_thread_cnt( + const uint new_cnt) +{ + if (!fil_crypt_threads_inited) { + if (srv_shutdown_state != SRV_SHUTDOWN_NONE) + return; + fil_crypt_threads_init(); + } + + mutex_enter(&fil_crypt_threads_mutex); + + if (new_cnt > srv_n_fil_crypt_threads) { + uint add = new_cnt - srv_n_fil_crypt_threads; + srv_n_fil_crypt_threads = new_cnt; + for (uint i = 0; i < add; i++) { + ib::info() << "Creating #" + << i+1 << " encryption thread id " + << os_thread_create(fil_crypt_thread) + << " total threads " << new_cnt << "."; + } + } else if (new_cnt < srv_n_fil_crypt_threads) { + srv_n_fil_crypt_threads = new_cnt; + os_event_set(fil_crypt_threads_event); + } + + mutex_exit(&fil_crypt_threads_mutex); + + while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) { + os_event_reset(fil_crypt_event); + os_event_wait_time(fil_crypt_event, 100000); + } + + /* Send a message to encryption threads that there could be + something to do. */ + if (srv_n_fil_crypt_threads) { + os_event_set(fil_crypt_threads_event); + } +} + +/** Initialize the tablespace default_encrypt_tables +if innodb_encryption_rotate_key_age=0. */ +static void fil_crypt_default_encrypt_tables_fill() +{ + ut_ad(mutex_own(&fil_system.mutex)); + + for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list); + space != NULL; + space = UT_LIST_GET_NEXT(space_list, space)) { + if (space->purpose != FIL_TYPE_TABLESPACE + || space->is_in_default_encrypt + || UT_LIST_GET_LEN(space->chain) == 0 + || !space->acquire_if_not_stopped()) { + continue; + } + + /* Ensure that crypt_data has been initialized. */ + ut_ad(space->size); + + /* Skip ENCRYPTION!=DEFAULT tablespaces. */ + if (space->crypt_data + && !space->crypt_data->is_default_encryption()) { + goto next; + } + + if (srv_encrypt_tables) { + /* Skip encrypted tablespaces if + innodb_encrypt_tables!=OFF */ + if (space->crypt_data + && space->crypt_data->min_key_version) { + goto next; + } + } else { + /* Skip unencrypted tablespaces if + innodb_encrypt_tables=OFF */ + if (!space->crypt_data + || !space->crypt_data->min_key_version) { + goto next; + } + } + + fil_system.default_encrypt_tables.push_back(*space); + space->is_in_default_encrypt = true; +next: + space->release(); + } +} + +/********************************************************************* +Adjust max key age +@param[in] val New max key age */ +UNIV_INTERN +void +fil_crypt_set_rotate_key_age( + uint val) +{ + mutex_enter(&fil_system.mutex); + srv_fil_crypt_rotate_key_age = val; + if (val == 0) { + fil_crypt_default_encrypt_tables_fill(); + } + mutex_exit(&fil_system.mutex); + os_event_set(fil_crypt_threads_event); +} + +/********************************************************************* +Adjust rotation iops +@param[in] val New max roation iops */ +UNIV_INTERN +void +fil_crypt_set_rotation_iops( + uint val) +{ + srv_n_fil_crypt_iops = val; + os_event_set(fil_crypt_threads_event); +} + +/********************************************************************* +Adjust encrypt tables +@param[in] val New setting for innodb-encrypt-tables */ +void fil_crypt_set_encrypt_tables(ulong val) +{ + mutex_enter(&fil_system.mutex); + + srv_encrypt_tables = val; + + if (fil_crypt_must_default_encrypt()) { + fil_crypt_default_encrypt_tables_fill(); + } + + mutex_exit(&fil_system.mutex); + + os_event_set(fil_crypt_threads_event); +} + +/********************************************************************* +Init threads for key rotation */ +UNIV_INTERN +void +fil_crypt_threads_init() +{ + if (!fil_crypt_threads_inited) { + fil_crypt_event = os_event_create(0); + fil_crypt_threads_event = os_event_create(0); + mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX, + &fil_crypt_threads_mutex); + + uint cnt = srv_n_fil_crypt_threads; + srv_n_fil_crypt_threads = 0; + fil_crypt_threads_inited = true; + fil_crypt_set_thread_cnt(cnt); + } +} + +/********************************************************************* +Clean up key rotation threads resources */ +UNIV_INTERN +void +fil_crypt_threads_cleanup() +{ + if (!fil_crypt_threads_inited) { + return; + } + ut_a(!srv_n_fil_crypt_threads_started); + os_event_destroy(fil_crypt_event); + os_event_destroy(fil_crypt_threads_event); + mutex_free(&fil_crypt_threads_mutex); + fil_crypt_threads_inited = false; +} + +/********************************************************************* +Wait for crypt threads to stop accessing space +@param[in] space Tablespace */ +UNIV_INTERN +void +fil_space_crypt_close_tablespace( + const fil_space_t* space) +{ + fil_space_crypt_t* crypt_data = space->crypt_data; + + if (!crypt_data || srv_n_fil_crypt_threads == 0 + || !fil_crypt_threads_inited) { + return; + } + + mutex_enter(&fil_crypt_threads_mutex); + + time_t start = time(0); + time_t last = start; + + mutex_enter(&crypt_data->mutex); + mutex_exit(&fil_crypt_threads_mutex); + + ulint cnt = crypt_data->rotate_state.active_threads; + bool flushing = crypt_data->rotate_state.flushing; + + while (cnt > 0 || flushing) { + mutex_exit(&crypt_data->mutex); + /* release dict mutex so that scrub threads can release their + * table references */ + dict_mutex_exit_for_mysql(); + + /* wakeup throttle (all) sleepers */ + os_event_set(fil_crypt_throttle_sleep_event); + os_event_set(fil_crypt_threads_event); + + os_thread_sleep(20000); + dict_mutex_enter_for_mysql(); + mutex_enter(&crypt_data->mutex); + cnt = crypt_data->rotate_state.active_threads; + flushing = crypt_data->rotate_state.flushing; + + time_t now = time(0); + + if (now >= last + 30) { + ib::warn() << "Waited " + << now - start + << " seconds to drop space: " + << space->name << " (" + << space->id << ") active threads " + << cnt << "flushing=" + << flushing << "."; + last = now; + } + } + + mutex_exit(&crypt_data->mutex); +} + +/********************************************************************* +Get crypt status for a space (used by information_schema) +@param[in] space Tablespace +@param[out] status Crypt status */ +UNIV_INTERN +void +fil_space_crypt_get_status( + const fil_space_t* space, + struct fil_space_crypt_status_t* status) +{ + memset(status, 0, sizeof(*status)); + + ut_ad(space->referenced()); + + /* If there is no crypt data and we have not yet read + page 0 for this tablespace, we need to read it before + we can continue. */ + if (!space->crypt_data) { + fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space)); + } + + status->space = ULINT_UNDEFINED; + + if (fil_space_crypt_t* crypt_data = space->crypt_data) { + status->space = space->id; + mutex_enter(&crypt_data->mutex); + status->scheme = crypt_data->type; + status->keyserver_requests = crypt_data->keyserver_requests; + status->min_key_version = crypt_data->min_key_version; + status->key_id = crypt_data->key_id; + + if (crypt_data->rotate_state.active_threads > 0 || + crypt_data->rotate_state.flushing) { + status->rotating = true; + status->flushing = + crypt_data->rotate_state.flushing; + status->rotate_next_page_number = + crypt_data->rotate_state.next_offset; + status->rotate_max_page_number = + crypt_data->rotate_state.max_offset; + } + + mutex_exit(&crypt_data->mutex); + + if (srv_encrypt_tables || crypt_data->min_key_version) { + status->current_key_version = + fil_crypt_get_latest_key_version(crypt_data); + } + } +} + +/********************************************************************* +Return crypt statistics +@param[out] stat Crypt statistics */ +UNIV_INTERN +void +fil_crypt_total_stat( + fil_crypt_stat_t *stat) +{ + mutex_enter(&crypt_stat_mutex); + *stat = crypt_stat; + mutex_exit(&crypt_stat_mutex); +} + +#endif /* UNIV_INNOCHECKSUM */ + +/** +Verify that post encryption checksum match calculated checksum. +This function should be called only if tablespace contains crypt_data +metadata (this is strong indication that tablespace is encrypted). +Function also verifies that traditional checksum does not match +calculated checksum as if it does page could be valid unencrypted, +encrypted, or corrupted. + +@param[in,out] page page frame (checksum is temporarily modified) +@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0 +@return true if page is encrypted AND OK, false otherwise */ +bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size) +{ + ut_ad(mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)); + + /* Compressed and encrypted pages do not have checksum. Assume not + corrupted. Page verification happens after decompression in + buf_page_read_complete() using buf_page_is_corrupted(). */ + if (fil_page_get_type(page) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { + return true; + } + + /* Read stored post encryption checksum. */ + const ib_uint32_t checksum = mach_read_from_4( + page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4); + + /* If stored checksum matches one of the calculated checksums + page is not corrupted. */ + + switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) { + case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32: + case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32: + if (zip_size) { + return checksum == page_zip_calc_checksum( + page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32); + } + + return checksum == buf_calc_page_crc32(page); + case SRV_CHECKSUM_ALGORITHM_STRICT_NONE: + /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1, + due to MDEV-12114, fil_crypt_calculate_checksum() + is only using CRC32 for the encrypted pages. + Due to this, we must treat "strict_none" as "none". */ + case SRV_CHECKSUM_ALGORITHM_NONE: + return true; + case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB: + /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1, + due to MDEV-12114, fil_crypt_calculate_checksum() + is only using CRC32 for the encrypted pages. + Due to this, we must treat "strict_innodb" as "innodb". */ + case SRV_CHECKSUM_ALGORITHM_INNODB: + case SRV_CHECKSUM_ALGORITHM_CRC32: + case SRV_CHECKSUM_ALGORITHM_FULL_CRC32: + if (checksum == BUF_NO_CHECKSUM_MAGIC) { + return true; + } + if (zip_size) { + return checksum == page_zip_calc_checksum( + page, zip_size, + SRV_CHECKSUM_ALGORITHM_CRC32) + || checksum == page_zip_calc_checksum( + page, zip_size, + SRV_CHECKSUM_ALGORITHM_INNODB); + } + + return checksum == buf_calc_page_crc32(page) + || checksum == buf_calc_page_new_checksum(page); + } + + ut_ad("unhandled innodb_checksum_algorithm" == 0); + return false; +} diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc new file mode 100644 index 00000000..a2591dd9 --- /dev/null +++ b/storage/innobase/fil/fil0fil.cc @@ -0,0 +1,3757 @@ +/***************************************************************************** + +Copyright (c) 1995, 2021, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2014, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file fil/fil0fil.cc +The tablespace memory cache + +Created 10/25/1995 Heikki Tuuri +*******************************************************/ + +#include "fil0fil.h" +#include "fil0crypt.h" + +#include "btr0btr.h" +#include "buf0buf.h" +#include "dict0boot.h" +#include "dict0dict.h" +#include "dict0load.h" +#include "fsp0file.h" +#include "fsp0fsp.h" +#include "hash0hash.h" +#include "log0log.h" +#include "log0recv.h" +#include "mach0data.h" +#include "mtr0log.h" +#include "os0file.h" +#include "page0zip.h" +#include "row0mysql.h" +#include "srv0start.h" +#include "trx0purge.h" +#include "buf0lru.h" +#include "ibuf0ibuf.h" +#include "os0event.h" +#include "sync0sync.h" +#include "buf0flu.h" +#ifdef UNIV_LINUX +# include <sys/types.h> +# include <sys/sysmacros.h> +# include <dirent.h> +#endif + +/** Determine if the space id is a user tablespace id or not. +@param space_id tablespace identifier +@return true if it is a user tablespace ID */ +inline bool fil_is_user_tablespace_id(ulint space_id) +{ + return space_id != TRX_SYS_SPACE && space_id != SRV_TMP_SPACE_ID && + !srv_is_undo_tablespace(space_id); +} + +/** Try to close a file to adhere to the innodb_open_files limit. +@param print_info whether to diagnose why a file cannot be closed +@return whether a file was closed */ +bool fil_space_t::try_to_close(bool print_info) +{ + ut_ad(mutex_own(&fil_system.mutex)); + for (fil_space_t *space= UT_LIST_GET_FIRST(fil_system.space_list); space; + space= UT_LIST_GET_NEXT(space_list, space)) + { + switch (space->purpose) { + case FIL_TYPE_TEMPORARY: + continue; + case FIL_TYPE_IMPORT: + break; + case FIL_TYPE_TABLESPACE: + if (!fil_is_user_tablespace_id(space->id)) + continue; + } + + /* We are using an approximation of LRU replacement policy. In + fil_node_open_file_low(), newly opened files are moved to the end + of fil_system.space_list, so that they would be less likely to be + closed here. */ + fil_node_t *node= UT_LIST_GET_FIRST(space->chain); + ut_ad(node); + ut_ad(!UT_LIST_GET_NEXT(chain, node)); + + if (!node->is_open()) + continue; + + if (const auto n= space->set_closing()) + { + if (print_info) + ib::info() << "Cannot close file " << node->name + << " because of " + << (n & PENDING) + << ((n & NEEDS_FSYNC) + ? " pending operations and pending fsync" + : " pending operations"); + continue; + } + + node->close(); + return true; + } + + return false; +} + +/** Rename a single-table tablespace. +The tablespace must exist in the memory cache. +@param[in] id tablespace identifier +@param[in] old_path old file name +@param[in] new_name new table name in the +databasename/tablename format +@param[in] new_path_in new file name, +or NULL if it is located in the normal data directory +@return true if success */ +static bool +fil_rename_tablespace( + ulint id, + const char* old_path, + const char* new_name, + const char* new_path_in); + +/* + IMPLEMENTATION OF THE TABLESPACE MEMORY CACHE + ============================================= + +The tablespace cache is responsible for providing fast read/write access to +tablespaces and logs of the database. File creation and deletion is done +in other modules which know more of the logic of the operation, however. + +A tablespace consists of a chain of files. The size of the files does not +have to be divisible by the database block size, because we may just leave +the last incomplete block unused. When a new file is appended to the +tablespace, the maximum size of the file is also specified. At the moment, +we think that it is best to extend the file to its maximum size already at +the creation of the file, because then we can avoid dynamically extending +the file when more space is needed for the tablespace. + +A block's position in the tablespace is specified with a 32-bit unsigned +integer. The files in the chain are thought to be catenated, and the block +corresponding to an address n is the nth block in the catenated file (where +the first block is named the 0th block, and the incomplete block fragments +at the end of files are not taken into account). A tablespace can be extended +by appending a new file at the end of the chain. + +Our tablespace concept is similar to the one of Oracle. + +To acquire more speed in disk transfers, a technique called disk striping is +sometimes used. This means that logical block addresses are divided in a +round-robin fashion across several disks. Windows NT supports disk striping, +so there we do not need to support it in the database. Disk striping is +implemented in hardware in RAID disks. We conclude that it is not necessary +to implement it in the database. Oracle 7 does not support disk striping, +either. + +Another trick used at some database sites is replacing tablespace files by +raw disks, that is, the whole physical disk drive, or a partition of it, is +opened as a single file, and it is accessed through byte offsets calculated +from the start of the disk or the partition. This is recommended in some +books on database tuning to achieve more speed in i/o. Using raw disk +certainly prevents the OS from fragmenting disk space, but it is not clear +if it really adds speed. We measured on the Pentium 100 MHz + NT + NTFS file +system + EIDE Conner disk only a negligible difference in speed when reading +from a file, versus reading from a raw disk. + +To have fast access to a tablespace or a log file, we put the data structures +to a hash table. Each tablespace and log file is given an unique 32-bit +identifier. */ + +/** Reference to the server data directory. Usually it is the +current working directory ".", but in the MySQL Embedded Server Library +it is an absolute path. */ +const char* fil_path_to_mysql_datadir; + +/** Common InnoDB file extensions */ +const char* dot_ext[] = { "", ".ibd", ".isl", ".cfg" }; + +/** Number of pending tablespace flushes */ +Atomic_counter<ulint> fil_n_pending_tablespace_flushes; + +/** The tablespace memory cache. This variable is NULL before the module is +initialized. */ +fil_system_t fil_system; + +/** At this age or older a space/page will be rotated */ +UNIV_INTERN extern uint srv_fil_crypt_rotate_key_age; + +#ifdef UNIV_DEBUG +/** Try fil_validate() every this many times */ +# define FIL_VALIDATE_SKIP 17 + +/******************************************************************//** +Checks the consistency of the tablespace cache some of the time. +@return true if ok or the check was skipped */ +static +bool +fil_validate_skip(void) +/*===================*/ +{ + /** The fil_validate() call skip counter. */ + static Atomic_counter<uint32_t> fil_validate_count; + + /* We want to reduce the call frequency of the costly fil_validate() + check in debug builds. */ + return (fil_validate_count++ % FIL_VALIDATE_SKIP) || fil_validate(); +} +#endif /* UNIV_DEBUG */ + +/*******************************************************************//** +Returns the table space by a given id, NULL if not found. +It is unsafe to dereference the returned pointer. It is fine to check +for NULL. */ +fil_space_t* +fil_space_get_by_id( +/*================*/ + ulint id) /*!< in: space id */ +{ + fil_space_t* space; + + ut_ad(fil_system.is_initialised()); + ut_ad(mutex_own(&fil_system.mutex)); + + HASH_SEARCH(hash, &fil_system.spaces, id, + fil_space_t*, space, + ut_ad(space->magic_n == FIL_SPACE_MAGIC_N), + space->id == id); + + return(space); +} + +/** Look up a tablespace. +The caller should hold an InnoDB table lock or a MDL that prevents +the tablespace from being dropped during the operation, +or the caller should be in single-threaded crash recovery mode +(no user connections that could drop tablespaces). +Normally, fil_space_t::get() should be used instead. +@param[in] id tablespace ID +@return tablespace, or NULL if not found */ +fil_space_t* +fil_space_get( + ulint id) +{ + mutex_enter(&fil_system.mutex); + fil_space_t* space = fil_space_get_by_id(id); + mutex_exit(&fil_system.mutex); + return(space); +} + +/** Validate the compression algorithm for full crc32 format. +@param[in] space tablespace object +@return whether the compression algorithm support */ +static bool fil_comp_algo_validate(const fil_space_t* space) +{ + if (!space->full_crc32()) { + return true; + } + + DBUG_EXECUTE_IF("fil_comp_algo_validate_fail", + return false;); + + ulint comp_algo = space->get_compression_algo(); + switch (comp_algo) { + case PAGE_UNCOMPRESSED: + case PAGE_ZLIB_ALGORITHM: +#ifdef HAVE_LZ4 + case PAGE_LZ4_ALGORITHM: +#endif /* HAVE_LZ4 */ +#ifdef HAVE_LZO + case PAGE_LZO_ALGORITHM: +#endif /* HAVE_LZO */ +#ifdef HAVE_LZMA + case PAGE_LZMA_ALGORITHM: +#endif /* HAVE_LZMA */ +#ifdef HAVE_BZIP2 + case PAGE_BZIP2_ALGORITHM: +#endif /* HAVE_BZIP2 */ +#ifdef HAVE_SNAPPY + case PAGE_SNAPPY_ALGORITHM: +#endif /* HAVE_SNAPPY */ + return true; + } + + return false; +} + +/** Append a file to the chain of files of a space. +@param[in] name file name of a file that is not open +@param[in] handle file handle, or OS_FILE_CLOSED +@param[in] size file size in entire database pages +@param[in] is_raw whether this is a raw device +@param[in] atomic_write true if atomic write could be enabled +@param[in] max_pages maximum number of pages in file, +or UINT32_MAX for unlimited +@return file object */ +fil_node_t* fil_space_t::add(const char* name, pfs_os_file_t handle, + uint32_t size, bool is_raw, bool atomic_write, + uint32_t max_pages) +{ + fil_node_t* node; + + ut_ad(name != NULL); + ut_ad(fil_system.is_initialised()); + + node = reinterpret_cast<fil_node_t*>(ut_zalloc_nokey(sizeof(*node))); + + node->handle = handle; + + node->name = mem_strdup(name); + + ut_a(!is_raw || srv_start_raw_disk_in_use); + + node->is_raw_disk = is_raw; + + node->size = size; + + node->magic_n = FIL_NODE_MAGIC_N; + + node->init_size = size; + node->max_size = max_pages; + + node->space = this; + + node->atomic_write = atomic_write; + + mutex_enter(&fil_system.mutex); + this->size += size; + UT_LIST_ADD_LAST(chain, node); + if (node->is_open()) { + n_pending.fetch_and(~CLOSING, std::memory_order_relaxed); + if (++fil_system.n_open >= srv_max_n_open_files) { + reacquire(); + try_to_close(true); + release(); + } + } + mutex_exit(&fil_system.mutex); + + return node; +} + +/** Open a tablespace file. +@param node data file +@return whether the file was successfully opened */ +static bool fil_node_open_file_low(fil_node_t *node) +{ + ut_ad(!node->is_open()); + ut_ad(node->space->is_closing()); + ut_ad(mutex_own(&fil_system.mutex)); + ulint type; + static_assert(((UNIV_ZIP_SIZE_MIN >> 1) << 3) == 4096, "compatibility"); + switch (FSP_FLAGS_GET_ZIP_SSIZE(node->space->flags)) { + case 1: + case 2: + type= OS_DATA_FILE_NO_O_DIRECT; + break; + default: + type= OS_DATA_FILE; + } + + for (;;) + { + bool success; + node->handle= os_file_create(innodb_data_file_key, node->name, + node->is_raw_disk + ? OS_FILE_OPEN_RAW | OS_FILE_ON_ERROR_NO_EXIT + : OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT, + OS_FILE_AIO, type, + srv_read_only_mode, &success); + if (success) + break; + + /* The following call prints an error message */ + if (os_file_get_last_error(true) == EMFILE + 100 && + fil_space_t::try_to_close(true)) + continue; + + ib::warn() << "Cannot open '" << node->name << "'."; + return false; + } + + if (node->size); + else if (!node->read_page0() || !fil_comp_algo_validate(node->space)) + { + os_file_close(node->handle); + node->handle= OS_FILE_CLOSED; + return false; + } + + ut_ad(node->is_open()); + + if (UNIV_LIKELY(!fil_system.freeze_space_list)) + { + /* Move the file last in fil_system.space_list, so that + fil_space_t::try_to_close() should close it as a last resort. */ + UT_LIST_REMOVE(fil_system.space_list, node->space); + UT_LIST_ADD_LAST(fil_system.space_list, node->space); + } + + fil_system.n_open++; + return true; +} + +/** Open a tablespace file. +@param node data file +@return whether the file was successfully opened */ +static bool fil_node_open_file(fil_node_t *node) +{ + ut_ad(mutex_own(&fil_system.mutex)); + ut_ad(!node->is_open()); + ut_ad(fil_is_user_tablespace_id(node->space->id) || + srv_operation == SRV_OPERATION_BACKUP || + srv_operation == SRV_OPERATION_RESTORE || + srv_operation == SRV_OPERATION_RESTORE_DELTA); + ut_ad(node->space->purpose != FIL_TYPE_TEMPORARY); + ut_ad(node->space->referenced()); + + for (ulint count= 0; fil_system.n_open >= srv_max_n_open_files; count++) + { + if (fil_space_t::try_to_close(count > 1)) + count= 0; + else if (count >= 2) + { + ib::warn() << "innodb_open_files=" << srv_max_n_open_files + << " is exceeded (" << fil_system.n_open + << ") files stay open)"; + break; + } + else + { + mutex_exit(&fil_system.mutex); + os_thread_sleep(20000); + /* Flush tablespaces so that we can close modified files. */ + fil_flush_file_spaces(); + mutex_enter(&fil_system.mutex); + } + } + + return fil_node_open_file_low(node); +} + +/** Close the file handle. */ +void fil_node_t::close() +{ + prepare_to_close_or_detach(); + + /* printf("Closing file %s\n", name); */ + int ret= os_file_close(handle); + ut_a(ret); + handle= OS_FILE_CLOSED; +} + +pfs_os_file_t fil_node_t::detach() +{ + prepare_to_close_or_detach(); + + pfs_os_file_t result= handle; + handle= OS_FILE_CLOSED; + return result; +} + +void fil_node_t::prepare_to_close_or_detach() +{ + ut_ad(mutex_own(&fil_system.mutex)); + ut_ad(space->is_ready_to_close() || srv_operation == SRV_OPERATION_BACKUP || + srv_operation == SRV_OPERATION_RESTORE_DELTA); + ut_a(is_open()); + ut_a(!being_extended); + ut_a(space->is_ready_to_close() || space->purpose == FIL_TYPE_TEMPORARY || + srv_fast_shutdown == 2 || !srv_was_started); + + ut_a(fil_system.n_open > 0); + fil_system.n_open--; +} + +/** Flush any writes cached by the file system. */ +void fil_space_t::flush_low() +{ + ut_ad(!mutex_own(&fil_system.mutex)); + + uint32_t n= 1; + while (!n_pending.compare_exchange_strong(n, n | NEEDS_FSYNC, + std::memory_order_acquire, + std::memory_order_relaxed)) + { + ut_ad(n & PENDING); + if (n & STOPPING) + return; + if (n & NEEDS_FSYNC) + break; + } + + fil_n_pending_tablespace_flushes++; + for (fil_node_t *node= UT_LIST_GET_FIRST(chain); node; + node= UT_LIST_GET_NEXT(chain, node)) + { + if (!node->is_open()) + { + ut_ad(!is_in_unflushed_spaces); + continue; + } + IF_WIN(if (node->is_raw_disk) continue,); + os_file_flush(node->handle); + } + + if (is_in_unflushed_spaces) + { + mutex_enter(&fil_system.mutex); + if (is_in_unflushed_spaces) + { + is_in_unflushed_spaces= false; + fil_system.unflushed_spaces.remove(*this); + } + mutex_exit(&fil_system.mutex); + } + + clear_flush(); + fil_n_pending_tablespace_flushes--; +} + +/** Try to extend a tablespace. +@param[in,out] space tablespace to be extended +@param[in,out] node last file of the tablespace +@param[in] size desired size in number of pages +@param[out] success whether the operation succeeded +@return whether the operation should be retried */ +static ATTRIBUTE_COLD __attribute__((warn_unused_result, nonnull)) +bool +fil_space_extend_must_retry( + fil_space_t* space, + fil_node_t* node, + uint32_t size, + bool* success) +{ + ut_ad(mutex_own(&fil_system.mutex)); + ut_ad(UT_LIST_GET_LAST(space->chain) == node); + ut_ad(size >= FIL_IBD_FILE_INITIAL_SIZE); + ut_ad(node->space == space); + ut_ad(space->referenced() || space->is_being_truncated); + + *success = space->size >= size; + + if (*success) { + /* Space already big enough */ + return(false); + } + + if (node->being_extended) { + /* Another thread is currently extending the file. Wait + for it to finish. + It'd have been better to use event driven mechanism but + the entire module is peppered with polling stuff. */ + mutex_exit(&fil_system.mutex); + os_thread_sleep(100000); + return(true); + } + + node->being_extended = true; + + /* At this point it is safe to release fil_system.mutex. No + other thread can rename, delete, close or extend the file because + we have set the node->being_extended flag. */ + mutex_exit(&fil_system.mutex); + + ut_ad(size >= space->size); + + uint32_t last_page_no = space->size; + const uint32_t file_start_page_no = last_page_no - node->size; + + const unsigned page_size = space->physical_size(); + + /* Datafile::read_first_page() expects srv_page_size bytes. + fil_node_t::read_page0() expects at least 4 * srv_page_size bytes.*/ + os_offset_t new_size = std::max( + os_offset_t(size - file_start_page_no) * page_size, + os_offset_t(FIL_IBD_FILE_INITIAL_SIZE << srv_page_size_shift)); + + *success = os_file_set_size(node->name, node->handle, new_size, + space->is_compressed()); + + os_has_said_disk_full = *success; + if (*success) { + os_file_flush(node->handle); + last_page_no = size; + } else { + /* Let us measure the size of the file + to determine how much we were able to + extend it */ + os_offset_t fsize = os_file_get_size(node->handle); + ut_a(fsize != os_offset_t(-1)); + + last_page_no = uint32_t(fsize / page_size) + + file_start_page_no; + } + mutex_enter(&fil_system.mutex); + + ut_a(node->being_extended); + node->being_extended = false; + ut_a(last_page_no - file_start_page_no >= node->size); + + uint32_t file_size = last_page_no - file_start_page_no; + space->size += file_size - node->size; + node->size = file_size; + const uint32_t pages_in_MiB = node->size + & ~uint32_t((1U << (20U - srv_page_size_shift)) - 1); + + /* Keep the last data file size info up to date, rounded to + full megabytes */ + + switch (space->id) { + case TRX_SYS_SPACE: + srv_sys_space.set_last_file_size(pages_in_MiB); + do_flush: + space->reacquire(); + mutex_exit(&fil_system.mutex); + space->flush_low(); + space->release(); + mutex_enter(&fil_system.mutex); + break; + default: + ut_ad(space->purpose == FIL_TYPE_TABLESPACE + || space->purpose == FIL_TYPE_IMPORT); + if (space->purpose == FIL_TYPE_TABLESPACE + && !space->is_being_truncated) { + goto do_flush; + } + break; + case SRV_TMP_SPACE_ID: + ut_ad(space->purpose == FIL_TYPE_TEMPORARY); + srv_tmp_space.set_last_file_size(pages_in_MiB); + break; + } + + return false; +} + +/** @return whether the file is usable for io() */ +ATTRIBUTE_COLD bool fil_space_t::prepare(bool have_mutex) +{ + ut_ad(referenced()); + if (!have_mutex) + mutex_enter(&fil_system.mutex); + ut_ad(mutex_own(&fil_system.mutex)); + fil_node_t *node= UT_LIST_GET_LAST(chain); + ut_ad(!id || purpose == FIL_TYPE_TEMPORARY || + node == UT_LIST_GET_FIRST(chain)); + + const bool is_open= node && (node->is_open() || fil_node_open_file(node)); + + if (!is_open) + release(); + else if (auto desired_size= recv_size) + { + bool success; + while (fil_space_extend_must_retry(this, node, desired_size, &success)) + mutex_enter(&fil_system.mutex); + + ut_ad(mutex_own(&fil_system.mutex)); + /* Crash recovery requires the file extension to succeed. */ + ut_a(success); + /* InnoDB data files cannot shrink. */ + ut_a(size >= desired_size); + if (desired_size > committed_size) + committed_size= desired_size; + + /* There could be multiple concurrent I/O requests for this + tablespace (multiple threads trying to extend this tablespace). + + Also, fil_space_set_recv_size_and_flags() may have been invoked + again during the file extension while fil_system.mutex was not + being held by us. + + Only if recv_size matches what we read originally, reset the + field. In this way, a subsequent I/O request will handle any + pending fil_space_set_recv_size_and_flags(). */ + + if (desired_size == recv_size) + { + recv_size= 0; + goto clear; + } + } + else +clear: + n_pending.fetch_and(~CLOSING, std::memory_order_relaxed); + + if (!have_mutex) + mutex_exit(&fil_system.mutex); + return is_open; +} + +/** Try to extend a tablespace if it is smaller than the specified size. +@param[in,out] space tablespace +@param[in] size desired size in pages +@return whether the tablespace is at least as big as requested */ +bool fil_space_extend(fil_space_t *space, uint32_t size) +{ + ut_ad(!srv_read_only_mode || space->purpose == FIL_TYPE_TEMPORARY); + bool success= false; + const bool acquired= space->acquire(); + mutex_enter(&fil_system.mutex); + if (acquired || space->is_being_truncated) + { + while (fil_space_extend_must_retry(space, UT_LIST_GET_LAST(space->chain), + size, &success)) + mutex_enter(&fil_system.mutex); + } + mutex_exit(&fil_system.mutex); + if (acquired) + space->release(); + return success; +} + +/** Prepare to free a file from fil_system. */ +inline pfs_os_file_t fil_node_t::close_to_free(bool detach_handle) +{ + ut_ad(mutex_own(&fil_system.mutex)); + ut_a(magic_n == FIL_NODE_MAGIC_N); + ut_a(!being_extended); + + if (is_open() && + (space->n_pending.fetch_or(fil_space_t::CLOSING, + std::memory_order_acquire) & + fil_space_t::PENDING)) + { + mutex_exit(&fil_system.mutex); + while (space->referenced()) + os_thread_sleep(100); + mutex_enter(&fil_system.mutex); + } + + while (is_open()) + { + if (space->is_in_unflushed_spaces) + { + ut_ad(srv_file_flush_method != SRV_O_DIRECT_NO_FSYNC); + space->is_in_unflushed_spaces= false; + fil_system.unflushed_spaces.remove(*space); + } + + ut_a(!being_extended); + if (detach_handle) + { + auto result= handle; + handle= OS_FILE_CLOSED; + return result; + } + bool ret= os_file_close(handle); + ut_a(ret); + handle= OS_FILE_CLOSED; + break; + } + + return OS_FILE_CLOSED; +} + +/** Detach a tablespace from the cache and close the files. */ +std::vector<pfs_os_file_t> fil_system_t::detach(fil_space_t *space, + bool detach_handle) +{ + ut_ad(mutex_own(&fil_system.mutex)); + HASH_DELETE(fil_space_t, hash, &spaces, space->id, space); + + if (space->is_in_unflushed_spaces) + { + ut_ad(srv_file_flush_method != SRV_O_DIRECT_NO_FSYNC); + space->is_in_unflushed_spaces= false; + unflushed_spaces.remove(*space); + } + + if (space->is_in_default_encrypt) + { + space->is_in_default_encrypt= false; + default_encrypt_tables.remove(*space); + } + UT_LIST_REMOVE(space_list, space); + if (space == sys_space) + sys_space= nullptr; + else if (space == temp_space) + temp_space= nullptr; + + ut_a(space->magic_n == FIL_SPACE_MAGIC_N); + + for (fil_node_t* node= UT_LIST_GET_FIRST(space->chain); node; + node= UT_LIST_GET_NEXT(chain, node)) + if (node->is_open()) + { + ut_ad(n_open > 0); + n_open--; + } + + std::vector<pfs_os_file_t> handles; + handles.reserve(UT_LIST_GET_LEN(space->chain)); + + for (fil_node_t* node= UT_LIST_GET_FIRST(space->chain); node; + node= UT_LIST_GET_NEXT(chain, node)) + { + auto handle= node->close_to_free(detach_handle); + if (handle != OS_FILE_CLOSED) + handles.push_back(handle); + } + + ut_ad(!space->referenced()); + return handles; +} + +/** Free a tablespace object on which fil_system_t::detach() was invoked. +There must not be any pending i/o's or flushes on the files. +@param[in,out] space tablespace */ +static +void +fil_space_free_low( + fil_space_t* space) +{ + /* The tablespace must not be in fil_system.named_spaces. */ + ut_ad(srv_fast_shutdown == 2 || !srv_was_started + || space->max_lsn == 0); + + /* Wait for fil_space_t::release() after + fil_system_t::detach(), the tablespace cannot be found, so + fil_space_t::get() would return NULL */ + while (space->referenced()) { + os_thread_sleep(100); + } + + for (fil_node_t* node = UT_LIST_GET_FIRST(space->chain); + node != NULL; ) { + ut_d(space->size -= node->size); + ut_free(node->name); + fil_node_t* old_node = node; + node = UT_LIST_GET_NEXT(chain, node); + ut_free(old_node); + } + + ut_ad(space->size == 0); + + rw_lock_free(&space->latch); + fil_space_destroy_crypt_data(&space->crypt_data); + + space->~fil_space_t(); + ut_free(space->name); + ut_free(space); +} + +/** Frees a space object from the tablespace memory cache. +Closes the files in the chain but does not delete them. +There must not be any pending i/o's or flushes on the files. +@param[in] id tablespace identifier +@param[in] x_latched whether the caller holds X-mode space->latch +@return true if success */ +bool +fil_space_free( + ulint id, + bool x_latched) +{ + ut_ad(id != TRX_SYS_SPACE); + + mutex_enter(&fil_system.mutex); + fil_space_t* space = fil_space_get_by_id(id); + + if (space != NULL) { + fil_system.detach(space); + } + + mutex_exit(&fil_system.mutex); + + if (space != NULL) { + if (x_latched) { + rw_lock_x_unlock(&space->latch); + } + + if (!recv_recovery_is_on()) { + mysql_mutex_lock(&log_sys.mutex); + } + + mysql_mutex_assert_owner(&log_sys.mutex); + + if (space->max_lsn != 0) { + ut_d(space->max_lsn = 0); + UT_LIST_REMOVE(fil_system.named_spaces, space); + } + + if (!recv_recovery_is_on()) { + mysql_mutex_unlock(&log_sys.mutex); + } + + fil_space_free_low(space); + } + + return(space != NULL); +} + +/** Create a tablespace in fil_system. +@param name tablespace name +@param id tablespace identifier +@param flags tablespace flags +@param purpose tablespace purpose +@param crypt_data encryption information +@param mode encryption mode +@return pointer to created tablespace, to be filled in with add() +@retval nullptr on failure (such as when the same tablespace exists) */ +fil_space_t *fil_space_t::create(const char *name, ulint id, ulint flags, + fil_type_t purpose, + fil_space_crypt_t *crypt_data, + fil_encryption_t mode) +{ + fil_space_t* space; + + ut_ad(fil_system.is_initialised()); + ut_ad(fil_space_t::is_valid_flags(flags & ~FSP_FLAGS_MEM_MASK, id)); + ut_ad(srv_page_size == UNIV_PAGE_SIZE_ORIG || flags != 0); + + DBUG_EXECUTE_IF("fil_space_create_failure", return(NULL);); + + /* FIXME: if calloc() is defined as an inline function that calls + memset() or bzero(), then GCC 6 -flifetime-dse can optimize it away */ + space= new (ut_zalloc_nokey(sizeof(*space))) fil_space_t; + + space->id = id; + space->name = mem_strdup(name); + + UT_LIST_INIT(space->chain, &fil_node_t::chain); + + space->purpose = purpose; + space->flags = flags; + + space->magic_n = FIL_SPACE_MAGIC_N; + space->crypt_data = crypt_data; + space->n_pending.store(CLOSING, std::memory_order_relaxed); + + DBUG_LOG("tablespace", + "Created metadata for " << id << " name " << name); + if (crypt_data) { + DBUG_LOG("crypt", + "Tablespace " << id << " name " << name + << " encryption " << crypt_data->encryption + << " key id " << crypt_data->key_id + << ":" << fil_crypt_get_mode(crypt_data) + << " " << fil_crypt_get_type(crypt_data)); + } + + rw_lock_create(fil_space_latch_key, &space->latch, SYNC_FSP); + + if (space->purpose == FIL_TYPE_TEMPORARY) { + /* SysTablespace::open_or_create() would pass + size!=0 to fil_space_t::add(), so first_time_open + would not hold in fil_node_open_file(), and we + must assign this manually. We do not care about + the durability or atomicity of writes to the + temporary tablespace files. */ + space->atomic_write_supported = true; + } + + mutex_enter(&fil_system.mutex); + + if (const fil_space_t *old_space = fil_space_get_by_id(id)) { + ib::error() << "Trying to add tablespace '" << name + << "' with id " << id + << " to the tablespace memory cache, but tablespace '" + << old_space->name << "' already exists in the cache!"; + mutex_exit(&fil_system.mutex); + rw_lock_free(&space->latch); + space->~fil_space_t(); + ut_free(space->name); + ut_free(space); + return(NULL); + } + + HASH_INSERT(fil_space_t, hash, &fil_system.spaces, id, space); + + UT_LIST_ADD_LAST(fil_system.space_list, space); + + switch (id) { + case 0: + ut_ad(!fil_system.sys_space); + fil_system.sys_space = space; + break; + case SRV_TMP_SPACE_ID: + ut_ad(!fil_system.temp_space); + fil_system.temp_space = space; + break; + default: + ut_ad(purpose != FIL_TYPE_TEMPORARY); + if (UNIV_LIKELY(id <= fil_system.max_assigned_id)) { + break; + } + if (!fil_system.space_id_reuse_warned) { + ib::warn() << "Allocated tablespace ID " << id + << " for " << name << ", old maximum was " + << fil_system.max_assigned_id; + } + + fil_system.max_assigned_id = id; + } + + const bool rotate = + (purpose == FIL_TYPE_TABLESPACE + && (mode == FIL_ENCRYPTION_ON + || mode == FIL_ENCRYPTION_OFF || srv_encrypt_tables) + && fil_crypt_must_default_encrypt()); + + /* Inform key rotation that there could be something + to do */ + if (rotate) { + /* Key rotation is not enabled, need to inform background + encryption threads. */ + fil_system.default_encrypt_tables.push_back(*space); + space->is_in_default_encrypt = true; + } + + mutex_exit(&fil_system.mutex); + + if (rotate && srv_n_fil_crypt_threads_started) { + os_event_set(fil_crypt_threads_event); + } + + return(space); +} + +/*******************************************************************//** +Assigns a new space id for a new single-table tablespace. This works simply by +incrementing the global counter. If 4 billion id's is not enough, we may need +to recycle id's. +@return true if assigned, false if not */ +bool +fil_assign_new_space_id( +/*====================*/ + ulint* space_id) /*!< in/out: space id */ +{ + ulint id; + bool success; + + mutex_enter(&fil_system.mutex); + + id = *space_id; + + if (id < fil_system.max_assigned_id) { + id = fil_system.max_assigned_id; + } + + id++; + + if (id > (SRV_SPACE_ID_UPPER_BOUND / 2) && (id % 1000000UL == 0)) { + ib::warn() << "You are running out of new single-table" + " tablespace id's. Current counter is " << id + << " and it must not exceed" <<SRV_SPACE_ID_UPPER_BOUND + << "! To reset the counter to zero you have to dump" + " all your tables and recreate the whole InnoDB" + " installation."; + } + + success = (id < SRV_SPACE_ID_UPPER_BOUND); + + if (success) { + *space_id = fil_system.max_assigned_id = id; + } else { + ib::warn() << "You have run out of single-table tablespace" + " id's! Current counter is " << id + << ". To reset the counter to zero" + " you have to dump all your tables and" + " recreate the whole InnoDB installation."; + *space_id = ULINT_UNDEFINED; + } + + mutex_exit(&fil_system.mutex); + + return(success); +} + +/** Read the first page of a data file. +@return whether the page was found valid */ +bool fil_space_t::read_page0() +{ + ut_ad(fil_system.is_initialised()); + ut_ad(mutex_own(&fil_system.mutex)); + if (size) + return true; + + fil_node_t *node= UT_LIST_GET_FIRST(chain); + if (!node) + return false; + ut_ad(!UT_LIST_GET_NEXT(chain, node)); + + if (UNIV_UNLIKELY(acquire_low() & STOPPING)) + { + ut_ad("this should not happen" == 0); + return false; + } + const bool ok= node->is_open() || fil_node_open_file(node); + release(); + return ok; +} + +/** Look up a tablespace and ensure that its first page has been validated. */ +static fil_space_t *fil_space_get_space(ulint id) +{ + if (fil_space_t *space= fil_space_get_by_id(id)) + if (space->read_page0()) + return space; + return nullptr; +} + +void fil_space_set_recv_size_and_flags(ulint id, uint32_t size, uint32_t flags) +{ + ut_ad(id < SRV_SPACE_ID_UPPER_BOUND); + mutex_enter(&fil_system.mutex); + if (fil_space_t *space= fil_space_get_space(id)) + { + if (size) + space->recv_size= size; + if (flags != FSP_FLAGS_FCRC32_MASK_MARKER) + space->flags= flags; + } + mutex_exit(&fil_system.mutex); +} + +/** Open each file. Never invoked on .ibd files. +@param create_new_db whether to skip the call to fil_node_t::read_page0() +@return whether all files were opened */ +bool fil_space_t::open(bool create_new_db) +{ + ut_ad(fil_system.is_initialised()); + ut_ad(!id || create_new_db); + + bool success= true; + bool skip_read= create_new_db; + + mutex_enter(&fil_system.mutex); + + for (fil_node_t *node= UT_LIST_GET_FIRST(chain); node; + node= UT_LIST_GET_NEXT(chain, node)) + { + if (!node->is_open() && !fil_node_open_file_low(node)) + { +err_exit: + success= false; + break; + } + + if (create_new_db) + continue; + if (skip_read) + { + size+= node->size; + continue; + } + + if (!node->read_page0()) + { + fil_system.n_open--; + os_file_close(node->handle); + node->handle= OS_FILE_CLOSED; + goto err_exit; + } + + skip_read= true; + } + + if (!create_new_db) + committed_size= size; + mutex_exit(&fil_system.mutex); + return success; +} + +/** Close each file. Only invoked on fil_system.temp_space. */ +void fil_space_t::close() +{ + if (!fil_system.is_initialised()) { + return; + } + + mutex_enter(&fil_system.mutex); + ut_ad(this == fil_system.temp_space + || srv_operation == SRV_OPERATION_BACKUP + || srv_operation == SRV_OPERATION_RESTORE + || srv_operation == SRV_OPERATION_RESTORE_DELTA); + + for (fil_node_t* node = UT_LIST_GET_FIRST(chain); + node != NULL; + node = UT_LIST_GET_NEXT(chain, node)) { + if (node->is_open()) { + node->close(); + } + } + + mutex_exit(&fil_system.mutex); +} + +void fil_system_t::create(ulint hash_size) +{ + ut_ad(this == &fil_system); + ut_ad(!is_initialised()); + ut_ad(!(srv_page_size % FSP_EXTENT_SIZE)); + ut_ad(srv_page_size); + ut_ad(!spaces.array); + + m_initialised = true; + + compile_time_assert(!(UNIV_PAGE_SIZE_MAX % FSP_EXTENT_SIZE_MAX)); + compile_time_assert(!(UNIV_PAGE_SIZE_MIN % FSP_EXTENT_SIZE_MIN)); + + ut_ad(hash_size > 0); + + mutex_create(LATCH_ID_FIL_SYSTEM, &mutex); + + spaces.create(hash_size); + + fil_space_crypt_init(); +#ifdef UNIV_LINUX + ssd.clear(); + char fn[sizeof(dirent::d_name) + + sizeof "/sys/block/" "/queue/rotational"]; + const size_t sizeof_fnp = (sizeof fn) - sizeof "/sys/block"; + memcpy(fn, "/sys/block/", sizeof "/sys/block"); + char* fnp = &fn[sizeof "/sys/block"]; + + std::set<std::string> ssd_devices; + if (DIR* d = opendir("/sys/block")) { + while (struct dirent* e = readdir(d)) { + if (e->d_name[0] == '.') { + continue; + } + snprintf(fnp, sizeof_fnp, "%s/queue/rotational", + e->d_name); + int f = open(fn, O_RDONLY); + if (f == -1) { + continue; + } + char b[sizeof "4294967295:4294967295\n"]; + ssize_t l = read(f, b, sizeof b); + ::close(f); + if (l != 2 || memcmp("0\n", b, 2)) { + continue; + } + snprintf(fnp, sizeof_fnp, "%s/dev", e->d_name); + f = open(fn, O_RDONLY); + if (f == -1) { + continue; + } + l = read(f, b, sizeof b); + ::close(f); + if (l <= 0 || b[l - 1] != '\n') { + continue; + } + b[l - 1] = '\0'; + char* end = b; + unsigned long dev_major = strtoul(b, &end, 10); + if (b == end || *end != ':' + || dev_major != unsigned(dev_major)) { + continue; + } + char* c = end + 1; + unsigned long dev_minor = strtoul(c, &end, 10); + if (c == end || *end + || dev_minor != unsigned(dev_minor)) { + continue; + } + ssd.push_back(makedev(unsigned(dev_major), + unsigned(dev_minor))); + } + closedir(d); + } + /* fil_system_t::is_ssd() assumes the following */ + ut_ad(makedev(0, 8) == 8); + ut_ad(makedev(0, 4) == 4); + ut_ad(makedev(0, 2) == 2); + ut_ad(makedev(0, 1) == 1); +#endif +} + +void fil_system_t::close() +{ + ut_ad(this == &fil_system); + ut_a(unflushed_spaces.empty()); + ut_a(!UT_LIST_GET_LEN(space_list)); + ut_ad(!sys_space); + ut_ad(!temp_space); + + if (is_initialised()) + { + m_initialised= false; + spaces.free(); + mutex_free(&mutex); + fil_space_crypt_cleanup(); + } + + ut_ad(!spaces.array); + +#ifdef UNIV_LINUX + ssd.clear(); + ssd.shrink_to_fit(); +#endif /* UNIV_LINUX */ +} + +/** Extend all open data files to the recovered size */ +ATTRIBUTE_COLD void fil_system_t::extend_to_recv_size() +{ + ut_ad(is_initialised()); + mutex_enter(&mutex); + for (fil_space_t *space= UT_LIST_GET_FIRST(fil_system.space_list); space; + space= UT_LIST_GET_NEXT(space_list, space)) + { + const uint32_t size= space->recv_size; + + if (size > space->size) + { + if (space->is_closing()) + continue; + space->reacquire(); + bool success; + while (fil_space_extend_must_retry(space, UT_LIST_GET_LAST(space->chain), + size, &success)) + mutex_enter(&mutex); + /* Crash recovery requires the file extension to succeed. */ + ut_a(success); + space->release(); + } + } + mutex_exit(&mutex); +} + +/** Close all tablespace files at shutdown */ +void fil_space_t::close_all() +{ + if (!fil_system.is_initialised()) { + return; + } + + fil_space_t* space; + + /* At shutdown, we should not have any files in this list. */ + ut_ad(srv_fast_shutdown == 2 + || !srv_was_started + || UT_LIST_GET_LEN(fil_system.named_spaces) == 0); + fil_flush_file_spaces(); + + mutex_enter(&fil_system.mutex); + + for (space = UT_LIST_GET_FIRST(fil_system.space_list); space; ) { + fil_node_t* node; + fil_space_t* prev_space = space; + + for (node = UT_LIST_GET_FIRST(space->chain); + node != NULL; + node = UT_LIST_GET_NEXT(chain, node)) { + + if (!node->is_open()) { +next: + continue; + } + + for (ulint count = 10000; count--; ) { + if (!space->set_closing()) { + node->close(); + goto next; + } + mutex_exit(&fil_system.mutex); + os_thread_sleep(100); + mutex_enter(&fil_system.mutex); + if (!node->is_open()) { + goto next; + } + } + + ib::error() << "File '" << node->name + << "' has " << space->referenced() + << " operations"; + } + + space = UT_LIST_GET_NEXT(space_list, space); + fil_system.detach(prev_space); + fil_space_free_low(prev_space); + } + + mutex_exit(&fil_system.mutex); + + ut_ad(srv_fast_shutdown == 2 + || !srv_was_started + || UT_LIST_GET_LEN(fil_system.named_spaces) == 0); +} + +/*******************************************************************//** +Sets the max tablespace id counter if the given number is bigger than the +previous value. */ +void +fil_set_max_space_id_if_bigger( +/*===========================*/ + ulint max_id) /*!< in: maximum known id */ +{ + if (max_id >= SRV_SPACE_ID_UPPER_BOUND) { + ib::fatal() << "Max tablespace id is too high, " << max_id; + } + + mutex_enter(&fil_system.mutex); + + if (fil_system.max_assigned_id < max_id) { + + fil_system.max_assigned_id = max_id; + } + + mutex_exit(&fil_system.mutex); +} + +/** Write the flushed LSN to the page header of the first page in the +system tablespace. +@param[in] lsn flushed LSN +@return DB_SUCCESS or error number */ +dberr_t +fil_write_flushed_lsn( + lsn_t lsn) +{ + byte* buf; + ut_ad(!srv_read_only_mode); + + if (!fil_system.sys_space->acquire()) { + return DB_ERROR; + } + + buf = static_cast<byte*>(aligned_malloc(srv_page_size, srv_page_size)); + + auto fio = fil_system.sys_space->io(IORequestRead, 0, srv_page_size, + buf); + + if (fio.err == DB_SUCCESS) { + mach_write_to_8(buf + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, + lsn); + + ulint fsp_flags = mach_read_from_4( + buf + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS); + + if (fil_space_t::full_crc32(fsp_flags)) { + buf_flush_assign_full_crc32_checksum(buf); + } + + fio = fil_system.sys_space->io(IORequestWrite, + 0, srv_page_size, buf); + fil_flush_file_spaces(); + } else { + fil_system.sys_space->release(); + } + + aligned_free(buf); + return fio.err; +} + +/** Acquire a tablespace reference. +@param id tablespace identifier +@return tablespace +@retval nullptr if the tablespace is missing or inaccessible */ +fil_space_t *fil_space_t::get(ulint id) +{ + mutex_enter(&fil_system.mutex); + fil_space_t *space= fil_space_get_by_id(id); + const uint32_t n= space ? space->acquire_low() : 0; + mutex_exit(&fil_system.mutex); + + if (n & STOPPING) + space= nullptr; + + if ((n & CLOSING) && !space->prepare()) + space= nullptr; + + return space; +} + +/** Write a log record about a file operation. +@param type file operation +@param first_page_no first page number in the file +@param path file path +@param new_path new file path for type=FILE_RENAME */ +inline void mtr_t::log_file_op(mfile_type_t type, ulint space_id, + const char *path, const char *new_path) +{ + ut_ad((new_path != nullptr) == (type == FILE_RENAME)); + ut_ad(!(byte(type) & 15)); + + /* fil_name_parse() requires that there be at least one path + separator and that the file path end with ".ibd". */ + ut_ad(strchr(path, OS_PATH_SEPARATOR) != NULL); + ut_ad(!strcmp(&path[strlen(path) - strlen(DOT_IBD)], DOT_IBD)); + + flag_modified(); + if (m_log_mode != MTR_LOG_ALL) + return; + m_last= nullptr; + + const size_t len= strlen(path); + const size_t new_len= type == FILE_RENAME ? 1 + strlen(new_path) : 0; + ut_ad(len > 0); + byte *const log_ptr= m_log.open(1 + 3/*length*/ + 5/*space_id*/ + + 1/*page_no=0*/); + byte *end= log_ptr + 1; + end= mlog_encode_varint(end, space_id); + *end++= 0; + if (UNIV_LIKELY(end + len + new_len >= &log_ptr[16])) + { + *log_ptr= type; + size_t total_len= len + new_len + end - log_ptr - 15; + if (total_len >= MIN_3BYTE) + total_len+= 2; + else if (total_len >= MIN_2BYTE) + total_len++; + end= mlog_encode_varint(log_ptr + 1, total_len); + end= mlog_encode_varint(end, space_id); + *end++= 0; + } + else + { + *log_ptr= static_cast<byte>(type | (end + len + new_len - &log_ptr[1])); + ut_ad(*log_ptr & 15); + } + + m_log.close(end); + + if (type == FILE_RENAME) + { + ut_ad(strchr(new_path, OS_PATH_SEPARATOR)); + m_log.push(reinterpret_cast<const byte*>(path), uint32_t(len + 1)); + m_log.push(reinterpret_cast<const byte*>(new_path), uint32_t(new_len)); + } + else + m_log.push(reinterpret_cast<const byte*>(path), uint32_t(len)); +} + +/** Write redo log for renaming a file. +@param[in] space_id tablespace id +@param[in] old_name tablespace file name +@param[in] new_name tablespace file name after renaming +@param[in,out] mtr mini-transaction */ +static +void +fil_name_write_rename_low( + ulint space_id, + const char* old_name, + const char* new_name, + mtr_t* mtr) +{ + ut_ad(!is_predefined_tablespace(space_id)); + mtr->log_file_op(FILE_RENAME, space_id, old_name, new_name); +} + +/** Write redo log for renaming a file. +@param[in] space_id tablespace id +@param[in] old_name tablespace file name +@param[in] new_name tablespace file name after renaming */ +static void +fil_name_write_rename( + ulint space_id, + const char* old_name, + const char* new_name) +{ + mtr_t mtr; + mtr.start(); + fil_name_write_rename_low(space_id, old_name, new_name, &mtr); + mtr.commit(); + log_write_up_to(mtr.commit_lsn(), true); +} + +/** Write FILE_MODIFY for a file. +@param[in] space_id tablespace id +@param[in] name tablespace file name +@param[in,out] mtr mini-transaction */ +static +void +fil_name_write( + ulint space_id, + const char* name, + mtr_t* mtr) +{ + ut_ad(!is_predefined_tablespace(space_id)); + mtr->log_file_op(FILE_MODIFY, space_id, name); +} + +/** Check for pending operations. +@param[in] space tablespace +@param[in] count number of attempts so far +@return 0 if no operations else count + 1. */ +static ulint fil_check_pending_ops(const fil_space_t* space, ulint count) +{ + ut_ad(mutex_own(&fil_system.mutex)); + + if (!space) { + return 0; + } + + if (auto n_pending_ops = space->referenced()) { + + /* Give a warning every 10 second, starting after 1 second */ + if ((count % 500) == 50) { + ib::warn() << "Trying to delete" + " tablespace '" << space->name + << "' but there are " << n_pending_ops + << " pending operations on it."; + } + + return(count + 1); + } + + return(0); +} + +/*******************************************************************//** +Check for pending IO. +@return 0 if no pending else count + 1. */ +static +ulint +fil_check_pending_io( +/*=================*/ + fil_space_t* space, /*!< in/out: Tablespace to check */ + fil_node_t** node, /*!< out: Node in space list */ + ulint count) /*!< in: number of attempts so far */ +{ + ut_ad(mutex_own(&fil_system.mutex)); + + /* The following code must change when InnoDB supports + multiple datafiles per tablespace. */ + ut_ad(UT_LIST_GET_LEN(space->chain) == 1); + + *node = UT_LIST_GET_FIRST(space->chain); + + if (const uint32_t p = space->referenced()) { + ut_a(!(*node)->being_extended); + + /* Give a warning every 10 second, starting after 1 second */ + if ((count % 500) == 50) { + ib::info() << "Trying to delete" + " tablespace '" << space->name + << "' but there are " << p + << " pending i/o's on it."; + } + + return(count + 1); + } + + return(0); +} + +/*******************************************************************//** +Check pending operations on a tablespace. +@return tablespace */ +static +fil_space_t* +fil_check_pending_operations( +/*=========================*/ + ulint id, /*!< in: space id */ + bool truncate, /*!< in: whether to truncate a file */ + char** path) /*!< out/own: tablespace path */ +{ + ulint count = 0; + + ut_a(!is_system_tablespace(id)); + mutex_enter(&fil_system.mutex); + fil_space_t* sp = fil_space_get_by_id(id); + + if (sp) { + sp->set_stopping(true); + if (sp->crypt_data) { + sp->reacquire(); + mutex_exit(&fil_system.mutex); + fil_space_crypt_close_tablespace(sp); + mutex_enter(&fil_system.mutex); + sp->release(); + } + } + + /* Check for pending operations. */ + + do { + count = fil_check_pending_ops(sp, count); + + mutex_exit(&fil_system.mutex); + + if (count) { + os_thread_sleep(20000); // Wait 0.02 seconds + } else if (!sp) { + return nullptr; + } + + mutex_enter(&fil_system.mutex); + + sp = fil_space_get_by_id(id); + } while (count); + + /* Check for pending IO. */ + + for (;;) { + if (truncate) { + sp->is_being_truncated = true; + } + + fil_node_t* node; + + count = fil_check_pending_io(sp, &node, count); + + if (count == 0 && path) { + *path = mem_strdup(node->name); + } + + mutex_exit(&fil_system.mutex); + + if (count == 0) { + break; + } + + os_thread_sleep(20000); // Wait 0.02 seconds + mutex_enter(&fil_system.mutex); + sp = fil_space_get_by_id(id); + + if (!sp) { + mutex_exit(&fil_system.mutex); + break; + } + } + + return sp; +} + +/** Close a single-table tablespace on failed IMPORT TABLESPACE. +The tablespace must be cached in the memory cache. +Free all pages used by the tablespace. */ +void fil_close_tablespace(ulint id) +{ + ut_ad(!is_system_tablespace(id)); + char* path = nullptr; + fil_space_t* space = fil_check_pending_operations(id, false, &path); + if (!space) { + return; + } + + rw_lock_x_lock(&space->latch); + + /* Invalidate in the buffer pool all pages belonging to the + tablespace. Since we have invoked space->set_stopping(), readahead + can no longer read more pages of this tablespace to buf_pool. + Thus we can clean the tablespace out of buf_pool + completely and permanently. */ + while (buf_flush_list_space(space)); + ut_ad(space->is_stopping()); + + /* If the free is successful, the X lock will be released before + the space memory data structure is freed. */ + + if (!fil_space_free(id, true)) { + rw_lock_x_unlock(&space->latch); + } + + /* If it is a delete then also delete any generated files, otherwise + when we drop the database the remove directory will fail. */ + + if (char* cfg_name = fil_make_filepath(path, NULL, CFG, false)) { + os_file_delete_if_exists(innodb_data_file_key, cfg_name, NULL); + ut_free(cfg_name); + } + + ut_free(path); +} + +/** Delete a tablespace and associated .ibd file. +@param[in] id tablespace identifier +@param[in] if_exists whether to ignore missing tablespace +@param[in,out] detached_handles return detached handles if not nullptr +@return DB_SUCCESS or error */ +dberr_t fil_delete_tablespace(ulint id, bool if_exists, + std::vector<pfs_os_file_t>* detached_handles) +{ + char* path = NULL; + ut_ad(!is_system_tablespace(id)); + ut_ad(!detached_handles || detached_handles->empty()); + + dberr_t err; + fil_space_t *space = fil_check_pending_operations(id, false, &path); + + if (!space) { + err = DB_TABLESPACE_NOT_FOUND; + if (!if_exists) { + ib::error() << "Cannot delete tablespace " << id + << " because it is not found" + " in the tablespace memory cache."; + } + + goto func_exit; + } + + /* IMPORTANT: Because we have set space::stop_new_ops there + can't be any new reads or flushes. We are here + because node::n_pending was zero above. However, it is still + possible to have pending read and write requests: + + A read request can happen because the reader thread has + gone through the ::stop_new_ops check in buf_page_init_for_read() + before the flag was set and has not yet incremented ::n_pending + when we checked it above. + + A write request can be issued any time because we don't check + fil_space_t::is_stopping() when queueing a block for write. + + We deal with pending write requests in the following function + where we'd minimally evict all dirty pages belonging to this + space from the flush_list. Note that if a block is IO-fixed + we'll wait for IO to complete. + + To deal with potential read requests, we will check the + is_stopping() in fil_space_t::io(). */ + + err = DB_SUCCESS; + buf_flush_remove_pages(id); + + /* If it is a delete then also delete any generated files, otherwise + when we drop the database the remove directory will fail. */ + { + /* Before deleting the file, write a log record about + it, so that InnoDB crash recovery will expect the file + to be gone. */ + mtr_t mtr; + + mtr.start(); + mtr.log_file_op(FILE_DELETE, id, path); + mtr.commit(); + /* Even if we got killed shortly after deleting the + tablespace file, the record must have already been + written to the redo log. */ + log_write_up_to(mtr.commit_lsn(), true); + + char* cfg_name = fil_make_filepath(path, NULL, CFG, false); + if (cfg_name != NULL) { + os_file_delete_if_exists(innodb_data_file_key, cfg_name, NULL); + ut_free(cfg_name); + } + } + + /* Delete the link file pointing to the ibd file we are deleting. */ + if (FSP_FLAGS_HAS_DATA_DIR(space->flags)) { + RemoteDatafile::delete_link_file(space->name); + } + + mutex_enter(&fil_system.mutex); + + /* Double check the sanity of pending ops after reacquiring + the fil_system::mutex. */ + if (const fil_space_t* s = fil_space_get_by_id(id)) { + ut_a(s == space); + ut_a(!space->referenced()); + ut_a(UT_LIST_GET_LEN(space->chain) == 1); + auto handles = fil_system.detach(space, + detached_handles != nullptr); + if (detached_handles) { + *detached_handles = std::move(handles); + } + mutex_exit(&fil_system.mutex); + + mysql_mutex_lock(&log_sys.mutex); + + if (space->max_lsn != 0) { + ut_d(space->max_lsn = 0); + UT_LIST_REMOVE(fil_system.named_spaces, space); + } + + mysql_mutex_unlock(&log_sys.mutex); + fil_space_free_low(space); + + if (!os_file_delete(innodb_data_file_key, path) + && !os_file_delete_if_exists( + innodb_data_file_key, path, NULL)) { + + /* Note: This is because we have removed the + tablespace instance from the cache. */ + + err = DB_IO_ERROR; + } + } else { + mutex_exit(&fil_system.mutex); + err = DB_TABLESPACE_NOT_FOUND; + } + +func_exit: + ut_free(path); + ibuf_delete_for_discarded_space(id); + return(err); +} + +/** Prepare to truncate an undo tablespace. +@param[in] space_id undo tablespace id +@return the tablespace +@retval NULL if tablespace not found */ +fil_space_t *fil_truncate_prepare(ulint space_id) +{ + return fil_check_pending_operations(space_id, true, nullptr); +} + +/*******************************************************************//** +Allocates and builds a file name from a path, a table or tablespace name +and a suffix. The string must be freed by caller with ut_free(). +@param[in] path NULL or the directory path or the full path and filename. +@param[in] name NULL if path is full, or Table/Tablespace name +@param[in] suffix NULL or the file extention to use. +@param[in] trim_name true if the last name on the path should be trimmed. +@return own: file name */ +char* +fil_make_filepath( + const char* path, + const char* name, + ib_extention ext, + bool trim_name) +{ + /* The path may contain the basename of the file, if so we do not + need the name. If the path is NULL, we can use the default path, + but there needs to be a name. */ + ut_ad(path != NULL || name != NULL); + + /* If we are going to strip a name off the path, there better be a + path and a new name to put back on. */ + ut_ad(!trim_name || (path != NULL && name != NULL)); + + if (path == NULL) { + path = fil_path_to_mysql_datadir; + } + + ulint len = 0; /* current length */ + ulint path_len = strlen(path); + ulint name_len = (name ? strlen(name) : 0); + const char* suffix = dot_ext[ext]; + ulint suffix_len = strlen(suffix); + ulint full_len = path_len + 1 + name_len + suffix_len + 1; + + char* full_name = static_cast<char*>(ut_malloc_nokey(full_len)); + if (full_name == NULL) { + return NULL; + } + + /* If the name is a relative path, do not prepend "./". */ + if (path[0] == '.' + && (path[1] == '\0' || path[1] == OS_PATH_SEPARATOR) + && name != NULL && name[0] == '.') { + path = NULL; + path_len = 0; + } + + if (path != NULL) { + memcpy(full_name, path, path_len); + len = path_len; + full_name[len] = '\0'; + os_normalize_path(full_name); + } + + if (trim_name) { + /* Find the offset of the last DIR separator and set it to + null in order to strip off the old basename from this path. */ + char* last_dir_sep = strrchr(full_name, OS_PATH_SEPARATOR); + if (last_dir_sep) { + last_dir_sep[0] = '\0'; + len = strlen(full_name); + } + } + + if (name != NULL) { + if (len && full_name[len - 1] != OS_PATH_SEPARATOR) { + /* Add a DIR separator */ + full_name[len] = OS_PATH_SEPARATOR; + full_name[++len] = '\0'; + } + + char* ptr = &full_name[len]; + memcpy(ptr, name, name_len); + len += name_len; + full_name[len] = '\0'; + os_normalize_path(ptr); + } + + /* Make sure that the specified suffix is at the end of the filepath + string provided. This assumes that the suffix starts with '.'. + If the first char of the suffix is found in the filepath at the same + length as the suffix from the end, then we will assume that there is + a previous suffix that needs to be replaced. */ + if (suffix != NULL) { + /* Need room for the trailing null byte. */ + ut_ad(len < full_len); + + if ((len > suffix_len) + && (full_name[len - suffix_len] == suffix[0])) { + /* Another suffix exists, make it the one requested. */ + memcpy(&full_name[len - suffix_len], suffix, suffix_len); + + } else { + /* No previous suffix, add it. */ + ut_ad(len + suffix_len < full_len); + memcpy(&full_name[len], suffix, suffix_len); + full_name[len + suffix_len] = '\0'; + } + } + + return(full_name); +} + +/** Test if a tablespace file can be renamed to a new filepath by checking +if that the old filepath exists and the new filepath does not exist. +@param[in] old_path old filepath +@param[in] new_path new filepath +@param[in] replace_new whether to ignore the existence of new_path +@return innodb error code */ +static dberr_t +fil_rename_tablespace_check( + const char* old_path, + const char* new_path, + bool replace_new) +{ + bool exists = false; + os_file_type_t ftype; + + if (os_file_status(old_path, &exists, &ftype) && !exists) { + ib::error() << "Cannot rename '" << old_path + << "' to '" << new_path + << "' because the source file" + << " does not exist."; + return(DB_TABLESPACE_NOT_FOUND); + } + + exists = false; + if (os_file_status(new_path, &exists, &ftype) && !exists) { + return DB_SUCCESS; + } + + if (!replace_new) { + ib::error() << "Cannot rename '" << old_path + << "' to '" << new_path + << "' because the target file exists." + " Remove the target file and try again."; + return(DB_TABLESPACE_EXISTS); + } + + /* This must be during the ROLLBACK of TRUNCATE TABLE. + Because InnoDB only allows at most one data dictionary + transaction at a time, and because this incomplete TRUNCATE + would have created a new tablespace file, we must remove + a possibly existing tablespace that is associated with the + new tablespace file. */ +retry: + mutex_enter(&fil_system.mutex); + for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list); + space; space = UT_LIST_GET_NEXT(space_list, space)) { + ulint id = space->id; + if (id + && space->purpose == FIL_TYPE_TABLESPACE + && !strcmp(new_path, + UT_LIST_GET_FIRST(space->chain)->name)) { + ib::info() << "TRUNCATE rollback: " << id + << "," << new_path; + mutex_exit(&fil_system.mutex); + dberr_t err = fil_delete_tablespace(id); + if (err != DB_SUCCESS) { + return err; + } + goto retry; + } + } + mutex_exit(&fil_system.mutex); + fil_delete_file(new_path); + + return(DB_SUCCESS); +} + +dberr_t fil_space_t::rename(const char* name, const char* path, bool log, + bool replace) +{ + ut_ad(UT_LIST_GET_LEN(chain) == 1); + ut_ad(!is_system_tablespace(id)); + + if (log) { + dberr_t err = fil_rename_tablespace_check( + chain.start->name, path, replace); + if (err != DB_SUCCESS) { + return(err); + } + fil_name_write_rename(id, chain.start->name, path); + } + + return fil_rename_tablespace(id, chain.start->name, name, path) + ? DB_SUCCESS : DB_ERROR; +} + +/** Rename a single-table tablespace. +The tablespace must exist in the memory cache. +@param[in] id tablespace identifier +@param[in] old_path old file name +@param[in] new_name new table name in the +databasename/tablename format +@param[in] new_path_in new file name, +or NULL if it is located in the normal data directory +@return true if success */ +static bool +fil_rename_tablespace( + ulint id, + const char* old_path, + const char* new_name, + const char* new_path_in) +{ + fil_space_t* space; + fil_node_t* node; + ut_a(id != 0); + + ut_ad(strchr(new_name, '/') != NULL); + + mutex_enter(&fil_system.mutex); + + space = fil_space_get_by_id(id); + + if (space == NULL) { + ib::error() << "Cannot find space id " << id + << " in the tablespace memory cache, though the file '" + << old_path + << "' in a rename operation should have that id."; + mutex_exit(&fil_system.mutex); + return(false); + } + + /* The following code must change when InnoDB supports + multiple datafiles per tablespace. */ + ut_a(UT_LIST_GET_LEN(space->chain) == 1); + node = UT_LIST_GET_FIRST(space->chain); + space->reacquire(); + + mutex_exit(&fil_system.mutex); + + char* new_file_name = new_path_in == NULL + ? fil_make_filepath(NULL, new_name, IBD, false) + : mem_strdup(new_path_in); + char* old_file_name = node->name; + char* new_space_name = mem_strdup(new_name); + char* old_space_name = space->name; + + ut_ad(strchr(old_file_name, OS_PATH_SEPARATOR) != NULL); + ut_ad(strchr(new_file_name, OS_PATH_SEPARATOR) != NULL); + + if (!recv_recovery_is_on()) { + mysql_mutex_lock(&log_sys.mutex); + } + + /* log_sys.mutex is above fil_system.mutex in the latching order */ + mysql_mutex_assert_owner(&log_sys.mutex); + mutex_enter(&fil_system.mutex); + space->release(); + ut_ad(space->name == old_space_name); + ut_ad(node->name == old_file_name); + bool success; + DBUG_EXECUTE_IF("fil_rename_tablespace_failure_2", + goto skip_second_rename; ); + success = os_file_rename(innodb_data_file_key, + old_file_name, + new_file_name); + DBUG_EXECUTE_IF("fil_rename_tablespace_failure_2", +skip_second_rename: + success = false; ); + + ut_ad(node->name == old_file_name); + + if (success) { + node->name = new_file_name; + } + + if (!recv_recovery_is_on()) { + mysql_mutex_unlock(&log_sys.mutex); + } + + ut_ad(space->name == old_space_name); + if (success) { + space->name = new_space_name; + } else { + /* Because nothing was renamed, we must free the new + names, not the old ones. */ + old_file_name = new_file_name; + old_space_name = new_space_name; + } + + mutex_exit(&fil_system.mutex); + + ut_free(old_file_name); + ut_free(old_space_name); + + return(success); +} + +/* FIXME: remove this! */ +IF_WIN(, bool os_is_sparse_file_supported(os_file_t fh)); + +/** Create a tablespace file. +@param[in] space_id Tablespace ID +@param[in] name Tablespace name in dbname/tablename format. +@param[in] path Path and filename of the datafile to create. +@param[in] flags Tablespace flags +@param[in] size Initial size of the tablespace file in pages, +must be >= FIL_IBD_FILE_INITIAL_SIZE +@param[in] mode MariaDB encryption mode +@param[in] key_id MariaDB encryption key_id +@param[out] err DB_SUCCESS or error code +@return the created tablespace +@retval NULL on error */ +fil_space_t* +fil_ibd_create( + ulint space_id, + const char* name, + const char* path, + ulint flags, + uint32_t size, + fil_encryption_t mode, + uint32_t key_id, + dberr_t* err) +{ + pfs_os_file_t file; + byte* page; + bool success; + bool has_data_dir = FSP_FLAGS_HAS_DATA_DIR(flags) != 0; + + ut_ad(!is_system_tablespace(space_id)); + ut_ad(!srv_read_only_mode); + ut_a(space_id < SRV_SPACE_ID_UPPER_BOUND); + ut_a(size >= FIL_IBD_FILE_INITIAL_SIZE); + ut_a(fil_space_t::is_valid_flags(flags & ~FSP_FLAGS_MEM_MASK, space_id)); + + /* Create the subdirectories in the path, if they are + not there already. */ + *err = os_file_create_subdirs_if_needed(path); + if (*err != DB_SUCCESS) { + return NULL; + } + + ulint type; + static_assert(((UNIV_ZIP_SIZE_MIN >> 1) << 3) == 4096, + "compatibility"); + switch (FSP_FLAGS_GET_ZIP_SSIZE(flags)) { + case 1: + case 2: + type = OS_DATA_FILE_NO_O_DIRECT; + break; + default: + type = OS_DATA_FILE; + } + + file = os_file_create( + innodb_data_file_key, path, + OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT, + OS_FILE_AIO, type, srv_read_only_mode, &success); + + if (!success) { + /* The following call will print an error message */ + switch (os_file_get_last_error(true)) { + case OS_FILE_ALREADY_EXISTS: + ib::info() << "The file '" << path << "'" + " already exists though the" + " corresponding table did not exist" + " in the InnoDB data dictionary." + " You can resolve the problem by removing" + " the file."; + *err = DB_TABLESPACE_EXISTS; + break; + case OS_FILE_DISK_FULL: + *err = DB_OUT_OF_FILE_SPACE; + break; + default: + *err = DB_ERROR; + } + ib::error() << "Cannot create file '" << path << "'"; + return NULL; + } + + const bool is_compressed = fil_space_t::is_compressed(flags); + bool punch_hole = is_compressed; + fil_space_crypt_t* crypt_data = nullptr; +#ifdef _WIN32 + if (is_compressed) { + os_file_set_sparse_win32(file); + } +#endif + + if (!os_file_set_size( + path, file, + os_offset_t(size) << srv_page_size_shift, is_compressed)) { + *err = DB_OUT_OF_FILE_SPACE; +err_exit: + os_file_close(file); + os_file_delete(innodb_data_file_key, path); + free(crypt_data); + return NULL; + } + + /* FIXME: remove this */ + IF_WIN(, punch_hole = punch_hole && os_is_sparse_file_supported(file)); + + /* We have to write the space id to the file immediately and flush the + file to disk. This is because in crash recovery we must be aware what + tablespaces exist and what are their space id's, so that we can apply + the log records to the right file. It may take quite a while until + buffer pool flush algorithms write anything to the file and flush it to + disk. If we would not write here anything, the file would be filled + with zeros from the call of os_file_set_size(), until a buffer pool + flush would write to it. */ + + /* Align the memory for file i/o if we might have O_DIRECT set */ + page = static_cast<byte*>(aligned_malloc(2 * srv_page_size, + srv_page_size)); + + memset(page, '\0', srv_page_size); + + if (fil_space_t::full_crc32(flags)) { + flags |= FSP_FLAGS_FCRC32_PAGE_SSIZE(); + } else { + flags |= FSP_FLAGS_PAGE_SSIZE(); + } + + fsp_header_init_fields(page, space_id, flags); + mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, space_id); + + /* Create crypt data if the tablespace is either encrypted or user has + requested it to remain unencrypted. */ + crypt_data = (mode != FIL_ENCRYPTION_DEFAULT || srv_encrypt_tables) + ? fil_space_create_crypt_data(mode, key_id) + : NULL; + + if (crypt_data) { + /* Write crypt data information in page0 while creating + ibd file. */ + crypt_data->fill_page0(flags, page); + } + + if (ulint zip_size = fil_space_t::zip_size(flags)) { + page_zip_des_t page_zip; + page_zip_set_size(&page_zip, zip_size); + page_zip.data = page + srv_page_size; +#ifdef UNIV_DEBUG + page_zip.m_start = 0; +#endif /* UNIV_DEBUG */ + page_zip.m_end = 0; + page_zip.m_nonempty = 0; + page_zip.n_blobs = 0; + + buf_flush_init_for_writing(NULL, page, &page_zip, false); + + *err = os_file_write(IORequestWrite, path, file, + page_zip.data, 0, zip_size); + } else { + buf_flush_init_for_writing(NULL, page, NULL, + fil_space_t::full_crc32(flags)); + + *err = os_file_write(IORequestWrite, path, file, + page, 0, srv_page_size); + } + + aligned_free(page); + + if (*err != DB_SUCCESS) { + ib::error() + << "Could not write the first page to" + << " tablespace '" << path << "'"; + goto err_exit; + } + + if (!os_file_flush(file)) { + ib::error() << "File flush of tablespace '" + << path << "' failed"; + *err = DB_ERROR; + goto err_exit; + } + + if (has_data_dir) { + /* Make the ISL file if the IBD file is not + in the default location. */ + *err = RemoteDatafile::create_link_file(name, path); + if (*err != DB_SUCCESS) { + goto err_exit; + } + } + + if (fil_space_t* space = fil_space_t::create(name, space_id, flags, + FIL_TYPE_TABLESPACE, + crypt_data, mode)) { + space->punch_hole = punch_hole; + fil_node_t* node = space->add(path, file, size, false, true); + mtr_t mtr; + mtr.start(); + mtr.log_file_op(FILE_CREATE, space_id, node->name); + mtr.commit(); + + node->find_metadata(file); + *err = DB_SUCCESS; + return space; + } + + if (has_data_dir) { + RemoteDatafile::delete_link_file(name); + } + + *err = DB_ERROR; + goto err_exit; +} + +/** Try to open a single-table tablespace and optionally check that the +space id in it is correct. If this does not succeed, print an error message +to the .err log. This function is used to open a tablespace when we start +mysqld after the dictionary has been booted, and also in IMPORT TABLESPACE. + +NOTE that we assume this operation is used either at the database startup +or under the protection of the dictionary mutex, so that two users cannot +race here. This operation does not leave the file associated with the +tablespace open, but closes it after we have looked at the space id in it. + +If the validate boolean is set, we read the first page of the file and +check that the space id in the file is what we expect. We assume that +this function runs much faster if no check is made, since accessing the +file inode probably is much faster (the OS caches them) than accessing +the first page of the file. This boolean may be initially false, but if +a remote tablespace is found it will be changed to true. + +If the fix_dict boolean is set, then it is safe to use an internal SQL +statement to update the dictionary tables if they are incorrect. + +@param[in] validate true if we should validate the tablespace +@param[in] fix_dict true if the dictionary is available to be fixed +@param[in] purpose FIL_TYPE_TABLESPACE or FIL_TYPE_TEMPORARY +@param[in] id tablespace ID +@param[in] flags expected FSP_SPACE_FLAGS +@param[in] space_name tablespace name of the datafile +If file-per-table, it is the table name in the databasename/tablename format +@param[in] path_in expected filepath, usually read from dictionary +@param[out] err DB_SUCCESS or error code +@return tablespace +@retval NULL if the tablespace could not be opened */ +fil_space_t* +fil_ibd_open( + bool validate, + bool fix_dict, + fil_type_t purpose, + ulint id, + ulint flags, + const table_name_t& tablename, + const char* path_in, + dberr_t* err) +{ + mutex_enter(&fil_system.mutex); + if (fil_space_t* space = fil_space_get_by_id(id)) { + if (strcmp(space->name, tablename.m_name)) { + table_name_t space_name; + space_name.m_name = space->name; + ib::error() + << "Trying to open table " << tablename + << " with id " << id + << ", conflicting with " << space_name; + space = NULL; + if (err) *err = DB_TABLESPACE_EXISTS; + } else if (err) *err = DB_SUCCESS; + + mutex_exit(&fil_system.mutex); + + if (space && validate && !srv_read_only_mode) { + fsp_flags_try_adjust(space, + flags & ~FSP_FLAGS_MEM_MASK); + } + + return space; + } + mutex_exit(&fil_system.mutex); + + bool dict_filepath_same_as_default = false; + bool link_file_found = false; + bool link_file_is_bad = false; + Datafile df_default; /* default location */ + Datafile df_dict; /* dictionary location */ + RemoteDatafile df_remote; /* remote location */ + ulint tablespaces_found = 0; + ulint valid_tablespaces_found = 0; + + if (fix_dict) { + ut_d(dict_sys.assert_locked()); + ut_ad(!srv_read_only_mode); + ut_ad(srv_log_file_size != 0); + } + + /* Table flags can be ULINT_UNDEFINED if + dict_tf_to_fsp_flags_failure is set. */ + if (flags == ULINT_UNDEFINED) { +corrupted: + if (err) *err = DB_CORRUPTION; + return NULL; + } + + ut_ad(fil_space_t::is_valid_flags(flags & ~FSP_FLAGS_MEM_MASK, id)); + df_default.init(tablename.m_name, flags); + df_dict.init(tablename.m_name, flags); + df_remote.init(tablename.m_name, flags); + + /* Discover the correct file by looking in three possible locations + while avoiding unecessary effort. */ + + /* We will always look for an ibd in the default location. */ + df_default.make_filepath(NULL, tablename.m_name, IBD); + + /* Look for a filepath embedded in an ISL where the default file + would be. */ + if (df_remote.open_read_only(true) == DB_SUCCESS) { + ut_ad(df_remote.is_open()); + + /* Always validate a file opened from an ISL pointer */ + validate = true; + ++tablespaces_found; + link_file_found = true; + } else if (df_remote.filepath() != NULL) { + /* An ISL file was found but contained a bad filepath in it. + Better validate anything we do find. */ + validate = true; + } + + /* Attempt to open the tablespace at the dictionary filepath. */ + if (path_in) { + if (df_default.same_filepath_as(path_in)) { + dict_filepath_same_as_default = true; + } else { + /* Dict path is not the default path. Always validate + remote files. If default is opened, it was moved. */ + validate = true; + df_dict.set_filepath(path_in); + if (df_dict.open_read_only(true) == DB_SUCCESS) { + ut_ad(df_dict.is_open()); + ++tablespaces_found; + } + } + } + + /* Always look for a file at the default location. But don't log + an error if the tablespace is already open in remote or dict. */ + ut_a(df_default.filepath()); + const bool strict = (tablespaces_found == 0); + if (df_default.open_read_only(strict) == DB_SUCCESS) { + ut_ad(df_default.is_open()); + ++tablespaces_found; + } + + /* Check if multiple locations point to the same file. */ + if (tablespaces_found > 1 && df_default.same_as(df_remote)) { + /* A link file was found with the default path in it. + Use the default path and delete the link file. */ + --tablespaces_found; + df_remote.delete_link_file(); + df_remote.close(); + } + if (tablespaces_found > 1 && df_default.same_as(df_dict)) { + --tablespaces_found; + df_dict.close(); + } + if (tablespaces_found > 1 && df_remote.same_as(df_dict)) { + --tablespaces_found; + df_dict.close(); + } + + /* We have now checked all possible tablespace locations and + have a count of how many unique files we found. If things are + normal, we only found 1. */ + /* For encrypted tablespace, we need to check the + encryption in header of first page. */ + if (!validate && tablespaces_found == 1) { + goto skip_validate; + } + + /* Read and validate the first page of these three tablespace + locations, if found. */ + valid_tablespaces_found += + (df_remote.validate_to_dd(id, flags) == DB_SUCCESS); + + valid_tablespaces_found += + (df_default.validate_to_dd(id, flags) == DB_SUCCESS); + + valid_tablespaces_found += + (df_dict.validate_to_dd(id, flags) == DB_SUCCESS); + + /* Make sense of these three possible locations. + First, bail out if no tablespace files were found. */ + if (valid_tablespaces_found == 0) { + os_file_get_last_error(true); + ib::error() << "Could not find a valid tablespace file for `" + << tablename << "`. " << TROUBLESHOOT_DATADICT_MSG; + goto corrupted; + } + if (!validate) { + goto skip_validate; + } + + /* Do not open any tablespaces if more than one tablespace with + the correct space ID and flags were found. */ + if (tablespaces_found > 1) { + ib::error() << "A tablespace for `" << tablename + << "` has been found in multiple places;"; + + if (df_default.is_open()) { + ib::error() << "Default location: " + << df_default.filepath() + << ", Space ID=" << df_default.space_id() + << ", Flags=" << df_default.flags(); + } + if (df_remote.is_open()) { + ib::error() << "Remote location: " + << df_remote.filepath() + << ", Space ID=" << df_remote.space_id() + << ", Flags=" << df_remote.flags(); + } + if (df_dict.is_open()) { + ib::error() << "Dictionary location: " + << df_dict.filepath() + << ", Space ID=" << df_dict.space_id() + << ", Flags=" << df_dict.flags(); + } + + /* Force-recovery will allow some tablespaces to be + skipped by REDO if there was more than one file found. + Unlike during the REDO phase of recovery, we now know + if the tablespace is valid according to the dictionary, + which was not available then. So if we did not force + recovery and there is only one good tablespace, ignore + any bad tablespaces. */ + if (valid_tablespaces_found > 1 || srv_force_recovery > 0) { + ib::error() << "Will not open tablespace `" + << tablename << "`"; + + /* If the file is not open it cannot be valid. */ + ut_ad(df_default.is_open() || !df_default.is_valid()); + ut_ad(df_dict.is_open() || !df_dict.is_valid()); + ut_ad(df_remote.is_open() || !df_remote.is_valid()); + + /* Having established that, this is an easy way to + look for corrupted data files. */ + if (df_default.is_open() != df_default.is_valid() + || df_dict.is_open() != df_dict.is_valid() + || df_remote.is_open() != df_remote.is_valid()) { + goto corrupted; + } +error: + if (err) *err = DB_ERROR; + return NULL; + } + + /* There is only one valid tablespace found and we did + not use srv_force_recovery during REDO. Use this one + tablespace and clean up invalid tablespace pointers */ + if (df_default.is_open() && !df_default.is_valid()) { + df_default.close(); + tablespaces_found--; + } + + if (df_dict.is_open() && !df_dict.is_valid()) { + df_dict.close(); + /* Leave dict.filepath so that SYS_DATAFILES + can be corrected below. */ + tablespaces_found--; + } + + if (df_remote.is_open() && !df_remote.is_valid()) { + df_remote.close(); + tablespaces_found--; + link_file_is_bad = true; + } + } + + /* At this point, there should be only one filepath. */ + ut_a(tablespaces_found == 1); + ut_a(valid_tablespaces_found == 1); + + /* Only fix the dictionary at startup when there is only one thread. + Calls to dict_load_table() can be done while holding other latches. */ + if (!fix_dict) { + goto skip_validate; + } + + /* We may need to update what is stored in SYS_DATAFILES or + SYS_TABLESPACES or adjust the link file. Since a failure to + update SYS_TABLESPACES or SYS_DATAFILES does not prevent opening + and using the tablespace either this time or the next, we do not + check the return code or fail to open the tablespace. But if it + fails, dict_update_filepath() will issue a warning to the log. */ + if (df_dict.filepath()) { + ut_ad(path_in != NULL); + ut_ad(df_dict.same_filepath_as(path_in)); + + if (df_remote.is_open()) { + if (!df_remote.same_filepath_as(path_in)) { + dict_update_filepath(id, df_remote.filepath()); + } + + } else if (df_default.is_open()) { + ut_ad(!dict_filepath_same_as_default); + dict_update_filepath(id, df_default.filepath()); + if (link_file_is_bad) { + RemoteDatafile::delete_link_file( + tablename.m_name); + } + + } else if (!link_file_found || link_file_is_bad) { + ut_ad(df_dict.is_open()); + /* Fix the link file if we got our filepath + from the dictionary but a link file did not + exist or it did not point to a valid file. */ + RemoteDatafile::delete_link_file(tablename.m_name); + RemoteDatafile::create_link_file( + tablename.m_name, df_dict.filepath()); + } + + } else if (df_remote.is_open()) { + if (dict_filepath_same_as_default) { + dict_update_filepath(id, df_remote.filepath()); + + } else if (path_in == NULL) { + /* SYS_DATAFILES record for this space ID + was not found. */ + dict_replace_tablespace_and_filepath( + id, tablename.m_name, + df_remote.filepath(), flags); + } + + } else if (df_default.is_open()) { + /* We opened the tablespace in the default location. + SYS_DATAFILES.PATH needs to be updated if it is different + from this default path or if the SYS_DATAFILES.PATH was not + supplied and it should have been. Also update the dictionary + if we found an ISL file (since !df_remote.is_open). Since + path_in is not suppled for file-per-table, we must assume + that it matched the ISL. */ + if ((path_in != NULL && !dict_filepath_same_as_default) + || (path_in == NULL && DICT_TF_HAS_DATA_DIR(flags)) + || df_remote.filepath() != NULL) { + dict_replace_tablespace_and_filepath( + id, tablename.m_name, df_default.filepath(), + flags); + } + } + +skip_validate: + const byte* first_page = + df_default.is_open() ? df_default.get_first_page() : + df_dict.is_open() ? df_dict.get_first_page() : + df_remote.get_first_page(); + + fil_space_crypt_t* crypt_data = first_page + ? fil_space_read_crypt_data(fil_space_t::zip_size(flags), + first_page) + : NULL; + + fil_space_t* space = fil_space_t::create( + tablename.m_name, id, flags, purpose, crypt_data); + if (!space) { + goto error; + } + + /* We do not measure the size of the file, that is why + we pass the 0 below */ + + space->add( + df_remote.is_open() ? df_remote.filepath() : + df_dict.is_open() ? df_dict.filepath() : + df_default.filepath(), OS_FILE_CLOSED, 0, false, true); + + if (validate && !srv_read_only_mode) { + df_remote.close(); + df_dict.close(); + df_default.close(); + if (space->acquire()) { + if (purpose != FIL_TYPE_IMPORT) { + fsp_flags_try_adjust(space, flags + & ~FSP_FLAGS_MEM_MASK); + } + space->release(); + } + } + + if (err) *err = DB_SUCCESS; + return space; +} + +/** Looks for a pre-existing fil_space_t with the given tablespace ID +and, if found, returns the name and filepath in newly allocated buffers +that the caller must free. +@param[in] space_id The tablespace ID to search for. +@param[out] name Name of the tablespace found. +@param[out] filepath The filepath of the first datafile for the +tablespace. +@return true if tablespace is found, false if not. */ +bool +fil_space_read_name_and_filepath( + ulint space_id, + char** name, + char** filepath) +{ + bool success = false; + *name = NULL; + *filepath = NULL; + + mutex_enter(&fil_system.mutex); + + fil_space_t* space = fil_space_get_by_id(space_id); + + if (space != NULL) { + *name = mem_strdup(space->name); + + fil_node_t* node = UT_LIST_GET_FIRST(space->chain); + *filepath = mem_strdup(node->name); + + success = true; + } + + mutex_exit(&fil_system.mutex); + + return(success); +} + +/** Convert a file name to a tablespace name. +@param[in] filename directory/databasename/tablename.ibd +@return database/tablename string, to be freed with ut_free() */ +char* +fil_path_to_space_name( + const char* filename) +{ + /* Strip the file name prefix and suffix, leaving + only databasename/tablename. */ + ulint filename_len = strlen(filename); + const char* end = filename + filename_len; +#ifdef HAVE_MEMRCHR + const char* tablename = 1 + static_cast<const char*>( + memrchr(filename, OS_PATH_SEPARATOR, + filename_len)); + const char* dbname = 1 + static_cast<const char*>( + memrchr(filename, OS_PATH_SEPARATOR, + tablename - filename - 1)); +#else /* HAVE_MEMRCHR */ + const char* tablename = filename; + const char* dbname = NULL; + + while (const char* t = static_cast<const char*>( + memchr(tablename, OS_PATH_SEPARATOR, + ulint(end - tablename)))) { + dbname = tablename; + tablename = t + 1; + } +#endif /* HAVE_MEMRCHR */ + + ut_ad(dbname != NULL); + ut_ad(tablename > dbname); + ut_ad(tablename < end); + ut_ad(end - tablename > 4); + ut_ad(memcmp(end - 4, DOT_IBD, 4) == 0); + + char* name = mem_strdupl(dbname, ulint(end - dbname) - 4); + + ut_ad(name[tablename - dbname - 1] == OS_PATH_SEPARATOR); +#if OS_PATH_SEPARATOR != '/' + /* space->name uses '/', not OS_PATH_SEPARATOR. */ + name[tablename - dbname - 1] = '/'; +#endif + + return(name); +} + +/** Discover the correct IBD file to open given a remote or missing +filepath from the REDO log. Administrators can move a crashed +database to another location on the same machine and try to recover it. +Remote IBD files might be moved as well to the new location. + The problem with this is that the REDO log contains the old location +which may be still accessible. During recovery, if files are found in +both locations, we can chose on based on these priorities; +1. Default location +2. ISL location +3. REDO location +@param[in] space_id tablespace ID +@param[in] df Datafile object with path from redo +@return true if a valid datafile was found, false if not */ +static +bool +fil_ibd_discover( + ulint space_id, + Datafile& df) +{ + Datafile df_def_per; /* default file-per-table datafile */ + RemoteDatafile df_rem_per; /* remote file-per-table datafile */ + + /* Look for the datafile in the default location. */ + const char* filename = df.filepath(); + const char* basename = base_name(filename); + + /* If this datafile is file-per-table it will have a schema dir. */ + ulint sep_found = 0; + const char* db = basename; + for (; db > filename && sep_found < 2; db--) { + if (db[0] == OS_PATH_SEPARATOR) { + sep_found++; + } + } + if (sep_found == 2) { + db += 2; + df_def_per.init(db, 0); + df_def_per.make_filepath(NULL, db, IBD); + if (df_def_per.open_read_only(false) == DB_SUCCESS + && df_def_per.validate_for_recovery() == DB_SUCCESS + && df_def_per.space_id() == space_id) { + df.set_filepath(df_def_per.filepath()); + df.open_read_only(false); + return(true); + } + + /* Look for a remote file-per-table tablespace. */ + + switch (srv_operation) { + case SRV_OPERATION_BACKUP: + case SRV_OPERATION_RESTORE_DELTA: + ut_ad(0); + break; + case SRV_OPERATION_RESTORE_EXPORT: + case SRV_OPERATION_RESTORE: + break; + case SRV_OPERATION_NORMAL: + df_rem_per.set_name(db); + if (df_rem_per.open_link_file() != DB_SUCCESS) { + break; + } + + /* An ISL file was found with contents. */ + if (df_rem_per.open_read_only(false) != DB_SUCCESS + || df_rem_per.validate_for_recovery() + != DB_SUCCESS) { + + /* Assume that this ISL file is intended to + be used. Do not continue looking for another + if this file cannot be opened or is not + a valid IBD file. */ + ib::error() << "ISL file '" + << df_rem_per.link_filepath() + << "' was found but the linked file '" + << df_rem_per.filepath() + << "' could not be opened or is" + " not correct."; + return(false); + } + + /* Use this file if it has the space_id from the + MLOG record. */ + if (df_rem_per.space_id() == space_id) { + df.set_filepath(df_rem_per.filepath()); + df.open_read_only(false); + return(true); + } + + /* Since old MLOG records can use the same basename + in multiple CREATE/DROP TABLE sequences, this ISL + file could be pointing to a later version of this + basename.ibd file which has a different space_id. + Keep looking. */ + } + } + + /* No ISL files were found in the default location. Use the location + given in the redo log. */ + if (df.open_read_only(false) == DB_SUCCESS + && df.validate_for_recovery() == DB_SUCCESS + && df.space_id() == space_id) { + return(true); + } + + /* A datafile was not discovered for the filename given. */ + return(false); +} +/** Open an ibd tablespace and add it to the InnoDB data structures. +This is similar to fil_ibd_open() except that it is used while processing +the REDO log, so the data dictionary is not available and very little +validation is done. The tablespace name is extracred from the +dbname/tablename.ibd portion of the filename, which assumes that the file +is a file-per-table tablespace. Any name will do for now. General +tablespace names will be read from the dictionary after it has been +recovered. The tablespace flags are read at this time from the first page +of the file in validate_for_recovery(). +@param[in] space_id tablespace ID +@param[in] filename path/to/databasename/tablename.ibd +@param[out] space the tablespace, or NULL on error +@return status of the operation */ +enum fil_load_status +fil_ibd_load( + ulint space_id, + const char* filename, + fil_space_t*& space) +{ + /* If the a space is already in the file system cache with this + space ID, then there is nothing to do. */ + mutex_enter(&fil_system.mutex); + space = fil_space_get_by_id(space_id); + mutex_exit(&fil_system.mutex); + + if (space) { + /* Compare the filename we are trying to open with the + filename from the first node of the tablespace we opened + previously. Fail if it is different. */ + fil_node_t* node = UT_LIST_GET_FIRST(space->chain); + if (0 != strcmp(innobase_basename(filename), + innobase_basename(node->name))) { + ib::info() + << "Ignoring data file '" << filename + << "' with space ID " << space->id + << ". Another data file called " << node->name + << " exists with the same space ID."; + space = NULL; + return(FIL_LOAD_ID_CHANGED); + } + return(FIL_LOAD_OK); + } + + if (srv_operation == SRV_OPERATION_RESTORE) { + /* Replace absolute DATA DIRECTORY file paths with + short names relative to the backup directory. */ + if (const char* name = strrchr(filename, OS_PATH_SEPARATOR)) { + while (--name > filename + && *name != OS_PATH_SEPARATOR); + if (name > filename) { + filename = name + 1; + } + } + } + + Datafile file; + file.set_filepath(filename); + file.open_read_only(false); + + if (!file.is_open()) { + /* The file has been moved or it is a remote datafile. */ + if (!fil_ibd_discover(space_id, file) + || !file.is_open()) { + return(FIL_LOAD_NOT_FOUND); + } + } + + os_offset_t size; + + /* Read and validate the first page of the tablespace. + Assign a tablespace name based on the tablespace type. */ + switch (file.validate_for_recovery()) { + os_offset_t minimum_size; + case DB_SUCCESS: + if (file.space_id() != space_id) { + return(FIL_LOAD_ID_CHANGED); + } + /* Get and test the file size. */ + size = os_file_get_size(file.handle()); + + /* Every .ibd file is created >= 4 pages in size. + Smaller files cannot be OK. */ + minimum_size = os_offset_t(FIL_IBD_FILE_INITIAL_SIZE) + << srv_page_size_shift; + + if (size == static_cast<os_offset_t>(-1)) { + /* The following call prints an error message */ + os_file_get_last_error(true); + + ib::error() << "Could not measure the size of" + " single-table tablespace file '" + << file.filepath() << "'"; + } else if (size < minimum_size) { + ib::error() << "The size of tablespace file '" + << file.filepath() << "' is only " << size + << ", should be at least " << minimum_size + << "!"; + } else { + /* Everything is fine so far. */ + break; + } + + /* fall through */ + + case DB_TABLESPACE_EXISTS: + return(FIL_LOAD_INVALID); + + default: + return(FIL_LOAD_NOT_FOUND); + } + + ut_ad(space == NULL); + + /* Adjust the memory-based flags that would normally be set by + dict_tf_to_fsp_flags(). In recovery, we have no data dictionary. */ + ulint flags = file.flags(); + if (fil_space_t::is_compressed(flags)) { + flags |= page_zip_level + << FSP_FLAGS_MEM_COMPRESSION_LEVEL; + } + + const byte* first_page = file.get_first_page(); + fil_space_crypt_t* crypt_data = first_page + ? fil_space_read_crypt_data(fil_space_t::zip_size(flags), + first_page) + : NULL; + space = fil_space_t::create( + file.name(), space_id, flags, FIL_TYPE_TABLESPACE, crypt_data); + + if (space == NULL) { + return(FIL_LOAD_INVALID); + } + + ut_ad(space->id == file.space_id()); + ut_ad(space->id == space_id); + + /* We do not use the size information we have about the file, because + the rounding formula for extents and pages is somewhat complex; we + let fil_node_open() do that task. */ + + space->add(file.filepath(), OS_FILE_CLOSED, 0, false, false); + + return(FIL_LOAD_OK); +} + +/** Try to adjust FSP_SPACE_FLAGS if they differ from the expectations. +(Typically when upgrading from MariaDB 10.1.0..10.1.20.) +@param[in,out] space tablespace +@param[in] flags desired tablespace flags */ +void fsp_flags_try_adjust(fil_space_t* space, ulint flags) +{ + ut_ad(!srv_read_only_mode); + ut_ad(fil_space_t::is_valid_flags(flags, space->id)); + if (space->full_crc32() || fil_space_t::full_crc32(flags)) { + return; + } + if (!space->size && (space->purpose != FIL_TYPE_TABLESPACE + || !space->get_size())) { + return; + } + /* This code is executed during server startup while no + connections are allowed. We do not need to protect against + DROP TABLE by fil_space_acquire(). */ + mtr_t mtr; + mtr.start(); + if (buf_block_t* b = buf_page_get( + page_id_t(space->id, 0), space->zip_size(), + RW_X_LATCH, &mtr)) { + uint32_t f = fsp_header_get_flags(b->frame); + if (fil_space_t::full_crc32(f)) { + goto func_exit; + } + if (fil_space_t::is_flags_equal(f, flags)) { + goto func_exit; + } + /* Suppress the message if only the DATA_DIR flag to differs. */ + if ((f ^ flags) & ~(1U << FSP_FLAGS_POS_RESERVED)) { + ib::warn() + << "adjusting FSP_SPACE_FLAGS of file '" + << UT_LIST_GET_FIRST(space->chain)->name + << "' from " << ib::hex(f) + << " to " << ib::hex(flags); + } + mtr.set_named_space(space); + mtr.write<4,mtr_t::FORCED>(*b, + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + + b->frame, flags); + } +func_exit: + mtr.commit(); +} + +/** Determine if a matching tablespace exists in the InnoDB tablespace +memory cache. Note that if we have not done a crash recovery at the database +startup, there may be many tablespaces which are not yet in the memory cache. +@param[in] id Tablespace ID +@param[in] name Tablespace name used in fil_space_t::create(). +@param[in] table_flags table flags +@return the tablespace +@retval NULL if no matching tablespace exists in the memory cache */ +fil_space_t* +fil_space_for_table_exists_in_mem( + ulint id, + const char* name, + ulint table_flags) +{ + const ulint expected_flags = dict_tf_to_fsp_flags(table_flags); + + mutex_enter(&fil_system.mutex); + if (fil_space_t* space = fil_space_get_by_id(id)) { + ulint tf = expected_flags & ~FSP_FLAGS_MEM_MASK; + ulint sf = space->flags & ~FSP_FLAGS_MEM_MASK; + + if (!fil_space_t::is_flags_equal(tf, sf) + && !fil_space_t::is_flags_equal(sf, tf)) { + goto func_exit; + } + + if (strcmp(space->name, name)) { + ib::error() << "Table " << name + << " in InnoDB data dictionary" + " has tablespace id " << id + << ", but the tablespace" + " with that id has name " << space->name << "." + " Have you deleted or moved .ibd files?"; + ib::info() << TROUBLESHOOT_DATADICT_MSG; + goto func_exit; + } + + /* Adjust the flags that are in FSP_FLAGS_MEM_MASK. + FSP_SPACE_FLAGS will not be written back here. */ + space->flags = (space->flags & ~FSP_FLAGS_MEM_MASK) + | (expected_flags & FSP_FLAGS_MEM_MASK); + mutex_exit(&fil_system.mutex); + if (!srv_read_only_mode) { + fsp_flags_try_adjust(space, expected_flags + & ~FSP_FLAGS_MEM_MASK); + } + return space; + } + +func_exit: + mutex_exit(&fil_system.mutex); + return NULL; +} + +/*============================ FILE I/O ================================*/ + +/** Report information about an invalid page access. */ +ATTRIBUTE_COLD __attribute__((noreturn)) +static void +fil_report_invalid_page_access(const char *name, + os_offset_t offset, ulint len, bool is_read) +{ + ib::fatal() << "Trying to " << (is_read ? "read " : "write ") << len + << " bytes at " << offset + << " outside the bounds of the file: " << name; +} + + +/** Update the data structures on write completion */ +inline void fil_node_t::complete_write() +{ + ut_ad(!mutex_own(&fil_system.mutex)); + + if (space->purpose != FIL_TYPE_TEMPORARY && + srv_file_flush_method != SRV_O_DIRECT_NO_FSYNC && + space->set_needs_flush()) + { + mutex_enter(&fil_system.mutex); + if (!space->is_in_unflushed_spaces) + { + space->is_in_unflushed_spaces= true; + fil_system.unflushed_spaces.push_front(*space); + } + mutex_exit(&fil_system.mutex); + } +} + +/** Read or write data. +@param type I/O context +@param offset offset in bytes +@param len number of bytes +@param buf the data to be read or written +@param bpage buffer block (for type.is_async() completion callback) +@return status and file descriptor */ +fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len, + void *buf, buf_page_t *bpage) +{ + ut_ad(referenced()); + ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0); + ut_ad((len % OS_FILE_LOG_BLOCK_SIZE) == 0); + ut_ad(fil_validate_skip()); + ut_ad(type.is_read() || type.is_write()); + ut_ad(type.type != IORequest::DBLWR_BATCH); + + if (type.is_read()) { + srv_stats.data_read.add(len); + } else { + ut_ad(!srv_read_only_mode || this == fil_system.temp_space); + srv_stats.data_written.add(len); + } + + fil_node_t* node= UT_LIST_GET_FIRST(chain); + ut_ad(node); + + if (type.type == IORequest::READ_ASYNC && is_stopping() + && !is_being_truncated) { + release(); + return {DB_TABLESPACE_DELETED, nullptr}; + } + + ulint p = static_cast<ulint>(offset >> srv_page_size_shift); + + if (UNIV_LIKELY_NULL(UT_LIST_GET_NEXT(chain, node))) { + ut_ad(this == fil_system.sys_space + || this == fil_system.temp_space); + ut_ad(!(offset & ((1 << srv_page_size_shift) - 1))); + + while (node->size <= p) { + p -= node->size; + node = UT_LIST_GET_NEXT(chain, node); + if (!node) { + if (type.type == IORequest::READ_ASYNC) { + release(); + return {DB_ERROR, nullptr}; + } + fil_report_invalid_page_access(name, offset, + len, + type.is_read()); + } + } + + offset = os_offset_t{p} << srv_page_size_shift; + } + + if (UNIV_UNLIKELY(node->size <= p)) { + if (type.type == IORequest::READ_ASYNC) { + release(); + /* If we can tolerate the non-existent pages, we + should return with DB_ERROR and let caller decide + what to do. */ + return {DB_ERROR, nullptr}; + } + + fil_report_invalid_page_access( + node->name, offset, len, type.is_read()); + } + + dberr_t err; + + if (type.type == IORequest::PUNCH_RANGE) { + err = os_file_punch_hole(node->handle, offset, len); + /* Punch hole is not supported, make space not to + support punch hole */ + if (UNIV_UNLIKELY(err == DB_IO_NO_PUNCH_HOLE)) { + punch_hole = false; + err = DB_SUCCESS; + } + goto release_sync_write; + } else { + /* Queue the aio request */ + err = os_aio(IORequest(bpage, node, type.type), + buf, offset, len); + } + + /* We an try to recover the page from the double write buffer if + the decompression fails or the page is corrupt. */ + + ut_a(type.type == IORequest::DBLWR_RECOVER || err == DB_SUCCESS); + if (!type.is_async()) { + if (type.is_write()) { +release_sync_write: + node->complete_write(); +release: + release(); + } + ut_ad(fil_validate_skip()); + } + if (err != DB_SUCCESS) { + goto release; + } + return {err, node}; +} + +#include <tpool.h> + +/** Callback for AIO completion */ +void fil_aio_callback(const IORequest &request) +{ + ut_ad(fil_validate_skip()); + ut_ad(request.node); + + if (!request.bpage) + { + ut_ad(!srv_read_only_mode); + if (request.type == IORequest::DBLWR_BATCH) + buf_dblwr.flush_buffered_writes_completed(request); + else + ut_ad(request.type == IORequest::WRITE_ASYNC); +write_completed: + request.node->complete_write(); + } + else if (request.is_write()) + { + buf_page_write_complete(request); + goto write_completed; + } + else + { + ut_ad(request.is_read()); + + /* IMPORTANT: since i/o handling for reads will read also the insert + buffer in fil_system.sys_space, we have to be very careful not to + introduce deadlocks. We never close fil_system.sys_space data + files and never issue asynchronous reads of change buffer pages. */ + const page_id_t id(request.bpage->id()); + + if (dberr_t err= buf_page_read_complete(request.bpage, *request.node)) + { + if (recv_recovery_is_on() && !srv_force_recovery) + recv_sys.found_corrupt_fs= true; + + ib::error() << "Failed to read page " << id.page_no() + << " from file '" << request.node->name << "': " << err; + } + } + + request.node->space->release(); +} + +/** Flush to disk the writes in file spaces of the given type +possibly cached by the OS. */ +void fil_flush_file_spaces() +{ + if (srv_file_flush_method == SRV_O_DIRECT_NO_FSYNC) + { + ut_d(mutex_enter(&fil_system.mutex)); + ut_ad(fil_system.unflushed_spaces.empty()); + ut_d(mutex_exit(&fil_system.mutex)); + return; + } + +rescan: + mutex_enter(&fil_system.mutex); + + for (fil_space_t &space : fil_system.unflushed_spaces) + { + if (space.needs_flush_not_stopping()) + { + space.reacquire(); + mutex_exit(&fil_system.mutex); + space.flush_low(); + space.release(); + goto rescan; + } + } + + mutex_exit(&fil_system.mutex); +} + +/** Functor to validate the file node list of a tablespace. */ +struct Check { + /** Total size of file nodes visited so far */ + ulint size; + /** Total number of open files visited so far */ + ulint n_open; + + /** Constructor */ + Check() : size(0), n_open(0) {} + + /** Visit a file node + @param[in] elem file node to visit */ + void operator()(const fil_node_t* elem) + { + n_open += elem->is_open(); + size += elem->size; + } + + /** Validate a tablespace. + @param[in] space tablespace to validate + @return number of open file nodes */ + static ulint validate(const fil_space_t* space) + { + ut_ad(mutex_own(&fil_system.mutex)); + Check check; + ut_list_validate(space->chain, check); + ut_a(space->size == check.size); + + switch (space->id) { + case TRX_SYS_SPACE: + ut_ad(fil_system.sys_space == NULL + || fil_system.sys_space == space); + break; + case SRV_TMP_SPACE_ID: + ut_ad(fil_system.temp_space == NULL + || fil_system.temp_space == space); + break; + default: + break; + } + + return(check.n_open); + } +}; + +/******************************************************************//** +Checks the consistency of the tablespace cache. +@return true if ok */ +bool fil_validate() +{ + ulint n_open = 0; + + mutex_enter(&fil_system.mutex); + + for (fil_space_t *space = UT_LIST_GET_FIRST(fil_system.space_list); + space != NULL; + space = UT_LIST_GET_NEXT(space_list, space)) { + n_open += Check::validate(space); + } + + ut_a(fil_system.n_open == n_open); + + mutex_exit(&fil_system.mutex); + + return(true); +} + +/*********************************************************************//** +Sets the file page type. */ +void +fil_page_set_type( +/*==============*/ + byte* page, /*!< in/out: file page */ + ulint type) /*!< in: type */ +{ + ut_ad(page); + + mach_write_to_2(page + FIL_PAGE_TYPE, type); +} + +/********************************************************************//** +Delete the tablespace file and any related files like .cfg. +This should not be called for temporary tables. +@param[in] ibd_filepath File path of the IBD tablespace */ +void +fil_delete_file( +/*============*/ + const char* ibd_filepath) +{ + /* Force a delete of any stale .ibd files that are lying around. */ + + ib::info() << "Deleting " << ibd_filepath; + os_file_delete_if_exists(innodb_data_file_key, ibd_filepath, NULL); + + char* cfg_filepath = fil_make_filepath( + ibd_filepath, NULL, CFG, false); + if (cfg_filepath != NULL) { + os_file_delete_if_exists( + innodb_data_file_key, cfg_filepath, NULL); + ut_free(cfg_filepath); + } +} + +#ifdef UNIV_DEBUG +/** Check that a tablespace is valid for mtr_commit(). +@param[in] space persistent tablespace that has been changed */ +static +void +fil_space_validate_for_mtr_commit( + const fil_space_t* space) +{ + ut_ad(!mutex_own(&fil_system.mutex)); + ut_ad(space != NULL); + ut_ad(space->purpose == FIL_TYPE_TABLESPACE); + ut_ad(!is_predefined_tablespace(space->id)); + + /* We are serving mtr_commit(). While there is an active + mini-transaction, we should have !space->stop_new_ops. This is + guaranteed by meta-data locks or transactional locks, or + dict_sys.latch (X-lock in DROP, S-lock in purge). */ + ut_ad(!space->is_stopping() + || space->is_being_truncated /* fil_truncate_prepare() */ + || space->referenced()); +} +#endif /* UNIV_DEBUG */ + +/** Write a FILE_MODIFY record for a persistent tablespace. +@param[in] space tablespace +@param[in,out] mtr mini-transaction */ +static +void +fil_names_write( + const fil_space_t* space, + mtr_t* mtr) +{ + ut_ad(UT_LIST_GET_LEN(space->chain) == 1); + fil_name_write(space->id, UT_LIST_GET_FIRST(space->chain)->name, mtr); +} + +/** Note that a non-predefined persistent tablespace has been modified +by redo log. +@param[in,out] space tablespace */ +void +fil_names_dirty( + fil_space_t* space) +{ + mysql_mutex_assert_owner(&log_sys.mutex); + ut_ad(recv_recovery_is_on()); + ut_ad(log_sys.get_lsn() != 0); + ut_ad(space->max_lsn == 0); + ut_d(fil_space_validate_for_mtr_commit(space)); + + UT_LIST_ADD_LAST(fil_system.named_spaces, space); + space->max_lsn = log_sys.get_lsn(); +} + +/** Write FILE_MODIFY records when a non-predefined persistent +tablespace was modified for the first time since the latest +fil_names_clear(). +@param[in,out] space tablespace */ +void fil_names_dirty_and_write(fil_space_t* space) +{ + mysql_mutex_assert_owner(&log_sys.mutex); + ut_d(fil_space_validate_for_mtr_commit(space)); + ut_ad(space->max_lsn == log_sys.get_lsn()); + + UT_LIST_ADD_LAST(fil_system.named_spaces, space); + mtr_t mtr; + mtr.start(); + fil_names_write(space, &mtr); + + DBUG_EXECUTE_IF("fil_names_write_bogus", + { + char bogus_name[] = "./test/bogus file.ibd"; + os_normalize_path(bogus_name); + fil_name_write( + SRV_SPACE_ID_UPPER_BOUND, + bogus_name, &mtr); + }); + + mtr.commit_files(); +} + +/** On a log checkpoint, reset fil_names_dirty_and_write() flags +and write out FILE_MODIFY and FILE_CHECKPOINT if needed. +@param[in] lsn checkpoint LSN +@param[in] do_write whether to always write FILE_CHECKPOINT +@return whether anything was written to the redo log +@retval false if no flags were set and nothing written +@retval true if anything was written to the redo log */ +bool +fil_names_clear( + lsn_t lsn, + bool do_write) +{ + mtr_t mtr; + ulint mtr_checkpoint_size = RECV_SCAN_SIZE - 1; + + DBUG_EXECUTE_IF( + "increase_mtr_checkpoint_size", + mtr_checkpoint_size = 75 * 1024; + ); + + mysql_mutex_assert_owner(&log_sys.mutex); + ut_ad(lsn); + + mtr.start(); + + for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.named_spaces); + space != NULL; ) { + if (mtr.get_log()->size() + + (3 + 5 + 1) + strlen(space->chain.start->name) + >= mtr_checkpoint_size) { + /* Prevent log parse buffer overflow */ + mtr.commit_files(); + mtr.start(); + } + + fil_space_t* next = UT_LIST_GET_NEXT(named_spaces, space); + + ut_ad(space->max_lsn > 0); + if (space->max_lsn < lsn) { + /* The tablespace was last dirtied before the + checkpoint LSN. Remove it from the list, so + that if the tablespace is not going to be + modified any more, subsequent checkpoints will + avoid calling fil_names_write() on it. */ + space->max_lsn = 0; + UT_LIST_REMOVE(fil_system.named_spaces, space); + } + + /* max_lsn is the last LSN where fil_names_dirty_and_write() + was called. If we kept track of "min_lsn" (the first LSN + where max_lsn turned nonzero), we could avoid the + fil_names_write() call if min_lsn > lsn. */ + + fil_names_write(space, &mtr); + do_write = true; + + space = next; + } + + if (do_write) { + mtr.commit_files(lsn); + } else { + ut_ad(!mtr.has_modifications()); + } + + return(do_write); +} + +/* Unit Tests */ +#ifdef UNIV_ENABLE_UNIT_TEST_MAKE_FILEPATH +#define MF fil_make_filepath +#define DISPLAY ib::info() << path +void +test_make_filepath() +{ + char* path; + const char* long_path = + "this/is/a/very/long/path/including/a/very/" + "looooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooooo" + "oooooooooooooooooooooooooooooooooooooooooooooooong" + "/folder/name"; + path = MF("/this/is/a/path/with/a/filename", NULL, IBD, false); DISPLAY; + path = MF("/this/is/a/path/with/a/filename", NULL, ISL, false); DISPLAY; + path = MF("/this/is/a/path/with/a/filename", NULL, CFG, false); DISPLAY; + path = MF("/this/is/a/path/with/a/filename.ibd", NULL, IBD, false); DISPLAY; + path = MF("/this/is/a/path/with/a/filename.ibd", NULL, IBD, false); DISPLAY; + path = MF("/this/is/a/path/with/a/filename.dat", NULL, IBD, false); DISPLAY; + path = MF(NULL, "tablespacename", NO_EXT, false); DISPLAY; + path = MF(NULL, "tablespacename", IBD, false); DISPLAY; + path = MF(NULL, "dbname/tablespacename", NO_EXT, false); DISPLAY; + path = MF(NULL, "dbname/tablespacename", IBD, false); DISPLAY; + path = MF(NULL, "dbname/tablespacename", ISL, false); DISPLAY; + path = MF(NULL, "dbname/tablespacename", CFG, false); DISPLAY; + path = MF(NULL, "dbname\\tablespacename", NO_EXT, false); DISPLAY; + path = MF(NULL, "dbname\\tablespacename", IBD, false); DISPLAY; + path = MF("/this/is/a/path", "dbname/tablespacename", IBD, false); DISPLAY; + path = MF("/this/is/a/path", "dbname/tablespacename", IBD, true); DISPLAY; + path = MF("./this/is/a/path", "dbname/tablespacename.ibd", IBD, true); DISPLAY; + path = MF("this\\is\\a\\path", "dbname/tablespacename", IBD, true); DISPLAY; + path = MF("/this/is/a/path", "dbname\\tablespacename", IBD, true); DISPLAY; + path = MF(long_path, NULL, IBD, false); DISPLAY; + path = MF(long_path, "tablespacename", IBD, false); DISPLAY; + path = MF(long_path, "tablespacename", IBD, true); DISPLAY; +} +#endif /* UNIV_ENABLE_UNIT_TEST_MAKE_FILEPATH */ +/* @} */ + +/** Determine the block size of the data file. +@param[in] space tablespace +@param[in] offset page number +@return block size */ +UNIV_INTERN +ulint +fil_space_get_block_size(const fil_space_t* space, unsigned offset) +{ + ulint block_size = 512; + + for (fil_node_t* node = UT_LIST_GET_FIRST(space->chain); + node != NULL; + node = UT_LIST_GET_NEXT(chain, node)) { + block_size = node->block_size; + if (node->size > offset) { + ut_ad(node->size <= 0xFFFFFFFFU); + break; + } + offset -= static_cast<unsigned>(node->size); + } + + /* Currently supporting block size up to 4K, + fall back to default if bigger requested. */ + if (block_size > 4096) { + block_size = 512; + } + + return block_size; +} diff --git a/storage/innobase/fil/fil0pagecompress.cc b/storage/innobase/fil/fil0pagecompress.cc new file mode 100644 index 00000000..909e8092 --- /dev/null +++ b/storage/innobase/fil/fil0pagecompress.cc @@ -0,0 +1,613 @@ +/***************************************************************************** + +Copyright (C) 2013, 2020, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/******************************************************************//** +@file fil/fil0pagecompress.cc +Implementation for page compressed file spaces. + +Created 11/12/2013 Jan Lindström jan.lindstrom@mariadb.com +Updated 14/02/2015 +***********************************************************************/ + +#include "fil0fil.h" +#include "fil0pagecompress.h" + +#include <my_dbug.h> + +#include "mem0mem.h" +#include "hash0hash.h" +#include "os0file.h" +#include "mach0data.h" +#include "buf0buf.h" +#include "buf0flu.h" +#include "log0recv.h" +#include "fsp0fsp.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "mtr0mtr.h" +#include "mtr0log.h" +#include "dict0dict.h" +#include "page0page.h" +#include "page0zip.h" +#include "trx0sys.h" +#include "row0mysql.h" +#include "buf0lru.h" +#include "ibuf0ibuf.h" +#include "sync0sync.h" +#include "zlib.h" +#ifdef __linux__ +#include <linux/fs.h> +#include <sys/ioctl.h> +#include <fcntl.h> +#endif +#include "row0mysql.h" +#ifdef HAVE_LZ4 +#include "lz4.h" +#endif +#ifdef HAVE_LZO +#include "lzo/lzo1x.h" +#endif +#ifdef HAVE_LZMA +#include "lzma.h" +#endif +#ifdef HAVE_BZIP2 +#include "bzlib.h" +#endif +#ifdef HAVE_SNAPPY +#include "snappy-c.h" +#endif + +/** Compress a page for the given compression algorithm. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] header_len header length of the page +@param[in] comp_algo compression algorithm +@param[in] comp_level compression level +@return actual length of compressed page data +@retval 0 if the page was not compressed */ +static ulint fil_page_compress_low( + const byte* buf, + byte* out_buf, + ulint header_len, + ulint comp_algo, + unsigned comp_level) +{ + ulint write_size = srv_page_size - header_len; + + switch (comp_algo) { + default: + ut_ad("unknown compression method" == 0); + /* fall through */ + case PAGE_UNCOMPRESSED: + return 0; + case PAGE_ZLIB_ALGORITHM: + { + ulong len = uLong(write_size); + if (Z_OK == compress2( + out_buf + header_len, &len, buf, + uLong(srv_page_size), int(comp_level))) { + return len; + } + } + break; +#ifdef HAVE_LZ4 + case PAGE_LZ4_ALGORITHM: +# ifdef HAVE_LZ4_COMPRESS_DEFAULT + write_size = LZ4_compress_default( + reinterpret_cast<const char*>(buf), + reinterpret_cast<char*>(out_buf) + header_len, + int(srv_page_size), int(write_size)); +# else + write_size = LZ4_compress_limitedOutput( + reinterpret_cast<const char*>(buf), + reinterpret_cast<char*>(out_buf) + header_len, + int(srv_page_size), int(write_size)); +# endif + + return write_size; +#endif /* HAVE_LZ4 */ +#ifdef HAVE_LZO + case PAGE_LZO_ALGORITHM: { + lzo_uint len = write_size; + + if (LZO_E_OK == lzo1x_1_15_compress( + buf, srv_page_size, + out_buf + header_len, &len, + out_buf + srv_page_size) + && len <= write_size) { + return len; + } + break; + } +#endif /* HAVE_LZO */ +#ifdef HAVE_LZMA + case PAGE_LZMA_ALGORITHM: { + size_t out_pos = 0; + + if (LZMA_OK == lzma_easy_buffer_encode( + comp_level, LZMA_CHECK_NONE, NULL, + buf, srv_page_size, out_buf + header_len, + &out_pos, write_size) + && out_pos <= write_size) { + return out_pos; + } + break; + } +#endif /* HAVE_LZMA */ + +#ifdef HAVE_BZIP2 + case PAGE_BZIP2_ALGORITHM: { + unsigned len = unsigned(write_size); + if (BZ_OK == BZ2_bzBuffToBuffCompress( + reinterpret_cast<char*>(out_buf + header_len), + &len, + const_cast<char*>( + reinterpret_cast<const char*>(buf)), + unsigned(srv_page_size), 1, 0, 0) + && len <= write_size) { + return len; + } + break; + } +#endif /* HAVE_BZIP2 */ + +#ifdef HAVE_SNAPPY + case PAGE_SNAPPY_ALGORITHM: { + size_t len = snappy_max_compressed_length(srv_page_size); + + if (SNAPPY_OK == snappy_compress( + reinterpret_cast<const char*>(buf), + srv_page_size, + reinterpret_cast<char*>(out_buf) + header_len, + &len) + && len <= write_size) { + return len; + } + break; + } +#endif /* HAVE_SNAPPY */ + } + + return 0; +} + +/** Compress a page_compressed page for full crc32 format. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] flags tablespace flags +@param[in] block_size file system block size +@return actual length of compressed page +@retval 0 if the page was not compressed */ +static ulint fil_page_compress_for_full_crc32( + const byte* buf, + byte* out_buf, + ulint flags, + ulint block_size, + bool encrypted) +{ + ulint comp_level = FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags); + + if (comp_level == 0) { + comp_level = page_zip_level; + } + + const ulint header_len = FIL_PAGE_COMP_ALGO; + + ulint write_size = fil_page_compress_low( + buf, out_buf, header_len, + fil_space_t::get_compression_algo(flags), + static_cast<unsigned>(comp_level)); + + if (write_size == 0) { +fail: + srv_stats.pages_page_compression_error.inc(); + return 0; + } + + write_size += header_len; + const ulint actual_size = write_size; + /* Write the actual length of the data & page type + for full crc32 format. */ + const bool lsb = fil_space_t::full_crc32_page_compressed_len(flags); + /* In the MSB, store the rounded-up page size. */ + write_size = (write_size + lsb + (4 + 255)) & ~255; + if (write_size >= srv_page_size) { + goto fail; + } + + /* Set up the page header */ + memcpy(out_buf, buf, header_len); + out_buf[FIL_PAGE_TYPE] = 1U << (FIL_PAGE_COMPRESS_FCRC32_MARKER - 8); + out_buf[FIL_PAGE_TYPE + 1] = byte(write_size >> 8); + /* Clean up the buffer for the remaining write_size (except checksum) */ + memset(out_buf + actual_size, 0, write_size - actual_size - 4); + if (lsb) { + /* Store the LSB */ + out_buf[write_size - 5] = byte(actual_size + (1 + 4)); + } + + if (!block_size) { + block_size = 512; + } + + ut_ad(write_size); + if (write_size & (block_size - 1)) { + size_t tmp = write_size; + write_size = (write_size + (block_size - 1)) + & ~(block_size - 1); + memset(out_buf + tmp, 0, write_size - tmp); + } + + srv_stats.page_compression_saved.add(srv_page_size - write_size); + srv_stats.pages_page_compressed.inc(); + + return write_size; +} + +/** Compress a page_compressed page for non full crc32 format. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] flags tablespace flags +@param[in] block_size file system block size +@param[in] encrypted whether the page will be subsequently encrypted +@return actual length of compressed page +@retval 0 if the page was not compressed */ +static ulint fil_page_compress_for_non_full_crc32( + const byte* buf, + byte* out_buf, + ulint flags, + ulint block_size, + bool encrypted) +{ + uint comp_level = static_cast<uint>( + FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags)); + ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN; + /* Cache to avoid change during function execution */ + ulint comp_algo = innodb_compression_algorithm; + + if (encrypted) { + header_len += FIL_PAGE_ENCRYPT_COMP_ALGO; + } + + /* If no compression level was provided to this table, use system + default level */ + if (comp_level == 0) { + comp_level = page_zip_level; + } + + ulint write_size = fil_page_compress_low( + buf, out_buf, + header_len, comp_algo, comp_level); + + if (write_size == 0) { + srv_stats.pages_page_compression_error.inc(); + return 0; + } + + /* Set up the page header */ + memcpy(out_buf, buf, FIL_PAGE_DATA); + /* Set up the checksum */ + mach_write_to_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC); + + /* Set up the compression algorithm */ + mach_write_to_8(out_buf + FIL_PAGE_COMP_ALGO, comp_algo); + + if (encrypted) { + /* Set up the correct page type */ + mach_write_to_2(out_buf + FIL_PAGE_TYPE, + FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); + + mach_write_to_2(out_buf + FIL_PAGE_DATA + + FIL_PAGE_ENCRYPT_COMP_ALGO, comp_algo); + } else { + /* Set up the correct page type */ + mach_write_to_2(out_buf + FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED); + } + + /* Set up the actual payload lenght */ + mach_write_to_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE, + write_size); + + ut_ad(mach_read_from_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM) + == BUF_NO_CHECKSUM_MAGIC); + + ut_ad(mach_read_from_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE) + == write_size); + +#ifdef UNIV_DEBUG + bool is_compressed = (mach_read_from_8(out_buf + FIL_PAGE_COMP_ALGO) + == (ulint) comp_algo); + + bool is_encrypted_compressed = + (mach_read_from_2(out_buf + FIL_PAGE_DATA + + FIL_PAGE_ENCRYPT_COMP_ALGO) + == (ulint) comp_algo); +#endif /* UNIV_DEBUG */ + + ut_ad(is_compressed || is_encrypted_compressed); + + write_size+=header_len; + + if (block_size <= 0) { + block_size = 512; + } + + ut_ad(write_size > 0 && block_size > 0); + + /* Actual write needs to be alligned on block size */ + if (write_size % block_size) { + size_t tmp = write_size; + write_size = (size_t)ut_uint64_align_up( + (ib_uint64_t)write_size, block_size); + /* Clean up the end of buffer */ + memset(out_buf+tmp, 0, write_size - tmp); +#ifdef UNIV_DEBUG + ut_a(write_size > 0 && ((write_size % block_size) == 0)); + ut_a(write_size >= tmp); +#endif + } + + srv_stats.page_compression_saved.add(srv_page_size - write_size); + srv_stats.pages_page_compressed.inc(); + + return write_size; +} + +/** Compress a page_compressed page before writing to a data file. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] flags tablespace flags +@param[in] block_size file system block size +@param[in] encrypted whether the page will be subsequently encrypted +@return actual length of compressed page +@retval 0 if the page was not compressed */ +ulint fil_page_compress( + const byte* buf, + byte* out_buf, + ulint flags, + ulint block_size, + bool encrypted) +{ + /* The full_crc32 page_compressed format assumes this. */ + ut_ad(!(block_size & 255)); + ut_ad(ut_is_2pow(block_size)); + + /* Let's not compress file space header or + extent descriptor */ + switch (fil_page_get_type(buf)) { + case 0: + case FIL_PAGE_TYPE_FSP_HDR: + case FIL_PAGE_TYPE_XDES: + case FIL_PAGE_PAGE_COMPRESSED: + return 0; + } + + if (fil_space_t::full_crc32(flags)) { + return fil_page_compress_for_full_crc32( + buf, out_buf, flags, block_size, encrypted); + } + + return fil_page_compress_for_non_full_crc32( + buf, out_buf, flags, block_size, encrypted); +} + +/** Decompress a page that may be subject to page_compressed compression. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@param[in] comp_algo compression algorithm +@param[in] header_len header length of the page +@param[in] actual size actual size of the page +@retval true if the page is decompressed or false */ +static bool fil_page_decompress_low( + byte* tmp_buf, + byte* buf, + ulint comp_algo, + ulint header_len, + ulint actual_size) +{ + switch (comp_algo) { + default: + ib::error() << "Unknown compression algorithm " + << comp_algo; + return false; + case PAGE_ZLIB_ALGORITHM: + { + uLong len = srv_page_size; + return (Z_OK == uncompress(tmp_buf, &len, + buf + header_len, + uLong(actual_size)) + && len == srv_page_size); + } +#ifdef HAVE_LZ4 + case PAGE_LZ4_ALGORITHM: + return LZ4_decompress_safe( + reinterpret_cast<const char*>(buf) + header_len, + reinterpret_cast<char*>(tmp_buf), + static_cast<int>(actual_size), + static_cast<int>(srv_page_size)) == + static_cast<int>(srv_page_size); +#endif /* HAVE_LZ4 */ +#ifdef HAVE_LZO + case PAGE_LZO_ALGORITHM: + { + lzo_uint len_lzo = srv_page_size; + return (LZO_E_OK == lzo1x_decompress_safe( + buf + header_len, + actual_size, tmp_buf, &len_lzo, NULL) + && len_lzo == srv_page_size); + } +#endif /* HAVE_LZO */ +#ifdef HAVE_LZMA + case PAGE_LZMA_ALGORITHM: + { + size_t src_pos = 0; + size_t dst_pos = 0; + uint64_t memlimit = UINT64_MAX; + + return LZMA_OK == lzma_stream_buffer_decode( + &memlimit, 0, NULL, buf + header_len, + &src_pos, actual_size, tmp_buf, &dst_pos, + srv_page_size) + && dst_pos == srv_page_size; + } +#endif /* HAVE_LZMA */ +#ifdef HAVE_BZIP2 + case PAGE_BZIP2_ALGORITHM: + { + uint dst_pos = static_cast<uint>(srv_page_size); + return BZ_OK == BZ2_bzBuffToBuffDecompress( + reinterpret_cast<char*>(tmp_buf), + &dst_pos, + reinterpret_cast<char*>(buf) + header_len, + static_cast<uint>(actual_size), 1, 0) + && dst_pos == srv_page_size; + } +#endif /* HAVE_BZIP2 */ +#ifdef HAVE_SNAPPY + case PAGE_SNAPPY_ALGORITHM: + { + size_t olen = srv_page_size; + + return SNAPPY_OK == snappy_uncompress( + reinterpret_cast<const char*>(buf) + + header_len, + actual_size, + reinterpret_cast<char*>(tmp_buf), &olen) + && olen == srv_page_size; + } +#endif /* HAVE_SNAPPY */ + } + + return false; +} + +/** Decompress a page for full crc32 format. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@param[in] flags tablespace flags +@return size of the compressed data +@retval 0 if decompression failed +@retval srv_page_size if the page was not compressed */ +ulint fil_page_decompress_for_full_crc32(byte* tmp_buf, byte* buf, ulint flags) +{ + ut_ad(fil_space_t::full_crc32(flags)); + bool compressed = false; + size_t size = buf_page_full_crc32_size(buf, &compressed, NULL); + if (!compressed) { + ut_ad(size == srv_page_size); + return size; + } + + if (!fil_space_t::is_compressed(flags)) { + return 0; + } + + if (size >= srv_page_size) { + return 0; + } + + if (fil_space_t::full_crc32_page_compressed_len(flags)) { + compile_time_assert(FIL_PAGE_FCRC32_CHECKSUM == 4); + if (size_t lsb = buf[size - 5]) { + size += lsb - 0x100; + } + size -= 5; + } + + const size_t header_len = FIL_PAGE_COMP_ALGO; + + if (!fil_page_decompress_low(tmp_buf, buf, + fil_space_t::get_compression_algo(flags), + header_len, size - header_len)) { + return 0; + } + + srv_stats.pages_page_decompressed.inc(); + memcpy(buf, tmp_buf, srv_page_size); + return size; +} + +/** Decompress a page for non full crc32 format. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@return size of the compressed data +@retval 0 if decompression failed +@retval srv_page_size if the page was not compressed */ +ulint fil_page_decompress_for_non_full_crc32( + byte* tmp_buf, + byte* buf) +{ + ulint header_len; + uint comp_algo; + switch (fil_page_get_type(buf)) { + case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED: + header_len= FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN; + comp_algo = mach_read_from_2( + FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_ALGO + buf); + break; + case FIL_PAGE_PAGE_COMPRESSED: + header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN; + if (mach_read_from_6(FIL_PAGE_COMP_ALGO + buf)) { + return 0; + } + comp_algo = mach_read_from_2(FIL_PAGE_COMP_ALGO + 6 + buf); + break; + default: + return srv_page_size; + } + + if (mach_read_from_4(buf + FIL_PAGE_SPACE_OR_CHKSUM) + != BUF_NO_CHECKSUM_MAGIC) { + return 0; + } + + ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA + + FIL_PAGE_COMP_SIZE); + + /* Check if payload size is corrupted */ + if (actual_size == 0 || actual_size > srv_page_size - header_len) { + return 0; + } + + if (!fil_page_decompress_low(tmp_buf, buf, comp_algo, header_len, + actual_size)) { + return 0; + } + + srv_stats.pages_page_decompressed.inc(); + memcpy(buf, tmp_buf, srv_page_size); + return actual_size; +} + +/** Decompress a page that may be subject to page_compressed compression. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@return size of the compressed data +@retval 0 if decompression failed +@retval srv_page_size if the page was not compressed */ +ulint fil_page_decompress( + byte* tmp_buf, + byte* buf, + ulint flags) +{ + if (fil_space_t::full_crc32(flags)) { + return fil_page_decompress_for_full_crc32(tmp_buf, buf, flags); + } + + return fil_page_decompress_for_non_full_crc32(tmp_buf, buf); +} |