summaryrefslogtreecommitdiffstats
path: root/storage/innobase/gis
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /storage/innobase/gis
parentInitial commit. (diff)
downloadmariadb-10.5-9e4947182e0b875da38088fdd168e775f473b8ad.tar.xz
mariadb-10.5-9e4947182e0b875da38088fdd168e775f473b8ad.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/innobase/gis')
-rw-r--r--storage/innobase/gis/gis0geo.cc650
-rw-r--r--storage/innobase/gis/gis0rtree.cc1956
-rw-r--r--storage/innobase/gis/gis0sea.cc2052
3 files changed, 4658 insertions, 0 deletions
diff --git a/storage/innobase/gis/gis0geo.cc b/storage/innobase/gis/gis0geo.cc
new file mode 100644
index 00000000..4c3ff188
--- /dev/null
+++ b/storage/innobase/gis/gis0geo.cc
@@ -0,0 +1,650 @@
+/*****************************************************************************
+
+Copyright (c) 2013, 2015, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2019, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file gis/gis0geo.cc
+InnoDB R-tree related functions.
+
+Created 2013/03/27 Allen Lai and Jimmy Yang
+*******************************************************/
+
+#include "page0types.h"
+#include "gis0geo.h"
+#include "page0cur.h"
+#include "ut0rnd.h"
+#include "mach0data.h"
+
+#include <spatial.h>
+#include <cmath>
+
+/* These definitions are for comparing 2 mbrs. */
+
+/* Check if a intersects b.
+Return false if a intersects b, otherwise true. */
+#define INTERSECT_CMP(amin, amax, bmin, bmax) \
+(((amin) > (bmax)) || ((bmin) > (amax)))
+
+/* Check if b contains a.
+Return false if b contains a, otherwise true. */
+#define CONTAIN_CMP(amin, amax, bmin, bmax) \
+(((bmin) > (amin)) || ((bmax) < (amax)))
+
+/* Check if b is within a.
+Return false if b is within a, otherwise true. */
+#define WITHIN_CMP(amin, amax, bmin, bmax) \
+(((amin) > (bmin)) || ((amax) < (bmax)))
+
+/* Check if a disjoints b.
+Return false if a disjoints b, otherwise true. */
+#define DISJOINT_CMP(amin, amax, bmin, bmax) \
+(((amin) <= (bmax)) && ((bmin) <= (amax)))
+
+/* Check if a equals b.
+Return false if equal, otherwise true. */
+#define EQUAL_CMP(amin, amax, bmin, bmax) \
+(((amin) != (bmin)) || ((amax) != (bmax)))
+
+/****************************************************************
+Functions for generating mbr
+****************************************************************/
+/*************************************************************//**
+Add one point stored in wkb to a given mbr.
+@return 0 if the point in wkb is valid, otherwise -1. */
+static
+int
+rtree_add_point_to_mbr(
+/*===================*/
+ const uchar** wkb, /*!< in: pointer to wkb,
+ where point is stored */
+ const uchar* end, /*!< in: end of wkb. */
+ uint n_dims, /*!< in: dimensions. */
+ double* mbr) /*!< in/out: mbr, which
+ must be of length n_dims * 2. */
+{
+ double ord;
+ double* mbr_end = mbr + n_dims * 2;
+
+ while (mbr < mbr_end) {
+ if ((*wkb) + sizeof(double) > end) {
+ return(-1);
+ }
+
+ ord = mach_double_read(*wkb);
+ (*wkb) += sizeof(double);
+
+ if (ord < *mbr) {
+ *mbr = ord;
+ }
+ mbr++;
+
+ if (ord > *mbr) {
+ *mbr = ord;
+ }
+ mbr++;
+ }
+
+ return(0);
+}
+
+/*************************************************************//**
+Get mbr of point stored in wkb.
+@return 0 if ok, otherwise -1. */
+static
+int
+rtree_get_point_mbr(
+/*================*/
+ const uchar** wkb, /*!< in: pointer to wkb,
+ where point is stored. */
+ const uchar* end, /*!< in: end of wkb. */
+ uint n_dims, /*!< in: dimensions. */
+ double* mbr) /*!< in/out: mbr,
+ must be of length n_dims * 2. */
+{
+ return rtree_add_point_to_mbr(wkb, end, n_dims, mbr);
+}
+
+
+/*************************************************************//**
+Get mbr of linestring stored in wkb.
+@return 0 if the linestring is valid, otherwise -1. */
+static
+int
+rtree_get_linestring_mbr(
+/*=====================*/
+ const uchar** wkb, /*!< in: pointer to wkb,
+ where point is stored. */
+ const uchar* end, /*!< in: end of wkb. */
+ uint n_dims, /*!< in: dimensions. */
+ double* mbr) /*!< in/out: mbr,
+ must be of length n_dims * 2. */
+{
+ uint n_points;
+
+ n_points = uint4korr(*wkb);
+ (*wkb) += 4;
+
+ for (; n_points > 0; --n_points) {
+ /* Add next point to mbr */
+ if (rtree_add_point_to_mbr(wkb, end, n_dims, mbr)) {
+ return(-1);
+ }
+ }
+
+ return(0);
+}
+
+/*************************************************************//**
+Get mbr of polygon stored in wkb.
+@return 0 if the polygon is valid, otherwise -1. */
+static
+int
+rtree_get_polygon_mbr(
+/*==================*/
+ const uchar** wkb, /*!< in: pointer to wkb,
+ where point is stored. */
+ const uchar* end, /*!< in: end of wkb. */
+ uint n_dims, /*!< in: dimensions. */
+ double* mbr) /*!< in/out: mbr,
+ must be of length n_dims * 2. */
+{
+ uint n_linear_rings;
+ uint n_points;
+
+ n_linear_rings = uint4korr((*wkb));
+ (*wkb) += 4;
+
+ for (; n_linear_rings > 0; --n_linear_rings) {
+ n_points = uint4korr((*wkb));
+ (*wkb) += 4;
+
+ for (; n_points > 0; --n_points) {
+ /* Add next point to mbr */
+ if (rtree_add_point_to_mbr(wkb, end, n_dims, mbr)) {
+ return(-1);
+ }
+ }
+ }
+
+ return(0);
+}
+
+/*************************************************************//**
+Get mbr of geometry stored in wkb.
+@return 0 if the geometry is valid, otherwise -1. */
+static
+int
+rtree_get_geometry_mbr(
+/*===================*/
+ const uchar** wkb, /*!< in: pointer to wkb,
+ where point is stored. */
+ const uchar* end, /*!< in: end of wkb. */
+ uint n_dims, /*!< in: dimensions. */
+ double* mbr, /*!< in/out: mbr. */
+ int top) /*!< in: if it is the top,
+ which means it's not called
+ by itself. */
+{
+ int res;
+ uint wkb_type = 0;
+ uint n_items;
+
+ /* byte_order = *(*wkb); */
+ ++(*wkb);
+
+ wkb_type = uint4korr((*wkb));
+ (*wkb) += 4;
+
+ switch ((enum wkbType) wkb_type) {
+ case wkbPoint:
+ res = rtree_get_point_mbr(wkb, end, n_dims, mbr);
+ break;
+ case wkbLineString:
+ res = rtree_get_linestring_mbr(wkb, end, n_dims, mbr);
+ break;
+ case wkbPolygon:
+ res = rtree_get_polygon_mbr(wkb, end, n_dims, mbr);
+ break;
+ case wkbMultiPoint:
+ n_items = uint4korr((*wkb));
+ (*wkb) += 4;
+ for (; n_items > 0; --n_items) {
+ /* byte_order = *(*wkb); */
+ ++(*wkb);
+ (*wkb) += 4;
+ if (rtree_get_point_mbr(wkb, end, n_dims, mbr)) {
+ return(-1);
+ }
+ }
+ res = 0;
+ break;
+ case wkbMultiLineString:
+ n_items = uint4korr((*wkb));
+ (*wkb) += 4;
+ for (; n_items > 0; --n_items) {
+ /* byte_order = *(*wkb); */
+ ++(*wkb);
+ (*wkb) += 4;
+ if (rtree_get_linestring_mbr(wkb, end, n_dims, mbr)) {
+ return(-1);
+ }
+ }
+ res = 0;
+ break;
+ case wkbMultiPolygon:
+ n_items = uint4korr((*wkb));
+ (*wkb) += 4;
+ for (; n_items > 0; --n_items) {
+ /* byte_order = *(*wkb); */
+ ++(*wkb);
+ (*wkb) += 4;
+ if (rtree_get_polygon_mbr(wkb, end, n_dims, mbr)) {
+ return(-1);
+ }
+ }
+ res = 0;
+ break;
+ case wkbGeometryCollection:
+ if (!top) {
+ return(-1);
+ }
+
+ n_items = uint4korr((*wkb));
+ (*wkb) += 4;
+ for (; n_items > 0; --n_items) {
+ if (rtree_get_geometry_mbr(wkb, end, n_dims,
+ mbr, 0)) {
+ return(-1);
+ }
+ }
+ res = 0;
+ break;
+ default:
+ res = -1;
+ }
+
+ return(res);
+}
+
+/*************************************************************//**
+Calculate Minimal Bounding Rectangle (MBR) of the spatial object
+stored in "well-known binary representation" (wkb) format.
+@return 0 if ok. */
+int
+rtree_mbr_from_wkb(
+/*===============*/
+ const uchar* wkb, /*!< in: wkb */
+ uint size, /*!< in: size of wkb. */
+ uint n_dims, /*!< in: dimensions. */
+ double* mbr) /*!< in/out: mbr, which must
+ be of length n_dim2 * 2. */
+{
+ for (uint i = 0; i < n_dims; ++i) {
+ mbr[i * 2] = DBL_MAX;
+ mbr[i * 2 + 1] = -DBL_MAX;
+ }
+
+ return rtree_get_geometry_mbr(&wkb, wkb + size, n_dims, mbr, 1);
+}
+
+
+/****************************************************************
+Functions for Rtree split
+****************************************************************/
+/*************************************************************//**
+Join 2 mbrs of dimensions n_dim. */
+static
+void
+mbr_join(
+/*=====*/
+ double* a, /*!< in/out: the first mbr,
+ where the joined result will be. */
+ const double* b, /*!< in: the second mbr. */
+ int n_dim) /*!< in: dimensions. */
+{
+ double* end = a + n_dim * 2;
+
+ do {
+ if (a[0] > b[0]) {
+ a[0] = b[0];
+ }
+
+ if (a[1] < b[1]) {
+ a[1] = b[1];
+ }
+
+ a += 2;
+ b += 2;
+
+ } while (a != end);
+}
+
+/*************************************************************//**
+Counts the square of mbr which is the join of a and b. Both a and b
+are of dimensions n_dim. */
+static
+double
+mbr_join_square(
+/*============*/
+ const double* a, /*!< in: the first mbr. */
+ const double* b, /*!< in: the second mbr. */
+ int n_dim) /*!< in: dimensions. */
+{
+ const double* end = a + n_dim * 2;
+ double square = 1.0;
+
+ do {
+ square *= std::max(a[1], b[1]) - std::min(a[0], b[0]);
+
+ a += 2;
+ b += 2;
+ } while (a != end);
+
+ /* Check if finite (not infinity or NaN),
+ so we don't get NaN in calculations */
+ if (!std::isfinite(square)) {
+ return DBL_MAX;
+ }
+
+ return square;
+}
+
+/*************************************************************//**
+Counts the square of mbr of dimension n_dim. */
+static
+double
+count_square(
+/*=========*/
+ const double* a, /*!< in: the mbr. */
+ int n_dim) /*!< in: dimensions. */
+{
+ const double* end = a + n_dim * 2;
+ double square = 1.0;
+
+ do {
+ square *= a[1] - a[0];
+ a += 2;
+ } while (a != end);
+
+ return square;
+}
+
+/*************************************************************//**
+Copy mbr of dimension n_dim from src to dst. */
+inline
+static
+void
+copy_coords(
+/*========*/
+ double* dst, /*!< in/out: destination. */
+ const double* src, /*!< in: source. */
+ int)
+{
+ memcpy(dst, src, DATA_MBR_LEN);
+}
+
+/*************************************************************//**
+Select two nodes to collect group upon */
+static
+void
+pick_seeds(
+/*=======*/
+ rtr_split_node_t* node, /*!< in: split nodes. */
+ int n_entries, /*!< in: entries number. */
+ rtr_split_node_t** seed_a, /*!< out: seed 1. */
+ rtr_split_node_t** seed_b, /*!< out: seed 2. */
+ int n_dim) /*!< in: dimensions. */
+{
+ rtr_split_node_t* cur1;
+ rtr_split_node_t* lim1 = node + (n_entries - 1);
+ rtr_split_node_t* cur2;
+ rtr_split_node_t* lim2 = node + n_entries;
+
+ double max_d = -DBL_MAX;
+ double d;
+
+ *seed_a = node;
+ *seed_b = node + 1;
+
+ for (cur1 = node; cur1 < lim1; ++cur1) {
+ for (cur2 = cur1 + 1; cur2 < lim2; ++cur2) {
+ d = mbr_join_square(cur1->coords, cur2->coords, n_dim) -
+ cur1->square - cur2->square;
+ if (d > max_d) {
+ max_d = d;
+ *seed_a = cur1;
+ *seed_b = cur2;
+ }
+ }
+ }
+}
+
+/*************************************************************//**
+Select next node and group where to add. */
+static
+void
+pick_next(
+/*======*/
+ rtr_split_node_t* node, /*!< in: split nodes. */
+ int n_entries, /*!< in: entries number. */
+ double* g1, /*!< in: mbr of group 1. */
+ double* g2, /*!< in: mbr of group 2. */
+ rtr_split_node_t** choice, /*!< out: the next node.*/
+ int* n_group, /*!< out: group number.*/
+ int n_dim) /*!< in: dimensions. */
+{
+ rtr_split_node_t* cur = node;
+ rtr_split_node_t* end = node + n_entries;
+ double max_diff = -DBL_MAX;
+
+ for (; cur < end; ++cur) {
+ double diff;
+ double abs_diff;
+
+ if (cur->n_node != 0) {
+ continue;
+ }
+
+ diff = mbr_join_square(g1, cur->coords, n_dim) -
+ mbr_join_square(g2, cur->coords, n_dim);
+
+ abs_diff = fabs(diff);
+ if (abs_diff > max_diff) {
+ max_diff = abs_diff;
+
+ /* Introduce some randomness if the record
+ is identical */
+ if (diff == 0) {
+ diff = static_cast<double>(ut_rnd_gen() & 1);
+ }
+
+ *n_group = 1 + (diff > 0);
+ *choice = cur;
+ }
+ }
+}
+
+/*************************************************************//**
+Mark not-in-group entries as n_group. */
+static
+void
+mark_all_entries(
+/*=============*/
+ rtr_split_node_t* node, /*!< in/out: split nodes. */
+ int n_entries, /*!< in: entries number. */
+ int n_group) /*!< in: group number. */
+{
+ rtr_split_node_t* cur = node;
+ rtr_split_node_t* end = node + n_entries;
+ for (; cur < end; ++cur) {
+ if (cur->n_node != 0) {
+ continue;
+ }
+ cur->n_node = n_group;
+ }
+}
+
+/*************************************************************//**
+Split rtree node.
+Return which group the first rec is in. */
+int
+split_rtree_node(
+/*=============*/
+ rtr_split_node_t* node, /*!< in: split nodes. */
+ int n_entries, /*!< in: entries number. */
+ int all_size, /*!< in: total key's size. */
+ int key_size, /*!< in: key's size. */
+ int min_size, /*!< in: minimal group size. */
+ int size1, /*!< in: size of group. */
+ int size2, /*!< in: initial group sizes */
+ double** d_buffer, /*!< in/out: buffer. */
+ int n_dim, /*!< in: dimensions. */
+ uchar* first_rec) /*!< in: the first rec. */
+{
+ rtr_split_node_t* cur;
+ rtr_split_node_t* a = NULL;
+ rtr_split_node_t* b = NULL;
+ double* g1 = reserve_coords(d_buffer, n_dim);
+ double* g2 = reserve_coords(d_buffer, n_dim);
+ rtr_split_node_t* next = NULL;
+ int next_node = 0;
+ int i;
+ int first_rec_group = 1;
+ rtr_split_node_t* end = node + n_entries;
+
+ if (all_size < min_size * 2) {
+ return 1;
+ }
+
+ cur = node;
+ for (; cur < end; ++cur) {
+ cur->square = count_square(cur->coords, n_dim);
+ cur->n_node = 0;
+ }
+
+ pick_seeds(node, n_entries, &a, &b, n_dim);
+ a->n_node = 1;
+ b->n_node = 2;
+
+ copy_coords(g1, a->coords, n_dim);
+ size1 += key_size;
+ copy_coords(g2, b->coords, n_dim);
+ size2 += key_size;
+
+ for (i = n_entries - 2; i > 0; --i) {
+ /* Can't write into group 2 */
+ if (all_size - (size2 + key_size) < min_size) {
+ mark_all_entries(node, n_entries, 1);
+ break;
+ }
+
+ /* Can't write into group 1 */
+ if (all_size - (size1 + key_size) < min_size) {
+ mark_all_entries(node, n_entries, 2);
+ break;
+ }
+
+ pick_next(node, n_entries, g1, g2, &next, &next_node, n_dim);
+ if (next_node == 1) {
+ size1 += key_size;
+ mbr_join(g1, next->coords, n_dim);
+ } else {
+ size2 += key_size;
+ mbr_join(g2, next->coords, n_dim);
+ }
+
+ next->n_node = next_node;
+
+ /* Find out where the first rec (of the page) will be at,
+ and inform the caller */
+ if (first_rec && first_rec == next->key) {
+ first_rec_group = next_node;
+ }
+ }
+
+ return(first_rec_group);
+}
+
+/** Compare two minimum bounding rectangles.
+@param mode comparison operator
+ MBR_INTERSECT(a,b) a overlaps b
+ MBR_CONTAIN(a,b) a contains b
+ MBR_DISJOINT(a,b) a disjoint b
+ MBR_WITHIN(a,b) a within b
+ MBR_EQUAL(a,b) All coordinates of MBRs are equal
+ MBR_DATA(a,b) Data reference is the same
+@param b first MBR
+@param a second MBR
+@retval 0 if the predicate holds
+@retval 1 if the precidate does not hold */
+int rtree_key_cmp(page_cur_mode_t mode, const void *b, const void *a)
+{
+ const byte *b_= static_cast<const byte*>(b);
+ const byte *a_= static_cast<const byte*>(a);
+
+ static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double), "compatibility");
+
+ for (auto i = SPDIMS; i--; )
+ {
+ double amin= mach_double_read(a_);
+ double bmin= mach_double_read(b_);
+ a_+= sizeof(double);
+ b_+= sizeof(double);
+ double amax= mach_double_read(a_);
+ double bmax= mach_double_read(b_);
+ a_+= sizeof(double);
+ b_+= sizeof(double);
+
+ switch (mode) {
+ case PAGE_CUR_INTERSECT:
+ if (INTERSECT_CMP(amin, amax, bmin, bmax))
+ return 1;
+ continue;
+ case PAGE_CUR_CONTAIN:
+ if (CONTAIN_CMP(amin, amax, bmin, bmax))
+ return 1;
+ continue;
+ case PAGE_CUR_WITHIN:
+ if (WITHIN_CMP(amin, amax, bmin, bmax))
+ return 1;
+ continue;
+ case PAGE_CUR_MBR_EQUAL:
+ if (EQUAL_CMP(amin, amax, bmin, bmax))
+ return 1;
+ continue;
+ case PAGE_CUR_DISJOINT:
+ if (!DISJOINT_CMP(amin, amax, bmin, bmax))
+ return 0;
+ if (!i)
+ return 1;
+ continue;
+ case PAGE_CUR_UNSUPP:
+ case PAGE_CUR_G:
+ case PAGE_CUR_GE:
+ case PAGE_CUR_L:
+ case PAGE_CUR_LE:
+ case PAGE_CUR_RTREE_LOCATE:
+ case PAGE_CUR_RTREE_GET_FATHER:
+ case PAGE_CUR_RTREE_INSERT:
+ break;
+ }
+ ut_ad("unknown comparison operator" == 0);
+ }
+
+ return 0;
+}
diff --git a/storage/innobase/gis/gis0rtree.cc b/storage/innobase/gis/gis0rtree.cc
new file mode 100644
index 00000000..22e6e08f
--- /dev/null
+++ b/storage/innobase/gis/gis0rtree.cc
@@ -0,0 +1,1956 @@
+/*****************************************************************************
+
+Copyright (c) 2016, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2018, 2021, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file gis/gis0rtree.cc
+InnoDB R-tree interfaces
+
+Created 2013/03/27 Allen Lai and Jimmy Yang
+***********************************************************************/
+
+#include "fsp0fsp.h"
+#include "page0page.h"
+#include "page0cur.h"
+#include "page0zip.h"
+#include "gis0rtree.h"
+#include "btr0cur.h"
+#include "btr0sea.h"
+#include "btr0pcur.h"
+#include "rem0cmp.h"
+#include "lock0lock.h"
+#include "ibuf0ibuf.h"
+#include "trx0undo.h"
+#include "srv0mon.h"
+#include "gis0geo.h"
+#include <cmath>
+
+/*************************************************************//**
+Initial split nodes info for R-tree split.
+@return initialized split nodes array */
+static
+rtr_split_node_t*
+rtr_page_split_initialize_nodes(
+/*============================*/
+ mem_heap_t* heap, /*!< in: pointer to memory heap, or NULL */
+ btr_cur_t* cursor, /*!< in: cursor at which to insert; when the
+ function returns, the cursor is positioned
+ on the predecessor of the inserted record */
+ rec_offs** offsets,/*!< in: offsets on inserted record */
+ const dtuple_t* tuple, /*!< in: tuple to insert */
+ double** buf_pos)/*!< in/out: current buffer position */
+{
+ rtr_split_node_t* split_node_array;
+ double* buf;
+ ulint n_recs;
+ rtr_split_node_t* task;
+ rtr_split_node_t* stop;
+ rtr_split_node_t* cur;
+ rec_t* rec;
+ buf_block_t* block;
+ page_t* page;
+ ulint n_uniq;
+ ulint len;
+ const byte* source_cur;
+
+ block = btr_cur_get_block(cursor);
+ page = buf_block_get_frame(block);
+ n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
+
+ n_recs = ulint(page_get_n_recs(page)) + 1;
+
+ /*We reserve 2 MBRs memory space for temp result of split
+ algrithm. And plus the new mbr that need to insert, we
+ need (n_recs + 3)*MBR size for storing all MBRs.*/
+ buf = static_cast<double*>(mem_heap_alloc(
+ heap, DATA_MBR_LEN * (n_recs + 3)
+ + sizeof(rtr_split_node_t) * (n_recs + 1)));
+
+ split_node_array = (rtr_split_node_t*)(buf + SPDIMS * 2 * (n_recs + 3));
+ task = split_node_array;
+ *buf_pos = buf;
+ stop = task + n_recs;
+
+ rec = page_rec_get_next(page_get_infimum_rec(page));
+ const ulint n_core = page_is_leaf(page)
+ ? cursor->index->n_core_fields : 0;
+ *offsets = rec_get_offsets(rec, cursor->index, *offsets, n_core,
+ n_uniq, &heap);
+
+ source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
+
+ for (cur = task; cur < stop - 1; ++cur) {
+ cur->coords = reserve_coords(buf_pos, SPDIMS);
+ cur->key = rec;
+
+ memcpy(cur->coords, source_cur, DATA_MBR_LEN);
+
+ rec = page_rec_get_next(rec);
+ *offsets = rec_get_offsets(rec, cursor->index, *offsets,
+ n_core, n_uniq, &heap);
+ source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
+ }
+
+ /* Put the insert key to node list */
+ source_cur = static_cast<const byte*>(dfield_get_data(
+ dtuple_get_nth_field(tuple, 0)));
+ cur->coords = reserve_coords(buf_pos, SPDIMS);
+ rec = (byte*) mem_heap_alloc(
+ heap, rec_get_converted_size(cursor->index, tuple, 0));
+
+ rec = rec_convert_dtuple_to_rec(rec, cursor->index, tuple, 0);
+ cur->key = rec;
+
+ memcpy(cur->coords, source_cur, DATA_MBR_LEN);
+
+ return split_node_array;
+}
+
+/**********************************************************************//**
+Builds a Rtree node pointer out of a physical record and a page number.
+Note: For Rtree, we just keep the mbr and page no field in non-leaf level
+page. It's different with Btree, Btree still keeps PK fields so far.
+@return own: node pointer */
+dtuple_t*
+rtr_index_build_node_ptr(
+/*=====================*/
+ const dict_index_t* index, /*!< in: index */
+ const rtr_mbr_t* mbr, /*!< in: mbr of lower page */
+ const rec_t* rec, /*!< in: record for which to build node
+ pointer */
+ ulint page_no,/*!< in: page number to put in node
+ pointer */
+ mem_heap_t* heap) /*!< in: memory heap where pointer
+ created */
+{
+ dtuple_t* tuple;
+ dfield_t* field;
+ byte* buf;
+ ulint n_unique;
+ ulint info_bits;
+
+ ut_ad(dict_index_is_spatial(index));
+
+ n_unique = DICT_INDEX_SPATIAL_NODEPTR_SIZE;
+
+ tuple = dtuple_create(heap, n_unique + 1);
+
+ /* For rtree internal node, we need to compare page number
+ fields. */
+ dtuple_set_n_fields_cmp(tuple, n_unique + 1);
+
+ dict_index_copy_types(tuple, index, n_unique);
+
+ /* Write page no field */
+ buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
+
+ mach_write_to_4(buf, page_no);
+
+ field = dtuple_get_nth_field(tuple, n_unique);
+ dfield_set_data(field, buf, 4);
+
+ dtype_set(dfield_get_type(field), DATA_SYS_CHILD, DATA_NOT_NULL, 4);
+
+ /* Set info bits. */
+ info_bits = rec_get_info_bits(rec, dict_table_is_comp(index->table));
+ dtuple_set_info_bits(tuple, info_bits | REC_STATUS_NODE_PTR);
+
+ /* Set mbr as index entry data */
+ field = dtuple_get_nth_field(tuple, 0);
+
+ buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_MBR_LEN));
+
+ rtr_write_mbr(buf, mbr);
+
+ dfield_set_data(field, buf, DATA_MBR_LEN);
+
+ ut_ad(dtuple_check_typed(tuple));
+
+ return(tuple);
+}
+
+/**************************************************************//**
+Update the mbr field of a spatial index row.
+@return true if update is successful */
+bool
+rtr_update_mbr_field(
+/*=================*/
+ btr_cur_t* cursor, /*!< in/out: cursor pointed to rec.*/
+ rec_offs* offsets, /*!< in/out: offsets on rec. */
+ btr_cur_t* cursor2, /*!< in/out: cursor pointed to rec
+ that should be deleted.
+ this cursor is for btr_compress to
+ delete the merged page's father rec.*/
+ page_t* child_page, /*!< in: child page. */
+ rtr_mbr_t* mbr, /*!< in: the new mbr. */
+ rec_t* new_rec, /*!< in: rec to use */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_index_t* index = cursor->index;
+ mem_heap_t* heap;
+ page_t* page;
+ rec_t* rec;
+ constexpr ulint flags = BTR_NO_UNDO_LOG_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_KEEP_SYS_FLAG;
+ dberr_t err;
+ big_rec_t* dummy_big_rec;
+ buf_block_t* block;
+ rec_t* child_rec;
+ ulint up_match = 0;
+ ulint low_match = 0;
+ ulint child;
+ ulint rec_info;
+ bool ins_suc = true;
+ ulint cur2_pos = 0;
+ ulint del_page_no = 0;
+ rec_offs* offsets2;
+
+ rec = btr_cur_get_rec(cursor);
+ page = page_align(rec);
+
+ rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
+
+ heap = mem_heap_create(100);
+ block = btr_cur_get_block(cursor);
+ ut_ad(page == buf_block_get_frame(block));
+
+ child = btr_node_ptr_get_child_page_no(rec, offsets);
+ const ulint n_core = page_is_leaf(block->frame)
+ ? index->n_core_fields : 0;
+
+ if (new_rec) {
+ child_rec = new_rec;
+ } else {
+ child_rec = page_rec_get_next(page_get_infimum_rec(child_page));
+ }
+
+ dtuple_t* node_ptr = rtr_index_build_node_ptr(
+ index, mbr, child_rec, child, heap);
+
+ /* We need to remember the child page no of cursor2, since page could be
+ reorganized or insert a new rec before it. */
+ if (cursor2) {
+ rec_t* del_rec = btr_cur_get_rec(cursor2);
+ offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
+ index, NULL, 0,
+ ULINT_UNDEFINED, &heap);
+ del_page_no = btr_node_ptr_get_child_page_no(del_rec, offsets2);
+ cur2_pos = page_rec_get_n_recs_before(btr_cur_get_rec(cursor2));
+ }
+
+ ut_ad(rec_offs_validate(rec, index, offsets));
+ ut_ad(rec_offs_base(offsets)[0 + 1] == DATA_MBR_LEN);
+ ut_ad(node_ptr->fields[0].len == DATA_MBR_LEN);
+
+ if (rec_info & REC_INFO_MIN_REC_FLAG) {
+ /* When the rec is minimal rec in this level, we do
+ in-place update for avoiding it move to other place. */
+ page_zip_des_t* page_zip = buf_block_get_page_zip(block);
+
+ if (UNIV_LIKELY_NULL(page_zip)) {
+ /* Check if there's enough space for in-place
+ update the zip page. */
+ if (!btr_cur_update_alloc_zip(
+ page_zip,
+ btr_cur_get_page_cur(cursor),
+ index, offsets,
+ rec_offs_size(offsets),
+ false, mtr)) {
+
+ /* If there's not enought space for
+ inplace update zip page, we do delete
+ insert. */
+ ins_suc = false;
+
+ /* Since btr_cur_update_alloc_zip could
+ reorganize the page, we need to repositon
+ cursor2. */
+ if (cursor2) {
+ cursor2->page_cur.rec =
+ page_rec_get_nth(page,
+ cur2_pos);
+ }
+
+ goto update_mbr;
+ }
+
+ /* Record could be repositioned */
+ rec = btr_cur_get_rec(cursor);
+
+#ifdef UNIV_DEBUG
+ /* Make sure it is still the first record */
+ rec_info = rec_get_info_bits(
+ rec, rec_offs_comp(offsets));
+ ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
+#endif /* UNIV_DEBUG */
+ memcpy(rec, node_ptr->fields[0].data, DATA_MBR_LEN);
+ page_zip_write_rec(block, rec, index, offsets, 0, mtr);
+ } else {
+ mtr->memcpy<mtr_t::MAYBE_NOP>(*block, rec,
+ node_ptr->fields[0].data,
+ DATA_MBR_LEN);
+ }
+
+ if (cursor2) {
+ rec_offs* offsets2;
+
+ if (UNIV_LIKELY_NULL(page_zip)) {
+ cursor2->page_cur.rec
+ = page_rec_get_nth(page, cur2_pos);
+ }
+ offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
+ index, NULL, 0,
+ ULINT_UNDEFINED, &heap);
+ ut_ad(del_page_no == btr_node_ptr_get_child_page_no(
+ cursor2->page_cur.rec,
+ offsets2));
+
+ page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
+ index, offsets2, mtr);
+ }
+ } else if (page_get_n_recs(page) == 1) {
+ /* When there's only one rec in the page, we do insert/delete to
+ avoid page merge. */
+
+ page_cur_t page_cur;
+ rec_t* insert_rec;
+ rec_offs* insert_offsets = NULL;
+ ulint old_pos;
+ rec_t* old_rec;
+
+ ut_ad(cursor2 == NULL);
+
+ /* Insert the new mbr rec. */
+ old_pos = page_rec_get_n_recs_before(rec);
+
+ err = btr_cur_optimistic_insert(
+ flags,
+ cursor, &insert_offsets, &heap,
+ node_ptr, &insert_rec, &dummy_big_rec, 0, NULL, mtr);
+
+ ut_ad(err == DB_SUCCESS);
+
+ btr_cur_position(index, insert_rec, block, cursor);
+
+ /* Delete the old mbr rec. */
+ old_rec = page_rec_get_nth(page, old_pos);
+ ut_ad(old_rec != insert_rec);
+
+ page_cur_position(old_rec, block, &page_cur);
+ offsets2 = rec_get_offsets(old_rec, index, NULL, n_core,
+ ULINT_UNDEFINED, &heap);
+ page_cur_delete_rec(&page_cur, index, offsets2, mtr);
+
+ } else {
+update_mbr:
+ /* When there're not only 1 rec in the page, we do delete/insert
+ to avoid page split. */
+ rec_t* insert_rec;
+ rec_offs* insert_offsets = NULL;
+ rec_t* next_rec;
+
+ /* Delete the rec which cursor point to. */
+ next_rec = page_rec_get_next(rec);
+ page_cur_delete_rec(btr_cur_get_page_cur(cursor),
+ index, offsets, mtr);
+ if (!ins_suc) {
+ ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
+
+ btr_set_min_rec_mark(next_rec, *block, mtr);
+ }
+
+ /* If there's more than 1 rec left in the page, delete
+ the rec which cursor2 point to. Otherwise, delete it later.*/
+ if (cursor2 && page_get_n_recs(page) > 1) {
+ ulint cur2_rec_info;
+ rec_t* cur2_rec;
+
+ cur2_rec = cursor2->page_cur.rec;
+ offsets2 = rec_get_offsets(cur2_rec, index, NULL,
+ n_core,
+ ULINT_UNDEFINED, &heap);
+
+ cur2_rec_info = rec_get_info_bits(cur2_rec,
+ rec_offs_comp(offsets2));
+ if (cur2_rec_info & REC_INFO_MIN_REC_FLAG) {
+ /* If we delete the leftmost node
+ pointer on a non-leaf level, we must
+ mark the new leftmost node pointer as
+ the predefined minimum record */
+ rec_t* next_rec = page_rec_get_next(cur2_rec);
+ btr_set_min_rec_mark(next_rec, *block, mtr);
+ }
+
+ ut_ad(del_page_no
+ == btr_node_ptr_get_child_page_no(cur2_rec,
+ offsets2));
+ page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
+ index, offsets2, mtr);
+ cursor2 = NULL;
+ }
+
+ /* Insert the new rec. */
+ page_cur_search_with_match(block, index, node_ptr,
+ PAGE_CUR_LE , &up_match, &low_match,
+ btr_cur_get_page_cur(cursor), NULL);
+
+ err = btr_cur_optimistic_insert(flags, cursor, &insert_offsets,
+ &heap, node_ptr, &insert_rec,
+ &dummy_big_rec, 0, NULL, mtr);
+
+ if (!ins_suc && err == DB_SUCCESS) {
+ ins_suc = true;
+ }
+
+ /* If optimistic insert fail, try reorganize the page
+ and insert again. */
+ if (err != DB_SUCCESS && ins_suc) {
+ btr_page_reorganize(btr_cur_get_page_cur(cursor),
+ index, mtr);
+
+ err = btr_cur_optimistic_insert(flags,
+ cursor,
+ &insert_offsets,
+ &heap,
+ node_ptr,
+ &insert_rec,
+ &dummy_big_rec,
+ 0, NULL, mtr);
+
+ /* Will do pessimistic insert */
+ if (err != DB_SUCCESS) {
+ ins_suc = false;
+ }
+ }
+
+ /* Insert succeed, position cursor the inserted rec.*/
+ if (ins_suc) {
+ btr_cur_position(index, insert_rec, block, cursor);
+ offsets = rec_get_offsets(insert_rec,
+ index, offsets, n_core,
+ ULINT_UNDEFINED, &heap);
+ }
+
+ /* Delete the rec which cursor2 point to. */
+ if (cursor2) {
+ ulint cur2_pno;
+ rec_t* cur2_rec;
+
+ cursor2->page_cur.rec = page_rec_get_nth(page,
+ cur2_pos);
+
+ cur2_rec = btr_cur_get_rec(cursor2);
+
+ offsets2 = rec_get_offsets(cur2_rec, index, NULL,
+ n_core,
+ ULINT_UNDEFINED, &heap);
+
+ /* If the cursor2 position is on a wrong rec, we
+ need to reposition it. */
+ cur2_pno = btr_node_ptr_get_child_page_no(cur2_rec, offsets2);
+ if ((del_page_no != cur2_pno)
+ || (cur2_rec == insert_rec)) {
+ cur2_rec = page_rec_get_next(
+ page_get_infimum_rec(page));
+
+ while (!page_rec_is_supremum(cur2_rec)) {
+ offsets2 = rec_get_offsets(cur2_rec, index,
+ NULL,
+ n_core,
+ ULINT_UNDEFINED,
+ &heap);
+ cur2_pno = btr_node_ptr_get_child_page_no(
+ cur2_rec, offsets2);
+ if (cur2_pno == del_page_no) {
+ if (insert_rec != cur2_rec) {
+ cursor2->page_cur.rec =
+ cur2_rec;
+ break;
+ }
+ }
+ cur2_rec = page_rec_get_next(cur2_rec);
+ }
+
+ ut_ad(!page_rec_is_supremum(cur2_rec));
+ }
+
+ rec_info = rec_get_info_bits(cur2_rec,
+ rec_offs_comp(offsets2));
+ if (rec_info & REC_INFO_MIN_REC_FLAG) {
+ /* If we delete the leftmost node
+ pointer on a non-leaf level, we must
+ mark the new leftmost node pointer as
+ the predefined minimum record */
+ rec_t* next_rec = page_rec_get_next(cur2_rec);
+ btr_set_min_rec_mark(next_rec, *block, mtr);
+ }
+
+ ut_ad(cur2_pno == del_page_no && cur2_rec != insert_rec);
+
+ page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
+ index, offsets2, mtr);
+ }
+
+ if (!ins_suc) {
+ mem_heap_t* new_heap = NULL;
+
+ err = btr_cur_pessimistic_insert(
+ flags,
+ cursor, &insert_offsets, &new_heap,
+ node_ptr, &insert_rec, &dummy_big_rec,
+ 0, NULL, mtr);
+
+ ut_ad(err == DB_SUCCESS);
+
+ if (new_heap) {
+ mem_heap_free(new_heap);
+ }
+
+ }
+
+ if (cursor2) {
+ btr_cur_compress_if_useful(cursor, FALSE, mtr);
+ }
+ }
+
+ ut_ad(page_has_prev(page)
+ || (REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
+ page_rec_get_next(page_get_infimum_rec(page)),
+ page_is_comp(page))));
+
+ mem_heap_free(heap);
+
+ return(true);
+}
+
+/**************************************************************//**
+Update parent page's MBR and Predicate lock information during a split */
+static MY_ATTRIBUTE((nonnull))
+void
+rtr_adjust_upper_level(
+/*===================*/
+ btr_cur_t* sea_cur, /*!< in: search cursor */
+ ulint flags, /*!< in: undo logging and
+ locking flags */
+ buf_block_t* block, /*!< in/out: page to be split */
+ buf_block_t* new_block, /*!< in/out: the new half page */
+ rtr_mbr_t* mbr, /*!< in: MBR on the old page */
+ rtr_mbr_t* new_mbr, /*!< in: MBR on the new page */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ ulint page_no;
+ ulint new_page_no;
+ dict_index_t* index = sea_cur->index;
+ btr_cur_t cursor;
+ rec_offs* offsets;
+ mem_heap_t* heap;
+ ulint level;
+ dtuple_t* node_ptr_upper;
+ page_cur_t* page_cursor;
+ lock_prdt_t prdt;
+ lock_prdt_t new_prdt;
+ dberr_t err;
+ big_rec_t* dummy_big_rec;
+ rec_t* rec;
+
+ /* Create a memory heap where the data tuple is stored */
+ heap = mem_heap_create(1024);
+ cursor.init();
+
+ cursor.thr = sea_cur->thr;
+
+ /* Get the level of the split pages */
+ level = btr_page_get_level(buf_block_get_frame(block));
+ ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block)));
+
+ page_no = block->page.id().page_no();
+
+ new_page_no = new_block->page.id().page_no();
+
+ /* Set new mbr for the old page on the upper level. */
+ /* Look up the index for the node pointer to page */
+ offsets = rtr_page_get_father_block(
+ NULL, heap, index, block, mtr, sea_cur, &cursor);
+
+ page_cursor = btr_cur_get_page_cur(&cursor);
+
+ rtr_update_mbr_field(&cursor, offsets, NULL, block->frame, mbr, NULL,
+ mtr);
+
+ /* Already updated parent MBR, reset in our path */
+ if (sea_cur->rtr_info) {
+ node_visit_t* node_visit = rtr_get_parent_node(
+ sea_cur, level + 1, true);
+ if (node_visit) {
+ node_visit->mbr_inc = 0;
+ }
+ }
+
+ /* Insert the node for the new page. */
+ node_ptr_upper = rtr_index_build_node_ptr(
+ index, new_mbr,
+ page_rec_get_next(page_get_infimum_rec(new_block->frame)),
+ new_page_no, heap);
+
+ ulint up_match = 0;
+ ulint low_match = 0;
+
+ buf_block_t* father_block = btr_cur_get_block(&cursor);
+
+ page_cur_search_with_match(
+ father_block, index, node_ptr_upper,
+ PAGE_CUR_LE , &up_match, &low_match,
+ btr_cur_get_page_cur(&cursor), NULL);
+
+ err = btr_cur_optimistic_insert(
+ flags
+ | BTR_NO_LOCKING_FLAG
+ | BTR_KEEP_SYS_FLAG
+ | BTR_NO_UNDO_LOG_FLAG,
+ &cursor, &offsets, &heap,
+ node_ptr_upper, &rec, &dummy_big_rec, 0, NULL, mtr);
+
+ if (err == DB_FAIL) {
+ cursor.rtr_info = sea_cur->rtr_info;
+ cursor.tree_height = sea_cur->tree_height;
+
+ /* Recreate a memory heap as input parameter for
+ btr_cur_pessimistic_insert(), because the heap may be
+ emptied in btr_cur_pessimistic_insert(). */
+ mem_heap_t* new_heap = mem_heap_create(1024);
+
+ err = btr_cur_pessimistic_insert(flags
+ | BTR_NO_LOCKING_FLAG
+ | BTR_KEEP_SYS_FLAG
+ | BTR_NO_UNDO_LOG_FLAG,
+ &cursor, &offsets, &new_heap,
+ node_ptr_upper, &rec,
+ &dummy_big_rec, 0, NULL, mtr);
+ cursor.rtr_info = NULL;
+ ut_a(err == DB_SUCCESS);
+
+ mem_heap_free(new_heap);
+ }
+
+ prdt.data = static_cast<void*>(mbr);
+ prdt.op = 0;
+ new_prdt.data = static_cast<void*>(new_mbr);
+ new_prdt.op = 0;
+
+ lock_prdt_update_parent(block, new_block, &prdt, &new_prdt,
+ page_cursor->block->page.id());
+
+ mem_heap_free(heap);
+
+ ut_ad(block->zip_size() == index->table->space->zip_size());
+
+ const uint32_t next_page_no = btr_page_get_next(block->frame);
+
+ if (next_page_no != FIL_NULL) {
+ buf_block_t* next_block = btr_block_get(
+ *index, next_page_no, RW_X_LATCH, false, mtr);
+#ifdef UNIV_BTR_DEBUG
+ ut_a(page_is_comp(next_block->frame)
+ == page_is_comp(block->frame));
+ ut_a(btr_page_get_prev(next_block->frame)
+ == block->page.id().page_no());
+#endif /* UNIV_BTR_DEBUG */
+
+ btr_page_set_prev(next_block, new_page_no, mtr);
+ }
+
+ btr_page_set_next(block, new_page_no, mtr);
+
+ btr_page_set_prev(new_block, page_no, mtr);
+ btr_page_set_next(new_block, next_page_no, mtr);
+}
+
+/*************************************************************//**
+Moves record list to another page for rtree splitting.
+
+IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
+if new_block is a compressed leaf page in a secondary index.
+This has to be done either within the same mini-transaction,
+or by invoking ibuf_reset_free_bits() before mtr_commit().
+
+@return TRUE on success; FALSE on compression failure */
+static
+ibool
+rtr_split_page_move_rec_list(
+/*=========================*/
+ rtr_split_node_t* node_array, /*!< in: split node array. */
+ int first_rec_group,/*!< in: group number of the
+ first rec. */
+ buf_block_t* new_block, /*!< in/out: index page
+ where to move */
+ buf_block_t* block, /*!< in/out: page containing
+ split_rec */
+ rec_t* first_rec, /*!< in: first record not to
+ move */
+ dict_index_t* index, /*!< in: record descriptor */
+ mem_heap_t* heap, /*!< in: pointer to memory
+ heap, or NULL */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ rtr_split_node_t* cur_split_node;
+ rtr_split_node_t* end_split_node;
+ page_cur_t page_cursor;
+ page_cur_t new_page_cursor;
+ page_t* page;
+ page_t* new_page;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets = offsets_;
+ page_zip_des_t* new_page_zip
+ = buf_block_get_page_zip(new_block);
+ rec_t* rec;
+ rec_t* ret;
+ ulint moved = 0;
+ ulint max_to_move = 0;
+ rtr_rec_move_t* rec_move = NULL;
+
+ ut_ad(!dict_index_is_ibuf(index));
+ ut_ad(dict_index_is_spatial(index));
+
+ rec_offs_init(offsets_);
+
+ page_cur_set_before_first(block, &page_cursor);
+ page_cur_set_before_first(new_block, &new_page_cursor);
+
+ page = buf_block_get_frame(block);
+ new_page = buf_block_get_frame(new_block);
+ ret = page_rec_get_prev(page_get_supremum_rec(new_page));
+
+ end_split_node = node_array + page_get_n_recs(page);
+
+ mtr_log_t log_mode = MTR_LOG_NONE;
+
+ if (new_page_zip) {
+ log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
+ }
+
+ max_to_move = page_get_n_recs(
+ buf_block_get_frame(block));
+ rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
+ heap,
+ sizeof (*rec_move) * max_to_move));
+ const ulint n_core = page_is_leaf(page)
+ ? index->n_core_fields : 0;
+
+ /* Insert the recs in group 2 to new page. */
+ for (cur_split_node = node_array;
+ cur_split_node < end_split_node; ++cur_split_node) {
+ if (cur_split_node->n_node != first_rec_group) {
+ lock_rec_store_on_page_infimum(
+ block, cur_split_node->key);
+
+ offsets = rec_get_offsets(cur_split_node->key,
+ index, offsets, n_core,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(!n_core || cur_split_node->key != first_rec);
+
+ rec = page_cur_insert_rec_low(
+ &new_page_cursor,
+ index, cur_split_node->key, offsets, mtr);
+
+ ut_a(rec);
+
+ lock_rec_restore_from_page_infimum(
+ new_block, rec, block);
+
+ page_cur_move_to_next(&new_page_cursor);
+
+ rec_move[moved].new_rec = rec;
+ rec_move[moved].old_rec = cur_split_node->key;
+ rec_move[moved].moved = false;
+ moved++;
+
+ if (moved > max_to_move) {
+ ut_ad(0);
+ break;
+ }
+ }
+ }
+
+ /* Update PAGE_MAX_TRX_ID on the uncompressed page.
+ Modifications will be redo logged and copied to the compressed
+ page in page_zip_compress() or page_zip_reorganize() below.
+ Multiple transactions cannot simultaneously operate on the
+ same temp-table in parallel.
+ max_trx_id is ignored for temp tables because it not required
+ for MVCC. */
+ if (n_core && !index->table->is_temporary()) {
+ page_update_max_trx_id(new_block, NULL,
+ page_get_max_trx_id(page),
+ mtr);
+ }
+
+ if (new_page_zip) {
+ mtr_set_log_mode(mtr, log_mode);
+
+ if (!page_zip_compress(new_block, index,
+ page_zip_level, mtr)) {
+ ulint ret_pos;
+
+ /* Before trying to reorganize the page,
+ store the number of preceding records on the page. */
+ ret_pos = page_rec_get_n_recs_before(ret);
+ /* Before copying, "ret" was the predecessor
+ of the predefined supremum record. If it was
+ the predefined infimum record, then it would
+ still be the infimum, and we would have
+ ret_pos == 0. */
+
+ if (UNIV_UNLIKELY
+ (!page_zip_reorganize(new_block, index,
+ page_zip_level, mtr))) {
+
+ if (UNIV_UNLIKELY
+ (!page_zip_decompress(new_page_zip,
+ new_page, FALSE))) {
+ ut_error;
+ }
+#ifdef UNIV_GIS_DEBUG
+ ut_ad(page_validate(new_page, index));
+#endif
+
+ return(false);
+ }
+
+ /* The page was reorganized: Seek to ret_pos. */
+ ret = page_rec_get_nth(new_page, ret_pos);
+ }
+ }
+
+ /* Update the lock table */
+ lock_rtr_move_rec_list(new_block, block, rec_move, moved);
+
+ /* Delete recs in second group from the old page. */
+ for (cur_split_node = node_array;
+ cur_split_node < end_split_node; ++cur_split_node) {
+ if (cur_split_node->n_node != first_rec_group) {
+ page_cur_position(cur_split_node->key,
+ block, &page_cursor);
+ offsets = rec_get_offsets(
+ page_cur_get_rec(&page_cursor), index,
+ offsets, n_core, ULINT_UNDEFINED,
+ &heap);
+ page_cur_delete_rec(&page_cursor,
+ index, offsets, mtr);
+ }
+ }
+
+ return(true);
+}
+
+/*************************************************************//**
+Splits an R-tree index page to halves and inserts the tuple. It is assumed
+that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
+released within this function! NOTE that the operation of this
+function must always succeed, we cannot reverse it: therefore enough
+free disk space (2 pages) must be guaranteed to be available before
+this function is called.
+@return inserted record */
+rec_t*
+rtr_page_split_and_insert(
+/*======================*/
+ ulint flags, /*!< in: undo logging and locking flags */
+ btr_cur_t* cursor, /*!< in/out: cursor at which to insert; when the
+ function returns, the cursor is positioned
+ on the predecessor of the inserted record */
+ rec_offs** offsets,/*!< out: offsets on inserted record */
+ mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
+ const dtuple_t* tuple, /*!< in: tuple to insert */
+ ulint n_ext, /*!< in: number of externally stored columns */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ buf_block_t* block;
+ page_t* page;
+ page_t* new_page;
+ buf_block_t* new_block;
+ page_zip_des_t* page_zip;
+ page_zip_des_t* new_page_zip;
+ buf_block_t* insert_block;
+ page_cur_t* page_cursor;
+ rec_t* rec = 0;
+ ulint n_recs;
+ ulint total_data;
+ ulint insert_size;
+ rtr_split_node_t* rtr_split_node_array;
+ rtr_split_node_t* cur_split_node;
+ rtr_split_node_t* end_split_node;
+ double* buf_pos;
+ node_seq_t current_ssn;
+ node_seq_t next_ssn;
+ buf_block_t* root_block;
+ rtr_mbr_t mbr;
+ rtr_mbr_t new_mbr;
+ lock_prdt_t prdt;
+ lock_prdt_t new_prdt;
+ rec_t* first_rec = NULL;
+ int first_rec_group = 1;
+ ulint n_iterations = 0;
+
+ if (!*heap) {
+ *heap = mem_heap_create(1024);
+ }
+
+func_start:
+ mem_heap_empty(*heap);
+ *offsets = NULL;
+
+ ut_ad(mtr->memo_contains_flagged(&cursor->index->lock, MTR_MEMO_X_LOCK
+ | MTR_MEMO_SX_LOCK));
+ ut_ad(!dict_index_is_online_ddl(cursor->index)
+ || (flags & BTR_CREATE_FLAG)
+ || dict_index_is_clust(cursor->index));
+ ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
+ RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
+
+ block = btr_cur_get_block(cursor);
+ page = buf_block_get_frame(block);
+ page_zip = buf_block_get_page_zip(block);
+ current_ssn = page_get_ssn_id(page);
+
+ ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
+ ut_ad(page_get_n_recs(page) >= 1);
+
+ const page_id_t page_id(block->page.id());
+
+ if (!page_has_prev(page) && !page_is_leaf(page)) {
+ first_rec = page_rec_get_next(
+ page_get_infimum_rec(buf_block_get_frame(block)));
+ }
+
+ /* Initial split nodes array. */
+ rtr_split_node_array = rtr_page_split_initialize_nodes(
+ *heap, cursor, offsets, tuple, &buf_pos);
+
+ /* Divide all mbrs to two groups. */
+ n_recs = ulint(page_get_n_recs(page)) + 1;
+
+ end_split_node = rtr_split_node_array + n_recs;
+
+#ifdef UNIV_GIS_DEBUG
+ fprintf(stderr, "Before split a page:\n");
+ for (cur_split_node = rtr_split_node_array;
+ cur_split_node < end_split_node; ++cur_split_node) {
+ for (int i = 0; i < SPDIMS * 2; i++) {
+ fprintf(stderr, "%.2lf ",
+ *(cur_split_node->coords + i));
+ }
+ fprintf(stderr, "\n");
+ }
+#endif
+
+ insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
+ total_data = page_get_data_size(page) + insert_size;
+ first_rec_group = split_rtree_node(rtr_split_node_array,
+ static_cast<int>(n_recs),
+ static_cast<int>(total_data),
+ static_cast<int>(insert_size),
+ 0, 2, 2, &buf_pos, SPDIMS,
+ static_cast<uchar*>(first_rec));
+
+ /* Allocate a new page to the index */
+ const uint16_t page_level = btr_page_get_level(page);
+ new_block = btr_page_alloc(cursor->index, page_id.page_no() + 1,
+ FSP_UP, page_level, mtr, mtr);
+ if (!new_block) {
+ return NULL;
+ }
+
+ new_page_zip = buf_block_get_page_zip(new_block);
+ if (page_level && UNIV_LIKELY_NULL(new_page_zip)) {
+ /* ROW_FORMAT=COMPRESSED non-leaf pages are not expected
+ to contain FIL_NULL in FIL_PAGE_PREV at this stage. */
+ memset_aligned<4>(new_block->frame + FIL_PAGE_PREV, 0, 4);
+ }
+ btr_page_create(new_block, new_page_zip, cursor->index,
+ page_level, mtr);
+
+ new_page = buf_block_get_frame(new_block);
+ ut_ad(page_get_ssn_id(new_page) == 0);
+
+ /* Set new ssn to the new page and page. */
+ page_set_ssn_id(new_block, new_page_zip, current_ssn, mtr);
+ next_ssn = rtr_get_new_ssn_id(cursor->index);
+
+ page_set_ssn_id(block, page_zip, next_ssn, mtr);
+
+ /* Keep recs in first group to the old page, move recs in second
+ groups to the new page. */
+ if (0
+#ifdef UNIV_ZIP_COPY
+ || page_zip
+#endif
+ || !rtr_split_page_move_rec_list(rtr_split_node_array,
+ first_rec_group,
+ new_block, block, first_rec,
+ cursor->index, *heap, mtr)) {
+ ulint n = 0;
+ rec_t* rec;
+ ulint moved = 0;
+ ulint max_to_move = 0;
+ rtr_rec_move_t* rec_move = NULL;
+ ulint pos;
+
+ /* For some reason, compressing new_page failed,
+ even though it should contain fewer records than
+ the original page. Copy the page byte for byte
+ and then delete the records from both pages
+ as appropriate. Deleting will always succeed. */
+ ut_a(new_page_zip);
+
+ page_zip_copy_recs(new_block,
+ page_zip, page, cursor->index, mtr);
+
+ page_cursor = btr_cur_get_page_cur(cursor);
+
+ /* Move locks on recs. */
+ max_to_move = page_get_n_recs(page);
+ rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
+ *heap,
+ sizeof (*rec_move) * max_to_move));
+
+ /* Init the rec_move array for moving lock on recs. */
+ for (cur_split_node = rtr_split_node_array;
+ cur_split_node < end_split_node - 1; ++cur_split_node) {
+ if (cur_split_node->n_node != first_rec_group) {
+ pos = page_rec_get_n_recs_before(
+ cur_split_node->key);
+ rec = page_rec_get_nth(new_page, pos);
+ ut_a(rec);
+
+ rec_move[moved].new_rec = rec;
+ rec_move[moved].old_rec = cur_split_node->key;
+ rec_move[moved].moved = false;
+ moved++;
+
+ if (moved > max_to_move) {
+ ut_ad(0);
+ break;
+ }
+ }
+ }
+
+ /* Update the lock table */
+ lock_rtr_move_rec_list(new_block, block, rec_move, moved);
+
+ const ulint n_core = page_level
+ ? 0 : cursor->index->n_core_fields;
+
+ /* Delete recs in first group from the new page. */
+ for (cur_split_node = rtr_split_node_array;
+ cur_split_node < end_split_node - 1; ++cur_split_node) {
+ if (cur_split_node->n_node == first_rec_group) {
+ ulint pos;
+
+ pos = page_rec_get_n_recs_before(
+ cur_split_node->key);
+ ut_a(pos > 0);
+ rec_t* new_rec = page_rec_get_nth(new_page,
+ pos - n);
+
+ ut_a(new_rec && page_rec_is_user_rec(new_rec));
+ page_cur_position(new_rec, new_block,
+ page_cursor);
+
+ *offsets = rec_get_offsets(
+ page_cur_get_rec(page_cursor),
+ cursor->index, *offsets, n_core,
+ ULINT_UNDEFINED, heap);
+
+ page_cur_delete_rec(page_cursor,
+ cursor->index, *offsets, mtr);
+ n++;
+ }
+ }
+
+ /* Delete recs in second group from the old page. */
+ for (cur_split_node = rtr_split_node_array;
+ cur_split_node < end_split_node - 1; ++cur_split_node) {
+ if (cur_split_node->n_node != first_rec_group) {
+ page_cur_position(cur_split_node->key,
+ block, page_cursor);
+ *offsets = rec_get_offsets(
+ page_cur_get_rec(page_cursor),
+ cursor->index, *offsets, n_core,
+ ULINT_UNDEFINED, heap);
+ page_cur_delete_rec(page_cursor,
+ cursor->index, *offsets, mtr);
+ }
+ }
+
+#ifdef UNIV_GIS_DEBUG
+ ut_ad(page_validate(new_page, cursor->index));
+ ut_ad(page_validate(page, cursor->index));
+#endif
+ }
+
+ /* Insert the new rec to the proper page. */
+ cur_split_node = end_split_node - 1;
+ if (cur_split_node->n_node != first_rec_group) {
+ insert_block = new_block;
+ } else {
+ insert_block = block;
+ }
+
+ /* Reposition the cursor for insert and try insertion */
+ page_cursor = btr_cur_get_page_cur(cursor);
+
+ page_cur_search(insert_block, cursor->index, tuple,
+ PAGE_CUR_LE, page_cursor);
+
+ /* It's possible that the new record is too big to be inserted into
+ the page, and it'll need the second round split in this case.
+ We test this scenario here*/
+ DBUG_EXECUTE_IF("rtr_page_need_second_split",
+ if (n_iterations == 0) {
+ rec = NULL;
+ goto after_insert; }
+ );
+
+ rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
+ offsets, heap, n_ext, mtr);
+
+ /* If insert did not fit, try page reorganization.
+ For compressed pages, page_cur_tuple_insert() will have
+ attempted this already. */
+ if (rec == NULL) {
+ if (!is_page_cur_get_page_zip(page_cursor)
+ && btr_page_reorganize(page_cursor, cursor->index, mtr)) {
+ rec = page_cur_tuple_insert(page_cursor, tuple,
+ cursor->index, offsets,
+ heap, n_ext, mtr);
+
+ }
+ /* If insert fail, we will try to split the insert_block
+ again. */
+ }
+
+#ifdef UNIV_DEBUG
+after_insert:
+#endif
+ /* Calculate the mbr on the upper half-page, and the mbr on
+ original page. */
+ rtr_page_cal_mbr(cursor->index, block, &mbr, *heap);
+ rtr_page_cal_mbr(cursor->index, new_block, &new_mbr, *heap);
+ prdt.data = &mbr;
+ new_prdt.data = &new_mbr;
+
+ /* Check any predicate locks need to be moved/copied to the
+ new page */
+ lock_prdt_update_split(new_block, &prdt, &new_prdt, page_id);
+
+ /* Adjust the upper level. */
+ rtr_adjust_upper_level(cursor, flags, block, new_block,
+ &mbr, &new_mbr, mtr);
+
+ /* Save the new ssn to the root page, since we need to reinit
+ the first ssn value from it after restart server. */
+
+ root_block = btr_root_block_get(cursor->index, RW_SX_LATCH, mtr);
+
+ page_zip = buf_block_get_page_zip(root_block);
+ page_set_ssn_id(root_block, page_zip, next_ssn, mtr);
+
+ /* Insert fit on the page: update the free bits for the
+ left and right pages in the same mtr */
+
+ if (page_is_leaf(page)) {
+ ibuf_update_free_bits_for_two_pages_low(
+ block, new_block, mtr);
+ }
+
+
+ /* If the new res insert fail, we need to do another split
+ again. */
+ if (!rec) {
+ /* We play safe and reset the free bits for new_page */
+ if (!dict_index_is_clust(cursor->index)
+ && !cursor->index->table->is_temporary()) {
+ ibuf_reset_free_bits(new_block);
+ ibuf_reset_free_bits(block);
+ }
+
+ /* We need to clean the parent path here and search father
+ node later, otherwise, it's possible that find a wrong
+ parent. */
+ rtr_clean_rtr_info(cursor->rtr_info, true);
+ cursor->rtr_info = NULL;
+ n_iterations++;
+
+ rec_t* i_rec = page_rec_get_next(page_get_infimum_rec(
+ buf_block_get_frame(block)));
+ btr_cur_position(cursor->index, i_rec, block, cursor);
+
+ goto func_start;
+ }
+
+#ifdef UNIV_GIS_DEBUG
+ ut_ad(page_validate(buf_block_get_frame(block), cursor->index));
+ ut_ad(page_validate(buf_block_get_frame(new_block), cursor->index));
+
+ ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
+#endif
+ MONITOR_INC(MONITOR_INDEX_SPLIT);
+
+ return(rec);
+}
+
+/****************************************************************//**
+Following the right link to find the proper block for insert.
+@return the proper block.*/
+dberr_t
+rtr_ins_enlarge_mbr(
+/*================*/
+ btr_cur_t* btr_cur, /*!< in: btr cursor */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dberr_t err = DB_SUCCESS;
+ rtr_mbr_t new_mbr;
+ buf_block_t* block;
+ mem_heap_t* heap;
+ dict_index_t* index = btr_cur->index;
+ page_cur_t* page_cursor;
+ rec_offs* offsets;
+ node_visit_t* node_visit;
+ btr_cur_t cursor;
+ page_t* page;
+
+ ut_ad(dict_index_is_spatial(index));
+
+ /* If no rtr_info or rtree is one level tree, return. */
+ if (!btr_cur->rtr_info || btr_cur->tree_height == 1) {
+ return(err);
+ }
+
+ /* Check path info is not empty. */
+ ut_ad(!btr_cur->rtr_info->parent_path->empty());
+
+ /* Create a memory heap. */
+ heap = mem_heap_create(1024);
+
+ /* Leaf level page is stored in cursor */
+ page_cursor = btr_cur_get_page_cur(btr_cur);
+ block = page_cur_get_block(page_cursor);
+
+ for (ulint i = 1; i < btr_cur->tree_height; i++) {
+ node_visit = rtr_get_parent_node(btr_cur, i, true);
+ ut_ad(node_visit != NULL);
+
+ /* If there's no mbr enlarge, return.*/
+ if (node_visit->mbr_inc == 0) {
+ block = btr_pcur_get_block(node_visit->cursor);
+ continue;
+ }
+
+ /* Calculate the mbr of the child page. */
+ rtr_page_cal_mbr(index, block, &new_mbr, heap);
+
+ /* Get father block. */
+ cursor.init();
+ offsets = rtr_page_get_father_block(
+ NULL, heap, index, block, mtr, btr_cur, &cursor);
+
+ page = buf_block_get_frame(block);
+
+ /* Update the mbr field of the rec. */
+ if (!rtr_update_mbr_field(&cursor, offsets, NULL, page,
+ &new_mbr, NULL, mtr)) {
+ err = DB_ERROR;
+ break;
+ }
+
+ page_cursor = btr_cur_get_page_cur(&cursor);
+ block = page_cur_get_block(page_cursor);
+ }
+
+ mem_heap_free(heap);
+
+ return(err);
+}
+
+/*************************************************************//**
+Copy recs from a page to new_block of rtree.
+Differs from page_copy_rec_list_end, because this function does not
+touch the lock table and max trx id on page or compress the page.
+
+IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
+if new_block is a compressed leaf page in a secondary index.
+This has to be done either within the same mini-transaction,
+or by invoking ibuf_reset_free_bits() before mtr_commit(). */
+void
+rtr_page_copy_rec_list_end_no_locks(
+/*================================*/
+ buf_block_t* new_block, /*!< in: index page to copy to */
+ buf_block_t* block, /*!< in: index page of rec */
+ rec_t* rec, /*!< in: record on page */
+ dict_index_t* index, /*!< in: record descriptor */
+ mem_heap_t* heap, /*!< in/out: heap memory */
+ rtr_rec_move_t* rec_move, /*!< in: recording records moved */
+ ulint max_move, /*!< in: num of rec to move */
+ ulint* num_moved, /*!< out: num of rec to move */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ page_t* new_page = buf_block_get_frame(new_block);
+ page_cur_t page_cur;
+ page_cur_t cur1;
+ rec_t* cur_rec;
+ rec_offs offsets_1[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets1 = offsets_1;
+ rec_offs offsets_2[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets2 = offsets_2;
+ ulint moved = 0;
+ const ulint n_core = page_is_leaf(new_page)
+ ? index->n_core_fields : 0;
+
+ rec_offs_init(offsets_1);
+ rec_offs_init(offsets_2);
+
+ page_cur_position(rec, block, &cur1);
+
+ if (page_cur_is_before_first(&cur1)) {
+ page_cur_move_to_next(&cur1);
+ }
+
+ btr_assert_not_corrupted(new_block, index);
+ ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
+ ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint)
+ (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
+
+ cur_rec = page_rec_get_next(
+ page_get_infimum_rec(buf_block_get_frame(new_block)));
+ page_cur_position(cur_rec, new_block, &page_cur);
+
+ /* Copy records from the original page to the new page */
+ while (!page_cur_is_after_last(&cur1)) {
+ rec_t* cur1_rec = page_cur_get_rec(&cur1);
+ rec_t* ins_rec;
+
+ if (page_rec_is_infimum(cur_rec)) {
+ cur_rec = page_rec_get_next(cur_rec);
+ }
+
+ offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core,
+ ULINT_UNDEFINED, &heap);
+ while (!page_rec_is_supremum(cur_rec)) {
+ ulint cur_matched_fields = 0;
+ int cmp;
+
+ offsets2 = rec_get_offsets(cur_rec, index, offsets2,
+ n_core,
+ ULINT_UNDEFINED, &heap);
+ cmp = cmp_rec_rec(cur1_rec, cur_rec,
+ offsets1, offsets2, index, false,
+ &cur_matched_fields);
+ if (cmp < 0) {
+ page_cur_move_to_prev(&page_cur);
+ break;
+ } else if (cmp > 0) {
+ /* Skip small recs. */
+ page_cur_move_to_next(&page_cur);
+ cur_rec = page_cur_get_rec(&page_cur);
+ } else if (n_core) {
+ if (rec_get_deleted_flag(cur1_rec,
+ dict_table_is_comp(index->table))) {
+ goto next;
+ } else {
+ /* We have two identical leaf records,
+ skip copying the undeleted one, and
+ unmark deleted on the current page */
+ btr_rec_set_deleted<false>(
+ new_block, cur_rec, mtr);
+ goto next;
+ }
+ }
+ }
+
+ /* If position is on suprenum rec, need to move to
+ previous rec. */
+ if (page_rec_is_supremum(cur_rec)) {
+ page_cur_move_to_prev(&page_cur);
+ }
+
+ cur_rec = page_cur_get_rec(&page_cur);
+
+ offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core,
+ ULINT_UNDEFINED, &heap);
+
+ ins_rec = page_cur_insert_rec_low(&page_cur, index,
+ cur1_rec, offsets1, mtr);
+ if (UNIV_UNLIKELY(!ins_rec)) {
+ fprintf(stderr, "page number %u and %u\n",
+ new_block->page.id().page_no(),
+ block->page.id().page_no());
+
+ ib::fatal() << "rec offset " << page_offset(rec)
+ << ", cur1 offset "
+ << page_offset(page_cur_get_rec(&cur1))
+ << ", cur_rec offset "
+ << page_offset(cur_rec);
+ }
+
+ rec_move[moved].new_rec = ins_rec;
+ rec_move[moved].old_rec = cur1_rec;
+ rec_move[moved].moved = false;
+ moved++;
+next:
+ if (moved > max_move) {
+ ut_ad(0);
+ break;
+ }
+
+ page_cur_move_to_next(&cur1);
+ }
+
+ *num_moved = moved;
+}
+
+/*************************************************************//**
+Copy recs till a specified rec from a page to new_block of rtree. */
+void
+rtr_page_copy_rec_list_start_no_locks(
+/*==================================*/
+ buf_block_t* new_block, /*!< in: index page to copy to */
+ buf_block_t* block, /*!< in: index page of rec */
+ rec_t* rec, /*!< in: record on page */
+ dict_index_t* index, /*!< in: record descriptor */
+ mem_heap_t* heap, /*!< in/out: heap memory */
+ rtr_rec_move_t* rec_move, /*!< in: recording records moved */
+ ulint max_move, /*!< in: num of rec to move */
+ ulint* num_moved, /*!< out: num of rec to move */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ page_cur_t cur1;
+ rec_t* cur_rec;
+ rec_offs offsets_1[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets1 = offsets_1;
+ rec_offs offsets_2[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets2 = offsets_2;
+ page_cur_t page_cur;
+ ulint moved = 0;
+ const ulint n_core = page_is_leaf(buf_block_get_frame(block))
+ ? index->n_core_fields : 0;
+
+ rec_offs_init(offsets_1);
+ rec_offs_init(offsets_2);
+
+ page_cur_set_before_first(block, &cur1);
+ page_cur_move_to_next(&cur1);
+
+ cur_rec = page_rec_get_next(
+ page_get_infimum_rec(buf_block_get_frame(new_block)));
+ page_cur_position(cur_rec, new_block, &page_cur);
+
+ while (page_cur_get_rec(&cur1) != rec) {
+ rec_t* cur1_rec = page_cur_get_rec(&cur1);
+ rec_t* ins_rec;
+
+ if (page_rec_is_infimum(cur_rec)) {
+ cur_rec = page_rec_get_next(cur_rec);
+ }
+
+ offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core,
+ ULINT_UNDEFINED, &heap);
+
+ while (!page_rec_is_supremum(cur_rec)) {
+ ulint cur_matched_fields = 0;
+
+ offsets2 = rec_get_offsets(cur_rec, index, offsets2,
+ n_core,
+ ULINT_UNDEFINED, &heap);
+ int cmp = cmp_rec_rec(cur1_rec, cur_rec,
+ offsets1, offsets2, index, false,
+ &cur_matched_fields);
+ if (cmp < 0) {
+ page_cur_move_to_prev(&page_cur);
+ cur_rec = page_cur_get_rec(&page_cur);
+ break;
+ } else if (cmp > 0) {
+ /* Skip small recs. */
+ page_cur_move_to_next(&page_cur);
+ cur_rec = page_cur_get_rec(&page_cur);
+ } else if (n_core) {
+ if (rec_get_deleted_flag(
+ cur1_rec,
+ dict_table_is_comp(index->table))) {
+ goto next;
+ } else {
+ /* We have two identical leaf records,
+ skip copying the undeleted one, and
+ unmark deleted on the current page */
+ btr_rec_set_deleted<false>(
+ new_block, cur_rec, mtr);
+ goto next;
+ }
+ }
+ }
+
+ /* If position is on suprenum rec, need to move to
+ previous rec. */
+ if (page_rec_is_supremum(cur_rec)) {
+ page_cur_move_to_prev(&page_cur);
+ }
+
+ cur_rec = page_cur_get_rec(&page_cur);
+
+ offsets1 = rec_get_offsets(cur1_rec, index, offsets1, n_core,
+ ULINT_UNDEFINED, &heap);
+
+ ins_rec = page_cur_insert_rec_low(&page_cur, index,
+ cur1_rec, offsets1, mtr);
+ if (UNIV_UNLIKELY(!ins_rec)) {
+ ib::fatal() << new_block->page.id()
+ << "rec offset " << page_offset(rec)
+ << ", cur1 offset "
+ << page_offset(page_cur_get_rec(&cur1))
+ << ", cur_rec offset "
+ << page_offset(cur_rec);
+ }
+
+ rec_move[moved].new_rec = ins_rec;
+ rec_move[moved].old_rec = cur1_rec;
+ rec_move[moved].moved = false;
+ moved++;
+next:
+ if (moved > max_move) {
+ ut_ad(0);
+ break;
+ }
+
+ page_cur_move_to_next(&cur1);
+ }
+
+ *num_moved = moved;
+}
+
+/****************************************************************//**
+Check two MBRs are identical or need to be merged */
+bool
+rtr_merge_mbr_changed(
+/*==================*/
+ btr_cur_t* cursor, /*!< in/out: cursor */
+ btr_cur_t* cursor2, /*!< in: the other cursor */
+ rec_offs* offsets, /*!< in: rec offsets */
+ rec_offs* offsets2, /*!< in: rec offsets */
+ rtr_mbr_t* new_mbr) /*!< out: MBR to update */
+{
+ double* mbr;
+ double mbr1[SPDIMS * 2];
+ double mbr2[SPDIMS * 2];
+ rec_t* rec;
+ ulint len;
+ bool changed = false;
+
+ ut_ad(dict_index_is_spatial(cursor->index));
+
+ rec = btr_cur_get_rec(cursor);
+
+ rtr_read_mbr(rec_get_nth_field(rec, offsets, 0, &len),
+ reinterpret_cast<rtr_mbr_t*>(mbr1));
+
+ rec = btr_cur_get_rec(cursor2);
+
+ rtr_read_mbr(rec_get_nth_field(rec, offsets2, 0, &len),
+ reinterpret_cast<rtr_mbr_t*>(mbr2));
+
+ mbr = reinterpret_cast<double*>(new_mbr);
+
+ for (int i = 0; i < SPDIMS * 2; i += 2) {
+ changed = (changed || mbr1[i] != mbr2[i]);
+ *mbr = mbr1[i] < mbr2[i] ? mbr1[i] : mbr2[i];
+ mbr++;
+ changed = (changed || mbr1[i + 1] != mbr2 [i + 1]);
+ *mbr = mbr1[i + 1] > mbr2[i + 1] ? mbr1[i + 1] : mbr2[i + 1];
+ mbr++;
+ }
+
+ return(changed);
+}
+
+/****************************************************************//**
+Merge 2 mbrs and update the the mbr that cursor is on. */
+dberr_t
+rtr_merge_and_update_mbr(
+/*=====================*/
+ btr_cur_t* cursor, /*!< in/out: cursor */
+ btr_cur_t* cursor2, /*!< in: the other cursor */
+ rec_offs* offsets, /*!< in: rec offsets */
+ rec_offs* offsets2, /*!< in: rec offsets */
+ page_t* child_page, /*!< in: the page. */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dberr_t err = DB_SUCCESS;
+ rtr_mbr_t new_mbr;
+ bool changed = false;
+
+ ut_ad(dict_index_is_spatial(cursor->index));
+
+ changed = rtr_merge_mbr_changed(cursor, cursor2, offsets, offsets2,
+ &new_mbr);
+
+ /* Update the mbr field of the rec. And will delete the record
+ pointed by cursor2 */
+ if (changed) {
+ if (!rtr_update_mbr_field(cursor, offsets, cursor2, child_page,
+ &new_mbr, NULL, mtr)) {
+ err = DB_ERROR;
+ }
+ } else {
+ rtr_node_ptr_delete(cursor2, mtr);
+ }
+
+ return(err);
+}
+
+/*************************************************************//**
+Deletes on the upper level the node pointer to a page. */
+void
+rtr_node_ptr_delete(
+/*================*/
+ btr_cur_t* cursor, /*!< in: search cursor, contains information
+ about parent nodes in search */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ ibool compressed;
+ dberr_t err;
+
+ compressed = btr_cur_pessimistic_delete(&err, TRUE, cursor,
+ BTR_CREATE_FLAG, false, mtr);
+ ut_a(err == DB_SUCCESS);
+
+ if (!compressed) {
+ btr_cur_compress_if_useful(cursor, FALSE, mtr);
+ }
+}
+
+/**************************************************************//**
+Check whether a Rtree page is child of a parent page
+@return true if there is child/parent relationship */
+bool
+rtr_check_same_block(
+/*================*/
+ dict_index_t* index, /*!< in: index tree */
+ btr_cur_t* cursor, /*!< in/out: position at the parent entry
+ pointing to the child if successful */
+ buf_block_t* parentb,/*!< in: parent page to check */
+ buf_block_t* childb, /*!< in: child Page */
+ mem_heap_t* heap) /*!< in: memory heap */
+
+{
+ ulint page_no = childb->page.id().page_no();
+ rec_offs* offsets;
+ rec_t* rec = page_rec_get_next(page_get_infimum_rec(
+ buf_block_get_frame(parentb)));
+
+ while (!page_rec_is_supremum(rec)) {
+ offsets = rec_get_offsets(
+ rec, index, NULL, 0, ULINT_UNDEFINED, &heap);
+
+ if (btr_node_ptr_get_child_page_no(rec, offsets) == page_no) {
+ btr_cur_position(index, rec, parentb, cursor);
+ return(true);
+ }
+
+ rec = page_rec_get_next(rec);
+ }
+
+ return(false);
+}
+
+/*************************************************************//**
+Calculates MBR_AREA(a+b) - MBR_AREA(a)
+Note: when 'a' and 'b' objects are far from each other,
+the area increase can be really big, so this function
+can return 'inf' as a result.
+Return the area increaed. */
+static double
+rtree_area_increase(
+ const uchar* a, /*!< in: original mbr. */
+ const uchar* b, /*!< in: new mbr. */
+ double* ab_area) /*!< out: increased area. */
+{
+ double a_area = 1.0;
+ double loc_ab_area = 1.0;
+ double amin, amax, bmin, bmax;
+ double data_round = 1.0;
+
+ static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double),
+ "compatibility");
+
+ for (auto i = SPDIMS; i--; ) {
+ double area;
+
+ amin = mach_double_read(a);
+ bmin = mach_double_read(b);
+ amax = mach_double_read(a + sizeof(double));
+ bmax = mach_double_read(b + sizeof(double));
+
+ a += 2 * sizeof(double);
+ b += 2 * sizeof(double);
+
+ area = amax - amin;
+ if (area == 0) {
+ a_area *= LINE_MBR_WEIGHTS;
+ } else {
+ a_area *= area;
+ }
+
+ area = (double)std::max(amax, bmax) -
+ (double)std::min(amin, bmin);
+ if (area == 0) {
+ loc_ab_area *= LINE_MBR_WEIGHTS;
+ } else {
+ loc_ab_area *= area;
+ }
+
+ /* Value of amax or bmin can be so large that small difference
+ are ignored. For example: 3.2884281489988079e+284 - 100 =
+ 3.2884281489988079e+284. This results some area difference
+ are not detected */
+ if (loc_ab_area == a_area) {
+ if (bmin < amin || bmax > amax) {
+ data_round *= ((double)std::max(amax, bmax)
+ - amax
+ + (amin - (double)std::min(
+ amin, bmin)));
+ } else {
+ data_round *= area;
+ }
+ }
+ }
+
+ *ab_area = loc_ab_area;
+
+ if (loc_ab_area == a_area && data_round != 1.0) {
+ return(data_round);
+ }
+
+ return(loc_ab_area - a_area);
+}
+
+/** Calculates overlapping area
+@param[in] a mbr a
+@param[in] b mbr b
+@return overlapping area */
+static double rtree_area_overlapping(const byte *a, const byte *b)
+{
+ double area = 1.0;
+ double amin;
+ double amax;
+ double bmin;
+ double bmax;
+
+ static_assert(DATA_MBR_LEN == SPDIMS * 2 * sizeof(double),
+ "compatibility");
+
+ for (auto i = SPDIMS; i--; ) {
+ amin = mach_double_read(a);
+ bmin = mach_double_read(b);
+ amax = mach_double_read(a + sizeof(double));
+ bmax = mach_double_read(b + sizeof(double));
+ a += 2 * sizeof(double);
+ b += 2 * sizeof(double);
+
+ amin = std::max(amin, bmin);
+ amax = std::min(amax, bmax);
+
+ if (amin > amax) {
+ return(0);
+ } else {
+ area *= (amax - amin);
+ }
+ }
+
+ return(area);
+}
+
+/****************************************************************//**
+Calculate the area increased for a new record
+@return area increased */
+double
+rtr_rec_cal_increase(
+/*=================*/
+ const dtuple_t* dtuple, /*!< in: data tuple to insert, which
+ cause area increase */
+ const rec_t* rec, /*!< in: physical record which differs from
+ dtuple in some of the common fields, or which
+ has an equal number or more fields than
+ dtuple */
+ double* area) /*!< out: increased area */
+{
+ const dfield_t* dtuple_field;
+
+ ut_ad(!page_rec_is_supremum(rec));
+ ut_ad(!page_rec_is_infimum(rec));
+
+ dtuple_field = dtuple_get_nth_field(dtuple, 0);
+ ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN);
+
+ return rtree_area_increase(rec,
+ static_cast<const byte*>(
+ dfield_get_data(dtuple_field)),
+ area);
+}
+
+/** Estimates the number of rows in a given area.
+@param[in] index index
+@param[in] tuple range tuple containing mbr, may also be empty tuple
+@param[in] mode search mode
+@return estimated number of rows */
+ha_rows
+rtr_estimate_n_rows_in_range(
+ dict_index_t* index,
+ const dtuple_t* tuple,
+ page_cur_mode_t mode)
+{
+ ut_ad(dict_index_is_spatial(index));
+
+ /* Check tuple & mode */
+ if (tuple->n_fields == 0) {
+ return(HA_POS_ERROR);
+ }
+
+ switch (mode) {
+ case PAGE_CUR_DISJOINT:
+ case PAGE_CUR_CONTAIN:
+ case PAGE_CUR_INTERSECT:
+ case PAGE_CUR_WITHIN:
+ case PAGE_CUR_MBR_EQUAL:
+ break;
+ default:
+ return(HA_POS_ERROR);
+ }
+
+ DBUG_EXECUTE_IF("rtr_pcur_move_to_next_return",
+ return(2);
+ );
+
+ /* Read mbr from tuple. */
+ rtr_mbr_t range_mbr;
+ double range_area;
+
+ const dfield_t* dtuple_field = dtuple_get_nth_field(tuple, 0);
+ ut_ad(dfield_get_len(dtuple_field) >= DATA_MBR_LEN);
+ const byte* range_mbr_ptr = reinterpret_cast<const byte*>(
+ dfield_get_data(dtuple_field));
+
+ rtr_read_mbr(range_mbr_ptr, &range_mbr);
+ range_area = (range_mbr.xmax - range_mbr.xmin)
+ * (range_mbr.ymax - range_mbr.ymin);
+
+ /* Get index root page. */
+ mtr_t mtr;
+
+ mtr.start();
+ index->set_modified(mtr);
+ mtr_s_lock_index(index, &mtr);
+
+ buf_block_t* block = btr_root_block_get(index, RW_S_LATCH, &mtr);
+ if (!block) {
+err_exit:
+ mtr.commit();
+ return HA_POS_ERROR;
+ }
+ const page_t* page = buf_block_get_frame(block);
+ const unsigned n_recs = page_header_get_field(page, PAGE_N_RECS);
+
+ if (n_recs == 0) {
+ goto err_exit;
+ }
+
+ /* Scan records in root page and calculate area. */
+ double area = 0;
+ for (const rec_t* rec = page_rec_get_next(
+ page_get_infimum_rec(block->frame));
+ !page_rec_is_supremum(rec);
+ rec = page_rec_get_next_const(rec)) {
+ rtr_mbr_t mbr;
+ double rec_area;
+
+ rtr_read_mbr(rec, &mbr);
+
+ rec_area = (mbr.xmax - mbr.xmin) * (mbr.ymax - mbr.ymin);
+
+ if (rec_area == 0) {
+ switch (mode) {
+ case PAGE_CUR_CONTAIN:
+ case PAGE_CUR_INTERSECT:
+ area += 1;
+ break;
+
+ case PAGE_CUR_DISJOINT:
+ break;
+
+ case PAGE_CUR_WITHIN:
+ case PAGE_CUR_MBR_EQUAL:
+ if (!rtree_key_cmp(
+ PAGE_CUR_WITHIN, range_mbr_ptr,
+ rec)) {
+ area += 1;
+ }
+
+ break;
+
+ default:
+ ut_error;
+ }
+ } else {
+ switch (mode) {
+ case PAGE_CUR_CONTAIN:
+ case PAGE_CUR_INTERSECT:
+ area += rtree_area_overlapping(
+ range_mbr_ptr, rec)
+ / rec_area;
+ break;
+
+ case PAGE_CUR_DISJOINT:
+ area += 1;
+ area -= rtree_area_overlapping(
+ range_mbr_ptr, rec)
+ / rec_area;
+ break;
+
+ case PAGE_CUR_WITHIN:
+ case PAGE_CUR_MBR_EQUAL:
+ if (!rtree_key_cmp(
+ PAGE_CUR_WITHIN, range_mbr_ptr,
+ rec)) {
+ area += range_area / rec_area;
+ }
+
+ break;
+ default:
+ ut_error;
+ }
+ }
+ }
+
+ mtr.commit();
+
+ if (!std::isfinite(area)) {
+ return(HA_POS_ERROR);
+ }
+
+ area /= n_recs;
+ return ha_rows(static_cast<double>(dict_table_get_n_rows(index->table))
+ * area);
+}
diff --git a/storage/innobase/gis/gis0sea.cc b/storage/innobase/gis/gis0sea.cc
new file mode 100644
index 00000000..1c22aab4
--- /dev/null
+++ b/storage/innobase/gis/gis0sea.cc
@@ -0,0 +1,2052 @@
+/*****************************************************************************
+
+Copyright (c) 2016, 2018, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2017, 2021, MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file gis/gis0sea.cc
+InnoDB R-tree search interfaces
+
+Created 2014/01/16 Jimmy Yang
+***********************************************************************/
+
+#include "fsp0fsp.h"
+#include "page0page.h"
+#include "page0cur.h"
+#include "page0zip.h"
+#include "gis0rtree.h"
+#include "btr0cur.h"
+#include "btr0sea.h"
+#include "btr0pcur.h"
+#include "rem0cmp.h"
+#include "lock0lock.h"
+#include "ibuf0ibuf.h"
+#include "trx0trx.h"
+#include "srv0mon.h"
+#include "que0que.h"
+#include "gis0geo.h"
+
+/** Restore the stored position of a persistent cursor bufferfixing the page */
+static
+bool
+rtr_cur_restore_position(
+ ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
+ btr_cur_t* cursor, /*!< in: detached persistent cursor */
+ ulint level, /*!< in: index level */
+ mtr_t* mtr); /*!< in: mtr */
+
+/*************************************************************//**
+Pop out used parent path entry, until we find the parent with matching
+page number */
+static
+void
+rtr_adjust_parent_path(
+/*===================*/
+ rtr_info_t* rtr_info, /* R-Tree info struct */
+ ulint page_no) /* page number to look for */
+{
+ while (!rtr_info->parent_path->empty()) {
+ if (rtr_info->parent_path->back().child_no == page_no) {
+ break;
+ } else {
+ if (rtr_info->parent_path->back().cursor) {
+ btr_pcur_close(
+ rtr_info->parent_path->back().cursor);
+ ut_free(rtr_info->parent_path->back().cursor);
+ }
+
+ rtr_info->parent_path->pop_back();
+ }
+ }
+}
+
+/*************************************************************//**
+Find the next matching record. This function is used by search
+or record locating during index delete/update.
+@return true if there is suitable record found, otherwise false */
+static
+bool
+rtr_pcur_getnext_from_path(
+/*=======================*/
+ const dtuple_t* tuple, /*!< in: data tuple */
+ page_cur_mode_t mode, /*!< in: cursor search mode */
+ btr_cur_t* btr_cur,/*!< in: persistent cursor; NOTE that the
+ function may release the page latch */
+ ulint target_level,
+ /*!< in: target level */
+ ulint latch_mode,
+ /*!< in: latch_mode */
+ bool index_locked,
+ /*!< in: index tree locked */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_index_t* index = btr_cur->index;
+ bool found = false;
+ page_cur_t* page_cursor;
+ ulint level = 0;
+ node_visit_t next_rec;
+ rtr_info_t* rtr_info = btr_cur->rtr_info;
+ node_seq_t page_ssn;
+ ulint my_latch_mode;
+ ulint skip_parent = false;
+ bool new_split = false;
+ bool need_parent;
+ bool for_delete = false;
+ bool for_undo_ins = false;
+
+ /* exhausted all the pages to be searched */
+ if (rtr_info->path->empty()) {
+ return(false);
+ }
+
+ ut_ad(dtuple_get_n_fields_cmp(tuple));
+
+ my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
+
+ for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
+ for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
+
+ /* There should be no insert coming to this function. Only
+ mode with BTR_MODIFY_* should be delete */
+ ut_ad(mode != PAGE_CUR_RTREE_INSERT);
+ ut_ad(my_latch_mode == BTR_SEARCH_LEAF
+ || my_latch_mode == BTR_MODIFY_LEAF
+ || my_latch_mode == BTR_MODIFY_TREE
+ || my_latch_mode == BTR_CONT_MODIFY_TREE);
+
+ /* Whether need to track parent information. Only need so
+ when we do tree altering operations (such as index page merge) */
+ need_parent = ((my_latch_mode == BTR_MODIFY_TREE
+ || my_latch_mode == BTR_CONT_MODIFY_TREE)
+ && mode == PAGE_CUR_RTREE_LOCATE);
+
+ if (!index_locked) {
+ ut_ad(latch_mode & BTR_SEARCH_LEAF
+ || latch_mode & BTR_MODIFY_LEAF);
+ mtr_s_lock_index(index, mtr);
+ } else {
+ ut_ad(mtr->memo_contains_flagged(&index->lock,
+ MTR_MEMO_SX_LOCK
+ | MTR_MEMO_S_LOCK
+ | MTR_MEMO_X_LOCK));
+ }
+
+ const ulint zip_size = index->table->space->zip_size();
+
+ /* Pop each node/page to be searched from "path" structure
+ and do a search on it. Please note, any pages that are in
+ the "path" structure are protected by "page" lock, so tey
+ cannot be shrunk away */
+ do {
+ buf_block_t* block;
+ node_seq_t path_ssn;
+ const page_t* page;
+ ulint rw_latch = RW_X_LATCH;
+ ulint tree_idx;
+
+ mutex_enter(&rtr_info->rtr_path_mutex);
+ next_rec = rtr_info->path->back();
+ rtr_info->path->pop_back();
+ level = next_rec.level;
+ path_ssn = next_rec.seq_no;
+ tree_idx = btr_cur->tree_height - level - 1;
+
+ /* Maintain the parent path info as well, if needed */
+ if (need_parent && !skip_parent && !new_split) {
+ ulint old_level;
+ ulint new_level;
+
+ ut_ad(!rtr_info->parent_path->empty());
+
+ /* Cleanup unused parent info */
+ if (rtr_info->parent_path->back().cursor) {
+ btr_pcur_close(
+ rtr_info->parent_path->back().cursor);
+ ut_free(rtr_info->parent_path->back().cursor);
+ }
+
+ old_level = rtr_info->parent_path->back().level;
+
+ rtr_info->parent_path->pop_back();
+
+ ut_ad(!rtr_info->parent_path->empty());
+
+ /* check whether there is a level change. If so,
+ the current parent path needs to pop enough
+ nodes to adjust to the new search page */
+ new_level = rtr_info->parent_path->back().level;
+
+ if (old_level < new_level) {
+ rtr_adjust_parent_path(
+ rtr_info, next_rec.page_no);
+ }
+
+ ut_ad(!rtr_info->parent_path->empty());
+
+ ut_ad(next_rec.page_no
+ == rtr_info->parent_path->back().child_no);
+ }
+
+ mutex_exit(&rtr_info->rtr_path_mutex);
+
+ skip_parent = false;
+ new_split = false;
+
+ /* Once we have pages in "path", these pages are
+ predicate page locked, so they can't be shrunk away.
+ They also have SSN (split sequence number) to detect
+ splits, so we can directly latch single page while
+ getting them. They can be unlatched if not qualified.
+ One reason for pre-latch is that we might need to position
+ some parent position (requires latch) during search */
+ if (level == 0) {
+ /* S latched for SEARCH_LEAF, and X latched
+ for MODIFY_LEAF */
+ if (my_latch_mode <= BTR_MODIFY_LEAF) {
+ rw_latch = my_latch_mode;
+ }
+
+ if (my_latch_mode == BTR_CONT_MODIFY_TREE
+ || my_latch_mode == BTR_MODIFY_TREE) {
+ rw_latch = RW_NO_LATCH;
+ }
+
+ } else if (level == target_level) {
+ rw_latch = RW_X_LATCH;
+ }
+
+ /* Release previous locked blocks */
+ if (my_latch_mode != BTR_SEARCH_LEAF) {
+ for (ulint idx = 0; idx < btr_cur->tree_height;
+ idx++) {
+ if (rtr_info->tree_blocks[idx]) {
+ mtr_release_block_at_savepoint(
+ mtr,
+ rtr_info->tree_savepoints[idx],
+ rtr_info->tree_blocks[idx]);
+ rtr_info->tree_blocks[idx] = NULL;
+ }
+ }
+ for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3;
+ idx++) {
+ if (rtr_info->tree_blocks[idx]) {
+ mtr_release_block_at_savepoint(
+ mtr,
+ rtr_info->tree_savepoints[idx],
+ rtr_info->tree_blocks[idx]);
+ rtr_info->tree_blocks[idx] = NULL;
+ }
+ }
+ }
+
+ /* set up savepoint to record any locks to be taken */
+ rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
+
+#ifdef UNIV_RTR_DEBUG
+ ut_ad(!(rw_lock_own_flagged(&btr_cur->page_cur.block->lock,
+ RW_LOCK_FLAG_X | RW_LOCK_FLAG_S))
+ || my_latch_mode == BTR_MODIFY_TREE
+ || my_latch_mode == BTR_CONT_MODIFY_TREE
+ || !page_is_leaf(buf_block_get_frame(
+ btr_cur->page_cur.block)));
+#endif /* UNIV_RTR_DEBUG */
+
+ dberr_t err = DB_SUCCESS;
+
+ block = buf_page_get_gen(
+ page_id_t(index->table->space_id,
+ next_rec.page_no), zip_size,
+ rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr, &err);
+
+ if (block == NULL) {
+ continue;
+ } else if (rw_latch != RW_NO_LATCH) {
+ ut_ad(!dict_index_is_ibuf(index));
+ buf_block_dbg_add_level(block, SYNC_TREE_NODE);
+ }
+
+ rtr_info->tree_blocks[tree_idx] = block;
+
+ page = buf_block_get_frame(block);
+ page_ssn = page_get_ssn_id(page);
+
+ /* If there are splits, push the splitted page.
+ Note that we have SX lock on index->lock, there
+ should not be any split/shrink happening here */
+ if (page_ssn > path_ssn) {
+ uint32_t next_page_no = btr_page_get_next(page);
+ rtr_non_leaf_stack_push(
+ rtr_info->path, next_page_no, path_ssn,
+ level, 0, NULL, 0);
+
+ if (!srv_read_only_mode
+ && mode != PAGE_CUR_RTREE_INSERT
+ && mode != PAGE_CUR_RTREE_LOCATE) {
+ ut_ad(rtr_info->thr);
+ lock_place_prdt_page_lock(
+ page_id_t(block->page.id().space(),
+ next_page_no),
+ index,
+ rtr_info->thr);
+ }
+ new_split = true;
+#if defined(UNIV_GIS_DEBUG)
+ fprintf(stderr,
+ "GIS_DIAG: Splitted page found: %d, %ld\n",
+ static_cast<int>(need_parent), next_page_no);
+#endif
+ }
+
+ page_cursor = btr_cur_get_page_cur(btr_cur);
+ page_cursor->rec = NULL;
+
+ if (mode == PAGE_CUR_RTREE_LOCATE) {
+ if (level == target_level && level == 0) {
+ ulint low_match;
+
+ found = false;
+
+ low_match = page_cur_search(
+ block, index, tuple,
+ PAGE_CUR_LE,
+ btr_cur_get_page_cur(btr_cur));
+
+ if (low_match == dtuple_get_n_fields_cmp(
+ tuple)) {
+ rec_t* rec = btr_cur_get_rec(btr_cur);
+
+ if (!rec_get_deleted_flag(rec,
+ dict_table_is_comp(index->table))
+ || (!for_delete && !for_undo_ins)) {
+ found = true;
+ btr_cur->low_match = low_match;
+ } else {
+ /* mark we found deleted row */
+ btr_cur->rtr_info->fd_del
+ = true;
+ }
+ }
+ } else {
+ page_cur_mode_t page_mode = mode;
+
+ if (level == target_level
+ && target_level != 0) {
+ page_mode = PAGE_CUR_RTREE_GET_FATHER;
+ }
+ found = rtr_cur_search_with_match(
+ block, index, tuple, page_mode,
+ page_cursor, btr_cur->rtr_info);
+
+ /* Save the position of parent if needed */
+ if (found && need_parent) {
+ btr_pcur_t* r_cursor =
+ rtr_get_parent_cursor(
+ btr_cur, level, false);
+
+ rec_t* rec = page_cur_get_rec(
+ page_cursor);
+ page_cur_position(
+ rec, block,
+ btr_pcur_get_page_cur(r_cursor));
+ r_cursor->pos_state =
+ BTR_PCUR_IS_POSITIONED;
+ r_cursor->latch_mode = my_latch_mode;
+ btr_pcur_store_position(r_cursor, mtr);
+#ifdef UNIV_DEBUG
+ ulint num_stored =
+ rtr_store_parent_path(
+ block, btr_cur,
+ rw_latch, level, mtr);
+ ut_ad(num_stored > 0);
+#else
+ rtr_store_parent_path(
+ block, btr_cur, rw_latch,
+ level, mtr);
+#endif /* UNIV_DEBUG */
+ }
+ }
+ } else {
+ found = rtr_cur_search_with_match(
+ block, index, tuple, mode, page_cursor,
+ btr_cur->rtr_info);
+ }
+
+ /* Attach predicate lock if needed, no matter whether
+ there are matched records */
+ if (mode != PAGE_CUR_RTREE_INSERT
+ && mode != PAGE_CUR_RTREE_LOCATE
+ && mode >= PAGE_CUR_CONTAIN
+ && btr_cur->rtr_info->need_prdt_lock) {
+ lock_prdt_t prdt;
+
+ trx_t* trx = thr_get_trx(
+ btr_cur->rtr_info->thr);
+ lock_mutex_enter();
+ lock_init_prdt_from_mbr(
+ &prdt, &btr_cur->rtr_info->mbr,
+ mode, trx->lock.lock_heap);
+ lock_mutex_exit();
+
+ if (rw_latch == RW_NO_LATCH) {
+ rw_lock_s_lock(&(block->lock));
+ }
+
+ lock_prdt_lock(block, &prdt, index, LOCK_S,
+ LOCK_PREDICATE, btr_cur->rtr_info->thr);
+
+ if (rw_latch == RW_NO_LATCH) {
+ rw_lock_s_unlock(&(block->lock));
+ }
+ }
+
+ if (found) {
+ if (level == target_level) {
+ page_cur_t* r_cur;;
+
+ if (my_latch_mode == BTR_MODIFY_TREE
+ && level == 0) {
+ ut_ad(rw_latch == RW_NO_LATCH);
+
+ btr_cur_latch_leaves(
+ block,
+ BTR_MODIFY_TREE,
+ btr_cur, mtr);
+ }
+
+ r_cur = btr_cur_get_page_cur(btr_cur);
+
+ page_cur_position(
+ page_cur_get_rec(page_cursor),
+ page_cur_get_block(page_cursor),
+ r_cur);
+
+ btr_cur->low_match = level != 0 ?
+ DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
+ : btr_cur->low_match;
+ break;
+ }
+
+ /* Keep the parent path node, which points to
+ last node just located */
+ skip_parent = true;
+ } else {
+ /* Release latch on the current page */
+ ut_ad(rtr_info->tree_blocks[tree_idx]);
+
+ mtr_release_block_at_savepoint(
+ mtr, rtr_info->tree_savepoints[tree_idx],
+ rtr_info->tree_blocks[tree_idx]);
+ rtr_info->tree_blocks[tree_idx] = NULL;
+ }
+
+ } while (!rtr_info->path->empty());
+
+ const rec_t* rec = btr_cur_get_rec(btr_cur);
+
+ if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
+ mtr_commit(mtr);
+ mtr_start(mtr);
+ } else if (!index_locked) {
+ mtr_memo_release(mtr, dict_index_get_lock(index),
+ MTR_MEMO_X_LOCK);
+ }
+
+ return(found);
+}
+
+/*************************************************************//**
+Find the next matching record. This function will first exhaust
+the copied record listed in the rtr_info->matches vector before
+moving to the next page
+@return true if there is suitable record found, otherwise false */
+bool
+rtr_pcur_move_to_next(
+/*==================*/
+ const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
+ tuple must be set so that it cannot get
+ compared to the node ptr page number field! */
+ page_cur_mode_t mode, /*!< in: cursor search mode */
+ btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the
+ function may release the page latch */
+ ulint level, /*!< in: target level */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ rtr_info_t* rtr_info = cursor->btr_cur.rtr_info;
+
+ ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
+
+ mutex_enter(&rtr_info->matches->rtr_match_mutex);
+ /* First retrieve the next record on the current page */
+ if (!rtr_info->matches->matched_recs->empty()) {
+ rtr_rec_t rec;
+ rec = rtr_info->matches->matched_recs->back();
+ rtr_info->matches->matched_recs->pop_back();
+ mutex_exit(&rtr_info->matches->rtr_match_mutex);
+
+ cursor->btr_cur.page_cur.rec = rec.r_rec;
+ cursor->btr_cur.page_cur.block = &rtr_info->matches->block;
+
+ DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
+ return(true);
+ }
+
+ mutex_exit(&rtr_info->matches->rtr_match_mutex);
+
+ /* Fetch the next page */
+ return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur,
+ level, cursor->latch_mode,
+ false, mtr));
+}
+
+/*************************************************************//**
+Check if the cursor holds record pointing to the specified child page
+@return true if it is (pointing to the child page) false otherwise */
+static
+bool
+rtr_compare_cursor_rec(
+/*===================*/
+ dict_index_t* index, /*!< in: index */
+ btr_cur_t* cursor, /*!< in: Cursor to check */
+ ulint page_no, /*!< in: desired child page number */
+ mem_heap_t** heap) /*!< in: memory heap */
+{
+ const rec_t* rec;
+ rec_offs* offsets;
+
+ rec = btr_cur_get_rec(cursor);
+
+ offsets = rec_get_offsets(rec, index, NULL, 0, ULINT_UNDEFINED, heap);
+
+ return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
+}
+
+/**************************************************************//**
+Initializes and opens a persistent cursor to an index tree. It should be
+closed with btr_pcur_close. Mainly called by row_search_index_entry() */
+void
+rtr_pcur_open_low(
+/*==============*/
+ dict_index_t* index, /*!< in: index */
+ ulint level, /*!< in: level in the rtree */
+ const dtuple_t* tuple, /*!< in: tuple on which search done */
+ page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_LOCATE, ... */
+ ulint latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */
+ btr_pcur_t* cursor, /*!< in: memory buffer for persistent cursor */
+ const char* file, /*!< in: file name */
+ unsigned line, /*!< in: line where called */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ btr_cur_t* btr_cursor;
+ ulint n_fields;
+ ulint low_match;
+ rec_t* rec;
+ bool tree_latched = false;
+ bool for_delete = false;
+ bool for_undo_ins = false;
+
+ ut_ad(level == 0);
+
+ ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
+ ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
+
+ /* Initialize the cursor */
+
+ btr_pcur_init(cursor);
+
+ for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
+ for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
+
+ cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
+ cursor->search_mode = mode;
+
+ /* Search with the tree cursor */
+
+ btr_cursor = btr_pcur_get_btr_cur(cursor);
+
+ btr_cursor->rtr_info = rtr_create_rtr_info(false, false,
+ btr_cursor, index);
+
+ /* Purge will SX lock the tree instead of take Page Locks */
+ if (btr_cursor->thr) {
+ btr_cursor->rtr_info->need_page_lock = true;
+ btr_cursor->rtr_info->thr = btr_cursor->thr;
+ }
+
+ btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode,
+ btr_cursor, 0, file, line, mtr);
+ cursor->pos_state = BTR_PCUR_IS_POSITIONED;
+
+ cursor->trx_if_known = NULL;
+
+ low_match = btr_pcur_get_low_match(cursor);
+
+ rec = btr_pcur_get_rec(cursor);
+
+ n_fields = dtuple_get_n_fields(tuple);
+
+ if (latch_mode & BTR_ALREADY_S_LATCHED) {
+ ut_ad(mtr->memo_contains(index->lock, MTR_MEMO_S_LOCK));
+ tree_latched = true;
+ }
+
+ if (latch_mode & BTR_MODIFY_TREE) {
+ ut_ad(mtr->memo_contains_flagged(&index->lock,
+ MTR_MEMO_X_LOCK
+ | MTR_MEMO_SX_LOCK));
+ tree_latched = true;
+ }
+
+ if (page_rec_is_infimum(rec) || low_match != n_fields
+ || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
+ && (for_delete || for_undo_ins))) {
+
+ if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
+ && for_delete) {
+ btr_cursor->rtr_info->fd_del = true;
+ btr_cursor->low_match = 0;
+ }
+ /* Did not find matched row in first dive. Release
+ latched block if any before search more pages */
+ if (latch_mode & BTR_MODIFY_LEAF) {
+ ulint tree_idx = btr_cursor->tree_height - 1;
+ rtr_info_t* rtr_info = btr_cursor->rtr_info;
+
+ ut_ad(level == 0);
+
+ if (rtr_info->tree_blocks[tree_idx]) {
+ mtr_release_block_at_savepoint(
+ mtr,
+ rtr_info->tree_savepoints[tree_idx],
+ rtr_info->tree_blocks[tree_idx]);
+ rtr_info->tree_blocks[tree_idx] = NULL;
+ }
+ }
+
+ bool ret = rtr_pcur_getnext_from_path(
+ tuple, mode, btr_cursor, level, latch_mode,
+ tree_latched, mtr);
+
+ if (ret) {
+ low_match = btr_pcur_get_low_match(cursor);
+ ut_ad(low_match == n_fields);
+ }
+ }
+}
+
+/* Get the rtree page father.
+@param[in] index rtree index
+@param[in] block child page in the index
+@param[in] mtr mtr
+@param[in] sea_cur search cursor, contains information
+ about parent nodes in search
+@param[in] cursor cursor on node pointer record,
+ its page x-latched */
+void
+rtr_page_get_father(
+ dict_index_t* index,
+ buf_block_t* block,
+ mtr_t* mtr,
+ btr_cur_t* sea_cur,
+ btr_cur_t* cursor)
+{
+ mem_heap_t* heap = mem_heap_create(100);
+#ifdef UNIV_DEBUG
+ rec_offs* offsets;
+
+ offsets = rtr_page_get_father_block(
+ NULL, heap, index, block, mtr, sea_cur, cursor);
+
+ ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec,
+ offsets);
+
+ ut_ad(page_no == block->page.id().page_no());
+#else
+ rtr_page_get_father_block(
+ NULL, heap, index, block, mtr, sea_cur, cursor);
+#endif
+
+ mem_heap_free(heap);
+}
+
+/********************************************************************//**
+Returns the upper level node pointer to a R-Tree page. It is assumed
+that mtr holds an x-latch on the tree. */
+static void rtr_get_father_node(
+ dict_index_t* index, /*!< in: index */
+ ulint level, /*!< in: the tree level of search */
+ const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
+ tuple must be set so that it cannot get
+ compared to the node ptr page number field! */
+ btr_cur_t* sea_cur,/*!< in: search cursor */
+ btr_cur_t* btr_cur,/*!< in/out: tree cursor; the cursor page is
+ s- or x-latched, but see also above! */
+ ulint page_no,/*!< Current page no */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ mem_heap_t* heap = NULL;
+ bool ret = false;
+ const rec_t* rec;
+ ulint n_fields;
+ bool new_rtr = false;
+
+ /* Try to optimally locate the parent node. Level should always
+ less than sea_cur->tree_height unless the root is splitting */
+ if (sea_cur && sea_cur->tree_height > level) {
+ ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
+ | MTR_MEMO_SX_LOCK));
+ ret = rtr_cur_restore_position(
+ BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
+
+ /* Once we block shrink tree nodes while there are
+ active search on it, this optimal locating should always
+ succeeds */
+ ut_ad(ret);
+
+ if (ret) {
+ btr_pcur_t* r_cursor = rtr_get_parent_cursor(
+ sea_cur, level, false);
+
+ rec = btr_pcur_get_rec(r_cursor);
+
+ ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
+ page_cur_position(rec,
+ btr_pcur_get_block(r_cursor),
+ btr_cur_get_page_cur(btr_cur));
+ btr_cur->rtr_info = sea_cur->rtr_info;
+ btr_cur->tree_height = sea_cur->tree_height;
+ ut_ad(rtr_compare_cursor_rec(
+ index, btr_cur, page_no, &heap));
+ goto func_exit;
+ }
+ }
+
+ /* We arrive here in one of two scenario
+ 1) check table and btr_valide
+ 2) index root page being raised */
+ ut_ad(!sea_cur || sea_cur->tree_height == level);
+
+ if (btr_cur->rtr_info) {
+ rtr_clean_rtr_info(btr_cur->rtr_info, true);
+ } else {
+ new_rtr = true;
+ }
+
+ btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
+
+ if (sea_cur && sea_cur->tree_height == level) {
+ /* root split, and search the new root */
+ btr_cur_search_to_nth_level(
+ index, level, tuple, PAGE_CUR_RTREE_LOCATE,
+ BTR_CONT_MODIFY_TREE, btr_cur, 0,
+ __FILE__, __LINE__, mtr);
+
+ } else {
+ /* btr_validate */
+ ut_ad(level >= 1);
+ ut_ad(!sea_cur);
+
+ btr_cur_search_to_nth_level(
+ index, level, tuple, PAGE_CUR_RTREE_LOCATE,
+ BTR_CONT_MODIFY_TREE, btr_cur, 0,
+ __FILE__, __LINE__, mtr);
+
+ rec = btr_cur_get_rec(btr_cur);
+ n_fields = dtuple_get_n_fields_cmp(tuple);
+
+ if (page_rec_is_infimum(rec)
+ || (btr_cur->low_match != n_fields)) {
+ ret = rtr_pcur_getnext_from_path(
+ tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
+ level, BTR_CONT_MODIFY_TREE,
+ true, mtr);
+
+ ut_ad(ret && btr_cur->low_match == n_fields);
+ }
+ }
+
+ ret = rtr_compare_cursor_rec(
+ index, btr_cur, page_no, &heap);
+
+ ut_ad(ret);
+
+func_exit:
+ if (heap) {
+ mem_heap_free(heap);
+ }
+
+ if (new_rtr && btr_cur->rtr_info) {
+ rtr_clean_rtr_info(btr_cur->rtr_info, true);
+ btr_cur->rtr_info = NULL;
+ }
+}
+
+/** Returns the upper level node pointer to a R-Tree page. It is assumed
+that mtr holds an SX-latch or X-latch on the tree.
+@return rec_get_offsets() of the node pointer record */
+static
+rec_offs*
+rtr_page_get_father_node_ptr(
+ rec_offs* offsets,/*!< in: work area for the return value */
+ mem_heap_t* heap, /*!< in: memory heap to use */
+ btr_cur_t* sea_cur,/*!< in: search cursor */
+ btr_cur_t* cursor, /*!< in: cursor pointing to user record,
+ out: cursor on node pointer record,
+ its page x-latched */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dtuple_t* tuple;
+ rec_t* user_rec;
+ rec_t* node_ptr;
+ ulint level;
+ ulint page_no;
+ dict_index_t* index;
+ rtr_mbr_t mbr;
+
+ page_no = btr_cur_get_block(cursor)->page.id().page_no();
+ index = btr_cur_get_index(cursor);
+
+ ut_ad(srv_read_only_mode
+ || mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
+ | MTR_MEMO_SX_LOCK));
+
+ ut_ad(dict_index_get_page(index) != page_no);
+
+ level = btr_page_get_level(btr_cur_get_page(cursor));
+
+ user_rec = btr_cur_get_rec(cursor);
+ ut_a(page_rec_is_user_rec(user_rec));
+
+ offsets = rec_get_offsets(user_rec, index, offsets,
+ level ? 0 : index->n_fields,
+ ULINT_UNDEFINED, &heap);
+ rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
+
+ tuple = rtr_index_build_node_ptr(
+ index, &mbr, user_rec, page_no, heap);
+
+ if (sea_cur && !sea_cur->rtr_info) {
+ sea_cur = NULL;
+ }
+
+ rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor,
+ page_no, mtr);
+
+ node_ptr = btr_cur_get_rec(cursor);
+ ut_ad(!page_rec_is_comp(node_ptr)
+ || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
+ offsets = rec_get_offsets(node_ptr, index, offsets, 0,
+ ULINT_UNDEFINED, &heap);
+
+ ulint child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
+
+ if (child_page != page_no) {
+ const rec_t* print_rec;
+
+ ib::fatal error;
+
+ error << "Corruption of index " << index->name
+ << " of table " << index->table->name
+ << " parent page " << page_no
+ << " child page " << child_page;
+
+ print_rec = page_rec_get_next(
+ page_get_infimum_rec(page_align(user_rec)));
+ offsets = rec_get_offsets(print_rec, index, offsets,
+ page_rec_is_leaf(user_rec)
+ ? index->n_fields : 0,
+ ULINT_UNDEFINED, &heap);
+ error << "; child ";
+ rec_print(error.m_oss, print_rec,
+ rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
+ offsets);
+ offsets = rec_get_offsets(node_ptr, index, offsets, 0,
+ ULINT_UNDEFINED, &heap);
+ error << "; parent ";
+ rec_print(error.m_oss, print_rec,
+ rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
+ offsets);
+
+ error << ". You should dump + drop + reimport the table to"
+ " fix the corruption. If the crash happens at"
+ " database startup, see "
+ "https://mariadb.com/kb/en/library/innodb-recovery-modes/"
+ " about forcing"
+ " recovery. Then dump + drop + reimport.";
+ }
+
+ return(offsets);
+}
+
+/************************************************************//**
+Returns the father block to a page. It is assumed that mtr holds
+an X or SX latch on the tree.
+@return rec_get_offsets() of the node pointer record */
+rec_offs*
+rtr_page_get_father_block(
+/*======================*/
+ rec_offs* offsets,/*!< in: work area for the return value */
+ mem_heap_t* heap, /*!< in: memory heap to use */
+ dict_index_t* index, /*!< in: b-tree index */
+ buf_block_t* block, /*!< in: child page in the index */
+ mtr_t* mtr, /*!< in: mtr */
+ btr_cur_t* sea_cur,/*!< in: search cursor, contains information
+ about parent nodes in search */
+ btr_cur_t* cursor) /*!< out: cursor on node pointer record,
+ its page x-latched */
+{
+ rec_t* rec = page_rec_get_next(
+ page_get_infimum_rec(buf_block_get_frame(block)));
+ btr_cur_position(index, rec, block, cursor);
+
+ return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur,
+ cursor, mtr));
+}
+
+/*******************************************************************//**
+Create a RTree search info structure */
+rtr_info_t*
+rtr_create_rtr_info(
+/******************/
+ bool need_prdt, /*!< in: Whether predicate lock
+ is needed */
+ bool init_matches, /*!< in: Whether to initiate the
+ "matches" structure for collecting
+ matched leaf records */
+ btr_cur_t* cursor, /*!< in: tree search cursor */
+ dict_index_t* index) /*!< in: index struct */
+{
+ rtr_info_t* rtr_info;
+
+ index = index ? index : cursor->index;
+ ut_ad(index);
+
+ rtr_info = static_cast<rtr_info_t*>(ut_zalloc_nokey(sizeof(*rtr_info)));
+
+ rtr_info->allocated = true;
+ rtr_info->cursor = cursor;
+ rtr_info->index = index;
+
+ if (init_matches) {
+ rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
+ rtr_info->matches = static_cast<matched_rec_t*>(
+ mem_heap_zalloc(
+ rtr_info->heap,
+ sizeof(*rtr_info->matches)));
+
+ rtr_info->matches->matched_recs
+ = UT_NEW_NOKEY(rtr_rec_vector());
+
+ rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf
+ + UNIV_PAGE_SIZE_MAX + 1);
+ mutex_create(LATCH_ID_RTR_MATCH_MUTEX,
+ &rtr_info->matches->rtr_match_mutex);
+ rw_lock_create(PFS_NOT_INSTRUMENTED,
+ &(rtr_info->matches->block.lock),
+ SYNC_LEVEL_VARYING);
+ }
+
+ rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
+ rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
+ rtr_info->need_prdt_lock = need_prdt;
+ mutex_create(LATCH_ID_RTR_PATH_MUTEX,
+ &rtr_info->rtr_path_mutex);
+
+ mutex_enter(&index->rtr_track->rtr_active_mutex);
+ index->rtr_track->rtr_active.push_front(rtr_info);
+ mutex_exit(&index->rtr_track->rtr_active_mutex);
+ return(rtr_info);
+}
+
+/*******************************************************************//**
+Update a btr_cur_t with rtr_info */
+void
+rtr_info_update_btr(
+/******************/
+ btr_cur_t* cursor, /*!< in/out: tree cursor */
+ rtr_info_t* rtr_info) /*!< in: rtr_info to set to the
+ cursor */
+{
+ ut_ad(rtr_info);
+
+ cursor->rtr_info = rtr_info;
+}
+
+/*******************************************************************//**
+Initialize a R-Tree Search structure */
+void
+rtr_init_rtr_info(
+/****************/
+ rtr_info_t* rtr_info, /*!< in: rtr_info to set to the
+ cursor */
+ bool need_prdt, /*!< in: Whether predicate lock is
+ needed */
+ btr_cur_t* cursor, /*!< in: tree search cursor */
+ dict_index_t* index, /*!< in: index structure */
+ bool reinit) /*!< in: Whether this is a reinit */
+{
+ ut_ad(rtr_info);
+
+ if (!reinit) {
+ /* Reset all members. */
+ rtr_info->path = NULL;
+ rtr_info->parent_path = NULL;
+ rtr_info->matches = NULL;
+
+ mutex_create(LATCH_ID_RTR_PATH_MUTEX,
+ &rtr_info->rtr_path_mutex);
+
+ memset(rtr_info->tree_blocks, 0x0,
+ sizeof(rtr_info->tree_blocks));
+ memset(rtr_info->tree_savepoints, 0x0,
+ sizeof(rtr_info->tree_savepoints));
+ rtr_info->mbr.xmin = 0.0;
+ rtr_info->mbr.xmax = 0.0;
+ rtr_info->mbr.ymin = 0.0;
+ rtr_info->mbr.ymax = 0.0;
+ rtr_info->thr = NULL;
+ rtr_info->heap = NULL;
+ rtr_info->cursor = NULL;
+ rtr_info->index = NULL;
+ rtr_info->need_prdt_lock = false;
+ rtr_info->need_page_lock = false;
+ rtr_info->allocated = false;
+ rtr_info->mbr_adj = false;
+ rtr_info->fd_del = false;
+ rtr_info->search_tuple = NULL;
+ rtr_info->search_mode = PAGE_CUR_UNSUPP;
+ }
+
+ ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
+
+ rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
+ rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
+ rtr_info->need_prdt_lock = need_prdt;
+ rtr_info->cursor = cursor;
+ rtr_info->index = index;
+
+ mutex_enter(&index->rtr_track->rtr_active_mutex);
+ index->rtr_track->rtr_active.push_front(rtr_info);
+ mutex_exit(&index->rtr_track->rtr_active_mutex);
+}
+
+/**************************************************************//**
+Clean up R-Tree search structure */
+void
+rtr_clean_rtr_info(
+/*===============*/
+ rtr_info_t* rtr_info, /*!< in: RTree search info */
+ bool free_all) /*!< in: need to free rtr_info itself */
+{
+ dict_index_t* index;
+ bool initialized = false;
+
+ if (!rtr_info) {
+ return;
+ }
+
+ index = rtr_info->index;
+
+ if (index) {
+ mutex_enter(&index->rtr_track->rtr_active_mutex);
+ }
+
+ while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
+ btr_pcur_t* cur = rtr_info->parent_path->back().cursor;
+ rtr_info->parent_path->pop_back();
+
+ if (cur) {
+ btr_pcur_close(cur);
+ ut_free(cur);
+ }
+ }
+
+ UT_DELETE(rtr_info->parent_path);
+ rtr_info->parent_path = NULL;
+
+ if (rtr_info->path != NULL) {
+ UT_DELETE(rtr_info->path);
+ rtr_info->path = NULL;
+ initialized = true;
+ }
+
+ if (rtr_info->matches) {
+ rtr_info->matches->used = false;
+ rtr_info->matches->locked = false;
+ rtr_info->matches->valid = false;
+ rtr_info->matches->matched_recs->clear();
+ }
+
+ if (index) {
+ index->rtr_track->rtr_active.remove(rtr_info);
+ mutex_exit(&index->rtr_track->rtr_active_mutex);
+ }
+
+ if (free_all) {
+ if (rtr_info->matches) {
+ if (rtr_info->matches->matched_recs != NULL) {
+ UT_DELETE(rtr_info->matches->matched_recs);
+ }
+
+ rw_lock_free(&(rtr_info->matches->block.lock));
+
+ mutex_destroy(&rtr_info->matches->rtr_match_mutex);
+ }
+
+ if (rtr_info->heap) {
+ mem_heap_free(rtr_info->heap);
+ }
+
+ if (initialized) {
+ mutex_destroy(&rtr_info->rtr_path_mutex);
+ }
+
+ if (rtr_info->allocated) {
+ ut_free(rtr_info);
+ }
+ }
+}
+
+/**************************************************************//**
+Rebuilt the "path" to exclude the removing page no */
+static
+void
+rtr_rebuild_path(
+/*=============*/
+ rtr_info_t* rtr_info, /*!< in: RTree search info */
+ ulint page_no) /*!< in: need to free rtr_info itself */
+{
+ rtr_node_path_t* new_path
+ = UT_NEW_NOKEY(rtr_node_path_t());
+
+ rtr_node_path_t::iterator rit;
+#ifdef UNIV_DEBUG
+ ulint before_size = rtr_info->path->size();
+#endif /* UNIV_DEBUG */
+
+ for (rit = rtr_info->path->begin();
+ rit != rtr_info->path->end(); ++rit) {
+ node_visit_t next_rec = *rit;
+
+ if (next_rec.page_no == page_no) {
+ continue;
+ }
+
+ new_path->push_back(next_rec);
+#ifdef UNIV_DEBUG
+ node_visit_t rec = new_path->back();
+ ut_ad(rec.level < rtr_info->cursor->tree_height
+ && rec.page_no > 0);
+#endif /* UNIV_DEBUG */
+ }
+
+ UT_DELETE(rtr_info->path);
+
+ ut_ad(new_path->size() == before_size - 1);
+
+ rtr_info->path = new_path;
+
+ if (!rtr_info->parent_path->empty()) {
+ rtr_node_path_t* new_parent_path = UT_NEW_NOKEY(
+ rtr_node_path_t());
+
+ for (rit = rtr_info->parent_path->begin();
+ rit != rtr_info->parent_path->end(); ++rit) {
+ node_visit_t next_rec = *rit;
+
+ if (next_rec.child_no == page_no) {
+ btr_pcur_t* cur = next_rec.cursor;
+
+ if (cur) {
+ btr_pcur_close(cur);
+ ut_free(cur);
+ }
+
+ continue;
+ }
+
+ new_parent_path->push_back(next_rec);
+ }
+ UT_DELETE(rtr_info->parent_path);
+ rtr_info->parent_path = new_parent_path;
+ }
+
+}
+
+/**************************************************************//**
+Check whether a discarding page is in anyone's search path */
+void
+rtr_check_discard_page(
+/*===================*/
+ dict_index_t* index, /*!< in: index */
+ btr_cur_t* cursor, /*!< in: cursor on the page to discard: not on
+ the root page */
+ buf_block_t* block) /*!< in: block of page to be discarded */
+{
+ const ulint pageno = block->page.id().page_no();
+
+ mutex_enter(&index->rtr_track->rtr_active_mutex);
+
+ for (const auto& rtr_info : index->rtr_track->rtr_active) {
+ if (cursor && rtr_info == cursor->rtr_info) {
+ continue;
+ }
+
+ mutex_enter(&rtr_info->rtr_path_mutex);
+ for (const node_visit_t& node : *rtr_info->path) {
+ if (node.page_no == pageno) {
+ rtr_rebuild_path(rtr_info, pageno);
+ break;
+ }
+ }
+ mutex_exit(&rtr_info->rtr_path_mutex);
+
+ if (rtr_info->matches) {
+ mutex_enter(&rtr_info->matches->rtr_match_mutex);
+
+ if ((&rtr_info->matches->block)->page.id().page_no()
+ == pageno) {
+ if (!rtr_info->matches->matched_recs->empty()) {
+ rtr_info->matches->matched_recs->clear();
+ }
+ ut_ad(rtr_info->matches->matched_recs->empty());
+ rtr_info->matches->valid = false;
+ }
+
+ mutex_exit(&rtr_info->matches->rtr_match_mutex);
+ }
+ }
+
+ mutex_exit(&index->rtr_track->rtr_active_mutex);
+
+ lock_mutex_enter();
+ lock_prdt_page_free_from_discard(block, &lock_sys.prdt_hash);
+ lock_prdt_page_free_from_discard(block, &lock_sys.prdt_page_hash);
+ lock_mutex_exit();
+}
+
+/** Structure acts as functor to get the optimistic access of the page.
+It returns true if it successfully gets the page. */
+struct optimistic_get
+{
+ btr_pcur_t *const r_cursor;
+ mtr_t *const mtr;
+
+ optimistic_get(btr_pcur_t *r_cursor,mtr_t *mtr)
+ :r_cursor(r_cursor), mtr(mtr) {}
+
+ bool operator()(buf_block_t *hint) const
+ {
+ return hint && buf_page_optimistic_get(
+ RW_X_LATCH, hint, r_cursor->modify_clock, __FILE__,
+ __LINE__, mtr);
+ }
+};
+
+/** Restore the stored position of a persistent cursor bufferfixing the page */
+static
+bool
+rtr_cur_restore_position(
+ ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
+ btr_cur_t* btr_cur, /*!< in: detached persistent cursor */
+ ulint level, /*!< in: index level */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_index_t* index;
+ mem_heap_t* heap;
+ btr_pcur_t* r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
+ dtuple_t* tuple;
+ bool ret = false;
+
+ ut_ad(mtr);
+ ut_ad(r_cursor);
+ ut_ad(mtr->is_active());
+
+ index = btr_cur_get_index(btr_cur);
+
+ if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
+ || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
+ return(false);
+ }
+
+ DBUG_EXECUTE_IF(
+ "rtr_pessimistic_position",
+ r_cursor->modify_clock = 100;
+ );
+
+ ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
+
+ if (r_cursor->block_when_stored.run_with_hint(
+ optimistic_get(r_cursor, mtr))) {
+ ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED);
+
+ ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
+#ifdef UNIV_DEBUG
+ do {
+ const rec_t* rec;
+ const rec_offs* offsets1;
+ const rec_offs* offsets2;
+ ulint comp;
+
+ rec = btr_pcur_get_rec(r_cursor);
+
+ heap = mem_heap_create(256);
+ offsets1 = rec_get_offsets(
+ r_cursor->old_rec, index, NULL,
+ level ? 0 : r_cursor->old_n_fields,
+ r_cursor->old_n_fields, &heap);
+ offsets2 = rec_get_offsets(
+ rec, index, NULL,
+ level ? 0 : r_cursor->old_n_fields,
+ r_cursor->old_n_fields, &heap);
+
+ comp = rec_offs_comp(offsets1);
+
+ if (rec_get_info_bits(r_cursor->old_rec, comp)
+ & REC_INFO_MIN_REC_FLAG) {
+ ut_ad(rec_get_info_bits(rec, comp)
+ & REC_INFO_MIN_REC_FLAG);
+ } else {
+
+ ut_ad(!cmp_rec_rec(r_cursor->old_rec,
+ rec, offsets1, offsets2,
+ index));
+ }
+
+ mem_heap_free(heap);
+ } while (0);
+#endif /* UNIV_DEBUG */
+
+ return(true);
+ }
+
+ /* Page has changed, for R-Tree, the page cannot be shrunk away,
+ so we search the page and its right siblings */
+ buf_block_t* block;
+ node_seq_t page_ssn;
+ const page_t* page;
+ page_cur_t* page_cursor;
+ node_visit_t* node = rtr_get_parent_node(btr_cur, level, false);
+ node_seq_t path_ssn = node->seq_no;
+ const unsigned zip_size = index->table->space->zip_size();
+ uint32_t page_no = node->page_no;
+
+ heap = mem_heap_create(256);
+
+ tuple = dict_index_build_data_tuple(r_cursor->old_rec, index, !level,
+ r_cursor->old_n_fields, heap);
+
+ page_cursor = btr_pcur_get_page_cur(r_cursor);
+ ut_ad(r_cursor == node->cursor);
+
+search_again:
+ dberr_t err = DB_SUCCESS;
+
+ block = buf_page_get_gen(
+ page_id_t(index->table->space_id, page_no),
+ zip_size, RW_X_LATCH, NULL,
+ BUF_GET, __FILE__, __LINE__, mtr, &err);
+
+ ut_ad(block);
+
+ /* Get the page SSN */
+ page = buf_block_get_frame(block);
+ page_ssn = page_get_ssn_id(page);
+
+ ulint low_match = page_cur_search(
+ block, index, tuple, PAGE_CUR_LE, page_cursor);
+
+ if (low_match == r_cursor->old_n_fields) {
+ const rec_t* rec;
+ const rec_offs* offsets1;
+ const rec_offs* offsets2;
+ ulint comp;
+
+ rec = btr_pcur_get_rec(r_cursor);
+
+ offsets1 = rec_get_offsets(r_cursor->old_rec, index, NULL,
+ level ? 0 : r_cursor->old_n_fields,
+ r_cursor->old_n_fields, &heap);
+ offsets2 = rec_get_offsets(rec, index, NULL,
+ level ? 0 : r_cursor->old_n_fields,
+ r_cursor->old_n_fields, &heap);
+
+ comp = rec_offs_comp(offsets1);
+
+ if ((rec_get_info_bits(r_cursor->old_rec, comp)
+ & REC_INFO_MIN_REC_FLAG)
+ && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
+ r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
+ ret = true;
+ } else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2,
+ index)) {
+ r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
+ ret = true;
+ }
+ }
+
+ /* Check the page SSN to see if it has been splitted, if so, search
+ the right page */
+ if (!ret && page_ssn > path_ssn) {
+ page_no = btr_page_get_next(page);
+ goto search_again;
+ }
+
+ mem_heap_free(heap);
+
+ return(ret);
+}
+
+/****************************************************************//**
+Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
+static
+void
+rtr_leaf_push_match_rec(
+/*====================*/
+ const rec_t* rec, /*!< in: record to copy */
+ rtr_info_t* rtr_info, /*!< in/out: search stack */
+ rec_offs* offsets, /*!< in: offsets */
+ bool is_comp) /*!< in: is compact format */
+{
+ byte* buf;
+ matched_rec_t* match_rec = rtr_info->matches;
+ rec_t* copy;
+ ulint data_len;
+ rtr_rec_t rtr_rec;
+
+ buf = match_rec->block.frame + match_rec->used;
+ ut_ad(page_rec_is_leaf(rec));
+
+ copy = rec_copy(buf, rec, offsets);
+
+ if (is_comp) {
+ rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
+ } else {
+ rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
+ }
+
+ rtr_rec.r_rec = copy;
+ rtr_rec.locked = false;
+
+ match_rec->matched_recs->push_back(rtr_rec);
+ match_rec->valid = true;
+
+ data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
+ match_rec->used += data_len;
+
+ ut_ad(match_rec->used < srv_page_size);
+}
+
+/**************************************************************//**
+Store the parent path cursor
+@return number of cursor stored */
+ulint
+rtr_store_parent_path(
+/*==================*/
+ const buf_block_t* block, /*!< in: block of the page */
+ btr_cur_t* btr_cur,/*!< in/out: persistent cursor */
+ ulint latch_mode,
+ /*!< in: latch_mode */
+ ulint level, /*!< in: index level */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ ulint num = btr_cur->rtr_info->parent_path->size();
+ ulint num_stored = 0;
+
+ while (num >= 1) {
+ node_visit_t* node = &(*btr_cur->rtr_info->parent_path)[
+ num - 1];
+ btr_pcur_t* r_cursor = node->cursor;
+ buf_block_t* cur_block;
+
+ if (node->level > level) {
+ break;
+ }
+
+ r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
+ r_cursor->latch_mode = latch_mode;
+
+ cur_block = btr_pcur_get_block(r_cursor);
+
+ if (cur_block == block) {
+ btr_pcur_store_position(r_cursor, mtr);
+ num_stored++;
+ } else {
+ break;
+ }
+
+ num--;
+ }
+
+ return(num_stored);
+}
+/**************************************************************//**
+push a nonleaf index node to the search path for insertion */
+static
+void
+rtr_non_leaf_insert_stack_push(
+/*===========================*/
+ dict_index_t* index, /*!< in: index descriptor */
+ rtr_node_path_t* path, /*!< in/out: search path */
+ ulint level, /*!< in: index page level */
+ uint32_t child_no,/*!< in: child page no */
+ const buf_block_t* block, /*!< in: block of the page */
+ const rec_t* rec, /*!< in: positioned record */
+ double mbr_inc)/*!< in: MBR needs to be enlarged */
+{
+ node_seq_t new_seq;
+ btr_pcur_t* my_cursor;
+
+ my_cursor = static_cast<btr_pcur_t*>(
+ ut_malloc_nokey(sizeof(*my_cursor)));
+
+ btr_pcur_init(my_cursor);
+
+ page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
+
+ (btr_pcur_get_btr_cur(my_cursor))->index = index;
+
+ new_seq = rtr_get_current_ssn_id(index);
+ rtr_non_leaf_stack_push(path, block->page.id().page_no(),
+ new_seq, level, child_no, my_cursor, mbr_inc);
+}
+
+/** Copy a buf_block_t, except "block->lock".
+@param[in,out] matches copy to match->block
+@param[in] block block to copy */
+static
+void
+rtr_copy_buf(
+ matched_rec_t* matches,
+ const buf_block_t* block)
+{
+ /* Copy all members of "block" to "matches->block" except "lock".
+ We skip "lock" because it is not used
+ from the dummy buf_block_t we create here and because memcpy()ing
+ it generates (valid) compiler warnings that the vtable pointer
+ will be copied. */
+ new (&matches->block.page) buf_page_t(block->page);
+ matches->block.frame = block->frame;
+ matches->block.unzip_LRU = block->unzip_LRU;
+
+ ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
+ ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
+
+ /* Skip buf_block_t::lock */
+ matches->block.modify_clock = block->modify_clock;
+#ifdef BTR_CUR_HASH_ADAPT
+ matches->block.n_hash_helps = block->n_hash_helps;
+ matches->block.n_fields = block->n_fields;
+ matches->block.left_side = block->left_side;
+#if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
+ matches->block.n_pointers = 0;
+#endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
+ matches->block.curr_n_fields = block->curr_n_fields;
+ matches->block.curr_left_side = block->curr_left_side;
+ matches->block.index = block->index;
+#endif /* BTR_CUR_HASH_ADAPT */
+ ut_d(matches->block.debug_latch = NULL);
+}
+
+/****************************************************************//**
+Generate a shadow copy of the page block header to save the
+matched records */
+static
+void
+rtr_init_match(
+/*===========*/
+ matched_rec_t* matches,/*!< in/out: match to initialize */
+ const buf_block_t* block, /*!< in: buffer block */
+ const page_t* page) /*!< in: buffer page */
+{
+ ut_ad(matches->matched_recs->empty());
+ matches->locked = false;
+ rtr_copy_buf(matches, block);
+ matches->block.frame = matches->bufp;
+ matches->valid = false;
+ /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
+ use infimum/supremum of this page as normal btr page for search. */
+ memcpy(matches->block.frame, page, page_is_comp(page)
+ ? PAGE_NEW_SUPREMUM_END
+ : PAGE_OLD_SUPREMUM_END);
+ matches->used = page_is_comp(page)
+ ? PAGE_NEW_SUPREMUM_END
+ : PAGE_OLD_SUPREMUM_END;
+#ifdef RTR_SEARCH_DIAGNOSTIC
+ ulint pageno = page_get_page_no(page);
+ fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
+ static_cast<int>(pageno));
+#endif /* RTR_SEARCH_DIAGNOSTIC */
+}
+
+/****************************************************************//**
+Get the bounding box content from an index record */
+void
+rtr_get_mbr_from_rec(
+/*=================*/
+ const rec_t* rec, /*!< in: data tuple */
+ const rec_offs* offsets,/*!< in: offsets array */
+ rtr_mbr_t* mbr) /*!< out MBR */
+{
+ ulint rec_f_len;
+ const byte* data;
+
+ data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
+
+ rtr_read_mbr(data, mbr);
+}
+
+/****************************************************************//**
+Get the bounding box content from a MBR data record */
+void
+rtr_get_mbr_from_tuple(
+/*===================*/
+ const dtuple_t* dtuple, /*!< in: data tuple */
+ rtr_mbr* mbr) /*!< out: mbr to fill */
+{
+ const dfield_t* dtuple_field;
+ ulint dtuple_f_len;
+
+ dtuple_field = dtuple_get_nth_field(dtuple, 0);
+ dtuple_f_len = dfield_get_len(dtuple_field);
+ ut_a(dtuple_f_len >= 4 * sizeof(double));
+
+ rtr_read_mbr(static_cast<const byte*>(dfield_get_data(dtuple_field)),
+ mbr);
+}
+
+/** Compare minimum bounding rectangles.
+@return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
+1, 0 for rest compare modes, depends on a and b qualifies the
+relationship (CONTAINS, WITHIN etc.) */
+static int cmp_gis_field(page_cur_mode_t mode, const void *a, const void *b)
+{
+ return mode == PAGE_CUR_MBR_EQUAL
+ ? cmp_geometry_field(a, b)
+ : rtree_key_cmp(mode, a, b);
+}
+
+/** Compare a GIS data tuple to a physical record in rtree non-leaf node.
+We need to check the page number field, since we don't store pk field in
+rtree non-leaf node.
+@param[in] dtuple data tuple
+@param[in] rec R-tree record
+@return whether dtuple is less than rec */
+static bool
+cmp_dtuple_rec_with_gis_internal(const dtuple_t* dtuple, const rec_t* rec)
+{
+ const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
+ ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN);
+
+ if (cmp_gis_field(PAGE_CUR_WITHIN, dfield_get_data(dtuple_field), rec))
+ return true;
+
+ dtuple_field= dtuple_get_nth_field(dtuple, 1);
+ ut_ad(dfield_get_len(dtuple_field) == 4); /* child page number */
+ ut_ad(dtuple_field->type.mtype == DATA_SYS_CHILD);
+ ut_ad(!(dtuple_field->type.prtype & ~DATA_NOT_NULL));
+
+ return memcmp(dtuple_field->data, rec + DATA_MBR_LEN, 4) != 0;
+}
+
+#ifndef UNIV_DEBUG
+static
+#endif
+/** Compare a GIS data tuple to a physical record.
+@param[in] dtuple data tuple
+@param[in] rec R-tree record
+@param[in] mode compare mode
+@retval negative if dtuple is less than rec */
+int cmp_dtuple_rec_with_gis(const dtuple_t *dtuple, const rec_t *rec,
+ page_cur_mode_t mode)
+{
+ const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
+ /* FIXME: TABLE_SHARE::init_from_binary_frm_image() is adding
+ field->key_part_length_bytes() to the key length */
+ ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN ||
+ dfield_get_len(dtuple_field) == DATA_MBR_LEN + 2);
+
+ return cmp_gis_field(mode, dfield_get_data(dtuple_field), rec);
+}
+
+/****************************************************************//**
+Searches the right position in rtree for a page cursor. */
+bool
+rtr_cur_search_with_match(
+/*======================*/
+ const buf_block_t* block, /*!< in: buffer block */
+ dict_index_t* index, /*!< in: index descriptor */
+ const dtuple_t* tuple, /*!< in: data tuple */
+ page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_INSERT,
+ PAGE_CUR_RTREE_LOCATE etc. */
+ page_cur_t* cursor, /*!< in/out: page cursor */
+ rtr_info_t* rtr_info)/*!< in/out: search stack */
+{
+ bool found = false;
+ const page_t* page;
+ const rec_t* rec;
+ const rec_t* last_rec;
+ rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets = offsets_;
+ mem_heap_t* heap = NULL;
+ int cmp = 1;
+ double least_inc = DBL_MAX;
+ const rec_t* best_rec;
+ const rec_t* last_match_rec = NULL;
+ bool match_init = false;
+ page_cur_mode_t orig_mode = mode;
+ const rec_t* first_rec = NULL;
+
+ rec_offs_init(offsets_);
+
+ ut_ad(RTREE_SEARCH_MODE(mode));
+
+ ut_ad(dict_index_is_spatial(index));
+
+ page = buf_block_get_frame(block);
+
+ const ulint level = btr_page_get_level(page);
+ const ulint n_core = level ? 0 : index->n_fields;
+
+ if (mode == PAGE_CUR_RTREE_LOCATE) {
+ ut_ad(level != 0);
+ mode = PAGE_CUR_WITHIN;
+ }
+
+ rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
+
+ last_rec = rec;
+ best_rec = rec;
+
+ if (page_rec_is_infimum(rec)) {
+ rec = page_rec_get_next_const(rec);
+ }
+
+ /* Check insert tuple size is larger than first rec, and try to
+ avoid it if possible */
+ if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
+
+ ulint new_rec_size = rec_get_converted_size(index, tuple, 0);
+
+ offsets = rec_get_offsets(rec, index, offsets, n_core,
+ dtuple_get_n_fields_cmp(tuple),
+ &heap);
+
+ if (rec_offs_size(offsets) < new_rec_size) {
+ first_rec = rec;
+ }
+
+ /* If this is the left-most page of this index level
+ and the table is a compressed table, try to avoid
+ first page as much as possible, as there will be problem
+ when update MIN_REC rec in compress table */
+ if (is_buf_block_get_page_zip(block)
+ && !page_has_prev(page)
+ && page_get_n_recs(page) >= 2) {
+
+ rec = page_rec_get_next_const(rec);
+ }
+ }
+
+ while (!page_rec_is_supremum(rec)) {
+ if (!n_core) {
+ switch (mode) {
+ case PAGE_CUR_CONTAIN:
+ case PAGE_CUR_INTERSECT:
+ case PAGE_CUR_MBR_EQUAL:
+ /* At non-leaf level, we will need to check
+ both CONTAIN and INTERSECT for either of
+ the search mode */
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec, PAGE_CUR_CONTAIN);
+
+ if (cmp != 0) {
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec,
+ PAGE_CUR_INTERSECT);
+ }
+ break;
+ case PAGE_CUR_DISJOINT:
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec, mode);
+
+ if (cmp != 0) {
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec,
+ PAGE_CUR_INTERSECT);
+ }
+ break;
+ case PAGE_CUR_RTREE_INSERT:
+ double increase;
+ double area;
+
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec, PAGE_CUR_WITHIN);
+
+ if (cmp != 0) {
+ increase = rtr_rec_cal_increase(
+ tuple, rec, &area);
+ /* Once it goes beyond DBL_MAX,
+ it would not make sense to record
+ such value, just make it
+ DBL_MAX / 2 */
+ if (increase >= DBL_MAX) {
+ increase = DBL_MAX / 2;
+ }
+
+ if (increase < least_inc) {
+ least_inc = increase;
+ best_rec = rec;
+ } else if (best_rec
+ && best_rec == first_rec) {
+ /* if first_rec is set,
+ we will try to avoid it */
+ least_inc = increase;
+ best_rec = rec;
+ }
+ }
+ break;
+ case PAGE_CUR_RTREE_GET_FATHER:
+ cmp = cmp_dtuple_rec_with_gis_internal(
+ tuple, rec);
+ break;
+ default:
+ /* WITHIN etc. */
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec, mode);
+ }
+ } else {
+ /* At leaf level, INSERT should translate to LE */
+ ut_ad(mode != PAGE_CUR_RTREE_INSERT);
+
+ cmp = cmp_dtuple_rec_with_gis(
+ tuple, rec, mode);
+ }
+
+ if (cmp == 0) {
+ found = true;
+
+ /* If located, the matching node/rec will be pushed
+ to rtr_info->path for non-leaf nodes, or
+ rtr_info->matches for leaf nodes */
+ if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
+ if (!n_core) {
+ uint32_t page_no;
+ node_seq_t new_seq;
+ bool is_loc;
+
+ is_loc = (orig_mode
+ == PAGE_CUR_RTREE_LOCATE
+ || orig_mode
+ == PAGE_CUR_RTREE_GET_FATHER);
+
+ offsets = rec_get_offsets(
+ rec, index, offsets, 0,
+ ULINT_UNDEFINED, &heap);
+
+ page_no = btr_node_ptr_get_child_page_no(
+ rec, offsets);
+
+ ut_ad(level >= 1);
+
+ /* Get current SSN, before we insert
+ it into the path stack */
+ new_seq = rtr_get_current_ssn_id(index);
+
+ rtr_non_leaf_stack_push(
+ rtr_info->path,
+ page_no,
+ new_seq, level - 1, 0,
+ NULL, 0);
+
+ if (is_loc) {
+ rtr_non_leaf_insert_stack_push(
+ index,
+ rtr_info->parent_path,
+ level, page_no, block,
+ rec, 0);
+ }
+
+ if (!srv_read_only_mode
+ && (rtr_info->need_page_lock
+ || !is_loc)) {
+
+ /* Lock the page, preventing it
+ from being shrunk */
+ lock_place_prdt_page_lock(
+ page_id_t(block->page
+ .id()
+ .space(),
+ page_no),
+ index,
+ rtr_info->thr);
+ }
+ } else {
+ ut_ad(orig_mode
+ != PAGE_CUR_RTREE_LOCATE);
+
+ if (!match_init) {
+ rtr_init_match(
+ rtr_info->matches,
+ block, page);
+ match_init = true;
+ }
+
+ /* Collect matched records on page */
+ offsets = rec_get_offsets(
+ rec, index, offsets,
+ index->n_fields,
+ ULINT_UNDEFINED, &heap);
+ rtr_leaf_push_match_rec(
+ rec, rtr_info, offsets,
+ page_is_comp(page));
+ }
+
+ last_match_rec = rec;
+ } else {
+ /* This is the insertion case, it will break
+ once it finds the first MBR that can accomodate
+ the inserting rec */
+ break;
+ }
+ }
+
+ last_rec = rec;
+
+ rec = page_rec_get_next_const(rec);
+ }
+
+ /* All records on page are searched */
+ if (page_rec_is_supremum(rec)) {
+ if (!n_core) {
+ if (!found) {
+ /* No match case, if it is for insertion,
+ then we select the record that result in
+ least increased area */
+ if (mode == PAGE_CUR_RTREE_INSERT) {
+ ut_ad(least_inc < DBL_MAX);
+ offsets = rec_get_offsets(
+ best_rec, index, offsets,
+ 0, ULINT_UNDEFINED, &heap);
+ uint32_t child_no =
+ btr_node_ptr_get_child_page_no(
+ best_rec, offsets);
+
+ rtr_non_leaf_insert_stack_push(
+ index, rtr_info->parent_path,
+ level, child_no, block,
+ best_rec, least_inc);
+
+ page_cur_position(best_rec, block,
+ cursor);
+ rtr_info->mbr_adj = true;
+ } else {
+ /* Position at the last rec of the
+ page, if it is not the leaf page */
+ page_cur_position(last_rec, block,
+ cursor);
+ }
+ } else {
+ /* There are matching records, position
+ in the last matching records */
+ if (rtr_info) {
+ rec = last_match_rec;
+ page_cur_position(
+ rec, block, cursor);
+ }
+ }
+ } else if (rtr_info) {
+ /* Leaf level, no match, position at the
+ last (supremum) rec */
+ if (!last_match_rec) {
+ page_cur_position(rec, block, cursor);
+ goto func_exit;
+ }
+
+ /* There are matched records */
+ matched_rec_t* match_rec = rtr_info->matches;
+
+ rtr_rec_t test_rec;
+
+ test_rec = match_rec->matched_recs->back();
+#ifdef UNIV_DEBUG
+ rec_offs offsets_2[REC_OFFS_NORMAL_SIZE];
+ rec_offs* offsets2 = offsets_2;
+ rec_offs_init(offsets_2);
+
+ ut_ad(found);
+
+ /* Verify the record to be positioned is the same
+ as the last record in matched_rec vector */
+ offsets2 = rec_get_offsets(test_rec.r_rec, index,
+ offsets2, index->n_fields,
+ ULINT_UNDEFINED, &heap);
+
+ offsets = rec_get_offsets(last_match_rec, index,
+ offsets, index->n_fields,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec,
+ offsets2, offsets, index) == 0);
+#endif /* UNIV_DEBUG */
+ /* Pop the last match record and position on it */
+ match_rec->matched_recs->pop_back();
+ page_cur_position(test_rec.r_rec, &match_rec->block,
+ cursor);
+ }
+ } else {
+
+ if (mode == PAGE_CUR_RTREE_INSERT) {
+ ut_ad(!last_match_rec);
+ rtr_non_leaf_insert_stack_push(
+ index, rtr_info->parent_path, level,
+ mach_read_from_4(rec + DATA_MBR_LEN),
+ block, rec, 0);
+
+ } else if (rtr_info && found && !n_core) {
+ rec = last_match_rec;
+ }
+
+ page_cur_position(rec, block, cursor);
+ }
+
+#ifdef UNIV_DEBUG
+ /* Verify that we are positioned at the same child page as pushed in
+ the path stack */
+ if (!n_core && (!page_rec_is_supremum(rec) || found)
+ && mode != PAGE_CUR_RTREE_INSERT) {
+ ulint page_no;
+
+ offsets = rec_get_offsets(rec, index, offsets, 0,
+ ULINT_UNDEFINED, &heap);
+ page_no = btr_node_ptr_get_child_page_no(rec, offsets);
+
+ if (rtr_info && found) {
+ rtr_node_path_t* path = rtr_info->path;
+ node_visit_t last_visit = path->back();
+
+ ut_ad(last_visit.page_no == page_no);
+ }
+ }
+#endif /* UNIV_DEBUG */
+
+func_exit:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ return(found);
+}