diff options
Diffstat (limited to 'storage/innobase/gis/gis0rtree.cc')
-rw-r--r-- | storage/innobase/gis/gis0rtree.cc | 277 |
1 files changed, 254 insertions, 23 deletions
diff --git a/storage/innobase/gis/gis0rtree.cc b/storage/innobase/gis/gis0rtree.cc index 83afd732..f75eab07 100644 --- a/storage/innobase/gis/gis0rtree.cc +++ b/storage/innobase/gis/gis0rtree.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2016, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2018, 2022, MariaDB Corporation. +Copyright (c) 2018, 2023, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -34,7 +34,6 @@ Created 2013/03/27 Allen Lai and Jimmy Yang #include "btr0pcur.h" #include "rem0cmp.h" #include "lock0lock.h" -#include "ibuf0ibuf.h" #include "trx0undo.h" #include "srv0mon.h" #include "gis0geo.h" @@ -538,7 +537,7 @@ err_exit: mem_heap_free(heap); } -MY_ATTRIBUTE((nonnull, warn_unused_result)) +MY_ATTRIBUTE((nonnull(1,3,4,5,6,8), warn_unused_result)) /**************************************************************//** Update parent page's MBR and Predicate lock information during a split */ static @@ -552,6 +551,7 @@ rtr_adjust_upper_level( buf_block_t* new_block, /*!< in/out: the new half page */ rtr_mbr_t* mbr, /*!< in: MBR on the old page */ rtr_mbr_t* new_mbr, /*!< in: MBR on the new page */ + que_thr_t* thr, /*!< in/out: query thread */ mtr_t* mtr) /*!< in: mtr */ { ulint page_no; @@ -570,7 +570,6 @@ rtr_adjust_upper_level( /* Create a memory heap where the data tuple is stored */ heap = mem_heap_create(1024); - cursor.thr = sea_cur->thr; cursor.page_cur.index = sea_cur->index(); cursor.page_cur.block = block; @@ -584,7 +583,8 @@ rtr_adjust_upper_level( /* Set new mbr for the old page on the upper level. */ /* Look up the index for the node pointer to page */ - offsets = rtr_page_get_father_block(NULL, heap, mtr, sea_cur, &cursor); + offsets = rtr_page_get_father_block(nullptr, heap, sea_cur, &cursor, + thr, mtr); page_cursor = btr_cur_get_page_cur(&cursor); @@ -669,7 +669,7 @@ rtr_adjust_upper_level( if (next_page_no == FIL_NULL) { } else if (buf_block_t* next_block = btr_block_get(*sea_cur->index(), next_page_no, RW_X_LATCH, - false, mtr, &err)) { + mtr, &err)) { if (UNIV_UNLIKELY(memcmp_aligned<4>(next_block->page.frame + FIL_PAGE_PREV, block->page.frame @@ -691,11 +691,6 @@ rtr_adjust_upper_level( /*************************************************************//** Moves record list to another page for rtree splitting. -IMPORTANT: The caller will have to update IBUF_BITMAP_FREE -if new_block is a compressed leaf page in a secondary index. -This has to be done either within the same mini-transaction, -or by invoking ibuf_reset_free_bits() before mtr_commit(). - @return error code @retval DB_FAIL on ROW_FORMAT=COMPRESSED compression failure */ static @@ -731,8 +726,7 @@ rtr_split_page_move_rec_list( ulint max_to_move = 0; rtr_rec_move_t* rec_move = NULL; - ut_ad(!dict_index_is_ibuf(index)); - ut_ad(dict_index_is_spatial(index)); + ut_ad(index->is_spatial()); rec_offs_init(offsets_); @@ -867,7 +861,8 @@ rtr_page_split_and_insert( const dtuple_t* tuple, /*!< in: tuple to insert */ ulint n_ext, /*!< in: number of externally stored columns */ mtr_t* mtr, /*!< in: mtr */ - dberr_t* err) /*!< out: error code */ + dberr_t* err, /*!< out: error code */ + que_thr_t* thr) /*!< in: query thread */ { buf_block_t* block; page_t* page; @@ -895,6 +890,8 @@ rtr_page_split_and_insert( int first_rec_group = 1; IF_DBUG(bool iterated = false,); + buf_pool.pages_split++; + if (!*heap) { *heap = mem_heap_create(1024); } @@ -1159,7 +1156,7 @@ after_insert: /* Adjust the upper level. */ *err = rtr_adjust_upper_level(cursor, flags, block, new_block, - &mbr, &new_mbr, mtr); + &mbr, &new_mbr, thr, mtr); if (UNIV_UNLIKELY(*err != DB_SUCCESS)) { return nullptr; } @@ -1179,13 +1176,6 @@ after_insert: /* If the new res insert fail, we need to do another split again. */ if (!rec) { - /* We play safe and reset the free bits for new_page */ - if (!dict_index_is_clust(cursor->index()) - && !cursor->index()->table->is_temporary()) { - ibuf_reset_free_bits(new_block); - ibuf_reset_free_bits(block); - } - /* We need to clean the parent path here and search father node later, otherwise, it's possible that find a wrong parent. */ @@ -1212,6 +1202,244 @@ after_insert: return(rec); } +/*************************************************************//** +Makes tree one level higher by splitting the root, and inserts the tuple. +NOTE that the operation of this function must always succeed, +we cannot reverse it: therefore enough free disk space must be +guaranteed to be available before this function is called. +@return inserted record */ +rec_t* +rtr_root_raise_and_insert( +/*======================*/ + ulint flags, /*!< in: undo logging and locking flags */ + btr_cur_t* cursor, /*!< in: cursor at which to insert: must be + on the root page; when the function returns, + the cursor is positioned on the predecessor + of the inserted record */ + rec_offs** offsets,/*!< out: offsets on inserted record */ + mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */ + const dtuple_t* tuple, /*!< in: tuple to insert */ + ulint n_ext, /*!< in: number of externally stored columns */ + mtr_t* mtr, /*!< in: mtr */ + dberr_t* err, /*!< out: error code */ + que_thr_t* thr) /*!< in: query thread */ +{ + dict_index_t* index; + rec_t* rec; + dtuple_t* node_ptr; + ulint level; + rec_t* node_ptr_rec; + page_cur_t* page_cursor; + page_zip_des_t* root_page_zip; + page_zip_des_t* new_page_zip; + buf_block_t* root; + buf_block_t* new_block; + + root = btr_cur_get_block(cursor); + root_page_zip = buf_block_get_page_zip(root); + ut_ad(!page_is_empty(root->page.frame)); + index = btr_cur_get_index(cursor); + ut_ad(index->is_spatial()); +#ifdef UNIV_ZIP_DEBUG + ut_a(!root_page_zip + || page_zip_validate(root_page_zip, root->page.frame, index)); +#endif /* UNIV_ZIP_DEBUG */ + + const page_id_t root_id{root->page.id()}; + + ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK + | MTR_MEMO_SX_LOCK)); + ut_ad(mtr->memo_contains_flagged(root, MTR_MEMO_PAGE_X_FIX)); + + if (index->page != root_id.page_no()) { + ut_ad("corrupted root page number" == 0); + return nullptr; + } + + if (!btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF, + *root, *index->table->space) + || !btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP, + *root, *index->table->space)) { + return nullptr; + } + + /* Allocate a new page to the tree. Root splitting is done by first + moving the root records to the new page, emptying the root, putting + a node pointer to the new page, and then splitting the new page. */ + + level = btr_page_get_level(root->page.frame); + + new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr, err); + + if (!new_block) { + return nullptr; + } + + new_page_zip = buf_block_get_page_zip(new_block); + ut_a(!new_page_zip == !root_page_zip); + ut_a(!new_page_zip + || page_zip_get_size(new_page_zip) + == page_zip_get_size(root_page_zip)); + + btr_page_create(new_block, new_page_zip, index, level, mtr); + if (page_has_siblings(new_block->page.frame)) { + compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4); + compile_time_assert(FIL_NULL == 0xffffffff); + static_assert(FIL_PAGE_PREV % 8 == 0, "alignment"); + memset_aligned<8>(new_block->page.frame + FIL_PAGE_PREV, + 0xff, 8); + mtr->memset(new_block, FIL_PAGE_PREV, 8, 0xff); + if (UNIV_LIKELY_NULL(new_page_zip)) { + memset_aligned<8>(new_page_zip->data + FIL_PAGE_PREV, + 0xff, 8); + } + } + + /* Copy the records from root to the new page one by one. */ + dberr_t e; + if (!err) { + err = &e; + } + + if (0 +#ifdef UNIV_ZIP_COPY + || new_page_zip +#endif /* UNIV_ZIP_COPY */ + || !page_copy_rec_list_end(new_block, root, + page_get_infimum_rec(root->page.frame), + index, mtr, err)) { + switch (*err) { + case DB_SUCCESS: + break; + case DB_FAIL: + *err = DB_SUCCESS; + break; + default: + return nullptr; + } + + ut_a(new_page_zip); + + /* Copy the page byte for byte. */ + page_zip_copy_recs(new_block, root_page_zip, + root->page.frame, index, mtr); + + /* Update the lock table and possible hash index. */ + if (index->has_locking()) { + lock_move_rec_list_end( + new_block, root, + page_get_infimum_rec(root->page.frame)); + } + + /* Move any existing predicate locks */ + lock_prdt_rec_move(new_block, root_id); + } + + constexpr uint16_t max_trx_id = PAGE_HEADER + PAGE_MAX_TRX_ID; + if (!index->is_primary()) { + /* In secondary indexes, + PAGE_MAX_TRX_ID can be reset on the root page, because + the field only matters on leaf pages, and the root no + longer is a leaf page. (Older versions of InnoDB did + set PAGE_MAX_TRX_ID on all secondary index pages.) */ + byte* p = my_assume_aligned<8>( + PAGE_HEADER + PAGE_MAX_TRX_ID + root->page.frame); + if (mach_read_from_8(p)) { + mtr->memset(root, max_trx_id, 8, 0); + if (UNIV_LIKELY_NULL(root->page.zip.data)) { + memset_aligned<8>(max_trx_id + + root->page.zip.data, 0, 8); + } + } + } else { + /* PAGE_ROOT_AUTO_INC is only present in the clustered index + root page; on other clustered index pages, we want to reserve + the field PAGE_MAX_TRX_ID for future use. */ + byte* p = my_assume_aligned<8>( + PAGE_HEADER + PAGE_MAX_TRX_ID + new_block->page.frame); + if (mach_read_from_8(p)) { + mtr->memset(new_block, max_trx_id, 8, 0); + if (UNIV_LIKELY_NULL(new_block->page.zip.data)) { + memset_aligned<8>(max_trx_id + + new_block->page.zip.data, + 0, 8); + } + } + } + + /* If this is a pessimistic insert which is actually done to + perform a pessimistic update then we have stored the lock + information of the record to be inserted on the infimum of the + root page: we cannot discard the lock structs on the root page */ + + if (index->has_locking()) { + lock_update_root_raise(*new_block, root_id); + } + + /* Create a memory heap where the node pointer is stored */ + if (!*heap) { + *heap = mem_heap_create(1000); + } + + const uint32_t new_page_no = new_block->page.id().page_no(); + rec = page_rec_get_next(page_get_infimum_rec(new_block->page.frame)); + ut_ad(rec); /* We just created the page. */ + + /* Build the node pointer (= node key and page address) for the + child */ + rtr_mbr_t new_mbr; + rtr_page_cal_mbr(index, new_block, &new_mbr, *heap); + node_ptr = rtr_index_build_node_ptr(index, &new_mbr, rec, new_page_no, + *heap); + /* The node pointer must be marked as the predefined minimum record, + as there is no lower alphabetical limit to records in the leftmost + node of a level: */ + dtuple_set_info_bits(node_ptr, + dtuple_get_info_bits(node_ptr) + | REC_INFO_MIN_REC_FLAG); + + /* Rebuild the root page to get free space */ + btr_page_empty(root, root_page_zip, index, level + 1, mtr); + ut_ad(!page_has_siblings(root->page.frame)); + + page_cursor = btr_cur_get_page_cur(cursor); + + /* Insert node pointer to the root */ + + page_cur_set_before_first(root, page_cursor); + + node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr, + offsets, heap, 0, mtr); + + /* The root page should only contain the node pointer + to new_block at this point. Thus, the data should fit. */ + ut_a(node_ptr_rec); + + page_cursor->block = new_block; + page_cursor->index = index; + + if (tuple) { + ut_ad(dtuple_check_typed(tuple)); + /* Reposition the cursor to the child node */ + ulint low_match = 0, up_match = 0; + + if (page_cur_search_with_match(tuple, PAGE_CUR_LE, + &up_match, &low_match, + page_cursor, nullptr)) { + if (err) { + *err = DB_CORRUPTION; + } + return nullptr; + } + } else { + page_cursor->rec = page_get_infimum_rec(new_block->page.frame); + } + + /* Split the child and insert tuple */ + return rtr_page_split_and_insert(flags, cursor, offsets, heap, + tuple, n_ext, mtr, err, thr); +} + /****************************************************************//** Following the right link to find the proper block for insert. @return the proper block.*/ @@ -1240,6 +1468,8 @@ rtr_ins_enlarge_mbr( /* Check path info is not empty. */ ut_ad(!btr_cur->rtr_info->parent_path->empty()); + ut_ad(btr_cur->rtr_info->thr || !btr_cur->index()->is_committed() + || btr_cur->index()->table->name.is_temporary()); /* Create a memory heap. */ heap = mem_heap_create(1024); @@ -1265,7 +1495,8 @@ rtr_ins_enlarge_mbr( cursor.page_cur.index = page_cursor->index; cursor.page_cur.block = block; offsets = rtr_page_get_father_block( - NULL, heap, mtr, btr_cur, &cursor); + nullptr, heap, btr_cur, &cursor, + btr_cur->rtr_info->thr, mtr); page = buf_block_get_frame(block); |