diff options
Diffstat (limited to 'storage/innobase/btr/btr0bulk.cc')
-rw-r--r-- | storage/innobase/btr/btr0bulk.cc | 1238 |
1 files changed, 1238 insertions, 0 deletions
diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc new file mode 100644 index 00000000..9004064a --- /dev/null +++ b/storage/innobase/btr/btr0bulk.cc @@ -0,0 +1,1238 @@ +/***************************************************************************** + +Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2017, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file btr/btr0bulk.cc +The B-tree bulk load + +Created 03/11/2014 Shaohua Wang +*******************************************************/ + +#include "btr0bulk.h" +#include "btr0btr.h" +#include "btr0cur.h" +#include "btr0pcur.h" +#include "ibuf0ibuf.h" +#include "page0page.h" +#include "trx0trx.h" + +/** Innodb B-tree index fill factor for bulk load. */ +uint innobase_fill_factor; + +/** Initialize members, allocate page if needed and start mtr. +Note: we commit all mtrs on failure. +@return error code. */ +dberr_t +PageBulk::init() +{ + buf_block_t* new_block; + page_t* new_page; + + ut_ad(m_heap == NULL); + m_heap = mem_heap_create(1000); + + m_mtr.start(); + m_index->set_modified(m_mtr); + + if (m_page_no == FIL_NULL) { + mtr_t alloc_mtr; + + /* We commit redo log for allocation by a separate mtr, + because we don't guarantee pages are committed following + the allocation order, and we will always generate redo log + for page allocation, even when creating a new tablespace. */ + alloc_mtr.start(); + m_index->set_modified(alloc_mtr); + + uint32_t n_reserved; + if (!fsp_reserve_free_extents(&n_reserved, + m_index->table->space, + 1, FSP_NORMAL, &alloc_mtr)) { + alloc_mtr.commit(); + m_mtr.commit(); + return(DB_OUT_OF_FILE_SPACE); + } + + /* Allocate a new page. */ + new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level, + &alloc_mtr, &m_mtr); + + m_index->table->space->release_free_extents(n_reserved); + + alloc_mtr.commit(); + + new_page = buf_block_get_frame(new_block); + m_page_no = new_block->page.id().page_no(); + + byte* index_id = my_assume_aligned<2> + (PAGE_HEADER + PAGE_INDEX_ID + new_page); + compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4); + compile_time_assert(FIL_NULL == 0xffffffff); + memset_aligned<8>(new_page + FIL_PAGE_PREV, 0xff, 8); + + if (UNIV_LIKELY_NULL(new_block->page.zip.data)) { + mach_write_to_8(index_id, m_index->id); + page_create_zip(new_block, m_index, m_level, 0, + &m_mtr); + } else { + ut_ad(!m_index->is_spatial()); + page_create(new_block, &m_mtr, + m_index->table->not_redundant()); + m_mtr.memset(*new_block, FIL_PAGE_PREV, 8, 0xff); + m_mtr.write<2,mtr_t::MAYBE_NOP>(*new_block, PAGE_HEADER + + PAGE_LEVEL + + new_page, m_level); + m_mtr.write<8>(*new_block, index_id, m_index->id); + } + } else { + new_block = btr_block_get(*m_index, m_page_no, RW_X_LATCH, + false, &m_mtr); + + new_page = buf_block_get_frame(new_block); + ut_ad(new_block->page.id().page_no() == m_page_no); + + ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW); + + btr_page_set_level(new_block, m_level, &m_mtr); + } + + m_page_zip = buf_block_get_page_zip(new_block); + + if (!m_level && dict_index_is_sec_or_ibuf(m_index)) { + page_update_max_trx_id(new_block, m_page_zip, m_trx_id, + &m_mtr); + } + + m_block = new_block; + m_page = new_page; + m_cur_rec = page_get_infimum_rec(new_page); + ut_ad(m_is_comp == !!page_is_comp(new_page)); + m_free_space = page_get_free_space_of_empty(m_is_comp); + + if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) { + /* Keep default behavior compatible with 5.6 */ + m_reserved_space = dict_index_get_space_reserve(); + } else { + m_reserved_space = + srv_page_size * (100 - innobase_fill_factor) / 100; + } + + m_padding_space = + srv_page_size - dict_index_zip_pad_optimal_page_size(m_index); + m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP); + m_rec_no = page_header_get_field(new_page, PAGE_N_RECS); + /* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0, + without writing redo log, to ensure that needs_finish() will hold + on an empty page. */ + ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION); + m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0; + ut_d(m_total_data = 0); + + return(DB_SUCCESS); +} + +/** Insert a record in the page. +@tparam fmt the page format +@param[in,out] rec record +@param[in] offsets record offsets */ +template<PageBulk::format fmt> +inline void PageBulk::insertPage(rec_t *rec, rec_offs *offsets) +{ + ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); + ut_ad((fmt != REDUNDANT) == m_is_comp); + ut_ad(page_align(m_heap_top) == m_page); + ut_ad(m_heap); + + const ulint rec_size= rec_offs_size(offsets); + const ulint extra_size= rec_offs_extra_size(offsets); + ut_ad(page_align(m_heap_top + rec_size) == m_page); + ut_d(const bool is_leaf= page_rec_is_leaf(m_cur_rec)); + +#ifdef UNIV_DEBUG + /* Check whether records are in order. */ + if (page_offset(m_cur_rec) != + (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM)) + { + const rec_t *old_rec = m_cur_rec; + rec_offs *old_offsets= rec_get_offsets(old_rec, m_index, nullptr, is_leaf + ? m_index->n_core_fields : 0, + ULINT_UNDEFINED, &m_heap); + ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index) > 0); + } + + m_total_data+= rec_size; +#endif /* UNIV_DEBUG */ + + rec_t* const insert_rec= m_heap_top + extra_size; + + /* Insert the record in the linked list. */ + if (fmt != REDUNDANT) + { + const rec_t *next_rec= m_page + + page_offset(m_cur_rec + mach_read_from_2(m_cur_rec - REC_NEXT)); + if (fmt != COMPRESSED) + m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, + static_cast<uint16_t>(insert_rec - m_cur_rec)); + else + { + mach_write_to_2(m_cur_rec - REC_NEXT, + static_cast<uint16_t>(insert_rec - m_cur_rec)); + memcpy(m_heap_top, rec - extra_size, rec_size); + } + + rec_t * const this_rec= fmt != COMPRESSED + ? const_cast<rec_t*>(rec) : insert_rec; + rec_set_bit_field_1(this_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK, + REC_N_OWNED_SHIFT); + rec_set_bit_field_2(this_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no, + REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + mach_write_to_2(this_rec - REC_NEXT, + static_cast<uint16_t>(next_rec - insert_rec)); + } + else + { + memcpy(const_cast<rec_t*>(rec) - REC_NEXT, m_cur_rec - REC_NEXT, 2); + m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, page_offset(insert_rec)); + rec_set_bit_field_1(const_cast<rec_t*>(rec), 0, + REC_OLD_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(const_cast<rec_t*>(rec), + PAGE_HEAP_NO_USER_LOW + m_rec_no, + REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } + + if (fmt == COMPRESSED) + /* We already wrote the record. Log is written in PageBulk::compress(). */; + else if (page_offset(m_cur_rec) == + (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM)) + m_mtr.memcpy(*m_block, m_heap_top, rec - extra_size, rec_size); + else + { + /* Try to copy common prefix from the preceding record. */ + const byte *r= rec - extra_size; + const byte * const insert_rec_end= m_heap_top + rec_size; + byte *b= m_heap_top; + + /* Skip any unchanged prefix of the record. */ + for (; * b == *r; b++, r++); + + ut_ad(b < insert_rec_end); + + const byte *c= m_cur_rec - (rec - r); + const byte * const c_end= std::min(m_cur_rec + rec_offs_data_size(offsets), + m_heap_top); + + /* Try to copy any bytes of the preceding record. */ + if (UNIV_LIKELY(c >= m_page && c < c_end)) + { + const byte *cm= c; + byte *bm= b; + const byte *rm= r; + for (; cm < c_end && *rm == *cm; cm++, bm++, rm++); + ut_ad(bm <= insert_rec_end); + size_t len= static_cast<size_t>(rm - r); + ut_ad(!memcmp(r, c, len)); + if (len > 2) + { + memcpy(b, c, len); + m_mtr.memmove(*m_block, page_offset(b), page_offset(c), len); + c= cm; + b= bm; + r= rm; + } + } + + if (c < m_cur_rec) + { + if (!rec_offs_data_size(offsets)) + { +no_data: + m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c); + goto rec_done; + } + /* Some header bytes differ. Compare the data separately. */ + const byte *cd= m_cur_rec; + byte *bd= insert_rec; + const byte *rd= rec; + /* Skip any unchanged prefix of the record. */ + for (;; cd++, bd++, rd++) + if (bd == insert_rec_end) + goto no_data; + else if (*bd != *rd) + break; + + /* Try to copy any data bytes of the preceding record. */ + if (c_end - cd > 2) + { + const byte *cdm= cd; + const byte *rdm= rd; + for (; cdm < c_end && *rdm == *cdm; cdm++, rdm++) + ut_ad(rdm - rd + bd <= insert_rec_end); + size_t len= static_cast<size_t>(rdm - rd); + ut_ad(!memcmp(rd, cd, len)); + if (len > 2) + { + m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c); + memcpy(bd, cd, len); + m_mtr.memmove(*m_block, page_offset(bd), page_offset(cd), len); + c= cdm; + b= rdm - rd + bd; + r= rdm; + } + } + } + + if (size_t len= static_cast<size_t>(insert_rec_end - b)) + m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, len); + } + +rec_done: + ut_ad(fmt == COMPRESSED || !memcmp(m_heap_top, rec - extra_size, rec_size)); + rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets); + + /* Update the member variables. */ + ulint slot_size= page_dir_calc_reserved_space(m_rec_no + 1) - + page_dir_calc_reserved_space(m_rec_no); + + ut_ad(m_free_space >= rec_size + slot_size); + ut_ad(m_heap_top + rec_size < m_page + srv_page_size); + + m_free_space-= rec_size + slot_size; + m_heap_top+= rec_size; + m_rec_no++; + m_cur_rec= insert_rec; +} + +/** Insert a record in the page. +@param[in] rec record +@param[in] offsets record offsets */ +inline void PageBulk::insert(const rec_t *rec, rec_offs *offsets) +{ + byte rec_hdr[REC_N_OLD_EXTRA_BYTES]; + static_assert(REC_N_OLD_EXTRA_BYTES > REC_N_NEW_EXTRA_BYTES, "file format"); + + if (UNIV_LIKELY_NULL(m_page_zip)) + insertPage<COMPRESSED>(const_cast<rec_t*>(rec), offsets); + else if (m_is_comp) + { + memcpy(rec_hdr, rec - REC_N_NEW_EXTRA_BYTES, REC_N_NEW_EXTRA_BYTES); + insertPage<DYNAMIC>(const_cast<rec_t*>(rec), offsets); + memcpy(const_cast<rec_t*>(rec) - REC_N_NEW_EXTRA_BYTES, rec_hdr, + REC_N_NEW_EXTRA_BYTES); + } + else + { + memcpy(rec_hdr, rec - REC_N_OLD_EXTRA_BYTES, REC_N_OLD_EXTRA_BYTES); + insertPage<REDUNDANT>(const_cast<rec_t*>(rec), offsets); + memcpy(const_cast<rec_t*>(rec) - REC_N_OLD_EXTRA_BYTES, rec_hdr, + REC_N_OLD_EXTRA_BYTES); + } +} + +/** Set the number of owned records in the uncompressed page of +a ROW_FORMAT=COMPRESSED record without redo-logging. */ +static void rec_set_n_owned_zip(rec_t *rec, ulint n_owned) +{ + rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); +} + +/** Mark end of insertion to the page. Scan all records to set page dirs, +and set page header members. +@tparam fmt page format */ +template<PageBulk::format fmt> +inline void PageBulk::finishPage() +{ + ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); + ut_ad((fmt != REDUNDANT) == m_is_comp); + + ulint count= 0; + ulint n_recs= 0; + byte *slot= my_assume_aligned<2>(m_page + srv_page_size - + (PAGE_DIR + PAGE_DIR_SLOT_SIZE)); + const page_dir_slot_t *const slot0 = slot; + compile_time_assert(PAGE_DIR_SLOT_SIZE == 2); + if (fmt != REDUNDANT) + { + uint16_t offset= mach_read_from_2(PAGE_NEW_INFIMUM - REC_NEXT + m_page); + ut_ad(offset >= PAGE_NEW_SUPREMUM - PAGE_NEW_INFIMUM); + offset= static_cast<uint16_t>(offset + PAGE_NEW_INFIMUM); + /* Set owner & dir. */ + while (offset != PAGE_NEW_SUPREMUM) + { + ut_ad(offset >= PAGE_NEW_SUPREMUM); + ut_ad(offset < page_offset(slot)); + count++; + n_recs++; + + if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) + { + slot-= PAGE_DIR_SLOT_SIZE; + mach_write_to_2(slot, offset); + + if (fmt != COMPRESSED) + page_rec_set_n_owned<false>(m_block, m_page + offset, count, true, + &m_mtr); + else + rec_set_n_owned_zip(m_page + offset, count); + + count= 0; + } + + uint16_t next= static_cast<uint16_t> + ((mach_read_from_2(m_page + offset - REC_NEXT) + offset) & + (srv_page_size - 1)); + ut_ad(next); + offset= next; + } + + if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <= + PAGE_DIR_SLOT_MAX_N_OWNED)) + { + /* Merge the last two slots, like page_cur_insert_rec_low() does. */ + count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; + + rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot)); + if (fmt != COMPRESSED) + page_rec_set_n_owned<false>(m_block, rec, 0, true, &m_mtr); + else + rec_set_n_owned_zip(rec, 0); + } + else + slot-= PAGE_DIR_SLOT_SIZE; + + mach_write_to_2(slot, PAGE_NEW_SUPREMUM); + if (fmt != COMPRESSED) + page_rec_set_n_owned<false>(m_block, m_page + PAGE_NEW_SUPREMUM, + count + 1, true, &m_mtr); + else + rec_set_n_owned_zip(m_page + PAGE_NEW_SUPREMUM, count + 1); + } + else + { + rec_t *insert_rec= m_page + + mach_read_from_2(PAGE_OLD_INFIMUM - REC_NEXT + m_page); + + /* Set owner & dir. */ + while (insert_rec != m_page + PAGE_OLD_SUPREMUM) + { + count++; + n_recs++; + + if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) + { + slot-= PAGE_DIR_SLOT_SIZE; + mach_write_to_2(slot, page_offset(insert_rec)); + page_rec_set_n_owned<false>(m_block, insert_rec, count, false, &m_mtr); + count= 0; + } + + insert_rec= m_page + mach_read_from_2(insert_rec - REC_NEXT); + } + + if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <= + PAGE_DIR_SLOT_MAX_N_OWNED)) + { + /* Merge the last two slots, like page_cur_insert_rec_low() does. */ + count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; + + rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot)); + page_rec_set_n_owned<false>(m_block, rec, 0, false, &m_mtr); + } + else + slot-= PAGE_DIR_SLOT_SIZE; + + mach_write_to_2(slot, PAGE_OLD_SUPREMUM); + page_rec_set_n_owned<false>(m_block, m_page + PAGE_OLD_SUPREMUM, count + 1, + false, &m_mtr); + } + + if (!m_rec_no); + else if (fmt != COMPRESSED) + { + static_assert(PAGE_N_DIR_SLOTS == 0, "compatibility"); + alignas(8) byte page_header[PAGE_N_HEAP + 2]; + mach_write_to_2(page_header + PAGE_N_DIR_SLOTS, + 1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE); + mach_write_to_2(page_header + PAGE_HEAP_TOP, m_heap_top - m_page); + mach_write_to_2(page_header + PAGE_N_HEAP, + (PAGE_HEAP_NO_USER_LOW + m_rec_no) | + uint16_t{fmt != REDUNDANT} << 15); + m_mtr.memcpy(*m_block, PAGE_HEADER + m_page, page_header, + sizeof page_header); + m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no); + m_mtr.memcpy(*m_block, page_offset(slot), slot0 - slot); + } + else + { + /* For ROW_FORMAT=COMPRESSED, redo log may be written in + PageBulk::compress(). */ + mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page, + 1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE); + mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page, + static_cast<ulint>(m_heap_top - m_page)); + mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page, + (PAGE_HEAP_NO_USER_LOW + m_rec_no) | 1U << 15); + mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no); + } +} + +inline bool PageBulk::needs_finish() const +{ + ut_ad(page_align(m_cur_rec) == m_block->frame); + ut_ad(m_page == m_block->frame); + if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B]) + return true; + ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP); + ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW); + if (n_heap & 0x8000) + { + n_heap&= 0x7fff; + heap_no= rec_get_heap_no_new(m_cur_rec); + if (heap_no == PAGE_HEAP_NO_INFIMUM && + page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END) + return false; + } + else + { + heap_no= rec_get_heap_no_old(m_cur_rec); + if (heap_no == PAGE_HEAP_NO_INFIMUM && + page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END) + return false; + } + return heap_no != n_heap - 1; +} + +/** Mark end of insertion to the page. Scan all records to set page dirs, +and set page header members. +@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED */ +inline void PageBulk::finish() +{ + ut_ad(!m_index->is_spatial()); + + if (!needs_finish()); + else if (UNIV_LIKELY_NULL(m_page_zip)) + finishPage<COMPRESSED>(); + else if (m_is_comp) + finishPage<DYNAMIC>(); + else + finishPage<REDUNDANT>(); + + /* In MariaDB 10.2, 10.3, 10.4, we would initialize + PAGE_DIRECTION_B, PAGE_N_DIRECTION, PAGE_LAST_INSERT + in the same way as we would during normal INSERT operations. + Starting with MariaDB Server 10.5, bulk insert will not + touch those fields. */ + ut_ad(!m_page[PAGE_HEADER + PAGE_INSTANT]); + /* Restore the temporary change of PageBulk::init() that was necessary to + ensure that PageBulk::needs_finish() holds on an empty page. */ + m_page[PAGE_HEADER + PAGE_DIRECTION_B]= PAGE_NO_DIRECTION; + + ut_ad(!page_header_get_field(m_page, PAGE_FREE)); + ut_ad(!page_header_get_field(m_page, PAGE_GARBAGE)); + ut_ad(!page_header_get_field(m_page, PAGE_LAST_INSERT)); + ut_ad(!page_header_get_field(m_page, PAGE_N_DIRECTION)); + ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no) <= + page_get_free_space_of_empty(m_is_comp)); + ut_ad(!needs_finish()); + ut_ad(page_validate(m_page, m_index)); +} + +/** Commit inserts done to the page +@param[in] success Flag whether all inserts succeed. */ +void PageBulk::commit(bool success) +{ + finish(); + if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page)) + ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100); + m_mtr.commit(); +} + +/** Compress a page of compressed table +@return true compress successfully or no need to compress +@return false compress failed. */ +bool +PageBulk::compress() +{ + ut_ad(m_page_zip != NULL); + + return page_zip_compress(m_block, m_index, page_zip_level, &m_mtr); +} + +/** Get node pointer +@return node pointer */ +dtuple_t* +PageBulk::getNodePtr() +{ + rec_t* first_rec; + dtuple_t* node_ptr; + + /* Create node pointer */ + first_rec = page_rec_get_next(page_get_infimum_rec(m_page)); + ut_a(page_rec_is_user_rec(first_rec)); + node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no, + m_heap, m_level); + + return(node_ptr); +} + +/** Get split rec in left page.We split a page in half when compresssion fails, +and the split rec will be copied to right page. +@return split rec */ +rec_t* +PageBulk::getSplitRec() +{ + rec_t* rec; + rec_offs* offsets; + ulint total_used_size; + ulint total_recs_size; + ulint n_recs; + + ut_ad(m_page_zip != NULL); + ut_ad(m_rec_no >= 2); + ut_ad(!m_index->is_instant()); + + ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space); + total_used_size = page_get_free_space_of_empty(m_is_comp) + - m_free_space; + + total_recs_size = 0; + n_recs = 0; + offsets = NULL; + rec = page_get_infimum_rec(m_page); + const ulint n_core = page_is_leaf(m_page) ? m_index->n_core_fields : 0; + + do { + rec = page_rec_get_next(rec); + ut_ad(page_rec_is_user_rec(rec)); + + offsets = rec_get_offsets(rec, m_index, offsets, n_core, + ULINT_UNDEFINED, &m_heap); + total_recs_size += rec_offs_size(offsets); + n_recs++; + } while (total_recs_size + page_dir_calc_reserved_space(n_recs) + < total_used_size / 2); + + /* Keep at least one record on left page */ + if (page_rec_is_infimum(page_rec_get_prev(rec))) { + rec = page_rec_get_next(rec); + ut_ad(page_rec_is_user_rec(rec)); + } + + return(rec); +} + +/** Copy all records after split rec including itself. +@param[in] rec split rec */ +void +PageBulk::copyIn( + rec_t* split_rec) +{ + + rec_t* rec = split_rec; + rec_offs* offsets = NULL; + + ut_ad(m_rec_no == 0); + ut_ad(page_rec_is_user_rec(rec)); + + const ulint n_core = page_rec_is_leaf(rec) + ? m_index->n_core_fields : 0; + + do { + offsets = rec_get_offsets(rec, m_index, offsets, n_core, + ULINT_UNDEFINED, &m_heap); + + insert(rec, offsets); + + rec = page_rec_get_next(rec); + } while (!page_rec_is_supremum(rec)); + + ut_ad(m_rec_no > 0); +} + +/** Remove all records after split rec including itself. +@param[in] rec split rec */ +void +PageBulk::copyOut( + rec_t* split_rec) +{ + rec_t* rec; + rec_t* last_rec; + ulint n; + + /* Suppose before copyOut, we have 5 records on the page: + infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec. + + after copyOut, we have 2 records on the page: + infimum->r1->r2->supremum. slot ajustment is not done. */ + + rec = page_rec_get_next(page_get_infimum_rec(m_page)); + last_rec = page_rec_get_prev(page_get_supremum_rec(m_page)); + n = 0; + + while (rec != split_rec) { + rec = page_rec_get_next(rec); + n++; + } + + ut_ad(n > 0); + + /* Set last record's next in page */ + rec_offs* offsets = NULL; + rec = page_rec_get_prev(split_rec); + const ulint n_core = page_rec_is_leaf(split_rec) + ? m_index->n_core_fields : 0; + + offsets = rec_get_offsets(rec, m_index, offsets, n_core, + ULINT_UNDEFINED, &m_heap); + mach_write_to_2(rec - REC_NEXT, m_is_comp + ? static_cast<uint16_t> + (PAGE_NEW_SUPREMUM - page_offset(rec)) + : PAGE_OLD_SUPREMUM); + + /* Set related members */ + m_cur_rec = rec; + m_heap_top = rec_get_end(rec, offsets); + + offsets = rec_get_offsets(last_rec, m_index, offsets, n_core, + ULINT_UNDEFINED, &m_heap); + + m_free_space += ulint(rec_get_end(last_rec, offsets) - m_heap_top) + + page_dir_calc_reserved_space(m_rec_no) + - page_dir_calc_reserved_space(n); + ut_ad(lint(m_free_space) > 0); + m_rec_no = n; + +#ifdef UNIV_DEBUG + m_total_data -= ulint(rec_get_end(last_rec, offsets) - m_heap_top); +#endif /* UNIV_DEBUG */ +} + +/** Set next page +@param[in] next_page_no next page no */ +inline void PageBulk::setNext(ulint next_page_no) +{ + if (UNIV_LIKELY_NULL(m_page_zip)) + /* For ROW_FORMAT=COMPRESSED, redo log may be written + in PageBulk::compress(). */ + mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no); + else + m_mtr.write<4>(*m_block, m_page + FIL_PAGE_NEXT, next_page_no); +} + +/** Set previous page +@param[in] prev_page_no previous page no */ +inline void PageBulk::setPrev(ulint prev_page_no) +{ + if (UNIV_LIKELY_NULL(m_page_zip)) + /* For ROW_FORMAT=COMPRESSED, redo log may be written + in PageBulk::compress(). */ + mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no); + else + m_mtr.write<4>(*m_block, m_page + FIL_PAGE_PREV, prev_page_no); +} + +/** Check if required space is available in the page for the rec to be inserted. +We check fill factor & padding here. +@param[in] length required length +@return true if space is available */ +bool +PageBulk::isSpaceAvailable( + ulint rec_size) +{ + ulint slot_size; + ulint required_space; + + slot_size = page_dir_calc_reserved_space(m_rec_no + 1) + - page_dir_calc_reserved_space(m_rec_no); + + required_space = rec_size + slot_size; + + if (required_space > m_free_space) { + ut_ad(m_rec_no > 0); + return false; + } + + /* Fillfactor & Padding apply to both leaf and non-leaf pages. + Note: we keep at least 2 records in a page to avoid B-tree level + growing too high. */ + if (m_rec_no >= 2 + && ((m_page_zip == NULL && m_free_space - required_space + < m_reserved_space) + || (m_page_zip != NULL && m_free_space - required_space + < m_padding_space))) { + return(false); + } + + return(true); +} + +/** Check whether the record needs to be stored externally. +@return false if the entire record can be stored locally on the page */ +bool +PageBulk::needExt( + const dtuple_t* tuple, + ulint rec_size) +{ + return page_zip_rec_needs_ext(rec_size, m_is_comp, + dtuple_get_n_fields(tuple), + m_block->zip_size()); +} + +/** Store external record +Since the record is not logged yet, so we don't log update to the record. +the blob data is logged first, then the record is logged in bulk mode. +@param[in] big_rec external recrod +@param[in] offsets record offsets +@return error code */ +dberr_t +PageBulk::storeExt( + const big_rec_t* big_rec, + rec_offs* offsets) +{ + finish(); + + /* Note: not all fields are initialized in btr_pcur. */ + btr_pcur_t btr_pcur; + btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED; + btr_pcur.latch_mode = BTR_MODIFY_LEAF; + btr_pcur.btr_cur.index = m_index; + btr_pcur.btr_cur.page_cur.index = m_index; + btr_pcur.btr_cur.page_cur.rec = m_cur_rec; + btr_pcur.btr_cur.page_cur.offsets = offsets; + btr_pcur.btr_cur.page_cur.block = m_block; + + dberr_t err = btr_store_big_rec_extern_fields( + &btr_pcur, offsets, big_rec, &m_mtr, BTR_STORE_INSERT_BULK); + + /* Reset m_block and m_cur_rec from page cursor, because + block may be changed during blob insert. (FIXME: Can it really?) */ + ut_ad(m_block == btr_pcur.btr_cur.page_cur.block); + + m_block = btr_pcur.btr_cur.page_cur.block; + m_cur_rec = btr_pcur.btr_cur.page_cur.rec; + m_page = buf_block_get_frame(m_block); + + return(err); +} + +/** Release block by commiting mtr +Note: log_free_check requires holding no lock/latch in current thread. */ +void +PageBulk::release() +{ + finish(); + + /* We fix the block because we will re-pin it soon. */ + buf_block_buf_fix_inc(m_block, __FILE__, __LINE__); + + /* No other threads can modify this block. */ + m_modify_clock = buf_block_get_modify_clock(m_block); + + m_mtr.commit(); +} + +/** Start mtr and latch the block */ +dberr_t +PageBulk::latch() +{ + m_mtr.start(); + m_index->set_modified(m_mtr); + + ut_ad(m_block->page.buf_fix_count()); + + /* In case the block is S-latched by page_cleaner. */ + if (!buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock, + __FILE__, __LINE__, &m_mtr)) { + m_block = buf_page_get_gen(page_id_t(m_index->table->space_id, + m_page_no), + 0, RW_X_LATCH, + m_block, BUF_GET_IF_IN_POOL, + __FILE__, __LINE__, &m_mtr, &m_err); + + if (m_err != DB_SUCCESS) { + return (m_err); + } + + ut_ad(m_block != NULL); + } + + buf_block_buf_fix_dec(m_block); + + ut_ad(m_block->page.buf_fix_count()); + + ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top); + + return (m_err); +} + +/** Split a page +@param[in] page_bulk page to split +@param[in] next_page_bulk next page +@return error code */ +dberr_t +BtrBulk::pageSplit( + PageBulk* page_bulk, + PageBulk* next_page_bulk) +{ + ut_ad(page_bulk->getPageZip() != NULL); + + if (page_bulk->getRecNo() <= 1) { + return(DB_TOO_BIG_RECORD); + } + + /* Initialize a new page */ + PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL, + page_bulk->getLevel()); + dberr_t err = new_page_bulk.init(); + if (err != DB_SUCCESS) { + return(err); + } + + /* Copy the upper half to the new page. */ + rec_t* split_rec = page_bulk->getSplitRec(); + new_page_bulk.copyIn(split_rec); + page_bulk->copyOut(split_rec); + + /* Commit the pages after split. */ + err = pageCommit(page_bulk, &new_page_bulk, true); + if (err != DB_SUCCESS) { + pageAbort(&new_page_bulk); + return(err); + } + + err = pageCommit(&new_page_bulk, next_page_bulk, true); + if (err != DB_SUCCESS) { + pageAbort(&new_page_bulk); + return(err); + } + + return(err); +} + +/** Commit(finish) a page. We set next/prev page no, compress a page of +compressed table and split the page if compression fails, insert a node +pointer to father page if needed, and commit mini-transaction. +@param[in] page_bulk page to commit +@param[in] next_page_bulk next page +@param[in] insert_father false when page_bulk is a root page and + true when it's a non-root page +@return error code */ +dberr_t +BtrBulk::pageCommit( + PageBulk* page_bulk, + PageBulk* next_page_bulk, + bool insert_father) +{ + page_bulk->finish(); + + /* Set page links */ + if (next_page_bulk != NULL) { + ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel()); + + page_bulk->setNext(next_page_bulk->getPageNo()); + next_page_bulk->setPrev(page_bulk->getPageNo()); + } else { + ut_ad(!page_has_next(page_bulk->getPage())); + /* If a page is released and latched again, we need to + mark it modified in mini-transaction. */ + page_bulk->set_modified(); + } + + ut_ad(!rw_lock_own_flagged(&m_index->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX + | RW_LOCK_FLAG_S)); + + /* Compress page if it's a compressed table. */ + if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) { + return(pageSplit(page_bulk, next_page_bulk)); + } + + /* Insert node pointer to father page. */ + if (insert_father) { + dtuple_t* node_ptr = page_bulk->getNodePtr(); + dberr_t err = insert(node_ptr, page_bulk->getLevel()+1); + + if (err != DB_SUCCESS) { + return(err); + } + } + + /* Commit mtr. */ + page_bulk->commit(true); + + return(DB_SUCCESS); +} + +/** Log free check */ +inline void BtrBulk::logFreeCheck() +{ + if (log_sys.check_flush_or_checkpoint()) { + release(); + + log_check_margins(); + + latch(); + } +} + +/** Release all latches */ +void +BtrBulk::release() +{ + ut_ad(m_root_level + 1 == m_page_bulks.size()); + + for (ulint level = 0; level <= m_root_level; level++) { + PageBulk* page_bulk = m_page_bulks.at(level); + + page_bulk->release(); + } +} + +/** Re-latch all latches */ +void +BtrBulk::latch() +{ + ut_ad(m_root_level + 1 == m_page_bulks.size()); + + for (ulint level = 0; level <= m_root_level; level++) { + PageBulk* page_bulk = m_page_bulks.at(level); + page_bulk->latch(); + } +} + +/** Insert a tuple to page in a level +@param[in] tuple tuple to insert +@param[in] level B-tree level +@return error code */ +dberr_t +BtrBulk::insert( + dtuple_t* tuple, + ulint level) +{ + bool is_left_most = false; + dberr_t err = DB_SUCCESS; + + /* Check if we need to create a PageBulk for the level. */ + if (level + 1 > m_page_bulks.size()) { + PageBulk* new_page_bulk + = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, FIL_NULL, + level)); + err = new_page_bulk->init(); + if (err != DB_SUCCESS) { + UT_DELETE(new_page_bulk); + return(err); + } + + m_page_bulks.push_back(new_page_bulk); + ut_ad(level + 1 == m_page_bulks.size()); + m_root_level = level; + + is_left_most = true; + } + + ut_ad(m_page_bulks.size() > level); + + PageBulk* page_bulk = m_page_bulks.at(level); + + if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) { + /* The node pointer must be marked as the predefined minimum + record, as there is no lower alphabetical limit to records in + the leftmost node of a level: */ + dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple) + | REC_INFO_MIN_REC_FLAG); + } + + ulint n_ext = 0; + ulint rec_size = rec_get_converted_size(m_index, tuple, n_ext); + big_rec_t* big_rec = NULL; + rec_t* rec = NULL; + rec_offs* offsets = NULL; + + if (page_bulk->needExt(tuple, rec_size)) { + /* The record is so big that we have to store some fields + externally on separate database pages */ + big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext); + + if (big_rec == NULL) { + return(DB_TOO_BIG_RECORD); + } + + rec_size = rec_get_converted_size(m_index, tuple, n_ext); + } + + if (page_bulk->getPageZip() != NULL + && page_zip_is_too_big(m_index, tuple)) { + err = DB_TOO_BIG_RECORD; + goto func_exit; + } + + if (!page_bulk->isSpaceAvailable(rec_size)) { + /* Create a sibling page_bulk. */ + PageBulk* sibling_page_bulk; + sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, + FIL_NULL, level)); + err = sibling_page_bulk->init(); + if (err != DB_SUCCESS) { + UT_DELETE(sibling_page_bulk); + goto func_exit; + } + + /* Commit page bulk. */ + err = pageCommit(page_bulk, sibling_page_bulk, true); + if (err != DB_SUCCESS) { + pageAbort(sibling_page_bulk); + UT_DELETE(sibling_page_bulk); + goto func_exit; + } + + /* Set new page bulk to page_bulks. */ + ut_ad(sibling_page_bulk->getLevel() <= m_root_level); + m_page_bulks.at(level) = sibling_page_bulk; + + UT_DELETE(page_bulk); + page_bulk = sibling_page_bulk; + + /* Important: log_free_check whether we need a checkpoint. */ + if (page_is_leaf(sibling_page_bulk->getPage())) { + if (trx_is_interrupted(m_trx)) { + err = DB_INTERRUPTED; + goto func_exit; + } + + srv_inc_activity_count(); + logFreeCheck(); + } + } + + /* Convert tuple to rec. */ + rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc( + page_bulk->m_heap, rec_size)), m_index, tuple, n_ext); + offsets = rec_get_offsets(rec, m_index, offsets, level + ? 0 : m_index->n_core_fields, + ULINT_UNDEFINED, &page_bulk->m_heap); + + page_bulk->insert(rec, offsets); + + if (big_rec != NULL) { + ut_ad(dict_index_is_clust(m_index)); + ut_ad(page_bulk->getLevel() == 0); + ut_ad(page_bulk == m_page_bulks.at(0)); + + /* Release all pages above the leaf level */ + for (ulint level = 1; level <= m_root_level; level++) { + m_page_bulks.at(level)->release(); + } + + err = page_bulk->storeExt(big_rec, offsets); + + /* Latch */ + for (ulint level = 1; level <= m_root_level; level++) { + PageBulk* page_bulk = m_page_bulks.at(level); + page_bulk->latch(); + } + } + +func_exit: + if (big_rec != NULL) { + dtuple_convert_back_big_rec(m_index, tuple, big_rec); + } + + return(err); +} + +/** Btree bulk load finish. We commit the last page in each level +and copy the last page in top level to the root page of the index +if no error occurs. +@param[in] err whether bulk load was successful until now +@return error code */ +dberr_t +BtrBulk::finish(dberr_t err) +{ + uint32_t last_page_no = FIL_NULL; + + ut_ad(!m_index->table->is_temporary()); + + if (m_page_bulks.size() == 0) { + /* The table is empty. The root page of the index tree + is already in a consistent state. No need to flush. */ + return(err); + } + + ut_ad(m_root_level + 1 == m_page_bulks.size()); + + /* Finish all page bulks */ + for (ulint level = 0; level <= m_root_level; level++) { + PageBulk* page_bulk = m_page_bulks.at(level); + + last_page_no = page_bulk->getPageNo(); + + if (err == DB_SUCCESS) { + err = pageCommit(page_bulk, NULL, + level != m_root_level); + } + + if (err != DB_SUCCESS) { + pageAbort(page_bulk); + } + + UT_DELETE(page_bulk); + } + + if (err == DB_SUCCESS) { + rec_t* first_rec; + mtr_t mtr; + buf_block_t* last_block; + PageBulk root_page_bulk(m_index, m_trx->id, + m_index->page, m_root_level); + + mtr.start(); + m_index->set_modified(mtr); + mtr_x_lock_index(m_index, &mtr); + + ut_ad(last_page_no != FIL_NULL); + last_block = btr_block_get(*m_index, last_page_no, RW_X_LATCH, + false, &mtr); + first_rec = page_rec_get_next( + page_get_infimum_rec(last_block->frame)); + ut_ad(page_rec_is_user_rec(first_rec)); + + /* Copy last page to root page. */ + err = root_page_bulk.init(); + if (err != DB_SUCCESS) { + mtr.commit(); + return(err); + } + root_page_bulk.copyIn(first_rec); + root_page_bulk.finish(); + + /* Remove last page. */ + btr_page_free(m_index, last_block, &mtr); + + mtr.commit(); + + err = pageCommit(&root_page_bulk, NULL, false); + ut_ad(err == DB_SUCCESS); + } + + ut_ad(!sync_check_iterate(dict_sync_check())); + + ut_ad(err != DB_SUCCESS + || btr_validate_index(m_index, NULL) == DB_SUCCESS); + return(err); +} |