From a175314c3e5827eb193872241446f2f8f5c9d33c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 20:07:14 +0200 Subject: Adding upstream version 1:10.5.12. Signed-off-by: Daniel Baumann --- storage/innobase/gis/gis0geo.cc | 650 ++++++++++++ storage/innobase/gis/gis0rtree.cc | 1956 +++++++++++++++++++++++++++++++++++ storage/innobase/gis/gis0sea.cc | 2052 +++++++++++++++++++++++++++++++++++++ 3 files changed, 4658 insertions(+) create mode 100644 storage/innobase/gis/gis0geo.cc create mode 100644 storage/innobase/gis/gis0rtree.cc create mode 100644 storage/innobase/gis/gis0sea.cc (limited to 'storage/innobase/gis') diff --git a/storage/innobase/gis/gis0geo.cc b/storage/innobase/gis/gis0geo.cc new file mode 100644 index 00000000..4c3ff188 --- /dev/null +++ b/storage/innobase/gis/gis0geo.cc @@ -0,0 +1,650 @@ +/***************************************************************************** + +Copyright (c) 2013, 2015, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2019, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file gis/gis0geo.cc +InnoDB R-tree related functions. + +Created 2013/03/27 Allen Lai and Jimmy Yang +*******************************************************/ + +#include "page0types.h" +#include "gis0geo.h" +#include "page0cur.h" +#include "ut0rnd.h" +#include "mach0data.h" + +#include +#include + +/* These definitions are for comparing 2 mbrs. */ + +/* Check if a intersects b. +Return false if a intersects b, otherwise true. */ +#define INTERSECT_CMP(amin, amax, bmin, bmax) \ +(((amin) > (bmax)) || ((bmin) > (amax))) + +/* Check if b contains a. +Return false if b contains a, otherwise true. */ +#define CONTAIN_CMP(amin, amax, bmin, bmax) \ +(((bmin) > (amin)) || ((bmax) < (amax))) + +/* Check if b is within a. +Return false if b is within a, otherwise true. */ +#define WITHIN_CMP(amin, amax, bmin, bmax) \ +(((amin) > (bmin)) || ((amax) < (bmax))) + +/* Check if a disjoints b. +Return false if a disjoints b, otherwise true. */ +#define DISJOINT_CMP(amin, amax, bmin, bmax) \ +(((amin) <= (bmax)) && ((bmin) <= (amax))) + +/* Check if a equals b. +Return false if equal, otherwise true. */ +#define EQUAL_CMP(amin, amax, bmin, bmax) \ +(((amin) != (bmin)) || ((amax) != (bmax))) + +/**************************************************************** +Functions for generating mbr +****************************************************************/ +/*************************************************************//** +Add one point stored in wkb to a given mbr. +@return 0 if the point in wkb is valid, otherwise -1. */ +static +int +rtree_add_point_to_mbr( +/*===================*/ + const uchar** wkb, /*!< in: pointer to wkb, + where point is stored */ + const uchar* end, /*!< in: end of wkb. */ + uint n_dims, /*!< in: dimensions. */ + double* mbr) /*!< in/out: mbr, which + must be of length n_dims * 2. */ +{ + double ord; + double* mbr_end = mbr + n_dims * 2; + + while (mbr < mbr_end) { + if ((*wkb) + sizeof(double) > end) { + return(-1); + } + + ord = mach_double_read(*wkb); + (*wkb) += sizeof(double); + + if (ord < *mbr) { + *mbr = ord; + } + mbr++; + + if (ord > *mbr) { + *mbr = ord; + } + mbr++; + } + + return(0); +} + +/*************************************************************//** +Get mbr of point stored in wkb. +@return 0 if ok, otherwise -1. */ +static +int +rtree_get_point_mbr( +/*================*/ + const uchar** wkb, /*!< in: pointer to wkb, + where point is stored. */ + const uchar* end, /*!< in: end of wkb. */ + uint n_dims, /*!< in: dimensions. */ + double* mbr) /*!< in/out: mbr, + must be of length n_dims * 2. */ +{ + return rtree_add_point_to_mbr(wkb, end, n_dims, mbr); +} + + +/*************************************************************//** +Get mbr of linestring stored in wkb. +@return 0 if the linestring is valid, otherwise -1. */ +static +int +rtree_get_linestring_mbr( +/*=====================*/ + const uchar** wkb, /*!< in: pointer to wkb, + where point is stored. */ + const uchar* end, /*!< in: end of wkb. */ + uint n_dims, /*!< in: dimensions. */ + double* mbr) /*!< in/out: mbr, + must be of length n_dims * 2. */ +{ + uint n_points; + + n_points = uint4korr(*wkb); + (*wkb) += 4; + + for (; n_points > 0; --n_points) { + /* Add next point to mbr */ + if (rtree_add_point_to_mbr(wkb, end, n_dims, mbr)) { + return(-1); + } + } + + return(0); +} + +/*************************************************************//** +Get mbr of polygon stored in wkb. +@return 0 if the polygon is valid, otherwise -1. */ +static +int +rtree_get_polygon_mbr( +/*==================*/ + const uchar** wkb, /*!< in: pointer to wkb, + where point is stored. */ + const uchar* end, /*!< in: end of wkb. */ + uint n_dims, /*!< in: dimensions. */ + double* mbr) /*!< in/out: mbr, + must be of length n_dims * 2. */ +{ + uint n_linear_rings; + uint n_points; + + n_linear_rings = uint4korr((*wkb)); + (*wkb) += 4; + + for (; n_linear_rings > 0; --n_linear_rings) { + n_points = uint4korr((*wkb)); + (*wkb) += 4; + + for (; n_points > 0; --n_points) { + /* Add next point to mbr */ + if (rtree_add_point_to_mbr(wkb, end, n_dims, mbr)) { + return(-1); + } + } + } + + return(0); +} + +/*************************************************************//** +Get mbr of geometry stored in wkb. +@return 0 if the geometry is valid, otherwise -1. */ +static +int +rtree_get_geometry_mbr( +/*===================*/ + const uchar** wkb, /*!< in: pointer to wkb, + where point is stored. */ + const uchar* end, /*!< in: end of wkb. */ + uint n_dims, /*!< in: dimensions. */ + double* mbr, /*!< in/out: mbr. */ + int top) /*!< in: if it is the top, + which means it's not called + by itself. */ +{ + int res; + uint wkb_type = 0; + uint n_items; + + /* byte_order = *(*wkb); */ + ++(*wkb); + + wkb_type = uint4korr((*wkb)); + (*wkb) += 4; + + switch ((enum wkbType) wkb_type) { + case wkbPoint: + res = rtree_get_point_mbr(wkb, end, n_dims, mbr); + break; + case wkbLineString: + res = rtree_get_linestring_mbr(wkb, end, n_dims, mbr); + break; + case wkbPolygon: + res = rtree_get_polygon_mbr(wkb, end, n_dims, mbr); + break; + case wkbMultiPoint: + n_items = uint4korr((*wkb)); + (*wkb) += 4; + for (; n_items > 0; --n_items) { + /* byte_order = *(*wkb); */ + ++(*wkb); + (*wkb) += 4; + if (rtree_get_point_mbr(wkb, end, n_dims, mbr)) { + return(-1); + } + } + res = 0; + break; + case wkbMultiLineString: + n_items = uint4korr((*wkb)); + (*wkb) += 4; + for (; n_items > 0; --n_items) { + /* byte_order = *(*wkb); */ + ++(*wkb); + (*wkb) += 4; + if (rtree_get_linestring_mbr(wkb, end, n_dims, mbr)) { + return(-1); + } + } + res = 0; + break; + case wkbMultiPolygon: + n_items = uint4korr((*wkb)); + (*wkb) += 4; + for (; n_items > 0; --n_items) { + /* byte_order = *(*wkb); */ + ++(*wkb); + (*wkb) += 4; + if (rtree_get_polygon_mbr(wkb, end, n_dims, mbr)) { + return(-1); + } + } + res = 0; + break; + case wkbGeometryCollection: + if (!top) { + return(-1); + } + + n_items = uint4korr((*wkb)); + (*wkb) += 4; + for (; n_items > 0; --n_items) { + if (rtree_get_geometry_mbr(wkb, end, n_dims, + mbr, 0)) { + return(-1); + } + } + res = 0; + break; + default: + res = -1; + } + + return(res); +} + +/*************************************************************//** +Calculate Minimal Bounding Rectangle (MBR) of the spatial object +stored in "well-known binary representation" (wkb) format. +@return 0 if ok. */ +int +rtree_mbr_from_wkb( +/*===============*/ + const uchar* wkb, /*!< in: wkb */ + uint size, /*!< in: size of wkb. */ + uint n_dims, /*!< in: dimensions. */ + double* mbr) /*!< in/out: mbr, which must + be of length n_dim2 * 2. */ +{ + for (uint i = 0; i < n_dims; ++i) { + mbr[i * 2] = DBL_MAX; + mbr[i * 2 + 1] = -DBL_MAX; + } + + return rtree_get_geometry_mbr(&wkb, wkb + size, n_dims, mbr, 1); +} + + +/**************************************************************** +Functions for Rtree split +****************************************************************/ +/*************************************************************//** +Join 2 mbrs of dimensions n_dim. */ +static +void +mbr_join( +/*=====*/ + double* a, /*!< in/out: the first mbr, + where the joined result will be. */ + const double* b, /*!< in: the second mbr. */ + int n_dim) /*!< in: dimensions. */ +{ + double* end = a + n_dim * 2; + + do { + if (a[0] > b[0]) { + a[0] = b[0]; + } + + if (a[1] < b[1]) { + a[1] = b[1]; + } + + a += 2; + b += 2; + + } while (a != end); +} + +/*************************************************************//** +Counts the square of mbr which is the join of a and b. Both a and b +are of dimensions n_dim. */ +static +double +mbr_join_square( +/*============*/ + const double* a, /*!< in: the first mbr. */ + const double* b, /*!< in: the second mbr. */ + int n_dim) /*!< in: dimensions. */ +{ + const double* end = a + n_dim * 2; + double square = 1.0; + + do { + square *= std::max(a[1], b[1]) - std::min(a[0], b[0]); + + a += 2; + b += 2; + } while (a != end); + + /* Check if finite (not infinity or NaN), + so we don't get NaN in calculations */ + if (!std::isfinite(square)) { + return DBL_MAX; + } + + return square; +} + +/*************************************************************//** +Counts the square of mbr of dimension n_dim. */ +static +double +count_square( +/*=========*/ + const double* a, /*!< in: the mbr. */ + int n_dim) /*!< in: dimensions. */ +{ + const double* end = a + n_dim * 2; + double square = 1.0; + + do { + square *= a[1] - a[0]; + a += 2; + } while (a != end); + + return square; +} + +/*************************************************************//** +Copy mbr of dimension n_dim from src to dst. */ +inline +static +void +copy_coords( +/*========*/ + double* dst, /*!< in/out: destination. */ + const double* src, /*!< in: source. */ + int) +{ + memcpy(dst, src, DATA_MBR_LEN); +} + +/*************************************************************//** +Select two nodes to collect group upon */ +static +void +pick_seeds( +/*=======*/ + rtr_split_node_t* node, /*!< in: split nodes. */ + int n_entries, /*!< in: entries number. */ + rtr_split_node_t** seed_a, /*!< out: seed 1. */ + rtr_split_node_t** seed_b, /*!< out: seed 2. */ + int n_dim) /*!< in: dimensions. */ +{ + rtr_split_node_t* cur1; + rtr_split_node_t* lim1 = node + (n_entries - 1); + rtr_split_node_t* cur2; + rtr_split_node_t* lim2 = node + n_entries; + + double max_d = -DBL_MAX; + double d; + + *seed_a = node; + *seed_b = node + 1; + + for (cur1 = node; cur1 < lim1; ++cur1) { + for (cur2 = cur1 + 1; cur2 < lim2; ++cur2) { + d = mbr_join_square(cur1->coords, cur2->coords, n_dim) - + cur1->square - cur2->square; + if (d > max_d) { + max_d = d; + *seed_a = cur1; + *seed_b = cur2; + } + } + } +} + +/*************************************************************//** +Select next node and group where to add. */ +static +void +pick_next( +/*======*/ + rtr_split_node_t* node, /*!< in: split nodes. */ + int n_entries, /*!< in: entries number. */ + double* g1, /*!< in: mbr of group 1. */ + double* g2, /*!< in: mbr of group 2. */ + rtr_split_node_t** choice, /*!< out: the next node.*/ + int* n_group, /*!< out: group number.*/ + int n_dim) /*!< in: dimensions. */ +{ + rtr_split_node_t* cur = node; + rtr_split_node_t* end = node + n_entries; + double max_diff = -DBL_MAX; + + for (; cur < end; ++cur) { + double diff; + double abs_diff; + + if (cur->n_node != 0) { + continue; + } + + diff = mbr_join_square(g1, cur->coords, n_dim) - + mbr_join_square(g2, cur->coords, n_dim); + + abs_diff = fabs(diff); + if (abs_diff > max_diff) { + max_diff = abs_diff; + + /* Introduce some randomness if the record + is identical */ + if (diff == 0) { + diff = static_cast(ut_rnd_gen() & 1); + } + + *n_group = 1 + (diff > 0); + *choice = cur; + } + } +} + +/*************************************************************//** +Mark not-in-group entries as n_group. */ +static +void +mark_all_entries( +/*=============*/ + rtr_split_node_t* node, /*!< in/out: split nodes. */ + int n_entries, /*!< in: entries number. */ + int n_group) /*!< in: group number. */ +{ + rtr_split_node_t* cur = node; + rtr_split_node_t* end = node + n_entries; + for (; cur < end; ++cur) { + if (cur->n_node != 0) { + continue; + } + cur->n_node = n_group; + } +} + +/*************************************************************//** +Split rtree node. +Return which group the first rec is in. */ +int +split_rtree_node( +/*=============*/ + rtr_split_node_t* node, /*!< in: split nodes. */ + int n_entries, /*!< in: entries number. */ + int all_size, /*!< in: total key's size. */ + int key_size, /*!< in: key's size. */ + int min_size, /*!< in: minimal group size. */ + int size1, /*!< in: size of group. */ + int size2, /*!< in: initial group sizes */ + double** d_buffer, /*!< in/out: buffer. */ + int n_dim, /*!< in: dimensions. */ + uchar* first_rec) /*!< in: the first rec. */ +{ + rtr_split_node_t* cur; + rtr_split_node_t* a = NULL; + rtr_split_node_t* b = NULL; + double* g1 = reserve_coords(d_buffer, n_dim); + double* g2 = reserve_coords(d_buffer, n_dim); + rtr_split_node_t* next = NULL; + int next_node = 0; + int i; + int first_rec_group = 1; + rtr_split_node_t* end = node + n_entries; + + if (all_size < min_size * 2) { + return 1; + } + + cur = node; + for (; cur < end; ++cur) { + cur->square = count_square(cur->coords, n_dim); + cur->n_node = 0; + } + + pick_seeds(node, n_entries, &a, &b, n_dim); + a->n_node = 1; + b->n_node = 2; + + copy_coords(g1, a->coords, n_dim); + size1 += key_size; + copy_coords(g2, b->coords, n_dim); + size2 += key_size; + + for (i = n_entries - 2; i > 0; --i) { + /* Can't write into group 2 */ + if (all_size - (size2 + key_size) < min_size) { + mark_all_entries(node, n_entries, 1); + break; + } + + /* Can't write into group 1 */ + if (all_size - (size1 + key_size) < min_size) { + mark_all_entries(node, n_entries, 2); + break; + } + + pick_next(node, n_entries, g1, g2, &next, &next_node, n_dim); + if (next_node == 1) { + size1 += key_size; + mbr_join(g1, next->coords, n_dim); + } else { + size2 += key_size; + mbr_join(g2, next->coords, n_dim); + } + + next->n_node = next_node; + + /* Find out where the first rec (of the page) will be at, + and inform the caller */ + if (first_rec && first_rec == next->key) { + first_rec_group = next_node; + } + } + + return(first_rec_group); +} + +/** Compare two minimum bounding rectangles. +@param mode comparison operator + MBR_INTERSECT(a,b) a overlaps b + MBR_CONTAIN(a,b) a contains b + MBR_DISJOINT(a,b) a disjoint b + MBR_WITHIN(a,b) a within b + MBR_EQUAL(a,b) All coordinates of MBRs are equal + MBR_DATA(a,b) Data reference is the same +@param b first MBR +@param a second MBR +@retval 0 if the predicate holds +@retval 1 if the precidate does not hold */ +int rtree_key_cmp(page_cur_mode_t mode, const void *b, const void *a) +{ + const byte *b_= static_cast(b); + const byte *a_= static_cast(a); + + static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double), "compatibility"); + + for (auto i = SPDIMS; i--; ) + { + double amin= mach_double_read(a_); + double bmin= mach_double_read(b_); + a_+= sizeof(double); + b_+= sizeof(double); + double amax= mach_double_read(a_); + double bmax= mach_double_read(b_); + a_+= sizeof(double); + b_+= sizeof(double); + + switch (mode) { + case PAGE_CUR_INTERSECT: + if (INTERSECT_CMP(amin, amax, bmin, bmax)) + return 1; + continue; + case PAGE_CUR_CONTAIN: + if (CONTAIN_CMP(amin, amax, bmin, bmax)) + return 1; + continue; + case PAGE_CUR_WITHIN: + if (WITHIN_CMP(amin, amax, bmin, bmax)) + return 1; + continue; + case PAGE_CUR_MBR_EQUAL: + if (EQUAL_CMP(amin, amax, bmin, bmax)) + return 1; + continue; + case PAGE_CUR_DISJOINT: + if (!DISJOINT_CMP(amin, amax, bmin, bmax)) + return 0; + if (!i) + return 1; + continue; + case PAGE_CUR_UNSUPP: + case PAGE_CUR_G: + case PAGE_CUR_GE: + case PAGE_CUR_L: + case PAGE_CUR_LE: + case PAGE_CUR_RTREE_LOCATE: + case PAGE_CUR_RTREE_GET_FATHER: + case PAGE_CUR_RTREE_INSERT: + break; + } + ut_ad("unknown comparison operator" == 0); + } + + return 0; +} diff --git a/storage/innobase/gis/gis0rtree.cc b/storage/innobase/gis/gis0rtree.cc new file mode 100644 index 00000000..22e6e08f --- /dev/null +++ b/storage/innobase/gis/gis0rtree.cc @@ -0,0 +1,1956 @@ +/***************************************************************************** + +Copyright (c) 2016, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2018, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file gis/gis0rtree.cc +InnoDB R-tree interfaces + +Created 2013/03/27 Allen Lai and Jimmy Yang +***********************************************************************/ + +#include "fsp0fsp.h" +#include "page0page.h" +#include "page0cur.h" +#include "page0zip.h" +#include "gis0rtree.h" +#include "btr0cur.h" +#include "btr0sea.h" +#include "btr0pcur.h" +#include "rem0cmp.h" +#include "lock0lock.h" +#include "ibuf0ibuf.h" +#include "trx0undo.h" +#include "srv0mon.h" +#include "gis0geo.h" +#include + +/*************************************************************//** +Initial split nodes info for R-tree split. +@return initialized split nodes array */ +static +rtr_split_node_t* +rtr_page_split_initialize_nodes( +/*============================*/ + mem_heap_t* heap, /*!< in: pointer to memory heap, or NULL */ + btr_cur_t* cursor, /*!< in: cursor at which to insert; when the + function returns, the cursor is positioned + on the predecessor of the inserted record */ + rec_offs** offsets,/*!< in: offsets on inserted record */ + const dtuple_t* tuple, /*!< in: tuple to insert */ + double** buf_pos)/*!< in/out: current buffer position */ +{ + rtr_split_node_t* split_node_array; + double* buf; + ulint n_recs; + rtr_split_node_t* task; + rtr_split_node_t* stop; + rtr_split_node_t* cur; + rec_t* rec; + buf_block_t* block; + page_t* page; + ulint n_uniq; + ulint len; + const byte* source_cur; + + block = btr_cur_get_block(cursor); + page = buf_block_get_frame(block); + n_uniq = dict_index_get_n_unique_in_tree(cursor->index); + + n_recs = ulint(page_get_n_recs(page)) + 1; + + /*We reserve 2 MBRs memory space for temp result of split + algrithm. And plus the new mbr that need to insert, we + need (n_recs + 3)*MBR size for storing all MBRs.*/ + buf = static_cast(mem_heap_alloc( + heap, DATA_MBR_LEN * (n_recs + 3) + + sizeof(rtr_split_node_t) * (n_recs + 1))); + + split_node_array = (rtr_split_node_t*)(buf + SPDIMS * 2 * (n_recs + 3)); + task = split_node_array; + *buf_pos = buf; + stop = task + n_recs; + + rec = page_rec_get_next(page_get_infimum_rec(page)); + const ulint n_core = page_is_leaf(page) + ? cursor->index->n_core_fields : 0; + *offsets = rec_get_offsets(rec, cursor->index, *offsets, n_core, + n_uniq, &heap); + + source_cur = rec_get_nth_field(rec, *offsets, 0, &len); + + for (cur = task; cur < stop - 1; ++cur) { + cur->coords = reserve_coords(buf_pos, SPDIMS); + cur->key = rec; + + memcpy(cur->coords, source_cur, DATA_MBR_LEN); + + rec = page_rec_get_next(rec); + *offsets = rec_get_offsets(rec, cursor->index, *offsets, + n_core, n_uniq, &heap); + source_cur = rec_get_nth_field(rec, *offsets, 0, &len); + } + + /* Put the insert key to node list */ + source_cur = static_cast(dfield_get_data( + dtuple_get_nth_field(tuple, 0))); + cur->coords = reserve_coords(buf_pos, SPDIMS); + rec = (byte*) mem_heap_alloc( + heap, rec_get_converted_size(cursor->index, tuple, 0)); + + rec = rec_convert_dtuple_to_rec(rec, cursor->index, tuple, 0); + cur->key = rec; + + memcpy(cur->coords, source_cur, DATA_MBR_LEN); + + return split_node_array; +} + +/**********************************************************************//** +Builds a Rtree node pointer out of a physical record and a page number. +Note: For Rtree, we just keep the mbr and page no field in non-leaf level +page. It's different with Btree, Btree still keeps PK fields so far. +@return own: node pointer */ +dtuple_t* +rtr_index_build_node_ptr( +/*=====================*/ + const dict_index_t* index, /*!< in: index */ + const rtr_mbr_t* mbr, /*!< in: mbr of lower page */ + const rec_t* rec, /*!< in: record for which to build node + pointer */ + ulint page_no,/*!< in: page number to put in node + pointer */ + mem_heap_t* heap) /*!< in: memory heap where pointer + created */ +{ + dtuple_t* tuple; + dfield_t* field; + byte* buf; + ulint n_unique; + ulint info_bits; + + ut_ad(dict_index_is_spatial(index)); + + n_unique = DICT_INDEX_SPATIAL_NODEPTR_SIZE; + + tuple = dtuple_create(heap, n_unique + 1); + + /* For rtree internal node, we need to compare page number + fields. */ + dtuple_set_n_fields_cmp(tuple, n_unique + 1); + + dict_index_copy_types(tuple, index, n_unique); + + /* Write page no field */ + buf = static_cast(mem_heap_alloc(heap, 4)); + + mach_write_to_4(buf, page_no); + + field = dtuple_get_nth_field(tuple, n_unique); + dfield_set_data(field, buf, 4); + + dtype_set(dfield_get_type(field), DATA_SYS_CHILD, DATA_NOT_NULL, 4); + + /* Set info bits. */ + info_bits = rec_get_info_bits(rec, dict_table_is_comp(index->table)); + dtuple_set_info_bits(tuple, info_bits | REC_STATUS_NODE_PTR); + + /* Set mbr as index entry data */ + field = dtuple_get_nth_field(tuple, 0); + + buf = static_cast(mem_heap_alloc(heap, DATA_MBR_LEN)); + + rtr_write_mbr(buf, mbr); + + dfield_set_data(field, buf, DATA_MBR_LEN); + + ut_ad(dtuple_check_typed(tuple)); + + return(tuple); +} + +/**************************************************************//** +Update the mbr field of a spatial index row. +@return true if update is successful */ +bool +rtr_update_mbr_field( +/*=================*/ + btr_cur_t* cursor, /*!< in/out: cursor pointed to rec.*/ + rec_offs* offsets, /*!< in/out: offsets on rec. */ + btr_cur_t* cursor2, /*!< in/out: cursor pointed to rec + that should be deleted. + this cursor is for btr_compress to + delete the merged page's father rec.*/ + page_t* child_page, /*!< in: child page. */ + rtr_mbr_t* mbr, /*!< in: the new mbr. */ + rec_t* new_rec, /*!< in: rec to use */ + mtr_t* mtr) /*!< in: mtr */ +{ + dict_index_t* index = cursor->index; + mem_heap_t* heap; + page_t* page; + rec_t* rec; + constexpr ulint flags = BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_KEEP_SYS_FLAG; + dberr_t err; + big_rec_t* dummy_big_rec; + buf_block_t* block; + rec_t* child_rec; + ulint up_match = 0; + ulint low_match = 0; + ulint child; + ulint rec_info; + bool ins_suc = true; + ulint cur2_pos = 0; + ulint del_page_no = 0; + rec_offs* offsets2; + + rec = btr_cur_get_rec(cursor); + page = page_align(rec); + + rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets)); + + heap = mem_heap_create(100); + block = btr_cur_get_block(cursor); + ut_ad(page == buf_block_get_frame(block)); + + child = btr_node_ptr_get_child_page_no(rec, offsets); + const ulint n_core = page_is_leaf(block->frame) + ? index->n_core_fields : 0; + + if (new_rec) { + child_rec = new_rec; + } else { + child_rec = page_rec_get_next(page_get_infimum_rec(child_page)); + } + + dtuple_t* node_ptr = rtr_index_build_node_ptr( + index, mbr, child_rec, child, heap); + + /* We need to remember the child page no of cursor2, since page could be + reorganized or insert a new rec before it. */ + if (cursor2) { + rec_t* del_rec = btr_cur_get_rec(cursor2); + offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2), + index, NULL, 0, + ULINT_UNDEFINED, &heap); + del_page_no = btr_node_ptr_get_child_page_no(del_rec, offsets2); + cur2_pos = page_rec_get_n_recs_before(btr_cur_get_rec(cursor2)); + } + + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_base(offsets)[0 + 1] == DATA_MBR_LEN); + ut_ad(node_ptr->fields[0].len == DATA_MBR_LEN); + + if (rec_info & REC_INFO_MIN_REC_FLAG) { + /* When the rec is minimal rec in this level, we do + in-place update for avoiding it move to other place. */ + page_zip_des_t* page_zip = buf_block_get_page_zip(block); + + if (UNIV_LIKELY_NULL(page_zip)) { + /* Check if there's enough space for in-place + update the zip page. */ + if (!btr_cur_update_alloc_zip( + page_zip, + btr_cur_get_page_cur(cursor), + index, offsets, + rec_offs_size(offsets), + false, mtr)) { + + /* If there's not enought space for + inplace update zip page, we do delete + insert. */ + ins_suc = false; + + /* Since btr_cur_update_alloc_zip could + reorganize the page, we need to repositon + cursor2. */ + if (cursor2) { + cursor2->page_cur.rec = + page_rec_get_nth(page, + cur2_pos); + } + + goto update_mbr; + } + + /* Record could be repositioned */ + rec = btr_cur_get_rec(cursor); + +#ifdef UNIV_DEBUG + /* Make sure it is still the first record */ + rec_info = rec_get_info_bits( + rec, rec_offs_comp(offsets)); + ut_ad(rec_info & REC_INFO_MIN_REC_FLAG); +#endif /* UNIV_DEBUG */ + memcpy(rec, node_ptr->fields[0].data, DATA_MBR_LEN); + page_zip_write_rec(block, rec, index, offsets, 0, mtr); + } else { + mtr->memcpy(*block, rec, + node_ptr->fields[0].data, + DATA_MBR_LEN); + } + + if (cursor2) { + rec_offs* offsets2; + + if (UNIV_LIKELY_NULL(page_zip)) { + cursor2->page_cur.rec + = page_rec_get_nth(page, cur2_pos); + } + offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2), + index, NULL, 0, + ULINT_UNDEFINED, &heap); + ut_ad(del_page_no == btr_node_ptr_get_child_page_no( + cursor2->page_cur.rec, + offsets2)); + + page_cur_delete_rec(btr_cur_get_page_cur(cursor2), + index, offsets2, mtr); + } + } else if (page_get_n_recs(page) == 1) { + /* When there's only one rec in the page, we do insert/delete to + avoid page merge. */ + + page_cur_t page_cur; + rec_t* insert_rec; + rec_offs* insert_offsets = NULL; + ulint old_pos; + rec_t* old_rec; + + ut_ad(cursor2 == NULL); + + /* Insert the new mbr rec. */ + old_pos = page_rec_get_n_recs_before(rec); + + err = btr_cur_optimistic_insert( + flags, + cursor, &insert_offsets, &heap, + node_ptr, &insert_rec, &dummy_big_rec, 0, NULL, mtr); + + ut_ad(err == DB_SUCCESS); + + btr_cur_position(index, insert_rec, block, cursor); + + /* Delete the old mbr rec. */ + old_rec = page_rec_get_nth(page, old_pos); + ut_ad(old_rec != insert_rec); + + page_cur_position(old_rec, block, &page_cur); + offsets2 = rec_get_offsets(old_rec, index, NULL, n_core, + ULINT_UNDEFINED, &heap); + page_cur_delete_rec(&page_cur, index, offsets2, mtr); + + } else { +update_mbr: + /* When there're not only 1 rec in the page, we do delete/insert + to avoid page split. */ + rec_t* insert_rec; + rec_offs* insert_offsets = NULL; + rec_t* next_rec; + + /* Delete the rec which cursor point to. */ + next_rec = page_rec_get_next(rec); + page_cur_delete_rec(btr_cur_get_page_cur(cursor), + index, offsets, mtr); + if (!ins_suc) { + ut_ad(rec_info & REC_INFO_MIN_REC_FLAG); + + btr_set_min_rec_mark(next_rec, *block, mtr); + } + + /* If there's more than 1 rec left in the page, delete + the rec which cursor2 point to. Otherwise, delete it later.*/ + if (cursor2 && page_get_n_recs(page) > 1) { + ulint cur2_rec_info; + rec_t* cur2_rec; + + cur2_rec = cursor2->page_cur.rec; + offsets2 = rec_get_offsets(cur2_rec, index, NULL, + n_core, + ULINT_UNDEFINED, &heap); + + cur2_rec_info = rec_get_info_bits(cur2_rec, + rec_offs_comp(offsets2)); + if (cur2_rec_info & REC_INFO_MIN_REC_FLAG) { + /* If we delete the leftmost node + pointer on a non-leaf level, we must + mark the new leftmost node pointer as + the predefined minimum record */ + rec_t* next_rec = page_rec_get_next(cur2_rec); + btr_set_min_rec_mark(next_rec, *block, mtr); + } + + ut_ad(del_page_no + == btr_node_ptr_get_child_page_no(cur2_rec, + offsets2)); + page_cur_delete_rec(btr_cur_get_page_cur(cursor2), + index, offsets2, mtr); + cursor2 = NULL; + } + + /* Insert the new rec. */ + page_cur_search_with_match(block, index, node_ptr, + PAGE_CUR_LE , &up_match, &low_match, + btr_cur_get_page_cur(cursor), NULL); + + err = btr_cur_optimistic_insert(flags, cursor, &insert_offsets, + &heap, node_ptr, &insert_rec, + &dummy_big_rec, 0, NULL, mtr); + + if (!ins_suc && err == DB_SUCCESS) { + ins_suc = true; + } + + /* If optimistic insert fail, try reorganize the page + and insert again. */ + if (err != DB_SUCCESS && ins_suc) { + btr_page_reorganize(btr_cur_get_page_cur(cursor), + index, mtr); + + err = btr_cur_optimistic_insert(flags, + cursor, + &insert_offsets, + &heap, + node_ptr, + &insert_rec, + &dummy_big_rec, + 0, NULL, mtr); + + /* Will do pessimistic insert */ + if (err != DB_SUCCESS) { + ins_suc = false; + } + } + + /* Insert succeed, position cursor the inserted rec.*/ + if (ins_suc) { + btr_cur_position(index, insert_rec, block, cursor); + offsets = rec_get_offsets(insert_rec, + index, offsets, n_core, + ULINT_UNDEFINED, &heap); + } + + /* Delete the rec which cursor2 point to. */ + if (cursor2) { + ulint cur2_pno; + rec_t* cur2_rec; + + cursor2->page_cur.rec = page_rec_get_nth(page, + cur2_pos); + + cur2_rec = btr_cur_get_rec(cursor2); + + offsets2 = rec_get_offsets(cur2_rec, index, NULL, + n_core, + ULINT_UNDEFINED, &heap); + + /* If the cursor2 position is on a wrong rec, we + need to reposition it. */ + cur2_pno = btr_node_ptr_get_child_page_no(cur2_rec, offsets2); + if ((del_page_no != cur2_pno) + || (cur2_rec == insert_rec)) { + cur2_rec = page_rec_get_next( + page_get_infimum_rec(page)); + + while (!page_rec_is_supremum(cur2_rec)) { + offsets2 = rec_get_offsets(cur2_rec, index, + NULL, + n_core, + ULINT_UNDEFINED, + &heap); + cur2_pno = btr_node_ptr_get_child_page_no( + cur2_rec, offsets2); + if (cur2_pno == del_page_no) { + if (insert_rec != cur2_rec) { + cursor2->page_cur.rec = + cur2_rec; + break; + } + } + cur2_rec = page_rec_get_next(cur2_rec); + } + + ut_ad(!page_rec_is_supremum(cur2_rec)); + } + + rec_info = rec_get_info_bits(cur2_rec, + rec_offs_comp(offsets2)); + if (rec_info & REC_INFO_MIN_REC_FLAG) { + /* If we delete the leftmost node + pointer on a non-leaf level, we must + mark the new leftmost node pointer as + the predefined minimum record */ + rec_t* next_rec = page_rec_get_next(cur2_rec); + btr_set_min_rec_mark(next_rec, *block, mtr); + } + + ut_ad(cur2_pno == del_page_no && cur2_rec != insert_rec); + + page_cur_delete_rec(btr_cur_get_page_cur(cursor2), + index, offsets2, mtr); + } + + if (!ins_suc) { + mem_heap_t* new_heap = NULL; + + err = btr_cur_pessimistic_insert( + flags, + cursor, &insert_offsets, &new_heap, + node_ptr, &insert_rec, &dummy_big_rec, + 0, NULL, mtr); + + ut_ad(err == DB_SUCCESS); + + if (new_heap) { + mem_heap_free(new_heap); + } + + } + + if (cursor2) { + btr_cur_compress_if_useful(cursor, FALSE, mtr); + } + } + + ut_ad(page_has_prev(page) + || (REC_INFO_MIN_REC_FLAG & rec_get_info_bits( + page_rec_get_next(page_get_infimum_rec(page)), + page_is_comp(page)))); + + mem_heap_free(heap); + + return(true); +} + +/**************************************************************//** +Update parent page's MBR and Predicate lock information during a split */ +static MY_ATTRIBUTE((nonnull)) +void +rtr_adjust_upper_level( +/*===================*/ + btr_cur_t* sea_cur, /*!< in: search cursor */ + ulint flags, /*!< in: undo logging and + locking flags */ + buf_block_t* block, /*!< in/out: page to be split */ + buf_block_t* new_block, /*!< in/out: the new half page */ + rtr_mbr_t* mbr, /*!< in: MBR on the old page */ + rtr_mbr_t* new_mbr, /*!< in: MBR on the new page */ + mtr_t* mtr) /*!< in: mtr */ +{ + ulint page_no; + ulint new_page_no; + dict_index_t* index = sea_cur->index; + btr_cur_t cursor; + rec_offs* offsets; + mem_heap_t* heap; + ulint level; + dtuple_t* node_ptr_upper; + page_cur_t* page_cursor; + lock_prdt_t prdt; + lock_prdt_t new_prdt; + dberr_t err; + big_rec_t* dummy_big_rec; + rec_t* rec; + + /* Create a memory heap where the data tuple is stored */ + heap = mem_heap_create(1024); + cursor.init(); + + cursor.thr = sea_cur->thr; + + /* Get the level of the split pages */ + level = btr_page_get_level(buf_block_get_frame(block)); + ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block))); + + page_no = block->page.id().page_no(); + + new_page_no = new_block->page.id().page_no(); + + /* Set new mbr for the old page on the upper level. */ + /* Look up the index for the node pointer to page */ + offsets = rtr_page_get_father_block( + NULL, heap, index, block, mtr, sea_cur, &cursor); + + page_cursor = btr_cur_get_page_cur(&cursor); + + rtr_update_mbr_field(&cursor, offsets, NULL, block->frame, mbr, NULL, + mtr); + + /* Already updated parent MBR, reset in our path */ + if (sea_cur->rtr_info) { + node_visit_t* node_visit = rtr_get_parent_node( + sea_cur, level + 1, true); + if (node_visit) { + node_visit->mbr_inc = 0; + } + } + + /* Insert the node for the new page. */ + node_ptr_upper = rtr_index_build_node_ptr( + index, new_mbr, + page_rec_get_next(page_get_infimum_rec(new_block->frame)), + new_page_no, heap); + + ulint up_match = 0; + ulint low_match = 0; + + buf_block_t* father_block = btr_cur_get_block(&cursor); + + page_cur_search_with_match( + father_block, index, node_ptr_upper, + PAGE_CUR_LE , &up_match, &low_match, + btr_cur_get_page_cur(&cursor), NULL); + + err = btr_cur_optimistic_insert( + flags + | BTR_NO_LOCKING_FLAG + | BTR_KEEP_SYS_FLAG + | BTR_NO_UNDO_LOG_FLAG, + &cursor, &offsets, &heap, + node_ptr_upper, &rec, &dummy_big_rec, 0, NULL, mtr); + + if (err == DB_FAIL) { + cursor.rtr_info = sea_cur->rtr_info; + cursor.tree_height = sea_cur->tree_height; + + /* Recreate a memory heap as input parameter for + btr_cur_pessimistic_insert(), because the heap may be + emptied in btr_cur_pessimistic_insert(). */ + mem_heap_t* new_heap = mem_heap_create(1024); + + err = btr_cur_pessimistic_insert(flags + | BTR_NO_LOCKING_FLAG + | BTR_KEEP_SYS_FLAG + | BTR_NO_UNDO_LOG_FLAG, + &cursor, &offsets, &new_heap, + node_ptr_upper, &rec, + &dummy_big_rec, 0, NULL, mtr); + cursor.rtr_info = NULL; + ut_a(err == DB_SUCCESS); + + mem_heap_free(new_heap); + } + + prdt.data = static_cast(mbr); + prdt.op = 0; + new_prdt.data = static_cast(new_mbr); + new_prdt.op = 0; + + lock_prdt_update_parent(block, new_block, &prdt, &new_prdt, + page_cursor->block->page.id()); + + mem_heap_free(heap); + + ut_ad(block->zip_size() == index->table->space->zip_size()); + + const uint32_t next_page_no = btr_page_get_next(block->frame); + + if (next_page_no != FIL_NULL) { + buf_block_t* next_block = btr_block_get( + *index, next_page_no, RW_X_LATCH, false, mtr); +#ifdef UNIV_BTR_DEBUG + ut_a(page_is_comp(next_block->frame) + == page_is_comp(block->frame)); + ut_a(btr_page_get_prev(next_block->frame) + == block->page.id().page_no()); +#endif /* UNIV_BTR_DEBUG */ + + btr_page_set_prev(next_block, new_page_no, mtr); + } + + btr_page_set_next(block, new_page_no, mtr); + + btr_page_set_prev(new_block, page_no, mtr); + btr_page_set_next(new_block, next_page_no, mtr); +} + +/*************************************************************//** +Moves record list to another page for rtree splitting. + +IMPORTANT: The caller will have to update IBUF_BITMAP_FREE +if new_block is a compressed leaf page in a secondary index. +This has to be done either within the same mini-transaction, +or by invoking ibuf_reset_free_bits() before mtr_commit(). + +@return TRUE on success; FALSE on compression failure */ +static +ibool +rtr_split_page_move_rec_list( +/*=========================*/ + rtr_split_node_t* node_array, /*!< in: split node array. */ + int first_rec_group,/*!< in: group number of the + first rec. */ + buf_block_t* new_block, /*!< in/out: index page + where to move */ + buf_block_t* block, /*!< in/out: page containing + split_rec */ + rec_t* first_rec, /*!< in: first record not to + move */ + dict_index_t* index, /*!< in: record descriptor */ + mem_heap_t* heap, /*!< in: pointer to memory + heap, or NULL */ + mtr_t* mtr) /*!< in: mtr */ +{ + rtr_split_node_t* cur_split_node; + rtr_split_node_t* end_split_node; + page_cur_t page_cursor; + page_cur_t new_page_cursor; + page_t* page; + page_t* new_page; + rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets = offsets_; + page_zip_des_t* new_page_zip + = buf_block_get_page_zip(new_block); + rec_t* rec; + rec_t* ret; + ulint moved = 0; + ulint max_to_move = 0; + rtr_rec_move_t* rec_move = NULL; + + ut_ad(!dict_index_is_ibuf(index)); + ut_ad(dict_index_is_spatial(index)); + + rec_offs_init(offsets_); + + page_cur_set_before_first(block, &page_cursor); + page_cur_set_before_first(new_block, &new_page_cursor); + + page = buf_block_get_frame(block); + new_page = buf_block_get_frame(new_block); + ret = page_rec_get_prev(page_get_supremum_rec(new_page)); + + end_split_node = node_array + page_get_n_recs(page); + + mtr_log_t log_mode = MTR_LOG_NONE; + + if (new_page_zip) { + log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE); + } + + max_to_move = page_get_n_recs( + buf_block_get_frame(block)); + rec_move = static_cast(mem_heap_alloc( + heap, + sizeof (*rec_move) * max_to_move)); + const ulint n_core = page_is_leaf(page) + ? index->n_core_fields : 0; + + /* Insert the recs in group 2 to new page. */ + for (cur_split_node = node_array; + cur_split_node < end_split_node; ++cur_split_node) { + if (cur_split_node->n_node != first_rec_group) { + lock_rec_store_on_page_infimum( + block, cur_split_node->key); + + offsets = rec_get_offsets(cur_split_node->key, + index, offsets, n_core, + ULINT_UNDEFINED, &heap); + + ut_ad(!n_core || cur_split_node->key != first_rec); + + rec = page_cur_insert_rec_low( + &new_page_cursor, + index, cur_split_node->key, offsets, mtr); + + ut_a(rec); + + lock_rec_restore_from_page_infimum( + new_block, rec, block); + + page_cur_move_to_next(&new_page_cursor); + + rec_move[moved].new_rec = rec; + rec_move[moved].old_rec = cur_split_node->key; + rec_move[moved].moved = false; + moved++; + + if (moved > max_to_move) { + ut_ad(0); + break; + } + } + } + + /* Update PAGE_MAX_TRX_ID on the uncompressed page. + Modifications will be redo logged and copied to the compressed + page in page_zip_compress() or page_zip_reorganize() below. + Multiple transactions cannot simultaneously operate on the + same temp-table in parallel. + max_trx_id is ignored for temp tables because it not required + for MVCC. */ + if (n_core && !index->table->is_temporary()) { + page_update_max_trx_id(new_block, NULL, + page_get_max_trx_id(page), + mtr); + } + + if (new_page_zip) { + mtr_set_log_mode(mtr, log_mode); + + if (!page_zip_compress(new_block, index, + page_zip_level, mtr)) { + ulint ret_pos; + + /* Before trying to reorganize the page, + store the number of preceding records on the page. */ + ret_pos = page_rec_get_n_recs_before(ret); + /* Before copying, "ret" was the predecessor + of the predefined supremum record. If it was + the predefined infimum record, then it would + still be the infimum, and we would have + ret_pos == 0. */ + + if (UNIV_UNLIKELY + (!page_zip_reorganize(new_block, index, + page_zip_level, mtr))) { + + if (UNIV_UNLIKELY + (!page_zip_decompress(new_page_zip, + new_page, FALSE))) { + ut_error; + } +#ifdef UNIV_GIS_DEBUG + ut_ad(page_validate(new_page, index)); +#endif + + return(false); + } + + /* The page was reorganized: Seek to ret_pos. */ + ret = page_rec_get_nth(new_page, ret_pos); + } + } + + /* Update the lock table */ + lock_rtr_move_rec_list(new_block, block, rec_move, moved); + + /* Delete recs in second group from the old page. */ + for (cur_split_node = node_array; + cur_split_node < end_split_node; ++cur_split_node) { + if (cur_split_node->n_node != first_rec_group) { + page_cur_position(cur_split_node->key, + block, &page_cursor); + offsets = rec_get_offsets( + page_cur_get_rec(&page_cursor), index, + offsets, n_core, ULINT_UNDEFINED, + &heap); + page_cur_delete_rec(&page_cursor, + index, offsets, mtr); + } + } + + return(true); +} + +/*************************************************************//** +Splits an R-tree index page to halves and inserts the tuple. It is assumed +that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is +released within this function! NOTE that the operation of this +function must always succeed, we cannot reverse it: therefore enough +free disk space (2 pages) must be guaranteed to be available before +this function is called. +@return inserted record */ +rec_t* +rtr_page_split_and_insert( +/*======================*/ + ulint flags, /*!< in: undo logging and locking flags */ + btr_cur_t* cursor, /*!< in/out: cursor at which to insert; when the + function returns, the cursor is positioned + on the predecessor of the inserted record */ + rec_offs** offsets,/*!< out: offsets on inserted record */ + mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */ + const dtuple_t* tuple, /*!< in: tuple to insert */ + ulint n_ext, /*!< in: number of externally stored columns */ + mtr_t* mtr) /*!< in: mtr */ +{ + buf_block_t* block; + page_t* page; + page_t* new_page; + buf_block_t* new_block; + page_zip_des_t* page_zip; + page_zip_des_t* new_page_zip; + buf_block_t* insert_block; + page_cur_t* page_cursor; + rec_t* rec = 0; + ulint n_recs; + ulint total_data; + ulint insert_size; + rtr_split_node_t* rtr_split_node_array; + rtr_split_node_t* cur_split_node; + rtr_split_node_t* end_split_node; + double* buf_pos; + node_seq_t current_ssn; + node_seq_t next_ssn; + buf_block_t* root_block; + rtr_mbr_t mbr; + rtr_mbr_t new_mbr; + lock_prdt_t prdt; + lock_prdt_t new_prdt; + rec_t* first_rec = NULL; + int first_rec_group = 1; + ulint n_iterations = 0; + + if (!*heap) { + *heap = mem_heap_create(1024); + } + +func_start: + mem_heap_empty(*heap); + *offsets = NULL; + + ut_ad(mtr->memo_contains_flagged(&cursor->index->lock, MTR_MEMO_X_LOCK + | MTR_MEMO_SX_LOCK)); + ut_ad(!dict_index_is_online_ddl(cursor->index) + || (flags & BTR_CREATE_FLAG) + || dict_index_is_clust(cursor->index)); + ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index), + RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); + + block = btr_cur_get_block(cursor); + page = buf_block_get_frame(block); + page_zip = buf_block_get_page_zip(block); + current_ssn = page_get_ssn_id(page); + + ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX)); + ut_ad(page_get_n_recs(page) >= 1); + + const page_id_t page_id(block->page.id()); + + if (!page_has_prev(page) && !page_is_leaf(page)) { + first_rec = page_rec_get_next( + page_get_infimum_rec(buf_block_get_frame(block))); + } + + /* Initial split nodes array. */ + rtr_split_node_array = rtr_page_split_initialize_nodes( + *heap, cursor, offsets, tuple, &buf_pos); + + /* Divide all mbrs to two groups. */ + n_recs = ulint(page_get_n_recs(page)) + 1; + + end_split_node = rtr_split_node_array + n_recs; + +#ifdef UNIV_GIS_DEBUG + fprintf(stderr, "Before split a page:\n"); + for (cur_split_node = rtr_split_node_array; + cur_split_node < end_split_node; ++cur_split_node) { + for (int i = 0; i < SPDIMS * 2; i++) { + fprintf(stderr, "%.2lf ", + *(cur_split_node->coords + i)); + } + fprintf(stderr, "\n"); + } +#endif + + insert_size = rec_get_converted_size(cursor->index, tuple, n_ext); + total_data = page_get_data_size(page) + insert_size; + first_rec_group = split_rtree_node(rtr_split_node_array, + static_cast(n_recs), + static_cast(total_data), + static_cast(insert_size), + 0, 2, 2, &buf_pos, SPDIMS, + static_cast(first_rec)); + + /* Allocate a new page to the index */ + const uint16_t page_level = btr_page_get_level(page); + new_block = btr_page_alloc(cursor->index, page_id.page_no() + 1, + FSP_UP, page_level, mtr, mtr); + if (!new_block) { + return NULL; + } + + new_page_zip = buf_block_get_page_zip(new_block); + if (page_level && UNIV_LIKELY_NULL(new_page_zip)) { + /* ROW_FORMAT=COMPRESSED non-leaf pages are not expected + to contain FIL_NULL in FIL_PAGE_PREV at this stage. */ + memset_aligned<4>(new_block->frame + FIL_PAGE_PREV, 0, 4); + } + btr_page_create(new_block, new_page_zip, cursor->index, + page_level, mtr); + + new_page = buf_block_get_frame(new_block); + ut_ad(page_get_ssn_id(new_page) == 0); + + /* Set new ssn to the new page and page. */ + page_set_ssn_id(new_block, new_page_zip, current_ssn, mtr); + next_ssn = rtr_get_new_ssn_id(cursor->index); + + page_set_ssn_id(block, page_zip, next_ssn, mtr); + + /* Keep recs in first group to the old page, move recs in second + groups to the new page. */ + if (0 +#ifdef UNIV_ZIP_COPY + || page_zip +#endif + || !rtr_split_page_move_rec_list(rtr_split_node_array, + first_rec_group, + new_block, block, first_rec, + cursor->index, *heap, mtr)) { + ulint n = 0; + rec_t* rec; + ulint moved = 0; + ulint max_to_move = 0; + rtr_rec_move_t* rec_move = NULL; + ulint pos; + + /* For some reason, compressing new_page failed, + even though it should contain fewer records than + the original page. Copy the page byte for byte + and then delete the records from both pages + as appropriate. Deleting will always succeed. */ + ut_a(new_page_zip); + + page_zip_copy_recs(new_block, + page_zip, page, cursor->index, mtr); + + page_cursor = btr_cur_get_page_cur(cursor); + + /* Move locks on recs. */ + max_to_move = page_get_n_recs(page); + rec_move = static_cast(mem_heap_alloc( + *heap, + sizeof (*rec_move) * max_to_move)); + + /* Init the rec_move array for moving lock on recs. */ + for (cur_split_node = rtr_split_node_array; + cur_split_node < end_split_node - 1; ++cur_split_node) { + if (cur_split_node->n_node != first_rec_group) { + pos = page_rec_get_n_recs_before( + cur_split_node->key); + rec = page_rec_get_nth(new_page, pos); + ut_a(rec); + + rec_move[moved].new_rec = rec; + rec_move[moved].old_rec = cur_split_node->key; + rec_move[moved].moved = false; + moved++; + + if (moved > max_to_move) { + ut_ad(0); + break; + } + } + } + + /* Update the lock table */ + lock_rtr_move_rec_list(new_block, block, rec_move, moved); + + const ulint n_core = page_level + ? 0 : cursor->index->n_core_fields; + + /* Delete recs in first group from the new page. */ + for (cur_split_node = rtr_split_node_array; + cur_split_node < end_split_node - 1; ++cur_split_node) { + if (cur_split_node->n_node == first_rec_group) { + ulint pos; + + pos = page_rec_get_n_recs_before( + cur_split_node->key); + ut_a(pos > 0); + rec_t* new_rec = page_rec_get_nth(new_page, + pos - n); + + ut_a(new_rec && page_rec_is_user_rec(new_rec)); + page_cur_position(new_rec, new_block, + page_cursor); + + *offsets = rec_get_offsets( + page_cur_get_rec(page_cursor), + cursor->index, *offsets, n_core, + ULINT_UNDEFINED, heap); + + page_cur_delete_rec(page_cursor, + cursor->index, *offsets, mtr); + n++; + } + } + + /* Delete recs in second group from the old page. */ + for (cur_split_node = rtr_split_node_array; + cur_split_node < end_split_node - 1; ++cur_split_node) { + if (cur_split_node->n_node != first_rec_group) { + page_cur_position(cur_split_node->key, + block, page_cursor); + *offsets = rec_get_offsets( + page_cur_get_rec(page_cursor), + cursor->index, *offsets, n_core, + ULINT_UNDEFINED, heap); + page_cur_delete_rec(page_cursor, + cursor->index, *offsets, mtr); + } + } + +#ifdef UNIV_GIS_DEBUG + ut_ad(page_validate(new_page, cursor->index)); + ut_ad(page_validate(page, cursor->index)); +#endif + } + + /* Insert the new rec to the proper page. */ + cur_split_node = end_split_node - 1; + if (cur_split_node->n_node != first_rec_group) { + insert_block = new_block; + } else { + insert_block = block; + } + + /* Reposition the cursor for insert and try insertion */ + page_cursor = btr_cur_get_page_cur(cursor); + + page_cur_search(insert_block, cursor->index, tuple, + PAGE_CUR_LE, page_cursor); + + /* It's possible that the new record is too big to be inserted into + the page, and it'll need the second round split in this case. + We test this scenario here*/ + DBUG_EXECUTE_IF("rtr_page_need_second_split", + if (n_iterations == 0) { + rec = NULL; + goto after_insert; } + ); + + rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, + offsets, heap, n_ext, mtr); + + /* If insert did not fit, try page reorganization. + For compressed pages, page_cur_tuple_insert() will have + attempted this already. */ + if (rec == NULL) { + if (!is_page_cur_get_page_zip(page_cursor) + && btr_page_reorganize(page_cursor, cursor->index, mtr)) { + rec = page_cur_tuple_insert(page_cursor, tuple, + cursor->index, offsets, + heap, n_ext, mtr); + + } + /* If insert fail, we will try to split the insert_block + again. */ + } + +#ifdef UNIV_DEBUG +after_insert: +#endif + /* Calculate the mbr on the upper half-page, and the mbr on + original page. */ + rtr_page_cal_mbr(cursor->index, block, &mbr, *heap); + rtr_page_cal_mbr(cursor->index, new_block, &new_mbr, *heap); + prdt.data = &mbr; + new_prdt.data = &new_mbr; + + /* Check any predicate locks need to be moved/copied to the + new page */ + lock_prdt_update_split(new_block, &prdt, &new_prdt, page_id); + + /* Adjust the upper level. */ + rtr_adjust_upper_level(cursor, flags, block, new_block, + &mbr, &new_mbr, mtr); + + /* Save the new ssn to the root page, since we need to reinit + the first ssn value from it after restart server. */ + + root_block = btr_root_block_get(cursor->index, RW_SX_LATCH, mtr); + + page_zip = buf_block_get_page_zip(root_block); + page_set_ssn_id(root_block, page_zip, next_ssn, mtr); + + /* Insert fit on the page: update the free bits for the + left and right pages in the same mtr */ + + if (page_is_leaf(page)) { + ibuf_update_free_bits_for_two_pages_low( + block, new_block, mtr); + } + + + /* If the new res insert fail, we need to do another split + again. */ + if (!rec) { + /* We play safe and reset the free bits for new_page */ + if (!dict_index_is_clust(cursor->index) + && !cursor->index->table->is_temporary()) { + ibuf_reset_free_bits(new_block); + ibuf_reset_free_bits(block); + } + + /* We need to clean the parent path here and search father + node later, otherwise, it's possible that find a wrong + parent. */ + rtr_clean_rtr_info(cursor->rtr_info, true); + cursor->rtr_info = NULL; + n_iterations++; + + rec_t* i_rec = page_rec_get_next(page_get_infimum_rec( + buf_block_get_frame(block))); + btr_cur_position(cursor->index, i_rec, block, cursor); + + goto func_start; + } + +#ifdef UNIV_GIS_DEBUG + ut_ad(page_validate(buf_block_get_frame(block), cursor->index)); + ut_ad(page_validate(buf_block_get_frame(new_block), cursor->index)); + + ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets)); +#endif + MONITOR_INC(MONITOR_INDEX_SPLIT); + + return(rec); +} + +/****************************************************************//** +Following the right link to find the proper block for insert. +@return the proper block.*/ +dberr_t +rtr_ins_enlarge_mbr( +/*================*/ + btr_cur_t* btr_cur, /*!< in: btr cursor */ + mtr_t* mtr) /*!< in: mtr */ +{ + dberr_t err = DB_SUCCESS; + rtr_mbr_t new_mbr; + buf_block_t* block; + mem_heap_t* heap; + dict_index_t* index = btr_cur->index; + page_cur_t* page_cursor; + rec_offs* offsets; + node_visit_t* node_visit; + btr_cur_t cursor; + page_t* page; + + ut_ad(dict_index_is_spatial(index)); + + /* If no rtr_info or rtree is one level tree, return. */ + if (!btr_cur->rtr_info || btr_cur->tree_height == 1) { + return(err); + } + + /* Check path info is not empty. */ + ut_ad(!btr_cur->rtr_info->parent_path->empty()); + + /* Create a memory heap. */ + heap = mem_heap_create(1024); + + /* Leaf level page is stored in cursor */ + page_cursor = btr_cur_get_page_cur(btr_cur); + block = page_cur_get_block(page_cursor); + + for (ulint i = 1; i < btr_cur->tree_height; i++) { + node_visit = rtr_get_parent_node(btr_cur, i, true); + ut_ad(node_visit != NULL); + + /* If there's no mbr enlarge, return.*/ + if (node_visit->mbr_inc == 0) { + block = btr_pcur_get_block(node_visit->cursor); + continue; + } + + /* Calculate the mbr of the child page. */ + rtr_page_cal_mbr(index, block, &new_mbr, heap); + + /* Get father block. */ + cursor.init(); + offsets = rtr_page_get_father_block( + NULL, heap, index, block, mtr, btr_cur, &cursor); + + page = buf_block_get_frame(block); + + /* Update the mbr field of the rec. */ + if (!rtr_update_mbr_field(&cursor, offsets, NULL, page, + &new_mbr, NULL, mtr)) { + err = DB_ERROR; + break; + } + + page_cursor = btr_cur_get_page_cur(&cursor); + block = page_cur_get_block(page_cursor); + } + + mem_heap_free(heap); + + return(err); +} + +/*************************************************************//** +Copy recs from a page to new_block of rtree. +Differs from page_copy_rec_list_end, because this function does not +touch the lock table and max trx id on page or compress the page. + +IMPORTANT: The caller will have to update IBUF_BITMAP_FREE +if new_block is a compressed leaf page in a secondary index. +This has to be done either within the same mini-transaction, +or by invoking ibuf_reset_free_bits() before mtr_commit(). */ +void +rtr_page_copy_rec_list_end_no_locks( +/*================================*/ + buf_block_t* new_block, /*!< in: index page to copy to */ + buf_block_t* block, /*!< in: index page of rec */ + rec_t* rec, /*!< in: record on page */ + dict_index_t* index, /*!< in: record descriptor */ + mem_heap_t* heap, /*!< in/out: heap memory */ + rtr_rec_move_t* rec_move, /*!< in: recording records moved */ + ulint max_move, /*!< in: num of rec to move */ + ulint* num_moved, /*!< out: num of rec to move */ + mtr_t* mtr) /*!< in: mtr */ +{ + page_t* new_page = buf_block_get_frame(new_block); + page_cur_t page_cur; + page_cur_t cur1; + rec_t* cur_rec; + rec_offs offsets_1[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets1 = offsets_1; + rec_offs offsets_2[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets2 = offsets_2; + ulint moved = 0; + const ulint n_core = page_is_leaf(new_page) + ? index->n_core_fields : 0; + + rec_offs_init(offsets_1); + rec_offs_init(offsets_2); + + page_cur_position(rec, block, &cur1); + + if (page_cur_is_before_first(&cur1)) { + page_cur_move_to_next(&cur1); + } + + btr_assert_not_corrupted(new_block, index); + ut_a(page_is_comp(new_page) == page_rec_is_comp(rec)); + ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint) + (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM)); + + cur_rec = page_rec_get_next( + page_get_infimum_rec(buf_block_get_frame(new_block))); + page_cur_position(cur_rec, new_block, &page_cur); + + /* Copy records from the original page to the new page */ + while (!page_cur_is_after_last(&cur1)) { + rec_t* cur1_rec = page_cur_get_rec(&cur1); + rec_t* ins_rec; + + if (page_rec_is_infimum(cur_rec)) { + cur_rec = page_rec_get_next(cur_rec); + } + + offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core, + ULINT_UNDEFINED, &heap); + while (!page_rec_is_supremum(cur_rec)) { + ulint cur_matched_fields = 0; + int cmp; + + offsets2 = rec_get_offsets(cur_rec, index, offsets2, + n_core, + ULINT_UNDEFINED, &heap); + cmp = cmp_rec_rec(cur1_rec, cur_rec, + offsets1, offsets2, index, false, + &cur_matched_fields); + if (cmp < 0) { + page_cur_move_to_prev(&page_cur); + break; + } else if (cmp > 0) { + /* Skip small recs. */ + page_cur_move_to_next(&page_cur); + cur_rec = page_cur_get_rec(&page_cur); + } else if (n_core) { + if (rec_get_deleted_flag(cur1_rec, + dict_table_is_comp(index->table))) { + goto next; + } else { + /* We have two identical leaf records, + skip copying the undeleted one, and + unmark deleted on the current page */ + btr_rec_set_deleted( + new_block, cur_rec, mtr); + goto next; + } + } + } + + /* If position is on suprenum rec, need to move to + previous rec. */ + if (page_rec_is_supremum(cur_rec)) { + page_cur_move_to_prev(&page_cur); + } + + cur_rec = page_cur_get_rec(&page_cur); + + offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core, + ULINT_UNDEFINED, &heap); + + ins_rec = page_cur_insert_rec_low(&page_cur, index, + cur1_rec, offsets1, mtr); + if (UNIV_UNLIKELY(!ins_rec)) { + fprintf(stderr, "page number %u and %u\n", + new_block->page.id().page_no(), + block->page.id().page_no()); + + ib::fatal() << "rec offset " << page_offset(rec) + << ", cur1 offset " + << page_offset(page_cur_get_rec(&cur1)) + << ", cur_rec offset " + << page_offset(cur_rec); + } + + rec_move[moved].new_rec = ins_rec; + rec_move[moved].old_rec = cur1_rec; + rec_move[moved].moved = false; + moved++; +next: + if (moved > max_move) { + ut_ad(0); + break; + } + + page_cur_move_to_next(&cur1); + } + + *num_moved = moved; +} + +/*************************************************************//** +Copy recs till a specified rec from a page to new_block of rtree. */ +void +rtr_page_copy_rec_list_start_no_locks( +/*==================================*/ + buf_block_t* new_block, /*!< in: index page to copy to */ + buf_block_t* block, /*!< in: index page of rec */ + rec_t* rec, /*!< in: record on page */ + dict_index_t* index, /*!< in: record descriptor */ + mem_heap_t* heap, /*!< in/out: heap memory */ + rtr_rec_move_t* rec_move, /*!< in: recording records moved */ + ulint max_move, /*!< in: num of rec to move */ + ulint* num_moved, /*!< out: num of rec to move */ + mtr_t* mtr) /*!< in: mtr */ +{ + page_cur_t cur1; + rec_t* cur_rec; + rec_offs offsets_1[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets1 = offsets_1; + rec_offs offsets_2[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets2 = offsets_2; + page_cur_t page_cur; + ulint moved = 0; + const ulint n_core = page_is_leaf(buf_block_get_frame(block)) + ? index->n_core_fields : 0; + + rec_offs_init(offsets_1); + rec_offs_init(offsets_2); + + page_cur_set_before_first(block, &cur1); + page_cur_move_to_next(&cur1); + + cur_rec = page_rec_get_next( + page_get_infimum_rec(buf_block_get_frame(new_block))); + page_cur_position(cur_rec, new_block, &page_cur); + + while (page_cur_get_rec(&cur1) != rec) { + rec_t* cur1_rec = page_cur_get_rec(&cur1); + rec_t* ins_rec; + + if (page_rec_is_infimum(cur_rec)) { + cur_rec = page_rec_get_next(cur_rec); + } + + offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core, + ULINT_UNDEFINED, &heap); + + while (!page_rec_is_supremum(cur_rec)) { + ulint cur_matched_fields = 0; + + offsets2 = rec_get_offsets(cur_rec, index, offsets2, + n_core, + ULINT_UNDEFINED, &heap); + int cmp = cmp_rec_rec(cur1_rec, cur_rec, + offsets1, offsets2, index, false, + &cur_matched_fields); + if (cmp < 0) { + page_cur_move_to_prev(&page_cur); + cur_rec = page_cur_get_rec(&page_cur); + break; + } else if (cmp > 0) { + /* Skip small recs. */ + page_cur_move_to_next(&page_cur); + cur_rec = page_cur_get_rec(&page_cur); + } else if (n_core) { + if (rec_get_deleted_flag( + cur1_rec, + dict_table_is_comp(index->table))) { + goto next; + } else { + /* We have two identical leaf records, + skip copying the undeleted one, and + unmark deleted on the current page */ + btr_rec_set_deleted( + new_block, cur_rec, mtr); + goto next; + } + } + } + + /* If position is on suprenum rec, need to move to + previous rec. */ + if (page_rec_is_supremum(cur_rec)) { + page_cur_move_to_prev(&page_cur); + } + + cur_rec = page_cur_get_rec(&page_cur); + + offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core, + ULINT_UNDEFINED, &heap); + + ins_rec = page_cur_insert_rec_low(&page_cur, index, + cur1_rec, offsets1, mtr); + if (UNIV_UNLIKELY(!ins_rec)) { + ib::fatal() << new_block->page.id() + << "rec offset " << page_offset(rec) + << ", cur1 offset " + << page_offset(page_cur_get_rec(&cur1)) + << ", cur_rec offset " + << page_offset(cur_rec); + } + + rec_move[moved].new_rec = ins_rec; + rec_move[moved].old_rec = cur1_rec; + rec_move[moved].moved = false; + moved++; +next: + if (moved > max_move) { + ut_ad(0); + break; + } + + page_cur_move_to_next(&cur1); + } + + *num_moved = moved; +} + +/****************************************************************//** +Check two MBRs are identical or need to be merged */ +bool +rtr_merge_mbr_changed( +/*==================*/ + btr_cur_t* cursor, /*!< in/out: cursor */ + btr_cur_t* cursor2, /*!< in: the other cursor */ + rec_offs* offsets, /*!< in: rec offsets */ + rec_offs* offsets2, /*!< in: rec offsets */ + rtr_mbr_t* new_mbr) /*!< out: MBR to update */ +{ + double* mbr; + double mbr1[SPDIMS * 2]; + double mbr2[SPDIMS * 2]; + rec_t* rec; + ulint len; + bool changed = false; + + ut_ad(dict_index_is_spatial(cursor->index)); + + rec = btr_cur_get_rec(cursor); + + rtr_read_mbr(rec_get_nth_field(rec, offsets, 0, &len), + reinterpret_cast(mbr1)); + + rec = btr_cur_get_rec(cursor2); + + rtr_read_mbr(rec_get_nth_field(rec, offsets2, 0, &len), + reinterpret_cast(mbr2)); + + mbr = reinterpret_cast(new_mbr); + + for (int i = 0; i < SPDIMS * 2; i += 2) { + changed = (changed || mbr1[i] != mbr2[i]); + *mbr = mbr1[i] < mbr2[i] ? mbr1[i] : mbr2[i]; + mbr++; + changed = (changed || mbr1[i + 1] != mbr2 [i + 1]); + *mbr = mbr1[i + 1] > mbr2[i + 1] ? mbr1[i + 1] : mbr2[i + 1]; + mbr++; + } + + return(changed); +} + +/****************************************************************//** +Merge 2 mbrs and update the the mbr that cursor is on. */ +dberr_t +rtr_merge_and_update_mbr( +/*=====================*/ + btr_cur_t* cursor, /*!< in/out: cursor */ + btr_cur_t* cursor2, /*!< in: the other cursor */ + rec_offs* offsets, /*!< in: rec offsets */ + rec_offs* offsets2, /*!< in: rec offsets */ + page_t* child_page, /*!< in: the page. */ + mtr_t* mtr) /*!< in: mtr */ +{ + dberr_t err = DB_SUCCESS; + rtr_mbr_t new_mbr; + bool changed = false; + + ut_ad(dict_index_is_spatial(cursor->index)); + + changed = rtr_merge_mbr_changed(cursor, cursor2, offsets, offsets2, + &new_mbr); + + /* Update the mbr field of the rec. And will delete the record + pointed by cursor2 */ + if (changed) { + if (!rtr_update_mbr_field(cursor, offsets, cursor2, child_page, + &new_mbr, NULL, mtr)) { + err = DB_ERROR; + } + } else { + rtr_node_ptr_delete(cursor2, mtr); + } + + return(err); +} + +/*************************************************************//** +Deletes on the upper level the node pointer to a page. */ +void +rtr_node_ptr_delete( +/*================*/ + btr_cur_t* cursor, /*!< in: search cursor, contains information + about parent nodes in search */ + mtr_t* mtr) /*!< in: mtr */ +{ + ibool compressed; + dberr_t err; + + compressed = btr_cur_pessimistic_delete(&err, TRUE, cursor, + BTR_CREATE_FLAG, false, mtr); + ut_a(err == DB_SUCCESS); + + if (!compressed) { + btr_cur_compress_if_useful(cursor, FALSE, mtr); + } +} + +/**************************************************************//** +Check whether a Rtree page is child of a parent page +@return true if there is child/parent relationship */ +bool +rtr_check_same_block( +/*================*/ + dict_index_t* index, /*!< in: index tree */ + btr_cur_t* cursor, /*!< in/out: position at the parent entry + pointing to the child if successful */ + buf_block_t* parentb,/*!< in: parent page to check */ + buf_block_t* childb, /*!< in: child Page */ + mem_heap_t* heap) /*!< in: memory heap */ + +{ + ulint page_no = childb->page.id().page_no(); + rec_offs* offsets; + rec_t* rec = page_rec_get_next(page_get_infimum_rec( + buf_block_get_frame(parentb))); + + while (!page_rec_is_supremum(rec)) { + offsets = rec_get_offsets( + rec, index, NULL, 0, ULINT_UNDEFINED, &heap); + + if (btr_node_ptr_get_child_page_no(rec, offsets) == page_no) { + btr_cur_position(index, rec, parentb, cursor); + return(true); + } + + rec = page_rec_get_next(rec); + } + + return(false); +} + +/*************************************************************//** +Calculates MBR_AREA(a+b) - MBR_AREA(a) +Note: when 'a' and 'b' objects are far from each other, +the area increase can be really big, so this function +can return 'inf' as a result. +Return the area increaed. */ +static double +rtree_area_increase( + const uchar* a, /*!< in: original mbr. */ + const uchar* b, /*!< in: new mbr. */ + double* ab_area) /*!< out: increased area. */ +{ + double a_area = 1.0; + double loc_ab_area = 1.0; + double amin, amax, bmin, bmax; + double data_round = 1.0; + + static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double), + "compatibility"); + + for (auto i = SPDIMS; i--; ) { + double area; + + amin = mach_double_read(a); + bmin = mach_double_read(b); + amax = mach_double_read(a + sizeof(double)); + bmax = mach_double_read(b + sizeof(double)); + + a += 2 * sizeof(double); + b += 2 * sizeof(double); + + area = amax - amin; + if (area == 0) { + a_area *= LINE_MBR_WEIGHTS; + } else { + a_area *= area; + } + + area = (double)std::max(amax, bmax) - + (double)std::min(amin, bmin); + if (area == 0) { + loc_ab_area *= LINE_MBR_WEIGHTS; + } else { + loc_ab_area *= area; + } + + /* Value of amax or bmin can be so large that small difference + are ignored. For example: 3.2884281489988079e+284 - 100 = + 3.2884281489988079e+284. This results some area difference + are not detected */ + if (loc_ab_area == a_area) { + if (bmin < amin || bmax > amax) { + data_round *= ((double)std::max(amax, bmax) + - amax + + (amin - (double)std::min( + amin, bmin))); + } else { + data_round *= area; + } + } + } + + *ab_area = loc_ab_area; + + if (loc_ab_area == a_area && data_round != 1.0) { + return(data_round); + } + + return(loc_ab_area - a_area); +} + +/** Calculates overlapping area +@param[in] a mbr a +@param[in] b mbr b +@return overlapping area */ +static double rtree_area_overlapping(const byte *a, const byte *b) +{ + double area = 1.0; + double amin; + double amax; + double bmin; + double bmax; + + static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double), + "compatibility"); + + for (auto i = SPDIMS; i--; ) { + amin = mach_double_read(a); + bmin = mach_double_read(b); + amax = mach_double_read(a + sizeof(double)); + bmax = mach_double_read(b + sizeof(double)); + a += 2 * sizeof(double); + b += 2 * sizeof(double); + + amin = std::max(amin, bmin); + amax = std::min(amax, bmax); + + if (amin > amax) { + return(0); + } else { + area *= (amax - amin); + } + } + + return(area); +} + +/****************************************************************//** +Calculate the area increased for a new record +@return area increased */ +double +rtr_rec_cal_increase( +/*=================*/ + const dtuple_t* dtuple, /*!< in: data tuple to insert, which + cause area increase */ + const rec_t* rec, /*!< in: physical record which differs from + dtuple in some of the common fields, or which + has an equal number or more fields than + dtuple */ + double* area) /*!< out: increased area */ +{ + const dfield_t* dtuple_field; + + ut_ad(!page_rec_is_supremum(rec)); + ut_ad(!page_rec_is_infimum(rec)); + + dtuple_field = dtuple_get_nth_field(dtuple, 0); + ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN); + + return rtree_area_increase(rec, + static_cast( + dfield_get_data(dtuple_field)), + area); +} + +/** Estimates the number of rows in a given area. +@param[in] index index +@param[in] tuple range tuple containing mbr, may also be empty tuple +@param[in] mode search mode +@return estimated number of rows */ +ha_rows +rtr_estimate_n_rows_in_range( + dict_index_t* index, + const dtuple_t* tuple, + page_cur_mode_t mode) +{ + ut_ad(dict_index_is_spatial(index)); + + /* Check tuple & mode */ + if (tuple->n_fields == 0) { + return(HA_POS_ERROR); + } + + switch (mode) { + case PAGE_CUR_DISJOINT: + case PAGE_CUR_CONTAIN: + case PAGE_CUR_INTERSECT: + case PAGE_CUR_WITHIN: + case PAGE_CUR_MBR_EQUAL: + break; + default: + return(HA_POS_ERROR); + } + + DBUG_EXECUTE_IF("rtr_pcur_move_to_next_return", + return(2); + ); + + /* Read mbr from tuple. */ + rtr_mbr_t range_mbr; + double range_area; + + const dfield_t* dtuple_field = dtuple_get_nth_field(tuple, 0); + ut_ad(dfield_get_len(dtuple_field) >= DATA_MBR_LEN); + const byte* range_mbr_ptr = reinterpret_cast( + dfield_get_data(dtuple_field)); + + rtr_read_mbr(range_mbr_ptr, &range_mbr); + range_area = (range_mbr.xmax - range_mbr.xmin) + * (range_mbr.ymax - range_mbr.ymin); + + /* Get index root page. */ + mtr_t mtr; + + mtr.start(); + index->set_modified(mtr); + mtr_s_lock_index(index, &mtr); + + buf_block_t* block = btr_root_block_get(index, RW_S_LATCH, &mtr); + if (!block) { +err_exit: + mtr.commit(); + return HA_POS_ERROR; + } + const page_t* page = buf_block_get_frame(block); + const unsigned n_recs = page_header_get_field(page, PAGE_N_RECS); + + if (n_recs == 0) { + goto err_exit; + } + + /* Scan records in root page and calculate area. */ + double area = 0; + for (const rec_t* rec = page_rec_get_next( + page_get_infimum_rec(block->frame)); + !page_rec_is_supremum(rec); + rec = page_rec_get_next_const(rec)) { + rtr_mbr_t mbr; + double rec_area; + + rtr_read_mbr(rec, &mbr); + + rec_area = (mbr.xmax - mbr.xmin) * (mbr.ymax - mbr.ymin); + + if (rec_area == 0) { + switch (mode) { + case PAGE_CUR_CONTAIN: + case PAGE_CUR_INTERSECT: + area += 1; + break; + + case PAGE_CUR_DISJOINT: + break; + + case PAGE_CUR_WITHIN: + case PAGE_CUR_MBR_EQUAL: + if (!rtree_key_cmp( + PAGE_CUR_WITHIN, range_mbr_ptr, + rec)) { + area += 1; + } + + break; + + default: + ut_error; + } + } else { + switch (mode) { + case PAGE_CUR_CONTAIN: + case PAGE_CUR_INTERSECT: + area += rtree_area_overlapping( + range_mbr_ptr, rec) + / rec_area; + break; + + case PAGE_CUR_DISJOINT: + area += 1; + area -= rtree_area_overlapping( + range_mbr_ptr, rec) + / rec_area; + break; + + case PAGE_CUR_WITHIN: + case PAGE_CUR_MBR_EQUAL: + if (!rtree_key_cmp( + PAGE_CUR_WITHIN, range_mbr_ptr, + rec)) { + area += range_area / rec_area; + } + + break; + default: + ut_error; + } + } + } + + mtr.commit(); + + if (!std::isfinite(area)) { + return(HA_POS_ERROR); + } + + area /= n_recs; + return ha_rows(static_cast(dict_table_get_n_rows(index->table)) + * area); +} diff --git a/storage/innobase/gis/gis0sea.cc b/storage/innobase/gis/gis0sea.cc new file mode 100644 index 00000000..1c22aab4 --- /dev/null +++ b/storage/innobase/gis/gis0sea.cc @@ -0,0 +1,2052 @@ +/***************************************************************************** + +Copyright (c) 2016, 2018, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2017, 2021, MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file gis/gis0sea.cc +InnoDB R-tree search interfaces + +Created 2014/01/16 Jimmy Yang +***********************************************************************/ + +#include "fsp0fsp.h" +#include "page0page.h" +#include "page0cur.h" +#include "page0zip.h" +#include "gis0rtree.h" +#include "btr0cur.h" +#include "btr0sea.h" +#include "btr0pcur.h" +#include "rem0cmp.h" +#include "lock0lock.h" +#include "ibuf0ibuf.h" +#include "trx0trx.h" +#include "srv0mon.h" +#include "que0que.h" +#include "gis0geo.h" + +/** Restore the stored position of a persistent cursor bufferfixing the page */ +static +bool +rtr_cur_restore_position( + ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */ + btr_cur_t* cursor, /*!< in: detached persistent cursor */ + ulint level, /*!< in: index level */ + mtr_t* mtr); /*!< in: mtr */ + +/*************************************************************//** +Pop out used parent path entry, until we find the parent with matching +page number */ +static +void +rtr_adjust_parent_path( +/*===================*/ + rtr_info_t* rtr_info, /* R-Tree info struct */ + ulint page_no) /* page number to look for */ +{ + while (!rtr_info->parent_path->empty()) { + if (rtr_info->parent_path->back().child_no == page_no) { + break; + } else { + if (rtr_info->parent_path->back().cursor) { + btr_pcur_close( + rtr_info->parent_path->back().cursor); + ut_free(rtr_info->parent_path->back().cursor); + } + + rtr_info->parent_path->pop_back(); + } + } +} + +/*************************************************************//** +Find the next matching record. This function is used by search +or record locating during index delete/update. +@return true if there is suitable record found, otherwise false */ +static +bool +rtr_pcur_getnext_from_path( +/*=======================*/ + const dtuple_t* tuple, /*!< in: data tuple */ + page_cur_mode_t mode, /*!< in: cursor search mode */ + btr_cur_t* btr_cur,/*!< in: persistent cursor; NOTE that the + function may release the page latch */ + ulint target_level, + /*!< in: target level */ + ulint latch_mode, + /*!< in: latch_mode */ + bool index_locked, + /*!< in: index tree locked */ + mtr_t* mtr) /*!< in: mtr */ +{ + dict_index_t* index = btr_cur->index; + bool found = false; + page_cur_t* page_cursor; + ulint level = 0; + node_visit_t next_rec; + rtr_info_t* rtr_info = btr_cur->rtr_info; + node_seq_t page_ssn; + ulint my_latch_mode; + ulint skip_parent = false; + bool new_split = false; + bool need_parent; + bool for_delete = false; + bool for_undo_ins = false; + + /* exhausted all the pages to be searched */ + if (rtr_info->path->empty()) { + return(false); + } + + ut_ad(dtuple_get_n_fields_cmp(tuple)); + + my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode); + + for_delete = latch_mode & BTR_RTREE_DELETE_MARK; + for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS; + + /* There should be no insert coming to this function. Only + mode with BTR_MODIFY_* should be delete */ + ut_ad(mode != PAGE_CUR_RTREE_INSERT); + ut_ad(my_latch_mode == BTR_SEARCH_LEAF + || my_latch_mode == BTR_MODIFY_LEAF + || my_latch_mode == BTR_MODIFY_TREE + || my_latch_mode == BTR_CONT_MODIFY_TREE); + + /* Whether need to track parent information. Only need so + when we do tree altering operations (such as index page merge) */ + need_parent = ((my_latch_mode == BTR_MODIFY_TREE + || my_latch_mode == BTR_CONT_MODIFY_TREE) + && mode == PAGE_CUR_RTREE_LOCATE); + + if (!index_locked) { + ut_ad(latch_mode & BTR_SEARCH_LEAF + || latch_mode & BTR_MODIFY_LEAF); + mtr_s_lock_index(index, mtr); + } else { + ut_ad(mtr->memo_contains_flagged(&index->lock, + MTR_MEMO_SX_LOCK + | MTR_MEMO_S_LOCK + | MTR_MEMO_X_LOCK)); + } + + const ulint zip_size = index->table->space->zip_size(); + + /* Pop each node/page to be searched from "path" structure + and do a search on it. Please note, any pages that are in + the "path" structure are protected by "page" lock, so tey + cannot be shrunk away */ + do { + buf_block_t* block; + node_seq_t path_ssn; + const page_t* page; + ulint rw_latch = RW_X_LATCH; + ulint tree_idx; + + mutex_enter(&rtr_info->rtr_path_mutex); + next_rec = rtr_info->path->back(); + rtr_info->path->pop_back(); + level = next_rec.level; + path_ssn = next_rec.seq_no; + tree_idx = btr_cur->tree_height - level - 1; + + /* Maintain the parent path info as well, if needed */ + if (need_parent && !skip_parent && !new_split) { + ulint old_level; + ulint new_level; + + ut_ad(!rtr_info->parent_path->empty()); + + /* Cleanup unused parent info */ + if (rtr_info->parent_path->back().cursor) { + btr_pcur_close( + rtr_info->parent_path->back().cursor); + ut_free(rtr_info->parent_path->back().cursor); + } + + old_level = rtr_info->parent_path->back().level; + + rtr_info->parent_path->pop_back(); + + ut_ad(!rtr_info->parent_path->empty()); + + /* check whether there is a level change. If so, + the current parent path needs to pop enough + nodes to adjust to the new search page */ + new_level = rtr_info->parent_path->back().level; + + if (old_level < new_level) { + rtr_adjust_parent_path( + rtr_info, next_rec.page_no); + } + + ut_ad(!rtr_info->parent_path->empty()); + + ut_ad(next_rec.page_no + == rtr_info->parent_path->back().child_no); + } + + mutex_exit(&rtr_info->rtr_path_mutex); + + skip_parent = false; + new_split = false; + + /* Once we have pages in "path", these pages are + predicate page locked, so they can't be shrunk away. + They also have SSN (split sequence number) to detect + splits, so we can directly latch single page while + getting them. They can be unlatched if not qualified. + One reason for pre-latch is that we might need to position + some parent position (requires latch) during search */ + if (level == 0) { + /* S latched for SEARCH_LEAF, and X latched + for MODIFY_LEAF */ + if (my_latch_mode <= BTR_MODIFY_LEAF) { + rw_latch = my_latch_mode; + } + + if (my_latch_mode == BTR_CONT_MODIFY_TREE + || my_latch_mode == BTR_MODIFY_TREE) { + rw_latch = RW_NO_LATCH; + } + + } else if (level == target_level) { + rw_latch = RW_X_LATCH; + } + + /* Release previous locked blocks */ + if (my_latch_mode != BTR_SEARCH_LEAF) { + for (ulint idx = 0; idx < btr_cur->tree_height; + idx++) { + if (rtr_info->tree_blocks[idx]) { + mtr_release_block_at_savepoint( + mtr, + rtr_info->tree_savepoints[idx], + rtr_info->tree_blocks[idx]); + rtr_info->tree_blocks[idx] = NULL; + } + } + for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3; + idx++) { + if (rtr_info->tree_blocks[idx]) { + mtr_release_block_at_savepoint( + mtr, + rtr_info->tree_savepoints[idx], + rtr_info->tree_blocks[idx]); + rtr_info->tree_blocks[idx] = NULL; + } + } + } + + /* set up savepoint to record any locks to be taken */ + rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr); + +#ifdef UNIV_RTR_DEBUG + ut_ad(!(rw_lock_own_flagged(&btr_cur->page_cur.block->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_S)) + || my_latch_mode == BTR_MODIFY_TREE + || my_latch_mode == BTR_CONT_MODIFY_TREE + || !page_is_leaf(buf_block_get_frame( + btr_cur->page_cur.block))); +#endif /* UNIV_RTR_DEBUG */ + + dberr_t err = DB_SUCCESS; + + block = buf_page_get_gen( + page_id_t(index->table->space_id, + next_rec.page_no), zip_size, + rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr, &err); + + if (block == NULL) { + continue; + } else if (rw_latch != RW_NO_LATCH) { + ut_ad(!dict_index_is_ibuf(index)); + buf_block_dbg_add_level(block, SYNC_TREE_NODE); + } + + rtr_info->tree_blocks[tree_idx] = block; + + page = buf_block_get_frame(block); + page_ssn = page_get_ssn_id(page); + + /* If there are splits, push the splitted page. + Note that we have SX lock on index->lock, there + should not be any split/shrink happening here */ + if (page_ssn > path_ssn) { + uint32_t next_page_no = btr_page_get_next(page); + rtr_non_leaf_stack_push( + rtr_info->path, next_page_no, path_ssn, + level, 0, NULL, 0); + + if (!srv_read_only_mode + && mode != PAGE_CUR_RTREE_INSERT + && mode != PAGE_CUR_RTREE_LOCATE) { + ut_ad(rtr_info->thr); + lock_place_prdt_page_lock( + page_id_t(block->page.id().space(), + next_page_no), + index, + rtr_info->thr); + } + new_split = true; +#if defined(UNIV_GIS_DEBUG) + fprintf(stderr, + "GIS_DIAG: Splitted page found: %d, %ld\n", + static_cast(need_parent), next_page_no); +#endif + } + + page_cursor = btr_cur_get_page_cur(btr_cur); + page_cursor->rec = NULL; + + if (mode == PAGE_CUR_RTREE_LOCATE) { + if (level == target_level && level == 0) { + ulint low_match; + + found = false; + + low_match = page_cur_search( + block, index, tuple, + PAGE_CUR_LE, + btr_cur_get_page_cur(btr_cur)); + + if (low_match == dtuple_get_n_fields_cmp( + tuple)) { + rec_t* rec = btr_cur_get_rec(btr_cur); + + if (!rec_get_deleted_flag(rec, + dict_table_is_comp(index->table)) + || (!for_delete && !for_undo_ins)) { + found = true; + btr_cur->low_match = low_match; + } else { + /* mark we found deleted row */ + btr_cur->rtr_info->fd_del + = true; + } + } + } else { + page_cur_mode_t page_mode = mode; + + if (level == target_level + && target_level != 0) { + page_mode = PAGE_CUR_RTREE_GET_FATHER; + } + found = rtr_cur_search_with_match( + block, index, tuple, page_mode, + page_cursor, btr_cur->rtr_info); + + /* Save the position of parent if needed */ + if (found && need_parent) { + btr_pcur_t* r_cursor = + rtr_get_parent_cursor( + btr_cur, level, false); + + rec_t* rec = page_cur_get_rec( + page_cursor); + page_cur_position( + rec, block, + btr_pcur_get_page_cur(r_cursor)); + r_cursor->pos_state = + BTR_PCUR_IS_POSITIONED; + r_cursor->latch_mode = my_latch_mode; + btr_pcur_store_position(r_cursor, mtr); +#ifdef UNIV_DEBUG + ulint num_stored = + rtr_store_parent_path( + block, btr_cur, + rw_latch, level, mtr); + ut_ad(num_stored > 0); +#else + rtr_store_parent_path( + block, btr_cur, rw_latch, + level, mtr); +#endif /* UNIV_DEBUG */ + } + } + } else { + found = rtr_cur_search_with_match( + block, index, tuple, mode, page_cursor, + btr_cur->rtr_info); + } + + /* Attach predicate lock if needed, no matter whether + there are matched records */ + if (mode != PAGE_CUR_RTREE_INSERT + && mode != PAGE_CUR_RTREE_LOCATE + && mode >= PAGE_CUR_CONTAIN + && btr_cur->rtr_info->need_prdt_lock) { + lock_prdt_t prdt; + + trx_t* trx = thr_get_trx( + btr_cur->rtr_info->thr); + lock_mutex_enter(); + lock_init_prdt_from_mbr( + &prdt, &btr_cur->rtr_info->mbr, + mode, trx->lock.lock_heap); + lock_mutex_exit(); + + if (rw_latch == RW_NO_LATCH) { + rw_lock_s_lock(&(block->lock)); + } + + lock_prdt_lock(block, &prdt, index, LOCK_S, + LOCK_PREDICATE, btr_cur->rtr_info->thr); + + if (rw_latch == RW_NO_LATCH) { + rw_lock_s_unlock(&(block->lock)); + } + } + + if (found) { + if (level == target_level) { + page_cur_t* r_cur;; + + if (my_latch_mode == BTR_MODIFY_TREE + && level == 0) { + ut_ad(rw_latch == RW_NO_LATCH); + + btr_cur_latch_leaves( + block, + BTR_MODIFY_TREE, + btr_cur, mtr); + } + + r_cur = btr_cur_get_page_cur(btr_cur); + + page_cur_position( + page_cur_get_rec(page_cursor), + page_cur_get_block(page_cursor), + r_cur); + + btr_cur->low_match = level != 0 ? + DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1 + : btr_cur->low_match; + break; + } + + /* Keep the parent path node, which points to + last node just located */ + skip_parent = true; + } else { + /* Release latch on the current page */ + ut_ad(rtr_info->tree_blocks[tree_idx]); + + mtr_release_block_at_savepoint( + mtr, rtr_info->tree_savepoints[tree_idx], + rtr_info->tree_blocks[tree_idx]); + rtr_info->tree_blocks[tree_idx] = NULL; + } + + } while (!rtr_info->path->empty()); + + const rec_t* rec = btr_cur_get_rec(btr_cur); + + if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) { + mtr_commit(mtr); + mtr_start(mtr); + } else if (!index_locked) { + mtr_memo_release(mtr, dict_index_get_lock(index), + MTR_MEMO_X_LOCK); + } + + return(found); +} + +/*************************************************************//** +Find the next matching record. This function will first exhaust +the copied record listed in the rtr_info->matches vector before +moving to the next page +@return true if there is suitable record found, otherwise false */ +bool +rtr_pcur_move_to_next( +/*==================*/ + const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in + tuple must be set so that it cannot get + compared to the node ptr page number field! */ + page_cur_mode_t mode, /*!< in: cursor search mode */ + btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the + function may release the page latch */ + ulint level, /*!< in: target level */ + mtr_t* mtr) /*!< in: mtr */ +{ + rtr_info_t* rtr_info = cursor->btr_cur.rtr_info; + + ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED); + + mutex_enter(&rtr_info->matches->rtr_match_mutex); + /* First retrieve the next record on the current page */ + if (!rtr_info->matches->matched_recs->empty()) { + rtr_rec_t rec; + rec = rtr_info->matches->matched_recs->back(); + rtr_info->matches->matched_recs->pop_back(); + mutex_exit(&rtr_info->matches->rtr_match_mutex); + + cursor->btr_cur.page_cur.rec = rec.r_rec; + cursor->btr_cur.page_cur.block = &rtr_info->matches->block; + + DEBUG_SYNC_C("rtr_pcur_move_to_next_return"); + return(true); + } + + mutex_exit(&rtr_info->matches->rtr_match_mutex); + + /* Fetch the next page */ + return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur, + level, cursor->latch_mode, + false, mtr)); +} + +/*************************************************************//** +Check if the cursor holds record pointing to the specified child page +@return true if it is (pointing to the child page) false otherwise */ +static +bool +rtr_compare_cursor_rec( +/*===================*/ + dict_index_t* index, /*!< in: index */ + btr_cur_t* cursor, /*!< in: Cursor to check */ + ulint page_no, /*!< in: desired child page number */ + mem_heap_t** heap) /*!< in: memory heap */ +{ + const rec_t* rec; + rec_offs* offsets; + + rec = btr_cur_get_rec(cursor); + + offsets = rec_get_offsets(rec, index, NULL, 0, ULINT_UNDEFINED, heap); + + return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no); +} + +/**************************************************************//** +Initializes and opens a persistent cursor to an index tree. It should be +closed with btr_pcur_close. Mainly called by row_search_index_entry() */ +void +rtr_pcur_open_low( +/*==============*/ + dict_index_t* index, /*!< in: index */ + ulint level, /*!< in: level in the rtree */ + const dtuple_t* tuple, /*!< in: tuple on which search done */ + page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_LOCATE, ... */ + ulint latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */ + btr_pcur_t* cursor, /*!< in: memory buffer for persistent cursor */ + const char* file, /*!< in: file name */ + unsigned line, /*!< in: line where called */ + mtr_t* mtr) /*!< in: mtr */ +{ + btr_cur_t* btr_cursor; + ulint n_fields; + ulint low_match; + rec_t* rec; + bool tree_latched = false; + bool for_delete = false; + bool for_undo_ins = false; + + ut_ad(level == 0); + + ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE); + ut_ad(mode == PAGE_CUR_RTREE_LOCATE); + + /* Initialize the cursor */ + + btr_pcur_init(cursor); + + for_delete = latch_mode & BTR_RTREE_DELETE_MARK; + for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS; + + cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode); + cursor->search_mode = mode; + + /* Search with the tree cursor */ + + btr_cursor = btr_pcur_get_btr_cur(cursor); + + btr_cursor->rtr_info = rtr_create_rtr_info(false, false, + btr_cursor, index); + + /* Purge will SX lock the tree instead of take Page Locks */ + if (btr_cursor->thr) { + btr_cursor->rtr_info->need_page_lock = true; + btr_cursor->rtr_info->thr = btr_cursor->thr; + } + + btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode, + btr_cursor, 0, file, line, mtr); + cursor->pos_state = BTR_PCUR_IS_POSITIONED; + + cursor->trx_if_known = NULL; + + low_match = btr_pcur_get_low_match(cursor); + + rec = btr_pcur_get_rec(cursor); + + n_fields = dtuple_get_n_fields(tuple); + + if (latch_mode & BTR_ALREADY_S_LATCHED) { + ut_ad(mtr->memo_contains(index->lock, MTR_MEMO_S_LOCK)); + tree_latched = true; + } + + if (latch_mode & BTR_MODIFY_TREE) { + ut_ad(mtr->memo_contains_flagged(&index->lock, + MTR_MEMO_X_LOCK + | MTR_MEMO_SX_LOCK)); + tree_latched = true; + } + + if (page_rec_is_infimum(rec) || low_match != n_fields + || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) + && (for_delete || for_undo_ins))) { + + if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) + && for_delete) { + btr_cursor->rtr_info->fd_del = true; + btr_cursor->low_match = 0; + } + /* Did not find matched row in first dive. Release + latched block if any before search more pages */ + if (latch_mode & BTR_MODIFY_LEAF) { + ulint tree_idx = btr_cursor->tree_height - 1; + rtr_info_t* rtr_info = btr_cursor->rtr_info; + + ut_ad(level == 0); + + if (rtr_info->tree_blocks[tree_idx]) { + mtr_release_block_at_savepoint( + mtr, + rtr_info->tree_savepoints[tree_idx], + rtr_info->tree_blocks[tree_idx]); + rtr_info->tree_blocks[tree_idx] = NULL; + } + } + + bool ret = rtr_pcur_getnext_from_path( + tuple, mode, btr_cursor, level, latch_mode, + tree_latched, mtr); + + if (ret) { + low_match = btr_pcur_get_low_match(cursor); + ut_ad(low_match == n_fields); + } + } +} + +/* Get the rtree page father. +@param[in] index rtree index +@param[in] block child page in the index +@param[in] mtr mtr +@param[in] sea_cur search cursor, contains information + about parent nodes in search +@param[in] cursor cursor on node pointer record, + its page x-latched */ +void +rtr_page_get_father( + dict_index_t* index, + buf_block_t* block, + mtr_t* mtr, + btr_cur_t* sea_cur, + btr_cur_t* cursor) +{ + mem_heap_t* heap = mem_heap_create(100); +#ifdef UNIV_DEBUG + rec_offs* offsets; + + offsets = rtr_page_get_father_block( + NULL, heap, index, block, mtr, sea_cur, cursor); + + ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec, + offsets); + + ut_ad(page_no == block->page.id().page_no()); +#else + rtr_page_get_father_block( + NULL, heap, index, block, mtr, sea_cur, cursor); +#endif + + mem_heap_free(heap); +} + +/********************************************************************//** +Returns the upper level node pointer to a R-Tree page. It is assumed +that mtr holds an x-latch on the tree. */ +static void rtr_get_father_node( + dict_index_t* index, /*!< in: index */ + ulint level, /*!< in: the tree level of search */ + const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in + tuple must be set so that it cannot get + compared to the node ptr page number field! */ + btr_cur_t* sea_cur,/*!< in: search cursor */ + btr_cur_t* btr_cur,/*!< in/out: tree cursor; the cursor page is + s- or x-latched, but see also above! */ + ulint page_no,/*!< Current page no */ + mtr_t* mtr) /*!< in: mtr */ +{ + mem_heap_t* heap = NULL; + bool ret = false; + const rec_t* rec; + ulint n_fields; + bool new_rtr = false; + + /* Try to optimally locate the parent node. Level should always + less than sea_cur->tree_height unless the root is splitting */ + if (sea_cur && sea_cur->tree_height > level) { + ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK + | MTR_MEMO_SX_LOCK)); + ret = rtr_cur_restore_position( + BTR_CONT_MODIFY_TREE, sea_cur, level, mtr); + + /* Once we block shrink tree nodes while there are + active search on it, this optimal locating should always + succeeds */ + ut_ad(ret); + + if (ret) { + btr_pcur_t* r_cursor = rtr_get_parent_cursor( + sea_cur, level, false); + + rec = btr_pcur_get_rec(r_cursor); + + ut_ad(r_cursor->rel_pos == BTR_PCUR_ON); + page_cur_position(rec, + btr_pcur_get_block(r_cursor), + btr_cur_get_page_cur(btr_cur)); + btr_cur->rtr_info = sea_cur->rtr_info; + btr_cur->tree_height = sea_cur->tree_height; + ut_ad(rtr_compare_cursor_rec( + index, btr_cur, page_no, &heap)); + goto func_exit; + } + } + + /* We arrive here in one of two scenario + 1) check table and btr_valide + 2) index root page being raised */ + ut_ad(!sea_cur || sea_cur->tree_height == level); + + if (btr_cur->rtr_info) { + rtr_clean_rtr_info(btr_cur->rtr_info, true); + } else { + new_rtr = true; + } + + btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index); + + if (sea_cur && sea_cur->tree_height == level) { + /* root split, and search the new root */ + btr_cur_search_to_nth_level( + index, level, tuple, PAGE_CUR_RTREE_LOCATE, + BTR_CONT_MODIFY_TREE, btr_cur, 0, + __FILE__, __LINE__, mtr); + + } else { + /* btr_validate */ + ut_ad(level >= 1); + ut_ad(!sea_cur); + + btr_cur_search_to_nth_level( + index, level, tuple, PAGE_CUR_RTREE_LOCATE, + BTR_CONT_MODIFY_TREE, btr_cur, 0, + __FILE__, __LINE__, mtr); + + rec = btr_cur_get_rec(btr_cur); + n_fields = dtuple_get_n_fields_cmp(tuple); + + if (page_rec_is_infimum(rec) + || (btr_cur->low_match != n_fields)) { + ret = rtr_pcur_getnext_from_path( + tuple, PAGE_CUR_RTREE_LOCATE, btr_cur, + level, BTR_CONT_MODIFY_TREE, + true, mtr); + + ut_ad(ret && btr_cur->low_match == n_fields); + } + } + + ret = rtr_compare_cursor_rec( + index, btr_cur, page_no, &heap); + + ut_ad(ret); + +func_exit: + if (heap) { + mem_heap_free(heap); + } + + if (new_rtr && btr_cur->rtr_info) { + rtr_clean_rtr_info(btr_cur->rtr_info, true); + btr_cur->rtr_info = NULL; + } +} + +/** Returns the upper level node pointer to a R-Tree page. It is assumed +that mtr holds an SX-latch or X-latch on the tree. +@return rec_get_offsets() of the node pointer record */ +static +rec_offs* +rtr_page_get_father_node_ptr( + rec_offs* offsets,/*!< in: work area for the return value */ + mem_heap_t* heap, /*!< in: memory heap to use */ + btr_cur_t* sea_cur,/*!< in: search cursor */ + btr_cur_t* cursor, /*!< in: cursor pointing to user record, + out: cursor on node pointer record, + its page x-latched */ + mtr_t* mtr) /*!< in: mtr */ +{ + dtuple_t* tuple; + rec_t* user_rec; + rec_t* node_ptr; + ulint level; + ulint page_no; + dict_index_t* index; + rtr_mbr_t mbr; + + page_no = btr_cur_get_block(cursor)->page.id().page_no(); + index = btr_cur_get_index(cursor); + + ut_ad(srv_read_only_mode + || mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK + | MTR_MEMO_SX_LOCK)); + + ut_ad(dict_index_get_page(index) != page_no); + + level = btr_page_get_level(btr_cur_get_page(cursor)); + + user_rec = btr_cur_get_rec(cursor); + ut_a(page_rec_is_user_rec(user_rec)); + + offsets = rec_get_offsets(user_rec, index, offsets, + level ? 0 : index->n_fields, + ULINT_UNDEFINED, &heap); + rtr_get_mbr_from_rec(user_rec, offsets, &mbr); + + tuple = rtr_index_build_node_ptr( + index, &mbr, user_rec, page_no, heap); + + if (sea_cur && !sea_cur->rtr_info) { + sea_cur = NULL; + } + + rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor, + page_no, mtr); + + node_ptr = btr_cur_get_rec(cursor); + ut_ad(!page_rec_is_comp(node_ptr) + || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR); + offsets = rec_get_offsets(node_ptr, index, offsets, 0, + ULINT_UNDEFINED, &heap); + + ulint child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets); + + if (child_page != page_no) { + const rec_t* print_rec; + + ib::fatal error; + + error << "Corruption of index " << index->name + << " of table " << index->table->name + << " parent page " << page_no + << " child page " << child_page; + + print_rec = page_rec_get_next( + page_get_infimum_rec(page_align(user_rec))); + offsets = rec_get_offsets(print_rec, index, offsets, + page_rec_is_leaf(user_rec) + ? index->n_fields : 0, + ULINT_UNDEFINED, &heap); + error << "; child "; + rec_print(error.m_oss, print_rec, + rec_get_info_bits(print_rec, rec_offs_comp(offsets)), + offsets); + offsets = rec_get_offsets(node_ptr, index, offsets, 0, + ULINT_UNDEFINED, &heap); + error << "; parent "; + rec_print(error.m_oss, print_rec, + rec_get_info_bits(print_rec, rec_offs_comp(offsets)), + offsets); + + error << ". You should dump + drop + reimport the table to" + " fix the corruption. If the crash happens at" + " database startup, see " + "https://mariadb.com/kb/en/library/innodb-recovery-modes/" + " about forcing" + " recovery. Then dump + drop + reimport."; + } + + return(offsets); +} + +/************************************************************//** +Returns the father block to a page. It is assumed that mtr holds +an X or SX latch on the tree. +@return rec_get_offsets() of the node pointer record */ +rec_offs* +rtr_page_get_father_block( +/*======================*/ + rec_offs* offsets,/*!< in: work area for the return value */ + mem_heap_t* heap, /*!< in: memory heap to use */ + dict_index_t* index, /*!< in: b-tree index */ + buf_block_t* block, /*!< in: child page in the index */ + mtr_t* mtr, /*!< in: mtr */ + btr_cur_t* sea_cur,/*!< in: search cursor, contains information + about parent nodes in search */ + btr_cur_t* cursor) /*!< out: cursor on node pointer record, + its page x-latched */ +{ + rec_t* rec = page_rec_get_next( + page_get_infimum_rec(buf_block_get_frame(block))); + btr_cur_position(index, rec, block, cursor); + + return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur, + cursor, mtr)); +} + +/*******************************************************************//** +Create a RTree search info structure */ +rtr_info_t* +rtr_create_rtr_info( +/******************/ + bool need_prdt, /*!< in: Whether predicate lock + is needed */ + bool init_matches, /*!< in: Whether to initiate the + "matches" structure for collecting + matched leaf records */ + btr_cur_t* cursor, /*!< in: tree search cursor */ + dict_index_t* index) /*!< in: index struct */ +{ + rtr_info_t* rtr_info; + + index = index ? index : cursor->index; + ut_ad(index); + + rtr_info = static_cast(ut_zalloc_nokey(sizeof(*rtr_info))); + + rtr_info->allocated = true; + rtr_info->cursor = cursor; + rtr_info->index = index; + + if (init_matches) { + rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches))); + rtr_info->matches = static_cast( + mem_heap_zalloc( + rtr_info->heap, + sizeof(*rtr_info->matches))); + + rtr_info->matches->matched_recs + = UT_NEW_NOKEY(rtr_rec_vector()); + + rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf + + UNIV_PAGE_SIZE_MAX + 1); + mutex_create(LATCH_ID_RTR_MATCH_MUTEX, + &rtr_info->matches->rtr_match_mutex); + rw_lock_create(PFS_NOT_INSTRUMENTED, + &(rtr_info->matches->block.lock), + SYNC_LEVEL_VARYING); + } + + rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t()); + rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t()); + rtr_info->need_prdt_lock = need_prdt; + mutex_create(LATCH_ID_RTR_PATH_MUTEX, + &rtr_info->rtr_path_mutex); + + mutex_enter(&index->rtr_track->rtr_active_mutex); + index->rtr_track->rtr_active.push_front(rtr_info); + mutex_exit(&index->rtr_track->rtr_active_mutex); + return(rtr_info); +} + +/*******************************************************************//** +Update a btr_cur_t with rtr_info */ +void +rtr_info_update_btr( +/******************/ + btr_cur_t* cursor, /*!< in/out: tree cursor */ + rtr_info_t* rtr_info) /*!< in: rtr_info to set to the + cursor */ +{ + ut_ad(rtr_info); + + cursor->rtr_info = rtr_info; +} + +/*******************************************************************//** +Initialize a R-Tree Search structure */ +void +rtr_init_rtr_info( +/****************/ + rtr_info_t* rtr_info, /*!< in: rtr_info to set to the + cursor */ + bool need_prdt, /*!< in: Whether predicate lock is + needed */ + btr_cur_t* cursor, /*!< in: tree search cursor */ + dict_index_t* index, /*!< in: index structure */ + bool reinit) /*!< in: Whether this is a reinit */ +{ + ut_ad(rtr_info); + + if (!reinit) { + /* Reset all members. */ + rtr_info->path = NULL; + rtr_info->parent_path = NULL; + rtr_info->matches = NULL; + + mutex_create(LATCH_ID_RTR_PATH_MUTEX, + &rtr_info->rtr_path_mutex); + + memset(rtr_info->tree_blocks, 0x0, + sizeof(rtr_info->tree_blocks)); + memset(rtr_info->tree_savepoints, 0x0, + sizeof(rtr_info->tree_savepoints)); + rtr_info->mbr.xmin = 0.0; + rtr_info->mbr.xmax = 0.0; + rtr_info->mbr.ymin = 0.0; + rtr_info->mbr.ymax = 0.0; + rtr_info->thr = NULL; + rtr_info->heap = NULL; + rtr_info->cursor = NULL; + rtr_info->index = NULL; + rtr_info->need_prdt_lock = false; + rtr_info->need_page_lock = false; + rtr_info->allocated = false; + rtr_info->mbr_adj = false; + rtr_info->fd_del = false; + rtr_info->search_tuple = NULL; + rtr_info->search_mode = PAGE_CUR_UNSUPP; + } + + ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty()); + + rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t()); + rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t()); + rtr_info->need_prdt_lock = need_prdt; + rtr_info->cursor = cursor; + rtr_info->index = index; + + mutex_enter(&index->rtr_track->rtr_active_mutex); + index->rtr_track->rtr_active.push_front(rtr_info); + mutex_exit(&index->rtr_track->rtr_active_mutex); +} + +/**************************************************************//** +Clean up R-Tree search structure */ +void +rtr_clean_rtr_info( +/*===============*/ + rtr_info_t* rtr_info, /*!< in: RTree search info */ + bool free_all) /*!< in: need to free rtr_info itself */ +{ + dict_index_t* index; + bool initialized = false; + + if (!rtr_info) { + return; + } + + index = rtr_info->index; + + if (index) { + mutex_enter(&index->rtr_track->rtr_active_mutex); + } + + while (rtr_info->parent_path && !rtr_info->parent_path->empty()) { + btr_pcur_t* cur = rtr_info->parent_path->back().cursor; + rtr_info->parent_path->pop_back(); + + if (cur) { + btr_pcur_close(cur); + ut_free(cur); + } + } + + UT_DELETE(rtr_info->parent_path); + rtr_info->parent_path = NULL; + + if (rtr_info->path != NULL) { + UT_DELETE(rtr_info->path); + rtr_info->path = NULL; + initialized = true; + } + + if (rtr_info->matches) { + rtr_info->matches->used = false; + rtr_info->matches->locked = false; + rtr_info->matches->valid = false; + rtr_info->matches->matched_recs->clear(); + } + + if (index) { + index->rtr_track->rtr_active.remove(rtr_info); + mutex_exit(&index->rtr_track->rtr_active_mutex); + } + + if (free_all) { + if (rtr_info->matches) { + if (rtr_info->matches->matched_recs != NULL) { + UT_DELETE(rtr_info->matches->matched_recs); + } + + rw_lock_free(&(rtr_info->matches->block.lock)); + + mutex_destroy(&rtr_info->matches->rtr_match_mutex); + } + + if (rtr_info->heap) { + mem_heap_free(rtr_info->heap); + } + + if (initialized) { + mutex_destroy(&rtr_info->rtr_path_mutex); + } + + if (rtr_info->allocated) { + ut_free(rtr_info); + } + } +} + +/**************************************************************//** +Rebuilt the "path" to exclude the removing page no */ +static +void +rtr_rebuild_path( +/*=============*/ + rtr_info_t* rtr_info, /*!< in: RTree search info */ + ulint page_no) /*!< in: need to free rtr_info itself */ +{ + rtr_node_path_t* new_path + = UT_NEW_NOKEY(rtr_node_path_t()); + + rtr_node_path_t::iterator rit; +#ifdef UNIV_DEBUG + ulint before_size = rtr_info->path->size(); +#endif /* UNIV_DEBUG */ + + for (rit = rtr_info->path->begin(); + rit != rtr_info->path->end(); ++rit) { + node_visit_t next_rec = *rit; + + if (next_rec.page_no == page_no) { + continue; + } + + new_path->push_back(next_rec); +#ifdef UNIV_DEBUG + node_visit_t rec = new_path->back(); + ut_ad(rec.level < rtr_info->cursor->tree_height + && rec.page_no > 0); +#endif /* UNIV_DEBUG */ + } + + UT_DELETE(rtr_info->path); + + ut_ad(new_path->size() == before_size - 1); + + rtr_info->path = new_path; + + if (!rtr_info->parent_path->empty()) { + rtr_node_path_t* new_parent_path = UT_NEW_NOKEY( + rtr_node_path_t()); + + for (rit = rtr_info->parent_path->begin(); + rit != rtr_info->parent_path->end(); ++rit) { + node_visit_t next_rec = *rit; + + if (next_rec.child_no == page_no) { + btr_pcur_t* cur = next_rec.cursor; + + if (cur) { + btr_pcur_close(cur); + ut_free(cur); + } + + continue; + } + + new_parent_path->push_back(next_rec); + } + UT_DELETE(rtr_info->parent_path); + rtr_info->parent_path = new_parent_path; + } + +} + +/**************************************************************//** +Check whether a discarding page is in anyone's search path */ +void +rtr_check_discard_page( +/*===================*/ + dict_index_t* index, /*!< in: index */ + btr_cur_t* cursor, /*!< in: cursor on the page to discard: not on + the root page */ + buf_block_t* block) /*!< in: block of page to be discarded */ +{ + const ulint pageno = block->page.id().page_no(); + + mutex_enter(&index->rtr_track->rtr_active_mutex); + + for (const auto& rtr_info : index->rtr_track->rtr_active) { + if (cursor && rtr_info == cursor->rtr_info) { + continue; + } + + mutex_enter(&rtr_info->rtr_path_mutex); + for (const node_visit_t& node : *rtr_info->path) { + if (node.page_no == pageno) { + rtr_rebuild_path(rtr_info, pageno); + break; + } + } + mutex_exit(&rtr_info->rtr_path_mutex); + + if (rtr_info->matches) { + mutex_enter(&rtr_info->matches->rtr_match_mutex); + + if ((&rtr_info->matches->block)->page.id().page_no() + == pageno) { + if (!rtr_info->matches->matched_recs->empty()) { + rtr_info->matches->matched_recs->clear(); + } + ut_ad(rtr_info->matches->matched_recs->empty()); + rtr_info->matches->valid = false; + } + + mutex_exit(&rtr_info->matches->rtr_match_mutex); + } + } + + mutex_exit(&index->rtr_track->rtr_active_mutex); + + lock_mutex_enter(); + lock_prdt_page_free_from_discard(block, &lock_sys.prdt_hash); + lock_prdt_page_free_from_discard(block, &lock_sys.prdt_page_hash); + lock_mutex_exit(); +} + +/** Structure acts as functor to get the optimistic access of the page. +It returns true if it successfully gets the page. */ +struct optimistic_get +{ + btr_pcur_t *const r_cursor; + mtr_t *const mtr; + + optimistic_get(btr_pcur_t *r_cursor,mtr_t *mtr) + :r_cursor(r_cursor), mtr(mtr) {} + + bool operator()(buf_block_t *hint) const + { + return hint && buf_page_optimistic_get( + RW_X_LATCH, hint, r_cursor->modify_clock, __FILE__, + __LINE__, mtr); + } +}; + +/** Restore the stored position of a persistent cursor bufferfixing the page */ +static +bool +rtr_cur_restore_position( + ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */ + btr_cur_t* btr_cur, /*!< in: detached persistent cursor */ + ulint level, /*!< in: index level */ + mtr_t* mtr) /*!< in: mtr */ +{ + dict_index_t* index; + mem_heap_t* heap; + btr_pcur_t* r_cursor = rtr_get_parent_cursor(btr_cur, level, false); + dtuple_t* tuple; + bool ret = false; + + ut_ad(mtr); + ut_ad(r_cursor); + ut_ad(mtr->is_active()); + + index = btr_cur_get_index(btr_cur); + + if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE + || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) { + return(false); + } + + DBUG_EXECUTE_IF( + "rtr_pessimistic_position", + r_cursor->modify_clock = 100; + ); + + ut_ad(latch_mode == BTR_CONT_MODIFY_TREE); + + if (r_cursor->block_when_stored.run_with_hint( + optimistic_get(r_cursor, mtr))) { + ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED); + + ut_ad(r_cursor->rel_pos == BTR_PCUR_ON); +#ifdef UNIV_DEBUG + do { + const rec_t* rec; + const rec_offs* offsets1; + const rec_offs* offsets2; + ulint comp; + + rec = btr_pcur_get_rec(r_cursor); + + heap = mem_heap_create(256); + offsets1 = rec_get_offsets( + r_cursor->old_rec, index, NULL, + level ? 0 : r_cursor->old_n_fields, + r_cursor->old_n_fields, &heap); + offsets2 = rec_get_offsets( + rec, index, NULL, + level ? 0 : r_cursor->old_n_fields, + r_cursor->old_n_fields, &heap); + + comp = rec_offs_comp(offsets1); + + if (rec_get_info_bits(r_cursor->old_rec, comp) + & REC_INFO_MIN_REC_FLAG) { + ut_ad(rec_get_info_bits(rec, comp) + & REC_INFO_MIN_REC_FLAG); + } else { + + ut_ad(!cmp_rec_rec(r_cursor->old_rec, + rec, offsets1, offsets2, + index)); + } + + mem_heap_free(heap); + } while (0); +#endif /* UNIV_DEBUG */ + + return(true); + } + + /* Page has changed, for R-Tree, the page cannot be shrunk away, + so we search the page and its right siblings */ + buf_block_t* block; + node_seq_t page_ssn; + const page_t* page; + page_cur_t* page_cursor; + node_visit_t* node = rtr_get_parent_node(btr_cur, level, false); + node_seq_t path_ssn = node->seq_no; + const unsigned zip_size = index->table->space->zip_size(); + uint32_t page_no = node->page_no; + + heap = mem_heap_create(256); + + tuple = dict_index_build_data_tuple(r_cursor->old_rec, index, !level, + r_cursor->old_n_fields, heap); + + page_cursor = btr_pcur_get_page_cur(r_cursor); + ut_ad(r_cursor == node->cursor); + +search_again: + dberr_t err = DB_SUCCESS; + + block = buf_page_get_gen( + page_id_t(index->table->space_id, page_no), + zip_size, RW_X_LATCH, NULL, + BUF_GET, __FILE__, __LINE__, mtr, &err); + + ut_ad(block); + + /* Get the page SSN */ + page = buf_block_get_frame(block); + page_ssn = page_get_ssn_id(page); + + ulint low_match = page_cur_search( + block, index, tuple, PAGE_CUR_LE, page_cursor); + + if (low_match == r_cursor->old_n_fields) { + const rec_t* rec; + const rec_offs* offsets1; + const rec_offs* offsets2; + ulint comp; + + rec = btr_pcur_get_rec(r_cursor); + + offsets1 = rec_get_offsets(r_cursor->old_rec, index, NULL, + level ? 0 : r_cursor->old_n_fields, + r_cursor->old_n_fields, &heap); + offsets2 = rec_get_offsets(rec, index, NULL, + level ? 0 : r_cursor->old_n_fields, + r_cursor->old_n_fields, &heap); + + comp = rec_offs_comp(offsets1); + + if ((rec_get_info_bits(r_cursor->old_rec, comp) + & REC_INFO_MIN_REC_FLAG) + && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) { + r_cursor->pos_state = BTR_PCUR_IS_POSITIONED; + ret = true; + } else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2, + index)) { + r_cursor->pos_state = BTR_PCUR_IS_POSITIONED; + ret = true; + } + } + + /* Check the page SSN to see if it has been splitted, if so, search + the right page */ + if (!ret && page_ssn > path_ssn) { + page_no = btr_page_get_next(page); + goto search_again; + } + + mem_heap_free(heap); + + return(ret); +} + +/****************************************************************//** +Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */ +static +void +rtr_leaf_push_match_rec( +/*====================*/ + const rec_t* rec, /*!< in: record to copy */ + rtr_info_t* rtr_info, /*!< in/out: search stack */ + rec_offs* offsets, /*!< in: offsets */ + bool is_comp) /*!< in: is compact format */ +{ + byte* buf; + matched_rec_t* match_rec = rtr_info->matches; + rec_t* copy; + ulint data_len; + rtr_rec_t rtr_rec; + + buf = match_rec->block.frame + match_rec->used; + ut_ad(page_rec_is_leaf(rec)); + + copy = rec_copy(buf, rec, offsets); + + if (is_comp) { + rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM); + } else { + rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM); + } + + rtr_rec.r_rec = copy; + rtr_rec.locked = false; + + match_rec->matched_recs->push_back(rtr_rec); + match_rec->valid = true; + + data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets); + match_rec->used += data_len; + + ut_ad(match_rec->used < srv_page_size); +} + +/**************************************************************//** +Store the parent path cursor +@return number of cursor stored */ +ulint +rtr_store_parent_path( +/*==================*/ + const buf_block_t* block, /*!< in: block of the page */ + btr_cur_t* btr_cur,/*!< in/out: persistent cursor */ + ulint latch_mode, + /*!< in: latch_mode */ + ulint level, /*!< in: index level */ + mtr_t* mtr) /*!< in: mtr */ +{ + ulint num = btr_cur->rtr_info->parent_path->size(); + ulint num_stored = 0; + + while (num >= 1) { + node_visit_t* node = &(*btr_cur->rtr_info->parent_path)[ + num - 1]; + btr_pcur_t* r_cursor = node->cursor; + buf_block_t* cur_block; + + if (node->level > level) { + break; + } + + r_cursor->pos_state = BTR_PCUR_IS_POSITIONED; + r_cursor->latch_mode = latch_mode; + + cur_block = btr_pcur_get_block(r_cursor); + + if (cur_block == block) { + btr_pcur_store_position(r_cursor, mtr); + num_stored++; + } else { + break; + } + + num--; + } + + return(num_stored); +} +/**************************************************************//** +push a nonleaf index node to the search path for insertion */ +static +void +rtr_non_leaf_insert_stack_push( +/*===========================*/ + dict_index_t* index, /*!< in: index descriptor */ + rtr_node_path_t* path, /*!< in/out: search path */ + ulint level, /*!< in: index page level */ + uint32_t child_no,/*!< in: child page no */ + const buf_block_t* block, /*!< in: block of the page */ + const rec_t* rec, /*!< in: positioned record */ + double mbr_inc)/*!< in: MBR needs to be enlarged */ +{ + node_seq_t new_seq; + btr_pcur_t* my_cursor; + + my_cursor = static_cast( + ut_malloc_nokey(sizeof(*my_cursor))); + + btr_pcur_init(my_cursor); + + page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor)); + + (btr_pcur_get_btr_cur(my_cursor))->index = index; + + new_seq = rtr_get_current_ssn_id(index); + rtr_non_leaf_stack_push(path, block->page.id().page_no(), + new_seq, level, child_no, my_cursor, mbr_inc); +} + +/** Copy a buf_block_t, except "block->lock". +@param[in,out] matches copy to match->block +@param[in] block block to copy */ +static +void +rtr_copy_buf( + matched_rec_t* matches, + const buf_block_t* block) +{ + /* Copy all members of "block" to "matches->block" except "lock". + We skip "lock" because it is not used + from the dummy buf_block_t we create here and because memcpy()ing + it generates (valid) compiler warnings that the vtable pointer + will be copied. */ + new (&matches->block.page) buf_page_t(block->page); + matches->block.frame = block->frame; + matches->block.unzip_LRU = block->unzip_LRU; + + ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list); + ut_d(matches->block.in_withdraw_list = block->in_withdraw_list); + + /* Skip buf_block_t::lock */ + matches->block.modify_clock = block->modify_clock; +#ifdef BTR_CUR_HASH_ADAPT + matches->block.n_hash_helps = block->n_hash_helps; + matches->block.n_fields = block->n_fields; + matches->block.left_side = block->left_side; +#if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG + matches->block.n_pointers = 0; +#endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */ + matches->block.curr_n_fields = block->curr_n_fields; + matches->block.curr_left_side = block->curr_left_side; + matches->block.index = block->index; +#endif /* BTR_CUR_HASH_ADAPT */ + ut_d(matches->block.debug_latch = NULL); +} + +/****************************************************************//** +Generate a shadow copy of the page block header to save the +matched records */ +static +void +rtr_init_match( +/*===========*/ + matched_rec_t* matches,/*!< in/out: match to initialize */ + const buf_block_t* block, /*!< in: buffer block */ + const page_t* page) /*!< in: buffer page */ +{ + ut_ad(matches->matched_recs->empty()); + matches->locked = false; + rtr_copy_buf(matches, block); + matches->block.frame = matches->bufp; + matches->valid = false; + /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can + use infimum/supremum of this page as normal btr page for search. */ + memcpy(matches->block.frame, page, page_is_comp(page) + ? PAGE_NEW_SUPREMUM_END + : PAGE_OLD_SUPREMUM_END); + matches->used = page_is_comp(page) + ? PAGE_NEW_SUPREMUM_END + : PAGE_OLD_SUPREMUM_END; +#ifdef RTR_SEARCH_DIAGNOSTIC + ulint pageno = page_get_page_no(page); + fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n", + static_cast(pageno)); +#endif /* RTR_SEARCH_DIAGNOSTIC */ +} + +/****************************************************************//** +Get the bounding box content from an index record */ +void +rtr_get_mbr_from_rec( +/*=================*/ + const rec_t* rec, /*!< in: data tuple */ + const rec_offs* offsets,/*!< in: offsets array */ + rtr_mbr_t* mbr) /*!< out MBR */ +{ + ulint rec_f_len; + const byte* data; + + data = rec_get_nth_field(rec, offsets, 0, &rec_f_len); + + rtr_read_mbr(data, mbr); +} + +/****************************************************************//** +Get the bounding box content from a MBR data record */ +void +rtr_get_mbr_from_tuple( +/*===================*/ + const dtuple_t* dtuple, /*!< in: data tuple */ + rtr_mbr* mbr) /*!< out: mbr to fill */ +{ + const dfield_t* dtuple_field; + ulint dtuple_f_len; + + dtuple_field = dtuple_get_nth_field(dtuple, 0); + dtuple_f_len = dfield_get_len(dtuple_field); + ut_a(dtuple_f_len >= 4 * sizeof(double)); + + rtr_read_mbr(static_cast(dfield_get_data(dtuple_field)), + mbr); +} + +/** Compare minimum bounding rectangles. +@return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return +1, 0 for rest compare modes, depends on a and b qualifies the +relationship (CONTAINS, WITHIN etc.) */ +static int cmp_gis_field(page_cur_mode_t mode, const void *a, const void *b) +{ + return mode == PAGE_CUR_MBR_EQUAL + ? cmp_geometry_field(a, b) + : rtree_key_cmp(mode, a, b); +} + +/** Compare a GIS data tuple to a physical record in rtree non-leaf node. +We need to check the page number field, since we don't store pk field in +rtree non-leaf node. +@param[in] dtuple data tuple +@param[in] rec R-tree record +@return whether dtuple is less than rec */ +static bool +cmp_dtuple_rec_with_gis_internal(const dtuple_t* dtuple, const rec_t* rec) +{ + const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0); + ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN); + + if (cmp_gis_field(PAGE_CUR_WITHIN, dfield_get_data(dtuple_field), rec)) + return true; + + dtuple_field= dtuple_get_nth_field(dtuple, 1); + ut_ad(dfield_get_len(dtuple_field) == 4); /* child page number */ + ut_ad(dtuple_field->type.mtype == DATA_SYS_CHILD); + ut_ad(!(dtuple_field->type.prtype & ~DATA_NOT_NULL)); + + return memcmp(dtuple_field->data, rec + DATA_MBR_LEN, 4) != 0; +} + +#ifndef UNIV_DEBUG +static +#endif +/** Compare a GIS data tuple to a physical record. +@param[in] dtuple data tuple +@param[in] rec R-tree record +@param[in] mode compare mode +@retval negative if dtuple is less than rec */ +int cmp_dtuple_rec_with_gis(const dtuple_t *dtuple, const rec_t *rec, + page_cur_mode_t mode) +{ + const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0); + /* FIXME: TABLE_SHARE::init_from_binary_frm_image() is adding + field->key_part_length_bytes() to the key length */ + ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN || + dfield_get_len(dtuple_field) == DATA_MBR_LEN + 2); + + return cmp_gis_field(mode, dfield_get_data(dtuple_field), rec); +} + +/****************************************************************//** +Searches the right position in rtree for a page cursor. */ +bool +rtr_cur_search_with_match( +/*======================*/ + const buf_block_t* block, /*!< in: buffer block */ + dict_index_t* index, /*!< in: index descriptor */ + const dtuple_t* tuple, /*!< in: data tuple */ + page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_INSERT, + PAGE_CUR_RTREE_LOCATE etc. */ + page_cur_t* cursor, /*!< in/out: page cursor */ + rtr_info_t* rtr_info)/*!< in/out: search stack */ +{ + bool found = false; + const page_t* page; + const rec_t* rec; + const rec_t* last_rec; + rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets = offsets_; + mem_heap_t* heap = NULL; + int cmp = 1; + double least_inc = DBL_MAX; + const rec_t* best_rec; + const rec_t* last_match_rec = NULL; + bool match_init = false; + page_cur_mode_t orig_mode = mode; + const rec_t* first_rec = NULL; + + rec_offs_init(offsets_); + + ut_ad(RTREE_SEARCH_MODE(mode)); + + ut_ad(dict_index_is_spatial(index)); + + page = buf_block_get_frame(block); + + const ulint level = btr_page_get_level(page); + const ulint n_core = level ? 0 : index->n_fields; + + if (mode == PAGE_CUR_RTREE_LOCATE) { + ut_ad(level != 0); + mode = PAGE_CUR_WITHIN; + } + + rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0)); + + last_rec = rec; + best_rec = rec; + + if (page_rec_is_infimum(rec)) { + rec = page_rec_get_next_const(rec); + } + + /* Check insert tuple size is larger than first rec, and try to + avoid it if possible */ + if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) { + + ulint new_rec_size = rec_get_converted_size(index, tuple, 0); + + offsets = rec_get_offsets(rec, index, offsets, n_core, + dtuple_get_n_fields_cmp(tuple), + &heap); + + if (rec_offs_size(offsets) < new_rec_size) { + first_rec = rec; + } + + /* If this is the left-most page of this index level + and the table is a compressed table, try to avoid + first page as much as possible, as there will be problem + when update MIN_REC rec in compress table */ + if (is_buf_block_get_page_zip(block) + && !page_has_prev(page) + && page_get_n_recs(page) >= 2) { + + rec = page_rec_get_next_const(rec); + } + } + + while (!page_rec_is_supremum(rec)) { + if (!n_core) { + switch (mode) { + case PAGE_CUR_CONTAIN: + case PAGE_CUR_INTERSECT: + case PAGE_CUR_MBR_EQUAL: + /* At non-leaf level, we will need to check + both CONTAIN and INTERSECT for either of + the search mode */ + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, PAGE_CUR_CONTAIN); + + if (cmp != 0) { + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, + PAGE_CUR_INTERSECT); + } + break; + case PAGE_CUR_DISJOINT: + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, mode); + + if (cmp != 0) { + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, + PAGE_CUR_INTERSECT); + } + break; + case PAGE_CUR_RTREE_INSERT: + double increase; + double area; + + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, PAGE_CUR_WITHIN); + + if (cmp != 0) { + increase = rtr_rec_cal_increase( + tuple, rec, &area); + /* Once it goes beyond DBL_MAX, + it would not make sense to record + such value, just make it + DBL_MAX / 2 */ + if (increase >= DBL_MAX) { + increase = DBL_MAX / 2; + } + + if (increase < least_inc) { + least_inc = increase; + best_rec = rec; + } else if (best_rec + && best_rec == first_rec) { + /* if first_rec is set, + we will try to avoid it */ + least_inc = increase; + best_rec = rec; + } + } + break; + case PAGE_CUR_RTREE_GET_FATHER: + cmp = cmp_dtuple_rec_with_gis_internal( + tuple, rec); + break; + default: + /* WITHIN etc. */ + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, mode); + } + } else { + /* At leaf level, INSERT should translate to LE */ + ut_ad(mode != PAGE_CUR_RTREE_INSERT); + + cmp = cmp_dtuple_rec_with_gis( + tuple, rec, mode); + } + + if (cmp == 0) { + found = true; + + /* If located, the matching node/rec will be pushed + to rtr_info->path for non-leaf nodes, or + rtr_info->matches for leaf nodes */ + if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) { + if (!n_core) { + uint32_t page_no; + node_seq_t new_seq; + bool is_loc; + + is_loc = (orig_mode + == PAGE_CUR_RTREE_LOCATE + || orig_mode + == PAGE_CUR_RTREE_GET_FATHER); + + offsets = rec_get_offsets( + rec, index, offsets, 0, + ULINT_UNDEFINED, &heap); + + page_no = btr_node_ptr_get_child_page_no( + rec, offsets); + + ut_ad(level >= 1); + + /* Get current SSN, before we insert + it into the path stack */ + new_seq = rtr_get_current_ssn_id(index); + + rtr_non_leaf_stack_push( + rtr_info->path, + page_no, + new_seq, level - 1, 0, + NULL, 0); + + if (is_loc) { + rtr_non_leaf_insert_stack_push( + index, + rtr_info->parent_path, + level, page_no, block, + rec, 0); + } + + if (!srv_read_only_mode + && (rtr_info->need_page_lock + || !is_loc)) { + + /* Lock the page, preventing it + from being shrunk */ + lock_place_prdt_page_lock( + page_id_t(block->page + .id() + .space(), + page_no), + index, + rtr_info->thr); + } + } else { + ut_ad(orig_mode + != PAGE_CUR_RTREE_LOCATE); + + if (!match_init) { + rtr_init_match( + rtr_info->matches, + block, page); + match_init = true; + } + + /* Collect matched records on page */ + offsets = rec_get_offsets( + rec, index, offsets, + index->n_fields, + ULINT_UNDEFINED, &heap); + rtr_leaf_push_match_rec( + rec, rtr_info, offsets, + page_is_comp(page)); + } + + last_match_rec = rec; + } else { + /* This is the insertion case, it will break + once it finds the first MBR that can accomodate + the inserting rec */ + break; + } + } + + last_rec = rec; + + rec = page_rec_get_next_const(rec); + } + + /* All records on page are searched */ + if (page_rec_is_supremum(rec)) { + if (!n_core) { + if (!found) { + /* No match case, if it is for insertion, + then we select the record that result in + least increased area */ + if (mode == PAGE_CUR_RTREE_INSERT) { + ut_ad(least_inc < DBL_MAX); + offsets = rec_get_offsets( + best_rec, index, offsets, + 0, ULINT_UNDEFINED, &heap); + uint32_t child_no = + btr_node_ptr_get_child_page_no( + best_rec, offsets); + + rtr_non_leaf_insert_stack_push( + index, rtr_info->parent_path, + level, child_no, block, + best_rec, least_inc); + + page_cur_position(best_rec, block, + cursor); + rtr_info->mbr_adj = true; + } else { + /* Position at the last rec of the + page, if it is not the leaf page */ + page_cur_position(last_rec, block, + cursor); + } + } else { + /* There are matching records, position + in the last matching records */ + if (rtr_info) { + rec = last_match_rec; + page_cur_position( + rec, block, cursor); + } + } + } else if (rtr_info) { + /* Leaf level, no match, position at the + last (supremum) rec */ + if (!last_match_rec) { + page_cur_position(rec, block, cursor); + goto func_exit; + } + + /* There are matched records */ + matched_rec_t* match_rec = rtr_info->matches; + + rtr_rec_t test_rec; + + test_rec = match_rec->matched_recs->back(); +#ifdef UNIV_DEBUG + rec_offs offsets_2[REC_OFFS_NORMAL_SIZE]; + rec_offs* offsets2 = offsets_2; + rec_offs_init(offsets_2); + + ut_ad(found); + + /* Verify the record to be positioned is the same + as the last record in matched_rec vector */ + offsets2 = rec_get_offsets(test_rec.r_rec, index, + offsets2, index->n_fields, + ULINT_UNDEFINED, &heap); + + offsets = rec_get_offsets(last_match_rec, index, + offsets, index->n_fields, + ULINT_UNDEFINED, &heap); + + ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec, + offsets2, offsets, index) == 0); +#endif /* UNIV_DEBUG */ + /* Pop the last match record and position on it */ + match_rec->matched_recs->pop_back(); + page_cur_position(test_rec.r_rec, &match_rec->block, + cursor); + } + } else { + + if (mode == PAGE_CUR_RTREE_INSERT) { + ut_ad(!last_match_rec); + rtr_non_leaf_insert_stack_push( + index, rtr_info->parent_path, level, + mach_read_from_4(rec + DATA_MBR_LEN), + block, rec, 0); + + } else if (rtr_info && found && !n_core) { + rec = last_match_rec; + } + + page_cur_position(rec, block, cursor); + } + +#ifdef UNIV_DEBUG + /* Verify that we are positioned at the same child page as pushed in + the path stack */ + if (!n_core && (!page_rec_is_supremum(rec) || found) + && mode != PAGE_CUR_RTREE_INSERT) { + ulint page_no; + + offsets = rec_get_offsets(rec, index, offsets, 0, + ULINT_UNDEFINED, &heap); + page_no = btr_node_ptr_get_child_page_no(rec, offsets); + + if (rtr_info && found) { + rtr_node_path_t* path = rtr_info->path; + node_visit_t last_visit = path->back(); + + ut_ad(last_visit.page_no == page_no); + } + } +#endif /* UNIV_DEBUG */ + +func_exit: + if (UNIV_LIKELY_NULL(heap)) { + mem_heap_free(heap); + } + + return(found); +} -- cgit v1.2.3