summaryrefslogtreecommitdiffstats
path: root/storage/tokudb/PerconaFT/src/ydb_cursor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/src/ydb_cursor.cc')
-rw-r--r--storage/tokudb/PerconaFT/src/ydb_cursor.cc900
1 files changed, 900 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/src/ydb_cursor.cc b/storage/tokudb/PerconaFT/src/ydb_cursor.cc
new file mode 100644
index 00000000..1f4f00b7
--- /dev/null
+++ b/storage/tokudb/PerconaFT/src/ydb_cursor.cc
@@ -0,0 +1,900 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <db.h>
+#include "toku_assert.h"
+#include "ydb-internal.h"
+#include "ydb_cursor.h"
+#include "ydb_row_lock.h"
+#include "ft/cursor.h"
+
+static YDB_C_LAYER_STATUS_S ydb_c_layer_status;
+#ifdef STATUS_VALUE
+#undef STATUS_VALUE
+#endif
+#define STATUS_VALUE(x) ydb_c_layer_status.status[x].value.num
+
+#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_c_layer_status, k, c, t, l, inc)
+
+static void
+ydb_c_layer_status_init (void) {
+ // Note, this function initializes the keyname, type, and legend fields.
+ // Value fields are initialized to zero by compiler.
+ ydb_c_layer_status.initialized = true;
+}
+#undef STATUS_INIT
+
+void
+ydb_c_layer_get_status(YDB_C_LAYER_STATUS statp) {
+ if (!ydb_c_layer_status.initialized)
+ ydb_c_layer_status_init();
+ *statp = ydb_c_layer_status;
+}
+
+//Get the main portion of a cursor flag (excluding the bitwise or'd components).
+static int
+get_main_cursor_flag(uint32_t flags) {
+ return flags & DB_OPFLAGS_MASK;
+}
+
+static int
+get_nonmain_cursor_flags(uint32_t flags) {
+ return flags & ~(DB_OPFLAGS_MASK);
+}
+
+static inline bool
+c_uninitialized(DBC *c) {
+ return toku_ft_cursor_uninitialized(dbc_ftcursor(c));
+}
+
+typedef struct query_context_wrapped_t {
+ DBT *key;
+ DBT *val;
+ struct simple_dbt *skey;
+ struct simple_dbt *sval;
+} *QUERY_CONTEXT_WRAPPED, QUERY_CONTEXT_WRAPPED_S;
+
+static inline void
+query_context_wrapped_init(QUERY_CONTEXT_WRAPPED context, DBC *c, DBT *key, DBT *val) {
+ context->key = key;
+ context->val = val;
+ context->skey = dbc_struct_i(c)->skey;
+ context->sval = dbc_struct_i(c)->sval;
+}
+
+static int
+c_get_wrapper_callback(DBT const *key, DBT const *val, void *extra) {
+ QUERY_CONTEXT_WRAPPED context = (QUERY_CONTEXT_WRAPPED) extra;
+ int r = toku_dbt_set(key->size, key->data, context->key, context->skey);
+ if (r == 0) {
+ r = toku_dbt_set(val->size, val->data, context->val, context->sval);
+ }
+ return r;
+}
+
+static inline uint32_t get_cursor_prelocked_flags(uint32_t flags, DBC *dbc) {
+ uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
+
+ // DB_READ_UNCOMMITTED and DB_READ_COMMITTED transactions 'own' all read
+ // locks for user-data dictionaries.
+ if (dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE &&
+ !(dbc_struct_i(dbc)->iso == TOKU_ISO_SNAPSHOT &&
+ dbc_struct_i(dbc)->locking_read)) {
+ lock_flags |= DB_PRELOCKED;
+ }
+ return lock_flags;
+}
+
+//This is the user level callback function given to ydb layer functions like
+//c_getf_first
+
+typedef struct query_context_base_t {
+ FT_CURSOR c;
+ DB_TXN *txn;
+ DB *db;
+ YDB_CALLBACK_FUNCTION f;
+ void *f_extra;
+ int r_user_callback;
+ bool do_locking;
+ bool is_write_op;
+ toku::lock_request request;
+} *QUERY_CONTEXT_BASE, QUERY_CONTEXT_BASE_S;
+
+typedef struct query_context_t {
+ QUERY_CONTEXT_BASE_S base;
+} *QUERY_CONTEXT, QUERY_CONTEXT_S;
+
+typedef struct query_context_with_input_t {
+ QUERY_CONTEXT_BASE_S base;
+ DBT *input_key;
+ DBT *input_val;
+} *QUERY_CONTEXT_WITH_INPUT, QUERY_CONTEXT_WITH_INPUT_S;
+
+static void
+query_context_base_init(QUERY_CONTEXT_BASE context, DBC *c, uint32_t flag, bool is_write_op, YDB_CALLBACK_FUNCTION f, void *extra) {
+ context->c = dbc_ftcursor(c);
+ context->txn = dbc_struct_i(c)->txn;
+ context->db = c->dbp;
+ context->f = f;
+ context->f_extra = extra;
+ context->is_write_op = is_write_op;
+ uint32_t lock_flags = get_cursor_prelocked_flags(flag, c);
+ if (context->is_write_op) {
+ lock_flags &= DB_PRELOCKED_WRITE; // Only care about whether already locked for write
+ }
+ context->do_locking = (context->db->i->lt != nullptr && !(lock_flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE)));
+ context->r_user_callback = 0;
+ context->request.create();
+}
+
+static toku::lock_request::type
+query_context_determine_lock_type(QUERY_CONTEXT_BASE context) {
+ return context->is_write_op ?
+ toku::lock_request::type::WRITE : toku::lock_request::type::READ;
+}
+
+static void
+query_context_base_destroy(QUERY_CONTEXT_BASE context) {
+ context->request.destroy();
+}
+
+static void
+query_context_init_read(QUERY_CONTEXT context, DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ const bool is_write = false;
+ query_context_base_init(&context->base, c, flag, is_write, f, extra);
+}
+
+static void
+query_context_init_write(QUERY_CONTEXT context, DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ const bool is_write = true;
+ query_context_base_init(&context->base, c, flag, is_write, f, extra);
+}
+
+static void
+query_context_with_input_init(QUERY_CONTEXT_WITH_INPUT context, DBC *c, uint32_t flag, DBT *key, DBT *val, YDB_CALLBACK_FUNCTION f, void *extra) {
+ // grab write locks if the DB_RMW flag is set or the cursor was created with the DB_RMW flag
+ const bool is_write = ((flag & DB_RMW) != 0) || dbc_struct_i(c)->rmw;
+ query_context_base_init(&context->base, c, flag, is_write, f, extra);
+ context->input_key = key;
+ context->input_val = val;
+}
+
+static int c_getf_first_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static void
+c_query_context_init(QUERY_CONTEXT context, DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ bool is_write_op = false;
+ // grab write locks if the DB_RMW flag is set or the cursor was created with the DB_RMW flag
+ if ((flag & DB_RMW) || dbc_struct_i(c)->rmw) {
+ is_write_op = true;
+ }
+ if (is_write_op) {
+ query_context_init_write(context, c, flag, f, extra);
+ } else {
+ query_context_init_read(context, c, flag, f, extra);
+ }
+}
+
+static void
+c_query_context_destroy(QUERY_CONTEXT context) {
+ query_context_base_destroy(&context->base);
+}
+
+static int
+c_getf_first(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+ int r = 0;
+ QUERY_CONTEXT_S context; //Describes the context of this query.
+ c_query_context_init(&context, c, flag, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_first will call c_getf_first_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_first(dbc_ftcursor(c), c_getf_first_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ c_query_context_destroy(&context);
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_first_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+ DBT found_key = { .data = (void *) key, .size = keylen };
+
+ if (context->do_locking) {
+ const DBT *left_key = toku_dbt_negative_infinity();
+ const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
+ r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_first
+ return r;
+}
+
+static int c_getf_last_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static int
+c_getf_last(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+ int r = 0;
+ QUERY_CONTEXT_S context; //Describes the context of this query.
+ c_query_context_init(&context, c, flag, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_last will call c_getf_last_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_last(dbc_ftcursor(c), c_getf_last_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ c_query_context_destroy(&context);
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_last_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+ DBT found_key = { .data = (void *) key, .size = keylen };
+
+ if (context->do_locking) {
+ const DBT *left_key = key != NULL ? &found_key : toku_dbt_negative_infinity();
+ const DBT *right_key = toku_dbt_positive_infinity();
+ r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_last
+ return r;
+}
+
+static int c_getf_next_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static int
+c_getf_next(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ int r;
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+ if (c_uninitialized(c)) {
+ r = c_getf_first(c, flag, f, extra);
+ } else {
+ r = 0;
+ QUERY_CONTEXT_S context; //Describes the context of this query.
+ c_query_context_init(&context, c, flag, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_next will call c_getf_next_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_next(dbc_ftcursor(c), c_getf_next_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ c_query_context_destroy(&context);
+ }
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_next_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+
+ DBT found_key = { .data = (void *) key, .size = keylen };
+
+ if (context->do_locking) {
+ const DBT *prevkey, *prevval;
+ toku_ft_cursor_peek(context->c, &prevkey, &prevval);
+ const DBT *left_key = prevkey;
+ const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
+ r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_next
+ return r;
+}
+
+static int c_getf_prev_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static int
+c_getf_prev(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ int r;
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+ if (c_uninitialized(c)) {
+ r = c_getf_last(c, flag, f, extra);
+ } else {
+ r = 0;
+ QUERY_CONTEXT_S context; //Describes the context of this query.
+ c_query_context_init(&context, c, flag, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_prev will call c_getf_prev_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_prev(dbc_ftcursor(c), c_getf_prev_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ c_query_context_destroy(&context);
+ }
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_prev_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+ DBT found_key = { .data = (void *) key, .size = keylen };
+
+ if (context->do_locking) {
+ const DBT *prevkey, *prevval;
+ toku_ft_cursor_peek(context->c, &prevkey, &prevval);
+ const DBT *left_key = key != NULL ? &found_key : toku_dbt_negative_infinity();
+ const DBT *right_key = prevkey;
+ r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_prev
+ return r;
+}
+
+static int c_getf_current_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static int
+c_getf_current(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+
+ QUERY_CONTEXT_S context; //Describes the context of this query.
+ c_query_context_init(&context, c, flag, f, extra);
+ //toku_ft_cursor_current will call c_getf_current_callback(..., context) (if query is successful)
+ int r = toku_ft_cursor_current(dbc_ftcursor(c), DB_CURRENT, c_getf_current_callback, &context);
+ c_query_context_destroy(&context);
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_current_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+
+ //Call application-layer callback if found.
+ if (key!=NULL && !lock_only) {
+ DBT found_key = { .data = (void *) key, .size = keylen };
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ } else {
+ r = 0;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_current
+ return r;
+}
+
+static int c_getf_set_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+int
+toku_c_getf_set(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+
+ int r = 0;
+ QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
+ query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_set will call c_getf_set_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_set(dbc_ftcursor(c), key, c_getf_set_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ query_context_base_destroy(&context.base);
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_set_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+
+ //Lock:
+ // left(key,val) = (input_key, -infinity)
+ // right(key,val) = (input_key, found ? found_val : infinity)
+ if (context->do_locking) {
+ r = toku_db_start_range_lock(context->db, context->txn, super_context->input_key, super_context->input_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_key = { .data = (void *) key, .size = keylen };
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_set
+ return r;
+}
+
+static int c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static int
+c_getf_set_range(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+
+ int r = 0;
+ QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
+ query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, nullptr, c_getf_set_range_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ query_context_base_destroy(&context.base);
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+ DBT found_key = { .data = (void *) key, .size = keylen };
+
+ //Lock:
+ // left(key,val) = (input_key, -infinity)
+ // right(key) = found ? found_key : infinity
+ // right(val) = found ? found_val : infinity
+ if (context->do_locking) {
+ const DBT *left_key = super_context->input_key;
+ const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
+ r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_set_range
+ return r;
+}
+
+static int
+c_getf_set_range_with_bound(DBC *c, uint32_t flag, DBT *key, DBT *key_bound, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+
+ int r = 0;
+ QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
+ query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, key_bound, c_getf_set_range_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ query_context_base_destroy(&context.base);
+ return r;
+}
+
+static int c_getf_set_range_reverse_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
+
+static int
+c_getf_set_range_reverse(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+
+ int r = 0;
+ QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
+ query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
+ while (r == 0) {
+ //toku_ft_cursor_set_range_reverse will call c_getf_set_range_reverse_callback(..., context) (if query is successful)
+ r = toku_ft_cursor_set_range_reverse(dbc_ftcursor(c), key, c_getf_set_range_reverse_callback, &context);
+ if (r == DB_LOCK_NOTGRANTED) {
+ r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
+ } else {
+ break;
+ }
+ }
+ query_context_base_destroy(&context.base);
+ return r;
+}
+
+//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
+static int
+c_getf_set_range_reverse_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
+ QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
+ QUERY_CONTEXT_BASE context = &super_context->base;
+
+ int r;
+ DBT found_key = { .data = (void *) key, .size = keylen };
+
+ //Lock:
+ // left(key) = found ? found_key : -infinity
+ // left(val) = found ? found_val : -infinity
+ // right(key,val) = (input_key, infinity)
+ if (context->do_locking) {
+ const DBT *left_key = key != NULL ? &found_key : toku_dbt_negative_infinity();
+ const DBT *right_key = super_context->input_key;
+ r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
+ query_context_determine_lock_type(context), &context->request);
+ } else {
+ r = 0;
+ }
+
+ //Call application-layer callback if found and locks were successfully obtained.
+ if (r==0 && key!=NULL && !lock_only) {
+ DBT found_val = { .data = (void *) val, .size = vallen };
+ context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
+ r = context->r_user_callback;
+ }
+
+ //Give ft-layer an error (if any) to return from toku_ft_cursor_set_range_reverse
+ return r;
+}
+
+
+int toku_c_close_internal(DBC *c) {
+ toku_ft_cursor_destroy(dbc_ftcursor(c));
+ toku_sdbt_cleanup(&dbc_struct_i(c)->skey_s);
+ toku_sdbt_cleanup(&dbc_struct_i(c)->sval_s);
+ return 0;
+}
+
+// Close a cursor.
+int toku_c_close(DBC *c) {
+ toku_c_close_internal(c);
+ toku_free(c);
+ return 0;
+}
+
+static int c_set_bounds(DBC *dbc,
+ const DBT *left_key,
+ const DBT *right_key,
+ bool pre_acquire,
+ int out_of_range_error) {
+ if (out_of_range_error != DB_NOTFOUND &&
+ out_of_range_error != TOKUDB_OUT_OF_RANGE && out_of_range_error != 0) {
+ return toku_ydb_do_error(dbc->dbp->dbenv,
+ EINVAL,
+ "Invalid out_of_range_error [%d] for %s\n",
+ out_of_range_error,
+ __FUNCTION__);
+ }
+ if (left_key == toku_dbt_negative_infinity() &&
+ right_key == toku_dbt_positive_infinity()) {
+ out_of_range_error = 0;
+ }
+ DB *db = dbc->dbp;
+ DB_TXN *txn = dbc_struct_i(dbc)->txn;
+ HANDLE_PANICKED_DB(db);
+ toku_ft_cursor_set_range_lock(dbc_ftcursor(dbc),
+ left_key,
+ right_key,
+ (left_key == toku_dbt_negative_infinity()),
+ (right_key == toku_dbt_positive_infinity()),
+ out_of_range_error);
+ if (!db->i->lt || !txn || !pre_acquire)
+ return 0;
+ // READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
+ if (!dbc_struct_i(dbc)->rmw &&
+ dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE &&
+ !(dbc_struct_i(dbc)->iso == TOKU_ISO_SNAPSHOT &&
+ dbc_struct_i(dbc)->locking_read))
+ return 0;
+
+ toku::lock_request::type lock_type = dbc_struct_i(dbc)->rmw
+ ? toku::lock_request::type::WRITE
+ : toku::lock_request::type::READ;
+ int r = toku_db_get_range_lock(db, txn, left_key, right_key, lock_type);
+ return r;
+}
+
+static void
+c_remove_restriction(DBC *dbc) {
+ toku_ft_cursor_remove_restriction(dbc_ftcursor(dbc));
+}
+
+static void c_set_txn(DBC *dbc, DB_TXN *txn) {
+ dbc_struct_i(dbc)->txn = txn;
+ dbc_ftcursor(dbc)->ttxn = db_txn_struct_i(txn)->tokutxn;
+}
+
+static void
+c_set_check_interrupt_callback(DBC* dbc, bool (*interrupt_callback)(void*, uint64_t), void *extra) {
+ toku_ft_cursor_set_check_interrupt_cb(dbc_ftcursor(dbc), interrupt_callback, extra);
+}
+
+int
+toku_c_get(DBC* c, DBT* key, DBT* val, uint32_t flag) {
+ HANDLE_PANICKED_DB(c->dbp);
+ HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
+
+ uint32_t main_flag = get_main_cursor_flag(flag);
+ uint32_t remaining_flags = get_nonmain_cursor_flags(flag);
+ int r;
+ QUERY_CONTEXT_WRAPPED_S context;
+ //Passing in NULL for a key or val means that it is NOT an output.
+ // Both key and val are output:
+ // query_context_wrapped_init(&context, c, key, val);
+ // Val is output, key is not:
+ // query_context_wrapped_init(&context, c, NULL, val);
+ // Neither key nor val are output:
+ // query_context_wrapped_init(&context, c, NULL, NULL);
+ switch (main_flag) {
+ case (DB_FIRST):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_first(c, remaining_flags, c_get_wrapper_callback, &context);
+ break;
+ case (DB_LAST):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_last(c, remaining_flags, c_get_wrapper_callback, &context);
+ break;
+ case (DB_NEXT):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_next(c, remaining_flags, c_get_wrapper_callback, &context);
+ break;
+ case (DB_PREV):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_prev(c, remaining_flags, c_get_wrapper_callback, &context);
+ break;
+#ifdef DB_PREV_DUP
+ case (DB_PREV_DUP):
+ query_context_wrapped_init(&context, c, key, val);
+ r = toku_c_getf_prev_dup(c, remaining_flags, c_get_wrapper_callback, &context);
+ break;
+#endif
+ case (DB_CURRENT):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_current(c, remaining_flags, c_get_wrapper_callback, &context);
+ break;
+ case (DB_SET):
+ query_context_wrapped_init(&context, c, NULL, val);
+ r = toku_c_getf_set(c, remaining_flags, key, c_get_wrapper_callback, &context);
+ break;
+ case (DB_SET_RANGE):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_set_range(c, remaining_flags, key, c_get_wrapper_callback, &context);
+ break;
+ case (DB_SET_RANGE_REVERSE):
+ query_context_wrapped_init(&context, c, key, val);
+ r = c_getf_set_range_reverse(c, remaining_flags, key, c_get_wrapper_callback, &context);
+ break;
+ default:
+ r = EINVAL;
+ break;
+ }
+ return r;
+}
+
+int toku_db_cursor_internal(DB *db,
+ DB_TXN *txn,
+ DBC *c,
+ uint32_t flags,
+ int is_temporary_cursor) {
+ HANDLE_PANICKED_DB(db);
+ HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
+ DB_ENV *env = db->dbenv;
+
+ if (flags &
+ ~(DB_SERIALIZABLE | DB_INHERIT_ISOLATION | DB_LOCKING_READ | DB_RMW |
+ DBC_DISABLE_PREFETCHING)) {
+ return toku_ydb_do_error(
+ env, EINVAL, "Invalid flags set for toku_db_cursor\n");
+ }
+
+#define SCRS(name) c->name = name
+ SCRS(c_getf_first);
+ SCRS(c_getf_last);
+ SCRS(c_getf_next);
+ SCRS(c_getf_prev);
+ SCRS(c_getf_current);
+ SCRS(c_getf_set_range);
+ SCRS(c_getf_set_range_reverse);
+ SCRS(c_getf_set_range_with_bound);
+ SCRS(c_set_bounds);
+ SCRS(c_remove_restriction);
+ SCRS(c_set_txn);
+ SCRS(c_set_check_interrupt_callback);
+#undef SCRS
+
+ c->c_get = toku_c_get;
+ c->c_getf_set = toku_c_getf_set;
+ c->c_close = toku_c_close;
+
+ c->dbp = db;
+
+ dbc_struct_i(c)->txn = txn;
+ dbc_struct_i(c)->skey_s = (struct simple_dbt){0, 0};
+ dbc_struct_i(c)->sval_s = (struct simple_dbt){0, 0};
+ if (is_temporary_cursor) {
+ dbc_struct_i(c)->skey = &db->i->skey;
+ dbc_struct_i(c)->sval = &db->i->sval;
+ } else {
+ dbc_struct_i(c)->skey = &dbc_struct_i(c)->skey_s;
+ dbc_struct_i(c)->sval = &dbc_struct_i(c)->sval_s;
+ }
+ if (flags & DB_SERIALIZABLE) {
+ dbc_struct_i(c)->iso = TOKU_ISO_SERIALIZABLE;
+ } else {
+ dbc_struct_i(c)->iso =
+ txn ? db_txn_struct_i(txn)->iso : TOKU_ISO_SERIALIZABLE;
+ }
+ dbc_struct_i(c)->rmw = (flags & DB_RMW) != 0;
+ dbc_struct_i(c)->locking_read = (flags & DB_LOCKING_READ) != 0;
+ enum cursor_read_type read_type =
+ C_READ_ANY; // default, used in serializable and read uncommitted
+ if (txn) {
+ if (dbc_struct_i(c)->iso == TOKU_ISO_READ_COMMITTED ||
+ dbc_struct_i(c)->iso == TOKU_ISO_SNAPSHOT) {
+ read_type = C_READ_SNAPSHOT;
+ } else if (dbc_struct_i(c)->iso == TOKU_ISO_READ_COMMITTED_ALWAYS) {
+ read_type = C_READ_COMMITTED;
+ }
+ }
+ int r = toku_ft_cursor_create(db->i->ft_handle,
+ dbc_ftcursor(c),
+ txn ? db_txn_struct_i(txn)->tokutxn : NULL,
+ read_type,
+ ((flags & DBC_DISABLE_PREFETCHING) != 0),
+ is_temporary_cursor != 0);
+ if (r != 0) {
+ invariant(r == TOKUDB_MVCC_DICTIONARY_TOO_NEW);
+ }
+ return r;
+}
+
+static inline int
+autotxn_db_cursor(DB *db, DB_TXN *txn, DBC *c, uint32_t flags) {
+ if (!txn && (db->dbenv->i->open_flags & DB_INIT_TXN)) {
+ return toku_ydb_do_error(db->dbenv, EINVAL,
+ "Cursors in a transaction environment must have transactions.\n");
+ }
+ return toku_db_cursor_internal(db, txn, c, flags, 0);
+}
+
+// Create a cursor on a db.
+int toku_db_cursor(DB *db, DB_TXN *txn, DBC **c, uint32_t flags) {
+ DBC *XMALLOC(cursor);
+ int r = autotxn_db_cursor(db, txn, cursor, flags);
+ if (r == 0) {
+ *c = cursor;
+ } else {
+ toku_free(cursor);
+ }
+ return r;
+}
+
+#undef STATUS_VALUE
+
+#include <toku_race_tools.h>
+void __attribute__((constructor)) toku_ydb_cursor_helgrind_ignore(void);
+void
+toku_ydb_cursor_helgrind_ignore(void) {
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_c_layer_status, sizeof ydb_c_layer_status);
+}