diff options
Diffstat (limited to '')
-rw-r--r-- | storage/federatedx/AUTHORS | 11 | ||||
-rw-r--r-- | storage/federatedx/CMakeLists.txt | 3 | ||||
-rw-r--r-- | storage/federatedx/ChangeLog | 18 | ||||
-rw-r--r-- | storage/federatedx/FAQ | 40 | ||||
-rw-r--r-- | storage/federatedx/README | 33 | ||||
-rw-r--r-- | storage/federatedx/README.windows | 23 | ||||
-rw-r--r-- | storage/federatedx/federatedx_io.cc | 101 | ||||
-rw-r--r-- | storage/federatedx/federatedx_io_mysql.cc | 659 | ||||
-rw-r--r-- | storage/federatedx/federatedx_io_null.cc | 299 | ||||
-rw-r--r-- | storage/federatedx/federatedx_probes.h | 45 | ||||
-rw-r--r-- | storage/federatedx/federatedx_pushdown.cc | 373 | ||||
-rw-r--r-- | storage/federatedx/federatedx_pushdown.h | 63 | ||||
-rw-r--r-- | storage/federatedx/federatedx_txn.cc | 439 | ||||
-rw-r--r-- | storage/federatedx/ha_federatedx.cc | 3730 | ||||
-rw-r--r-- | storage/federatedx/ha_federatedx.h | 487 |
15 files changed, 6324 insertions, 0 deletions
diff --git a/storage/federatedx/AUTHORS b/storage/federatedx/AUTHORS new file mode 100644 index 00000000..cdd6600f --- /dev/null +++ b/storage/federatedx/AUTHORS @@ -0,0 +1,11 @@ +FederatedX + +Patrick Galbraith <patg@patg.net> - Federated + +Pluggable Storage Engine Skeleton setup + +Brian Aker <brian@mysql.com> | <brian@tangent.org> - Original Design +Calvin Sun - Windows Support +Brian Miezejewski - Bug fixes +Antony T Curtis - Help in initial development, transactions and various help +Michael Widenius - Bug fixes and some simple early optimizations diff --git a/storage/federatedx/CMakeLists.txt b/storage/federatedx/CMakeLists.txt new file mode 100644 index 00000000..6a9b12ee --- /dev/null +++ b/storage/federatedx/CMakeLists.txt @@ -0,0 +1,3 @@ +SET(FEDERATEDX_SOURCES ha_federatedx.cc federatedx_txn.cc federatedx_io.cc federatedx_io_null.cc federatedx_io_mysql.cc) +MYSQL_ADD_PLUGIN(federatedx ${FEDERATEDX_SOURCES} STORAGE_ENGINE + RECOMPILE_FOR_EMBEDDED) diff --git a/storage/federatedx/ChangeLog b/storage/federatedx/ChangeLog new file mode 100644 index 00000000..170321cc --- /dev/null +++ b/storage/federatedx/ChangeLog @@ -0,0 +1,18 @@ +0.2 - Thu March 8 00:00:00 EST 2008 + + - Fixed bug #30051 "CREATE TABLE does not connect and check existence of remote table" + Modified "real_connect" to take a share and create flag to in order to not rely + on any settings that are later instantiated and/or set by get_share + Also, put logic in the code to not attempt this if a localhost. There's an annoying + functionality that if federated tries to connect to itself during creater table, you + get 1159 error (timeout) - only when local. This prevents having this functionality + and is probably part of the reason it was removed. + +0.1 - Thu Feb 1 00:00:00 EST 2008 + + - This is the FederatedX Storage Engine, + first release. + - Added documentation + - Added simple test and README file to explain + how to run the test + - Added FAQ diff --git a/storage/federatedx/FAQ b/storage/federatedx/FAQ new file mode 100644 index 00000000..50def432 --- /dev/null +++ b/storage/federatedx/FAQ @@ -0,0 +1,40 @@ +Q. What is the FederatedX pluggable storage engine? + +A. It is a fork of the Federated Storage Engine that Brian Aker and I +(Patrick Galbraith) developed originally . It is a storage engine that +uses a client connection to a remote MySQL data source as its data +source instead of a local file on disk. + +Q. Why did you fork from Federated? + +A. To enhance the storage engine independently of the +MySQL Server release schedule. Many people have been +mentioning their dissatisfaction with the limitations +of Federated. I think the engine is a great concept and +have a sense of obligation to continue to improve it. +There are some patches already that are in dire need +of being applied and tested. + +Q. What do you plan to do with FederatedX? + +A. Many things need addressing: + +- Outstanding bugs +- How do deal with huge result sets +- Pushdown conditions (being able to pass things like LIMIT + to the remote connection to keep from returning huge + result sets). +- Better transactional support +- Other connection mechanisms (ODBC, JDBC, native drivers + of other RDBMSs) + +Q. What FederatedX is and is not? + +A. FederatedX is not yet a complete "federated" solution in + the sense that other venders have developed (IBM, etc). It + is essentially a networked storage engine. It is my hope + to make it a real federated solution. + +Q. In which MySQL distributions/forks/branches can I find FederateX + +A. MariaDB (http://www.mariadb.com) diff --git a/storage/federatedx/README b/storage/federatedx/README new file mode 100644 index 00000000..6618527c --- /dev/null +++ b/storage/federatedx/README @@ -0,0 +1,33 @@ +This is the FederatedX Storage Engine, developed as an external storage engine. + +NOTE: + +The following is only relevant if you use it for MySQL. MariaDB already comes +with the latest version of FederatedX. + +To install, grab a copy of the mysql source code and run this: + +./configure --with-mysql=/path/to/src/mysql-5.x --libdir=/usr/local/lib/mysql/ + +make install + +And then inside of MySQL: + +mysql> INSTALL PLUGIN federatedx SONAME 'libfederatedx_engine.so'; + +mysql> CREATE TABLE `d` (`a` varchar(125), b text, primary key(a)) ENGINE=FEDERATEDX CONNECTION="mysql://root@host/schema/table" + +or + +mysql> CREATE TABLE `d` (`a` varchar(125), b text, primary key(a)) ENGINE=FEDERATEDX CONNECTION="server" CHARSET=latin1; + +You will probably need to edit the Makefile.am in the src/ tree if you want +to build on anything other then Linux (and the Makefile assumes that the +server was not compiled for debug). The reason for the two possible +configure lines is that libdir is dependent on where MySQL was installed. If +you run the "INSTALL PLUGIN ..." and you get a file not found, check that +your configured this directory correctly. + +For Solaris you can enable DTrace probes by adding to configure +--enable-dtrace + diff --git a/storage/federatedx/README.windows b/storage/federatedx/README.windows new file mode 100644 index 00000000..74de15c6 --- /dev/null +++ b/storage/federatedx/README.windows @@ -0,0 +1,23 @@ +The following files are changed in order to build a new engine on Windows: + +- Update win\configure.js with +case "WITH_FEDERATEDX_STORAGE_ENGINE": +to make sure it will pass WITH_FEDERATEDX_STORAGE_ENGINE in. + +- Update CMakeFiles.txt under mysql root: + IF(WITH_FEDERATEDX_STORAGE_ENGINE) + ADD_DEFINITIONS(-D WITH_FEDERATEDX_STORAGE_ENGINE) + SET (mysql_plugin_defs + "${mysql_plugin_defs},builtin_skeleton_plugin") + ENDIF(WITH_FEDERATEDX_STORAGE_ENGINE) + + and, + + IF(WITH_FEDERATEDX_STORAGE_ENGINE) + ADD_SUBDIRECTORY(storage/skeleton/src) + ENDIF(WITH_FEDERATEDX_STORAGE_ENGINE) + + - Update CMakeFiles.txt under sql: + IF(WITH_FEDERATEDX_STORAGE_ENGINE) + TARGET_LINK_LIBRARIES(mysqld skeleton) + ENDIF(WITH_FEDERATEDX_STORAGE_ENGINE) diff --git a/storage/federatedx/federatedx_io.cc b/storage/federatedx/federatedx_io.cc new file mode 100644 index 00000000..5baec617 --- /dev/null +++ b/storage/federatedx/federatedx_io.cc @@ -0,0 +1,101 @@ +/* +Copyright (c) 2007, Antony T Curtis +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Neither the name of FederatedX nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + + +/*#define MYSQL_SERVER 1*/ +#include <my_global.h> +#include "sql_priv.h" + +#include "ha_federatedx.h" + +#include "m_string.h" + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +typedef federatedx_io *(*instantiate_io_type)(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server); +struct io_schemes_st +{ + const char *scheme; + instantiate_io_type instantiate; +}; + + +static const io_schemes_st federated_io_schemes[] = +{ + { "mysql", &instantiate_io_mysql }, + { "null", instantiate_io_null } /* must be last element */ +}; + +federatedx_io::federatedx_io(FEDERATEDX_SERVER *aserver) + : server(aserver), owner_ptr(0), txn_next(0), idle_next(0), + active(FALSE), busy(FALSE), readonly(TRUE) +{ + DBUG_ENTER("federatedx_io::federatedx_io"); + DBUG_ASSERT(server); + + mysql_mutex_assert_owner(&server->mutex); + server->io_count++; + + DBUG_VOID_RETURN; +} + + +federatedx_io::~federatedx_io() +{ + DBUG_ENTER("federatedx_io::~federatedx_io"); + + server->io_count--; + + DBUG_VOID_RETURN; +} + + +bool federatedx_io::handles_scheme(const char *scheme) +{ + const io_schemes_st *ptr = federated_io_schemes; + const io_schemes_st *end = ptr + array_elements(federated_io_schemes); + while (ptr != end && strcasecmp(scheme, ptr->scheme)) + ++ptr; + return ptr != end; +} + + +federatedx_io *federatedx_io::construct(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server) +{ + const io_schemes_st *ptr = federated_io_schemes; + const io_schemes_st *end = ptr + (array_elements(federated_io_schemes) - 1); + while (ptr != end && strcasecmp(server->scheme, ptr->scheme)) + ++ptr; + return ptr->instantiate(server_root, server); +} + + diff --git a/storage/federatedx/federatedx_io_mysql.cc b/storage/federatedx/federatedx_io_mysql.cc new file mode 100644 index 00000000..fc32146b --- /dev/null +++ b/storage/federatedx/federatedx_io_mysql.cc @@ -0,0 +1,659 @@ +/* +Copyright (c) 2007, Antony T Curtis +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Neither the name of FederatedX nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + + +#define MYSQL_SERVER 1 +#include <my_global.h> +#include "sql_priv.h" +#include <mysqld_error.h> +#include <mysql.h> + +#include "ha_federatedx.h" + +#include "m_string.h" +#include "mysqld_error.h" +#include "sql_servers.h" + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + + +#define SAVEPOINT_REALIZED 1 +#define SAVEPOINT_RESTRICT 2 +#define SAVEPOINT_EMITTED 4 + + +typedef struct federatedx_savepoint +{ + ulong level; + uint flags; +} SAVEPT; + +struct mysql_position +{ + MYSQL_RES* result; + MYSQL_ROW_OFFSET offset; +}; + + +class federatedx_io_mysql :public federatedx_io +{ + MYSQL mysql; /* MySQL connection */ + DYNAMIC_ARRAY savepoints; + bool requested_autocommit; + bool actual_autocommit; + + int actual_query(const char *buffer, size_t length); + bool test_all_restrict() const; +public: + federatedx_io_mysql(FEDERATEDX_SERVER *); + ~federatedx_io_mysql(); + + int simple_query(const char *fmt, ...); + int query(const char *buffer, size_t length); + virtual FEDERATEDX_IO_RESULT *store_result(); + + virtual size_t max_query_size() const; + + virtual my_ulonglong affected_rows() const; + virtual my_ulonglong last_insert_id() const; + + virtual int error_code(); + virtual const char *error_str(); + + void reset(); + int commit(); + int rollback(); + + int savepoint_set(ulong sp); + ulong savepoint_release(ulong sp); + ulong savepoint_rollback(ulong sp); + void savepoint_restrict(ulong sp); + + ulong last_savepoint() const; + ulong actual_savepoint() const; + bool is_autocommit() const; + + bool table_metadata(ha_statistics *stats, const char *table_name, + uint table_name_length, uint flag); + + /* resultset operations */ + + virtual void free_result(FEDERATEDX_IO_RESULT *io_result); + virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result); + virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result); + virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result, + FEDERATEDX_IO_ROWS **current= NULL); + virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result); + virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, + unsigned int column); + virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, + unsigned int column) const; + + virtual size_t get_ref_length() const; + virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref, FEDERATEDX_IO_ROWS *current); + virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref); + virtual void set_thd(void *thd); +}; + + +federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server) +{ + return new (server_root) federatedx_io_mysql(server); +} + + +federatedx_io_mysql::federatedx_io_mysql(FEDERATEDX_SERVER *aserver) + : federatedx_io(aserver), + requested_autocommit(TRUE), actual_autocommit(TRUE) +{ + DBUG_ENTER("federatedx_io_mysql::federatedx_io_mysql"); + + bzero(&mysql, sizeof(MYSQL)); + bzero(&savepoints, sizeof(DYNAMIC_ARRAY)); + + my_init_dynamic_array(PSI_INSTRUMENT_ME, &savepoints, sizeof(SAVEPT), 16, 16, MYF(0)); + + DBUG_VOID_RETURN; +} + + +federatedx_io_mysql::~federatedx_io_mysql() +{ + DBUG_ENTER("federatedx_io_mysql::~federatedx_io_mysql"); + + mysql_close(&mysql); + delete_dynamic(&savepoints); + + DBUG_VOID_RETURN; +} + + +void federatedx_io_mysql::reset() +{ + reset_dynamic(&savepoints); + set_active(FALSE); + + requested_autocommit= TRUE; + mysql.reconnect= 1; +} + + +int federatedx_io_mysql::commit() +{ + int error= 0; + DBUG_ENTER("federatedx_io_mysql::commit"); + + if (!actual_autocommit && (error= actual_query("COMMIT", 6))) + rollback(); + + reset(); + + DBUG_RETURN(error); +} + +int federatedx_io_mysql::rollback() +{ + int error= 0; + DBUG_ENTER("federatedx_io_mysql::rollback"); + + if (!actual_autocommit) + error= actual_query("ROLLBACK", 8); + else + error= ER_WARNING_NOT_COMPLETE_ROLLBACK; + + reset(); + + DBUG_RETURN(error); +} + + +ulong federatedx_io_mysql::last_savepoint() const +{ + SAVEPT *savept= NULL; + DBUG_ENTER("federatedx_io_mysql::last_savepoint"); + + if (savepoints.elements) + savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *); + + DBUG_RETURN(savept ? savept->level : 0); +} + + +ulong federatedx_io_mysql::actual_savepoint() const +{ + SAVEPT *savept= NULL; + size_t index= savepoints.elements; + DBUG_ENTER("federatedx_io_mysql::last_savepoint"); + + while (index) + { + savept= dynamic_element(&savepoints, --index, SAVEPT *); + if (savept->flags & SAVEPOINT_REALIZED) + break; + savept= NULL; + } + + DBUG_RETURN(savept ? savept->level : 0); +} + +bool federatedx_io_mysql::is_autocommit() const +{ + return actual_autocommit; +} + + +int federatedx_io_mysql::savepoint_set(ulong sp) +{ + int error; + SAVEPT savept; + DBUG_ENTER("federatedx_io_mysql::savepoint_set"); + DBUG_PRINT("info",("savepoint=%lu", sp)); + DBUG_ASSERT(sp > last_savepoint()); + + savept.level= sp; + savept.flags= 0; + + if ((error= insert_dynamic(&savepoints, (uchar*) &savept) ? -1 : 0)) + goto err; + + set_active(TRUE); + mysql.reconnect= 0; + requested_autocommit= FALSE; + +err: + DBUG_RETURN(error); +} + + +ulong federatedx_io_mysql::savepoint_release(ulong sp) +{ + SAVEPT *savept, *last= NULL; + DBUG_ENTER("federatedx_io_mysql::savepoint_release"); + DBUG_PRINT("info",("savepoint=%lu", sp)); + + while (savepoints.elements) + { + savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *); + if (savept->level < sp) + break; + if ((savept->flags & (SAVEPOINT_REALIZED | SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED) + last= savept; + savepoints.elements--; + } + + if (last) + { + char buffer[STRING_BUFFER_USUAL_SIZE]; + size_t length= my_snprintf(buffer, sizeof(buffer), + "RELEASE SAVEPOINT save%lu", last->level); + actual_query(buffer, length); + } + + DBUG_RETURN(last_savepoint()); +} + + +ulong federatedx_io_mysql::savepoint_rollback(ulong sp) +{ + SAVEPT *savept; + size_t index; + DBUG_ENTER("federatedx_io_mysql::savepoint_release"); + DBUG_PRINT("info",("savepoint=%lu", sp)); + + while (savepoints.elements) + { + savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *); + if (savept->level <= sp) + break; + savepoints.elements--; + } + + for (index= savepoints.elements, savept= NULL; index;) + { + savept= dynamic_element(&savepoints, --index, SAVEPT *); + if (savept->flags & SAVEPOINT_REALIZED) + break; + savept= NULL; + } + + if (savept && !(savept->flags & SAVEPOINT_RESTRICT)) + { + char buffer[STRING_BUFFER_USUAL_SIZE]; + size_t length= my_snprintf(buffer, sizeof(buffer), + "ROLLBACK TO SAVEPOINT save%lu", savept->level); + actual_query(buffer, length); + } + + DBUG_RETURN(last_savepoint()); +} + + +void federatedx_io_mysql::savepoint_restrict(ulong sp) +{ + SAVEPT *savept; + size_t index= savepoints.elements; + DBUG_ENTER("federatedx_io_mysql::savepoint_restrict"); + + while (index) + { + savept= dynamic_element(&savepoints, --index, SAVEPT *); + if (savept->level > sp) + continue; + if (savept->level < sp) + break; + savept->flags|= SAVEPOINT_RESTRICT; + break; + } + + DBUG_VOID_RETURN; +} + + +int federatedx_io_mysql::simple_query(const char *fmt, ...) +{ + char buffer[STRING_BUFFER_USUAL_SIZE]; + size_t length; + int error; + va_list arg; + DBUG_ENTER("federatedx_io_mysql::simple_query"); + + va_start(arg, fmt); + length= my_vsnprintf(buffer, sizeof(buffer), fmt, arg); + va_end(arg); + + error= query(buffer, length); + + DBUG_RETURN(error); +} + + +bool federatedx_io_mysql::test_all_restrict() const +{ + bool result= FALSE; + SAVEPT *savept; + size_t index= savepoints.elements; + DBUG_ENTER("federatedx_io_mysql::test_all_restrict"); + + while (index) + { + savept= dynamic_element(&savepoints, --index, SAVEPT *); + if ((savept->flags & (SAVEPOINT_REALIZED | + SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED || + (savept->flags & SAVEPOINT_EMITTED)) + DBUG_RETURN(FALSE); + if (savept->flags & SAVEPOINT_RESTRICT) + result= TRUE; + } + + DBUG_RETURN(result); +} + + +int federatedx_io_mysql::query(const char *buffer, size_t length) +{ + int error; + bool wants_autocommit= requested_autocommit | is_readonly(); + DBUG_ENTER("federatedx_io_mysql::query"); + + if (!wants_autocommit && test_all_restrict()) + wants_autocommit= TRUE; + + if (wants_autocommit != actual_autocommit) + { + if ((error= actual_query(wants_autocommit ? "SET AUTOCOMMIT=1" + : "SET AUTOCOMMIT=0", 16))) + DBUG_RETURN(error); + mysql.reconnect= wants_autocommit ? 1 : 0; + actual_autocommit= wants_autocommit; + } + + if (!actual_autocommit && last_savepoint() != actual_savepoint()) + { + SAVEPT *savept= dynamic_element(&savepoints, savepoints.elements - 1, + SAVEPT *); + if (!(savept->flags & SAVEPOINT_RESTRICT)) + { + char buf[STRING_BUFFER_USUAL_SIZE]; + size_t len= my_snprintf(buf, sizeof(buf), + "SAVEPOINT save%lu", savept->level); + if ((error= actual_query(buf, len))) + DBUG_RETURN(error); + set_active(TRUE); + savept->flags|= SAVEPOINT_EMITTED; + } + savept->flags|= SAVEPOINT_REALIZED; + } + + if (!(error= actual_query(buffer, length))) + set_active(is_active() || !actual_autocommit); + + DBUG_RETURN(error); +} + + +int federatedx_io_mysql::actual_query(const char *buffer, size_t length) +{ + int error; + DBUG_ENTER("federatedx_io_mysql::actual_query"); + + if (!mysql.net.vio) + { + my_bool my_true= 1; + + if (!(mysql_init(&mysql))) + DBUG_RETURN(-1); + + /* + BUG# 17044 Federated Storage Engine is not UTF8 clean + Add set names to whatever charset the table is at open + of table + */ + /* this sets the csname like 'set names utf8' */ + mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, get_charsetname()); + mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); + + if (!mysql_real_connect(&mysql, + get_hostname(), + get_username(), + get_password(), + get_database(), + get_port(), + get_socket(), 0)) + DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); + mysql.reconnect= 1; + } + + if (!(error= mysql_real_query(&mysql, STRING_WITH_LEN("set time_zone='+00:00'")))) + error= mysql_real_query(&mysql, buffer, (ulong)length); + + DBUG_RETURN(error); +} + +size_t federatedx_io_mysql::max_query_size() const +{ + return mysql.net.max_packet_size; +} + + +my_ulonglong federatedx_io_mysql::affected_rows() const +{ + return mysql.affected_rows; +} + + +my_ulonglong federatedx_io_mysql::last_insert_id() const +{ + return mysql.insert_id; +} + + +int federatedx_io_mysql::error_code() +{ + return mysql_errno(&mysql); +} + + +const char *federatedx_io_mysql::error_str() +{ + return mysql_error(&mysql); +} + +FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result() +{ + FEDERATEDX_IO_RESULT *result; + DBUG_ENTER("federatedx_io_mysql::store_result"); + + result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql); + + DBUG_RETURN(result); +} + + +void federatedx_io_mysql::free_result(FEDERATEDX_IO_RESULT *io_result) +{ + mysql_free_result((MYSQL_RES *) io_result); +} + + +unsigned int federatedx_io_mysql::get_num_fields(FEDERATEDX_IO_RESULT *io_result) +{ + return mysql_num_fields((MYSQL_RES *) io_result); +} + + +my_ulonglong federatedx_io_mysql::get_num_rows(FEDERATEDX_IO_RESULT *io_result) +{ + return mysql_num_rows((MYSQL_RES *) io_result); +} + + +FEDERATEDX_IO_ROW *federatedx_io_mysql::fetch_row(FEDERATEDX_IO_RESULT *io_result, + FEDERATEDX_IO_ROWS **current) +{ + MYSQL_RES *result= (MYSQL_RES*)io_result; + if (current) + *current= (FEDERATEDX_IO_ROWS *) result->data_cursor; + return (FEDERATEDX_IO_ROW *) mysql_fetch_row(result); +} + + +ulong *federatedx_io_mysql::fetch_lengths(FEDERATEDX_IO_RESULT *io_result) +{ + return mysql_fetch_lengths((MYSQL_RES *) io_result); +} + + +const char *federatedx_io_mysql::get_column_data(FEDERATEDX_IO_ROW *row, + unsigned int column) +{ + return ((MYSQL_ROW)row)[column]; +} + + +bool federatedx_io_mysql::is_column_null(const FEDERATEDX_IO_ROW *row, + unsigned int column) const +{ + return !((MYSQL_ROW)row)[column]; +} + +bool federatedx_io_mysql::table_metadata(ha_statistics *stats, + const char *table_name, + uint table_name_length, uint flag) +{ + char status_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; + FEDERATEDX_IO_RESULT *result= 0; + FEDERATEDX_IO_ROW *row; + String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin); + int error; + + status_query_string.length(0); + status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE ")); + append_ident(&status_query_string, table_name, + table_name_length, value_quote_char); + + if (query(status_query_string.ptr(), status_query_string.length())) + goto error; + + status_query_string.length(0); + + result= store_result(); + + /* + We're going to use fields num. 4, 12 and 13 of the resultset, + so make sure we have these fields. + */ + if (!result || (get_num_fields(result) < 14)) + goto error; + + if (!get_num_rows(result)) + goto error; + + if (!(row= fetch_row(result))) + goto error; + + /* + deleted is set in ha_federatedx::info + */ + /* + need to figure out what this means as far as federatedx is concerned, + since we don't have a "file" + + data_file_length = ? + index_file_length = ? + delete_length = ? + */ + if (!is_column_null(row, 4)) + stats->records= (ha_rows) my_strtoll10(get_column_data(row, 4), + (char**) 0, &error); + if (!is_column_null(row, 5)) + stats->mean_rec_length= (ulong) my_strtoll10(get_column_data(row, 5), + (char**) 0, &error); + + stats->data_file_length= stats->records * stats->mean_rec_length; + + if (!is_column_null(row, 12)) + stats->update_time= (time_t) my_strtoll10(get_column_data(row, 12), + (char**) 0, &error); + if (!is_column_null(row, 13)) + stats->check_time= (time_t) my_strtoll10(get_column_data(row, 13), + (char**) 0, &error); + + free_result(result); + return 0; + +error: + if (!mysql_errno(&mysql)) + { + mysql.net.last_errno= ER_NO_SUCH_TABLE; + strmake_buf(mysql.net.last_error, "Remote table does not exist"); + } + free_result(result); + return 1; +} + + + +size_t federatedx_io_mysql::get_ref_length() const +{ + return sizeof(mysql_position); +} + + +void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref, FEDERATEDX_IO_ROWS *current) +{ + mysql_position& pos= *reinterpret_cast<mysql_position*>(ref); + pos.result= (MYSQL_RES *) io_result; + pos.offset= (MYSQL_ROW_OFFSET) current; +} + +int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref) +{ + const mysql_position& pos= *reinterpret_cast<const mysql_position*>(ref); + + if (!pos.result || !pos.offset) + return HA_ERR_END_OF_FILE; + + pos.result->current_row= 0; + pos.result->data_cursor= pos.offset; + *io_result= (FEDERATEDX_IO_RESULT*) pos.result; + + return 0; +} + +void federatedx_io_mysql::set_thd(void *thd) +{ + mysql.net.thd= thd; +} diff --git a/storage/federatedx/federatedx_io_null.cc b/storage/federatedx/federatedx_io_null.cc new file mode 100644 index 00000000..8a2394f2 --- /dev/null +++ b/storage/federatedx/federatedx_io_null.cc @@ -0,0 +1,299 @@ +/* +Copyright (c) 2007, Antony T Curtis +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Neither the name of FederatedX nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + + +/*#define MYSQL_SERVER 1*/ +#include <my_global.h> +#include "sql_priv.h" + +#include "ha_federatedx.h" + +#include "m_string.h" + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + + +#define SAVEPOINT_REALIZED 1 +#define SAVEPOINT_RESTRICT 2 +#define SAVEPOINT_EMITTED 4 + + +typedef struct federatedx_savepoint +{ + ulong level; + uint flags; +} SAVEPT; + + +class federatedx_io_null :public federatedx_io +{ +public: + federatedx_io_null(FEDERATEDX_SERVER *); + ~federatedx_io_null(); + + int query(const char *buffer, size_t length); + virtual FEDERATEDX_IO_RESULT *store_result(); + + virtual size_t max_query_size() const; + + virtual my_ulonglong affected_rows() const; + virtual my_ulonglong last_insert_id() const; + + virtual int error_code(); + virtual const char *error_str(); + + void reset(); + int commit(); + int rollback(); + + int savepoint_set(ulong sp); + ulong savepoint_release(ulong sp); + ulong savepoint_rollback(ulong sp); + void savepoint_restrict(ulong sp); + + ulong last_savepoint() const; + ulong actual_savepoint() const; + bool is_autocommit() const; + + bool table_metadata(ha_statistics *stats, const char *table_name, + uint table_name_length, uint flag); + + /* resultset operations */ + + virtual void free_result(FEDERATEDX_IO_RESULT *io_result); + virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result); + virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result); + virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result, + FEDERATEDX_IO_ROWS **current= NULL); + virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result); + virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, + unsigned int column); + virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, + unsigned int column) const; + virtual size_t get_ref_length() const; + virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref, FEDERATEDX_IO_ROWS *current); + virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref); +}; + + +federatedx_io *instantiate_io_null(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server) +{ + return new (server_root) federatedx_io_null(server); +} + + +federatedx_io_null::federatedx_io_null(FEDERATEDX_SERVER *aserver) + : federatedx_io(aserver) +{ +} + + +federatedx_io_null::~federatedx_io_null() = default; + + +void federatedx_io_null::reset() +{ +} + + +int federatedx_io_null::commit() +{ + return 0; +} + +int federatedx_io_null::rollback() +{ + return 0; +} + + +ulong federatedx_io_null::last_savepoint() const +{ + return 0; +} + + +ulong federatedx_io_null::actual_savepoint() const +{ + return 0; +} + +bool federatedx_io_null::is_autocommit() const +{ + return 0; +} + + +int federatedx_io_null::savepoint_set(ulong sp) +{ + return 0; +} + + +ulong federatedx_io_null::savepoint_release(ulong sp) +{ + return 0; +} + + +ulong federatedx_io_null::savepoint_rollback(ulong sp) +{ + return 0; +} + + +void federatedx_io_null::savepoint_restrict(ulong sp) +{ +} + + +int federatedx_io_null::query(const char *buffer, size_t length) +{ + return 0; +} + + +size_t federatedx_io_null::max_query_size() const +{ + return INT_MAX; +} + + +my_ulonglong federatedx_io_null::affected_rows() const +{ + return 0; +} + + +my_ulonglong federatedx_io_null::last_insert_id() const +{ + return 0; +} + + +int federatedx_io_null::error_code() +{ + return 0; +} + + +const char *federatedx_io_null::error_str() +{ + return ""; +} + + +FEDERATEDX_IO_RESULT *federatedx_io_null::store_result() +{ + FEDERATEDX_IO_RESULT *result; + DBUG_ENTER("federatedx_io_null::store_result"); + + result= NULL; + + DBUG_RETURN(result); +} + + +void federatedx_io_null::free_result(FEDERATEDX_IO_RESULT *) +{ +} + + +unsigned int federatedx_io_null::get_num_fields(FEDERATEDX_IO_RESULT *) +{ + return 0; +} + + +my_ulonglong federatedx_io_null::get_num_rows(FEDERATEDX_IO_RESULT *) +{ + return 0; +} + + +FEDERATEDX_IO_ROW *federatedx_io_null::fetch_row(FEDERATEDX_IO_RESULT *, + FEDERATEDX_IO_ROWS **current) +{ + return NULL; +} + + +ulong *federatedx_io_null::fetch_lengths(FEDERATEDX_IO_RESULT *) +{ + return NULL; +} + + +const char *federatedx_io_null::get_column_data(FEDERATEDX_IO_ROW *, + unsigned int) +{ + return ""; +} + + +bool federatedx_io_null::is_column_null(const FEDERATEDX_IO_ROW *, + unsigned int) const +{ + return true; +} + +bool federatedx_io_null::table_metadata(ha_statistics *stats, + const char *table_name, + uint table_name_length, uint flag) +{ + stats->records= (ha_rows) 0; + stats->mean_rec_length= (ulong) 0; + stats->data_file_length= 0; + + stats->update_time= (time_t) 0; + stats->check_time= (time_t) 0; + + return 0; +} + +size_t federatedx_io_null::get_ref_length() const +{ + return sizeof(int); +} + + +void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref, FEDERATEDX_IO_ROWS *current) +{ +} + +int federatedx_io_null::seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref) +{ + return 0; +} diff --git a/storage/federatedx/federatedx_probes.h b/storage/federatedx/federatedx_probes.h new file mode 100644 index 00000000..62041951 --- /dev/null +++ b/storage/federatedx/federatedx_probes.h @@ -0,0 +1,45 @@ +/* + * Generated by dtrace(1M). + */ + +#ifndef _FEDERATED_PROBES_H +#define _FEDERATED_PROBES_H + + + +#ifdef __cplusplus +extern "C" { +#endif + +#if _DTRACE_VERSION + +#define FEDERATED_CLOSE() \ + __dtrace_federated___close() +#define FEDERATED_CLOSE_ENABLED() \ + __dtraceenabled_federated___close() +#define FEDERATED_OPEN() \ + __dtrace_federated___open() +#define FEDERATED_OPEN_ENABLED() \ + __dtraceenabled_federated___open() + + +extern void __dtrace_federated___close(void); +extern int __dtraceenabled_federated___close(void); +extern void __dtrace_federated___open(void); +extern int __dtraceenabled_federated___open(void); + +#else + +#define FEDERATED_CLOSE() +#define FEDERATED_CLOSE_ENABLED() (0) +#define FEDERATED_OPEN() +#define FEDERATED_OPEN_ENABLED() (0) + +#endif + + +#ifdef __cplusplus +} +#endif + +#endif /* _FEDERATED_PROBES_H */ diff --git a/storage/federatedx/federatedx_pushdown.cc b/storage/federatedx/federatedx_pushdown.cc new file mode 100644 index 00000000..e9a9791a --- /dev/null +++ b/storage/federatedx/federatedx_pushdown.cc @@ -0,0 +1,373 @@ +/* + Copyright (c) 2019, 2020, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +/* !!! For inclusion into ha_federatedx.cc */ + + +/* + This is a quick a dirty implemention of the derived_handler and select_handler + interfaces to be used to push select queries and the queries specifying + derived tables into FEDERATEDX engine. + The functions + create_federatedx_derived_handler and + create_federatedx_select_handler + that return the corresponding interfaces for pushdown capabilities do + not check a lot of things. In particular they do not check that the tables + of the pushed queries belong to the same foreign server. + + The implementation is provided purely for testing purposes. + The pushdown capabilities are enabled by turning on the plugin system + variable federated_pushdown: + set global federated_pushdown=1; +*/ + + +/* + Check if table and database names are equal on local and remote servers + + SYNOPSIS + local_and_remote_names_match() + tbl_share Pointer to current table TABLE_SHARE structure + fshare Pointer to current table FEDERATEDX_SHARE structure + + DESCRIPTION + FederatedX table on the local server may refer to a table having another + name on the remote server. The remote table may even reside in a different + database. For example: + + -- Remote server + CREATE TABLE t1 (id int(32)); + + -- Local server + CREATE TABLE t2 ENGINE="FEDERATEDX" + CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federatedx/t1"; + + It's not a problem while the federated_pushdown is disabled 'cause + the CONNECTION strings are being parsed for every table during + the execution, so the table names are translated from local to remote. + But in case of the federated_pushdown the whole query is pushed down + to the engine without any translation, so the remote server may try + to select data from a nonexistent table (for example, query + "SELECT * FROM t2" will try to retrieve data from nonexistent "t2"). + + This function checks whether there is a mismatch between local and remote + table/database names + + RETURN VALUE + false names are equal + true names are not equal + +*/ +bool local_and_remote_names_mismatch(const TABLE_SHARE *tbl_share, + const FEDERATEDX_SHARE *fshare) +{ + + if (lower_case_table_names) + { + if (strcasecmp(fshare->database, tbl_share->db.str) != 0) + return true; + } + else + { + if (strncmp(fshare->database, tbl_share->db.str, tbl_share->db.length) != 0) + return true; + } + + return my_strnncoll(system_charset_info, (uchar *) fshare->table_name, + strlen(fshare->table_name), + (uchar *) tbl_share->table_name.str, + tbl_share->table_name.length) != 0; +} + + +static derived_handler* +create_federatedx_derived_handler(THD* thd, TABLE_LIST *derived) +{ + if (!use_pushdown) + return 0; + + ha_federatedx_derived_handler* handler = NULL; + + SELECT_LEX_UNIT *unit= derived->derived; + + for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select()) + { + if (!(sl->join)) + return 0; + for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local) + { + if (!tbl->table) + return 0; + /* + We intentionally don't support partitioned federatedx tables here, so + use file->ht and not file->partition_ht(). + */ + if (tbl->table->file->ht != federatedx_hton) + return 0; + + const FEDERATEDX_SHARE *fshare= + ((ha_federatedx*)tbl->table->file)->get_federatedx_share(); + if (local_and_remote_names_mismatch(tbl->table->s, fshare)) + return 0; + } + } + + handler= new ha_federatedx_derived_handler(thd, derived); + + return handler; +} + + +/* + Implementation class of the derived_handler interface for FEDERATEDX: + class implementation +*/ + +ha_federatedx_derived_handler::ha_federatedx_derived_handler(THD *thd, + TABLE_LIST *dt) + : derived_handler(thd, federatedx_hton), + share(NULL), txn(NULL), iop(NULL), stored_result(NULL) +{ + derived= dt; +} + +ha_federatedx_derived_handler::~ha_federatedx_derived_handler() = default; + +int ha_federatedx_derived_handler::init_scan() +{ + THD *thd; + int rc= 0; + + DBUG_ENTER("ha_federatedx_derived_handler::init_scan"); + + TABLE *table= derived->get_first_table()->table; + ha_federatedx *h= (ha_federatedx *) table->file; + iop= &h->io; + share= get_share(table->s->table_name.str, table); + thd= table->in_use; + txn= h->get_txn(thd); + if ((rc= txn->acquire(share, thd, TRUE, iop))) + DBUG_RETURN(rc); + + if ((*iop)->query(derived->derived_spec.str, derived->derived_spec.length)) + goto err; + + stored_result= (*iop)->store_result(); + if (!stored_result) + goto err; + + DBUG_RETURN(0); + +err: + DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM); +} + +int ha_federatedx_derived_handler::next_row() +{ + int rc; + FEDERATEDX_IO_ROW *row; + ulong *lengths; + Field **field; + int column= 0; + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + DBUG_ENTER("ha_federatedx_derived_handler::next_row"); + + if ((rc= txn->acquire(share, table->in_use, TRUE, iop))) + DBUG_RETURN(rc); + + if (!(row= (*iop)->fetch_row(stored_result))) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + /* Convert row to internal format */ + table->in_use->variables.time_zone= UTC; + lengths= (*iop)->fetch_lengths(stored_result); + + for (field= table->field; *field; field++, column++) + { + if ((*iop)->is_column_null(row, column)) + (*field)->set_null(); + else + { + (*field)->set_notnull(); + (*field)->store((*iop)->get_column_data(row, column), + lengths[column], &my_charset_bin); + } + } + table->in_use->variables.time_zone= saved_time_zone; + + DBUG_RETURN(rc); +} + +int ha_federatedx_derived_handler::end_scan() +{ + DBUG_ENTER("ha_federatedx_derived_handler::end_scan"); + + (*iop)->free_result(stored_result); + + free_share(txn, share); + + DBUG_RETURN(0); +} + +void ha_federatedx_derived_handler::print_error(int, unsigned long) +{ +} + + +static select_handler* +create_federatedx_select_handler(THD* thd, SELECT_LEX *sel) +{ + if (!use_pushdown) + return 0; + + ha_federatedx_select_handler* handler = NULL; + + for (TABLE_LIST *tbl= thd->lex->query_tables; tbl; tbl= tbl->next_global) + { + if (!tbl->table) + return 0; + /* + We intentionally don't support partitioned federatedx tables here, so + use file->ht and not file->partition_ht(). + */ + if (tbl->table->file->ht != federatedx_hton) + return 0; + + const FEDERATEDX_SHARE *fshare= + ((ha_federatedx*)tbl->table->file)->get_federatedx_share(); + + if (local_and_remote_names_mismatch(tbl->table->s, fshare)) + return 0; + } + + /* + Currently, ha_federatedx_select_handler::init_scan just takes the + thd->query and sends it to the backend. + This obviously won't work if the SELECT uses an "INTO @var" or + "INTO OUTFILE". It is also unlikely to work if the select has some + other kind of side effect. + */ + if (sel->uncacheable & UNCACHEABLE_SIDEEFFECT) + return NULL; + + handler= new ha_federatedx_select_handler(thd, sel); + + return handler; +} + +/* + Implementation class of the select_handler interface for FEDERATEDX: + class implementation +*/ + +ha_federatedx_select_handler::ha_federatedx_select_handler(THD *thd, + SELECT_LEX *sel) + : select_handler(thd, federatedx_hton), + share(NULL), txn(NULL), iop(NULL), stored_result(NULL) +{ + select= sel; +} + +ha_federatedx_select_handler::~ha_federatedx_select_handler() = default; + +int ha_federatedx_select_handler::init_scan() +{ + int rc= 0; + + DBUG_ENTER("ha_federatedx_select_handler::init_scan"); + + TABLE *table= 0; + for (TABLE_LIST *tbl= thd->lex->query_tables; tbl; tbl= tbl->next_global) + { + if (!tbl->table) + continue; + table= tbl->table; + break; + } + ha_federatedx *h= (ha_federatedx *) table->file; + iop= &h->io; + share= get_share(table->s->table_name.str, table); + txn= h->get_txn(thd); + if ((rc= txn->acquire(share, thd, TRUE, iop))) + DBUG_RETURN(rc); + + if ((*iop)->query(thd->query(), thd->query_length())) + goto err; + + stored_result= (*iop)->store_result(); + if (!stored_result) + goto err; + + DBUG_RETURN(0); + +err: + DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM); +} + +int ha_federatedx_select_handler::next_row() +{ + int rc= 0; + FEDERATEDX_IO_ROW *row; + ulong *lengths; + Field **field; + int column= 0; + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + DBUG_ENTER("ha_federatedx_select_handler::next_row"); + + if ((rc= txn->acquire(share, table->in_use, TRUE, iop))) + DBUG_RETURN(rc); + + if (!(row= (*iop)->fetch_row(stored_result))) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + /* Convert row to internal format */ + table->in_use->variables.time_zone= UTC; + lengths= (*iop)->fetch_lengths(stored_result); + + for (field= table->field; *field; field++, column++) + { + if ((*iop)->is_column_null(row, column)) + (*field)->set_null(); + else + { + (*field)->set_notnull(); + (*field)->store((*iop)->get_column_data(row, column), + lengths[column], &my_charset_bin); + } + } + table->in_use->variables.time_zone= saved_time_zone; + + DBUG_RETURN(rc); +} + +int ha_federatedx_select_handler::end_scan() +{ + DBUG_ENTER("ha_federatedx_derived_handler::end_scan"); + + free_tmp_table(thd, table); + table= 0; + + (*iop)->free_result(stored_result); + + free_share(txn, share); + + DBUG_RETURN(0); +} + +void ha_federatedx_select_handler::print_error(int error, myf error_flag) +{ + select_handler::print_error(error, error_flag); +} diff --git a/storage/federatedx/federatedx_pushdown.h b/storage/federatedx/federatedx_pushdown.h new file mode 100644 index 00000000..673abcfc --- /dev/null +++ b/storage/federatedx/federatedx_pushdown.h @@ -0,0 +1,63 @@ +/* + Copyright (c) 2019 MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "derived_handler.h" +#include "select_handler.h" + +/* + Implementation class of the derived_handler interface for FEDERATEDX: + class declaration +*/ + +class ha_federatedx_derived_handler: public derived_handler +{ +private: + FEDERATEDX_SHARE *share; + federatedx_txn *txn; + federatedx_io **iop; + FEDERATEDX_IO_RESULT *stored_result; + +public: + ha_federatedx_derived_handler(THD* thd_arg, TABLE_LIST *tbl); + ~ha_federatedx_derived_handler(); + int init_scan(); + int next_row(); + int end_scan(); + void print_error(int, unsigned long); +}; + + +/* + Implementation class of the select_handler interface for FEDERATEDX: + class declaration +*/ + +class ha_federatedx_select_handler: public select_handler +{ +private: + FEDERATEDX_SHARE *share; + federatedx_txn *txn; + federatedx_io **iop; + FEDERATEDX_IO_RESULT *stored_result; + +public: + ha_federatedx_select_handler(THD* thd_arg, SELECT_LEX *sel); + ~ha_federatedx_select_handler(); + int init_scan(); + int next_row(); + int end_scan(); + void print_error(int, unsigned long); +}; diff --git a/storage/federatedx/federatedx_txn.cc b/storage/federatedx/federatedx_txn.cc new file mode 100644 index 00000000..c434a008 --- /dev/null +++ b/storage/federatedx/federatedx_txn.cc @@ -0,0 +1,439 @@ +/* +Copyright (c) 2007, Antony T Curtis +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Neither the name of FederatedX nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +#define MYSQL_SERVER 1 +#include <my_global.h> +#include "sql_priv.h" + +#include "ha_federatedx.h" + +#include "m_string.h" +#include "table.h" +#include "sql_servers.h" + +federatedx_txn::federatedx_txn() + : txn_list(0), savepoint_level(0), savepoint_stmt(0), savepoint_next(0) +{ + DBUG_ENTER("federatedx_txn::federatedx_txn"); + DBUG_VOID_RETURN; +} + +federatedx_txn::~federatedx_txn() +{ + DBUG_ENTER("federatedx_txn::~federatedx_txn"); + DBUG_ASSERT(!txn_list); + DBUG_VOID_RETURN; +} + + +void federatedx_txn::close(FEDERATEDX_SERVER *server) +{ +#ifdef DBUG_TRACE + uint count= 0; +#endif + federatedx_io *io, **iop; + DBUG_ENTER("federatedx_txn::close"); + + DBUG_ASSERT(!server->use_count); + DBUG_PRINT("info",("use count: %u connections: %u", + server->use_count, server->io_count)); + + for (iop= &txn_list; (io= *iop);) + { + if (io->server != server) + iop= &io->txn_next; + else + { + *iop= io->txn_next; + io->txn_next= NULL; + io->busy= FALSE; + + io->idle_next= server->idle_list; + server->idle_list= io; + } + } + + while ((io= server->idle_list)) + { + server->idle_list= io->idle_next; + delete io; +#ifdef DBUG_TRACE + count++; +#endif + } + + DBUG_PRINT("info",("closed %u connections, txn_list: %s", count, + txn_list ? "active": "empty")); + DBUG_VOID_RETURN; +} + + +int federatedx_txn::acquire(FEDERATEDX_SHARE *share, void *thd, + bool readonly, federatedx_io **ioptr) +{ + federatedx_io *io; + FEDERATEDX_SERVER *server= share->s; + DBUG_ENTER("federatedx_txn::acquire"); + DBUG_ASSERT(ioptr && server); + + if (!(io= *ioptr)) + { + /* check to see if we have an available IO connection */ + for (io= txn_list; io; io= io->txn_next) + if (io->server == server) + break; + + if (!io) + { + /* check to see if there are any unowned IO connections */ + mysql_mutex_lock(&server->mutex); + if ((io= server->idle_list)) + { + server->idle_list= io->idle_next; + io->idle_next= NULL; + } + else + io= federatedx_io::construct(&server->mem_root, server); + + io->txn_next= txn_list; + txn_list= io; + + mysql_mutex_unlock(&server->mutex); + } + + if (io->busy) + *io->owner_ptr= NULL; + + io->busy= TRUE; + io->owner_ptr= ioptr; + io->set_thd(thd); + } + + DBUG_ASSERT(io->busy && io->server == server); + + io->readonly&= readonly; + + DBUG_RETURN((*ioptr= io) ? 0 : -1); +} + + +void federatedx_txn::release(federatedx_io **ioptr) +{ + federatedx_io *io; + DBUG_ENTER("federatedx_txn::release"); + DBUG_ASSERT(ioptr); + + if ((io= *ioptr)) + { + /* mark as available for reuse in this transaction */ + io->busy= FALSE; + *ioptr= NULL; + + DBUG_PRINT("info", ("active: %d autocommit: %d", + io->active, io->is_autocommit())); + + if (io->is_autocommit()) + { + io->set_thd(NULL); + io->active= FALSE; + } + } + + release_scan(); + + DBUG_VOID_RETURN; +} + + +void federatedx_txn::release_scan() +{ +#ifdef DBUG_TRACE + uint count= 0, returned= 0; +#endif + federatedx_io *io, **pio; + DBUG_ENTER("federatedx_txn::release_scan"); + + /* return any inactive and idle connections to the server */ + for (pio= &txn_list; (io= *pio);) + { + if (io->active || io->busy) + pio= &io->txn_next; + else + { + FEDERATEDX_SERVER *server= io->server; + + /* unlink from list of connections bound to the transaction */ + *pio= io->txn_next; + io->txn_next= NULL; + + /* reset some values */ + io->readonly= TRUE; + + mysql_mutex_lock(&server->mutex); + io->idle_next= server->idle_list; + server->idle_list= io; + mysql_mutex_unlock(&server->mutex); +#ifdef DBUG_TRACE + returned++; +#endif + } +#ifdef DBUG_TRACE + count++; +#endif + } + DBUG_PRINT("info",("returned %u of %u connections(s)", returned, count)); + + DBUG_VOID_RETURN; +} + + +bool federatedx_txn::txn_begin() +{ + ulong level= 0; + DBUG_ENTER("federatedx_txn::txn_begin"); + + if (savepoint_next == 0) + { + savepoint_next++; + savepoint_level= savepoint_stmt= 0; + sp_acquire(&level); + } + + DBUG_RETURN(level == 1); +} + + +int federatedx_txn::txn_commit() +{ + int error= 0; + federatedx_io *io; + DBUG_ENTER("federatedx_txn::txn_commit"); + + if (savepoint_next) + { + DBUG_ASSERT(savepoint_stmt != 1); + + for (io= txn_list; io; io= io->txn_next) + { + int rc= 0; + + if (io->active) + rc= io->commit(); + else + io->rollback(); + + if (io->active && rc) + error= -1; + + io->reset(); + } + + release_scan(); + + savepoint_next= savepoint_stmt= savepoint_level= 0; + } + + DBUG_RETURN(error); +} + + +int federatedx_txn::txn_rollback() +{ + int error= 0; + federatedx_io *io; + DBUG_ENTER("federatedx_txn::txn_commit"); + + if (savepoint_next) + { + DBUG_ASSERT(savepoint_stmt != 1); + + for (io= txn_list; io; io= io->txn_next) + { + int rc= io->rollback(); + + if (io->active && rc) + error= -1; + + io->reset(); + } + + release_scan(); + + savepoint_next= savepoint_stmt= savepoint_level= 0; + } + + DBUG_RETURN(error); +} + + +bool federatedx_txn::sp_acquire(ulong *sp) +{ + bool rc= FALSE; + federatedx_io *io; + DBUG_ENTER("federatedx_txn::sp_acquire"); + DBUG_ASSERT(sp && savepoint_next); + + *sp= savepoint_level= savepoint_next++; + + for (io= txn_list; io; io= io->txn_next) + { + if (io->readonly) + continue; + + io->savepoint_set(savepoint_level); + rc= TRUE; + } + + DBUG_RETURN(rc); +} + + +int federatedx_txn::sp_rollback(ulong *sp) +{ + ulong level, new_level= savepoint_level; + federatedx_io *io; + DBUG_ENTER("federatedx_txn::sp_rollback"); + DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level); + + for (io= txn_list; io; io= io->txn_next) + { + if (io->readonly) + continue; + + if ((level= io->savepoint_rollback(*sp)) < new_level) + new_level= level; + } + + savepoint_level= new_level; + + DBUG_RETURN(0); +} + + +int federatedx_txn::sp_release(ulong *sp) +{ + ulong level, new_level= savepoint_level; + federatedx_io *io; + DBUG_ENTER("federatedx_txn::sp_release"); + DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level); + + for (io= txn_list; io; io= io->txn_next) + { + if (io->readonly) + continue; + + if ((level= io->savepoint_release(*sp)) < new_level) + new_level= level; + } + + savepoint_level= new_level; + *sp= 0; + + DBUG_RETURN(0); +} + + +bool federatedx_txn::stmt_begin() +{ + bool result= FALSE; + DBUG_ENTER("federatedx_txn::stmt_begin"); + + if (!savepoint_stmt) + { + if (!savepoint_next) + { + savepoint_next++; + savepoint_level= savepoint_stmt= 0; + } + result= sp_acquire(&savepoint_stmt); + } + + DBUG_RETURN(result); +} + + +int federatedx_txn::stmt_commit() +{ + int result= 0; + DBUG_ENTER("federatedx_txn::stmt_commit"); + + if (savepoint_stmt == 1) + { + savepoint_stmt= 0; + result= txn_commit(); + } + else + if (savepoint_stmt) + result= sp_release(&savepoint_stmt); + + DBUG_RETURN(result); +} + + +int federatedx_txn::stmt_rollback() +{ + int result= 0; + DBUG_ENTER("federated:txn::stmt_rollback"); + + if (savepoint_stmt == 1) + { + savepoint_stmt= 0; + result= txn_rollback(); + } + else + if (savepoint_stmt) + { + result= sp_rollback(&savepoint_stmt); + sp_release(&savepoint_stmt); + } + + DBUG_RETURN(result); +} + + +void federatedx_txn::stmt_autocommit() +{ + federatedx_io *io; + DBUG_ENTER("federatedx_txn::stmt_autocommit"); + + for (io= txn_list; savepoint_stmt && io; io= io->txn_next) + { + if (io->readonly) + continue; + + io->savepoint_restrict(savepoint_stmt); + } + + DBUG_VOID_RETURN; +} + + diff --git a/storage/federatedx/ha_federatedx.cc b/storage/federatedx/ha_federatedx.cc new file mode 100644 index 00000000..598886b8 --- /dev/null +++ b/storage/federatedx/ha_federatedx.cc @@ -0,0 +1,3730 @@ +/* +Copyright (c) 2008-2009, Patrick Galbraith & Antony Curtis +Copyright (c) 2020, 2022, MariaDB Corporation. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Neither the name of Patrick Galbraith nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ +/* + + FederatedX Pluggable Storage Engine + + ha_federatedx.cc - FederatedX Pluggable Storage Engine + Patrick Galbraith, 2008 + + This is a handler which uses a foreign database as the data file, as + opposed to a handler like MyISAM, which uses .MYD files locally. + + How this handler works + ---------------------------------- + Normal database files are local and as such: You create a table called + 'users', a file such as 'users.MYD' is created. A handler reads, inserts, + deletes, updates data in this file. The data is stored in particular format, + so to read, that data has to be parsed into fields, to write, fields have to + be stored in this format to write to this data file. + + With FederatedX storage engine, there will be no local files + for each table's data (such as .MYD). A foreign database will store + the data that would normally be in this file. This will necessitate + the use of MySQL client API to read, delete, update, insert this + data. The data will have to be retrieve via an SQL call "SELECT * + FROM users". Then, to read this data, it will have to be retrieved + via mysql_fetch_row one row at a time, then converted from the + column in this select into the format that the handler expects. + + The create table will simply create the .frm file, and within the + "CREATE TABLE" SQL, there SHALL be any of the following : + + connection=scheme://username:password@hostname:port/database/tablename + connection=scheme://username@hostname/database/tablename + connection=scheme://username:password@hostname/database/tablename + connection=scheme://username:password@hostname/database/tablename + + - OR - + + As of 5.1 federatedx now allows you to use a non-url + format, taking advantage of mysql.servers: + + connection="connection_one" + connection="connection_one/table_foo" + + An example would be: + + connection=mysql://username:password@hostname:port/database/tablename + + or, if we had: + + create server 'server_one' foreign data wrapper 'mysql' options + (HOST '127.0.0.1', + DATABASE 'db1', + USER 'root', + PASSWORD '', + PORT 3306, + SOCKET '', + OWNER 'root'); + + CREATE TABLE federatedx.t1 ( + `id` int(20) NOT NULL, + `name` varchar(64) NOT NULL default '' + ) + ENGINE="FEDERATEDX" DEFAULT CHARSET=latin1 + CONNECTION='server_one'; + + So, this will have been the equivalent of + + CONNECTION="mysql://root@127.0.0.1:3306/db1/t1" + + Then, we can also change the server to point to a new schema: + + ALTER SERVER 'server_one' options(DATABASE 'db2'); + + All subsequent calls will now be against db2.t1! Guess what? You don't + have to perform an alter table! + + This connecton="connection string" is necessary for the handler to be + able to connect to the foreign server, either by URL, or by server + name. + + + The basic flow is this: + + SQL calls issues locally -> + mysql handler API (data in handler format) -> + mysql client API (data converted to SQL calls) -> + foreign database -> mysql client API -> + convert result sets (if any) to handler format -> + handler API -> results or rows affected to local + + What this handler does and doesn't support + ------------------------------------------ + * Tables MUST be created on the foreign server prior to any action on those + tables via the handler, first version. IMPORTANT: IF you MUST use the + federatedx storage engine type on the REMOTE end, MAKE SURE [ :) ] That + the table you connect to IS NOT a table pointing BACK to your ORIGNAL + table! You know and have heard the screaching of audio feedback? You + know putting two mirror in front of each other how the reflection + continues for eternity? Well, need I say more?! + * There will not be support for transactions. + * There is no way for the handler to know if the foreign database or table + has changed. The reason for this is that this database has to work like a + data file that would never be written to by anything other than the + database. The integrity of the data in the local table could be breached + if there was any change to the foreign database. + * Support for SELECT, INSERT, UPDATE , DELETE, indexes. + * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls. + * Prepared statements will not be used in the first implementation, it + remains to to be seen whether the limited subset of the client API for the + server supports this. + * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its + implementation. + * This will not work with the query cache. + + Method calls + + A two column table, with one record: + + (SELECT) + + "SELECT * FROM foo" + ha_federatedx::info + ha_federatedx::scan_time: + ha_federatedx::rnd_init: share->select_query SELECT * FROM foo + ha_federatedx::extra + + <for every row of data retrieved> + ha_federatedx::rnd_next + ha_federatedx::convert_row_to_internal_format + ha_federatedx::rnd_next + </for every row of data retrieved> + + ha_federatedx::rnd_end + ha_federatedx::extra + ha_federatedx::reset + + (INSERT) + + "INSERT INTO foo (id, ts) VALUES (2, now());" + + ha_federatedx::write_row + + ha_federatedx::reset + + (UPDATE) + + "UPDATE foo SET ts = now() WHERE id = 1;" + + ha_federatedx::index_init + ha_federatedx::index_read + ha_federatedx::index_read_idx + ha_federatedx::rnd_next + ha_federatedx::convert_row_to_internal_format + ha_federatedx::update_row + + ha_federatedx::extra + ha_federatedx::extra + ha_federatedx::extra + ha_federatedx::external_lock + ha_federatedx::reset + + + How do I use this handler? + -------------------------- + + <insert text about plugin storage engine> + + Next, to use this handler, it's very simple. You must + have two databases running, either both on the same host, or + on different hosts. + + One the server that will be connecting to the foreign + host (client), you create your table as such: + + CREATE TABLE test_table ( + id int(20) NOT NULL auto_increment, + name varchar(32) NOT NULL default '', + other int(20) NOT NULL default '0', + PRIMARY KEY (id), + KEY name (name), + KEY other_key (other)) + ENGINE="FEDERATEDX" + DEFAULT CHARSET=latin1 + CONNECTION='mysql://root@127.0.0.1:9306/federatedx/test_federatedx'; + + Notice the "COMMENT" and "ENGINE" field? This is where you + respectively set the engine type, "FEDERATEDX" and foreign + host information, this being the database your 'client' database + will connect to and use as the "data file". Obviously, the foreign + database is running on port 9306, so you want to start up your other + database so that it is indeed on port 9306, and your federatedx + database on a port other than that. In my setup, I use port 5554 + for federatedx, and port 5555 for the foreign database. + + Then, on the foreign database: + + CREATE TABLE test_table ( + id int(20) NOT NULL auto_increment, + name varchar(32) NOT NULL default '', + other int(20) NOT NULL default '0', + PRIMARY KEY (id), + KEY name (name), + KEY other_key (other)) + ENGINE="<NAME>" <-- whatever you want, or not specify + DEFAULT CHARSET=latin1 ; + + This table is exactly the same (and must be exactly the same), + except that it is not using the federatedx handler and does + not need the URL. + + + How to see the handler in action + -------------------------------- + + When developing this handler, I compiled the federatedx database with + debugging: + + ./configure --with-federatedx-storage-engine + --prefix=/home/mysql/mysql-build/federatedx/ --with-debug + + Once compiled, I did a 'make install' (not for the purpose of installing + the binary, but to install all the files the binary expects to see in the + diretory I specified in the build with --prefix, + "/home/mysql/mysql-build/federatedx". + + Then, I started the foreign server: + + /usr/local/mysql/bin/mysqld_safe + --user=mysql --log=/tmp/mysqld.5555.log -P 5555 + + Then, I went back to the directory containing the newly compiled mysqld, + <builddir>/sql/, started up gdb: + + gdb ./mysqld + + Then, withn the (gdb) prompt: + (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug-dbug + + Next, I open several windows for each: + + 1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed + 2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log + 3. A window with a client open to the federatedx server on port 5554 + 4. A window with a client open to the federatedx server on port 5555 + + I would create a table on the client to the foreign server on port + 5555, and then to the federatedx server on port 5554. At this point, + I would run whatever queries I wanted to on the federatedx server, + just always remembering that whatever changes I wanted to make on + the table, or if I created new tables, that I would have to do that + on the foreign server. + + Another thing to look for is 'show variables' to show you that you have + support for federatedx handler support: + + show variables like '%federat%' + + and: + + show storage engines; + + Both should display the federatedx storage handler. + + + Testing + ------- + + Testing for FederatedX as a pluggable storage engine for + now is a manual process that I intend to build a test + suite that works for all pluggable storage engines. + + How to test + + 1. cp fed.dat /tmp + (make sure you have access to "test". Use a user that has + super privileges for now) + 2. mysql -f -u root test < federated.test > federated.myresult 2>&1 + 3. diff federated.result federated.myresult (there _should_ be no differences) + + +*/ + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +#define MYSQL_SERVER 1 +#include <my_global.h> +#include <mysql/plugin.h> +#include <mysql.h> +#include "ha_federatedx.h" +#include "sql_servers.h" +#include "sql_analyse.h" // append_escaped() +#include "sql_show.h" // append_identifier() +#include "tztime.h" // my_tz_find() +#include "sql_select.h" + +#ifdef I_AM_PARANOID +#define MIN_PORT 1023 +#else +#define MIN_PORT 0 +#endif + +/* Variables for federatedx share methods */ +static HASH federatedx_open_tables; // To track open tables +static HASH federatedx_open_servers; // To track open servers +mysql_mutex_t federatedx_mutex; // To init the hash +const char ident_quote_char= '`'; // Character for quoting + // identifiers +const char value_quote_char= '\''; // Character for quoting + // literals +static const int bulk_padding= 64; // bytes "overhead" in packet + +/* Variables used when chopping off trailing characters */ +static const uint sizeof_trailing_comma= sizeof(", ") - 1; +static const uint sizeof_trailing_and= sizeof(" AND ") - 1; +static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1; + +static Time_zone *UTC= 0; + +/* Static declaration for handerton */ +static handler *federatedx_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root); + +/* FederatedX storage engine handlerton */ + +static handler *federatedx_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_federatedx(hton, table); +} + + +/* Function we use in the creation of our hash to get key */ + +static uchar * +federatedx_share_get_key(FEDERATEDX_SHARE *share, size_t *length, + my_bool not_used __attribute__ ((unused))) +{ + *length= share->share_key_length; + return (uchar*) share->share_key; +} + + +static uchar * +federatedx_server_get_key(FEDERATEDX_SERVER *server, size_t *length, + my_bool not_used __attribute__ ((unused))) +{ + *length= server->key_length; + return server->key; +} + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key fe_key_mutex_federatedx, fe_key_mutex_FEDERATEDX_SERVER_mutex; + +static PSI_mutex_info all_federated_mutexes[]= +{ + { &fe_key_mutex_federatedx, "federatedx", PSI_FLAG_GLOBAL}, + { &fe_key_mutex_FEDERATEDX_SERVER_mutex, "FEDERATED_SERVER::mutex", 0} +}; + +static void init_federated_psi_keys(void) +{ + const char* category= "federated"; + int count; + + if (PSI_server == NULL) + return; + + count= array_elements(all_federated_mutexes); + PSI_server->register_mutex(category, all_federated_mutexes, count); +} +#else +#define init_federated_psi_keys() /* no-op */ +#endif /* HAVE_PSI_INTERFACE */ + +handlerton* federatedx_hton; + +static derived_handler* +create_federatedx_derived_handler(THD* thd, TABLE_LIST *derived); +static select_handler* +create_federatedx_select_handler(THD* thd, SELECT_LEX *sel); + +/* + Initialize the federatedx handler. + + SYNOPSIS + federatedx_db_init() + p Handlerton + + RETURN + FALSE OK + TRUE Error +*/ + +int federatedx_db_init(void *p) +{ + DBUG_ENTER("federatedx_db_init"); + init_federated_psi_keys(); + federatedx_hton= (handlerton *)p; + /* Needed to work with old .frm files */ + federatedx_hton->db_type= DB_TYPE_FEDERATED_DB; + federatedx_hton->savepoint_offset= sizeof(ulong); + federatedx_hton->close_connection= ha_federatedx::disconnect; + federatedx_hton->savepoint_set= ha_federatedx::savepoint_set; + federatedx_hton->savepoint_rollback= ha_federatedx::savepoint_rollback; + federatedx_hton->savepoint_release= ha_federatedx::savepoint_release; + federatedx_hton->commit= ha_federatedx::commit; + federatedx_hton->rollback= ha_federatedx::rollback; + federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted; + federatedx_hton->create= federatedx_create_handler; + federatedx_hton->drop_table= [](handlerton *, const char*) { return -1; }; + federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED; + federatedx_hton->create_derived= create_federatedx_derived_handler; + federatedx_hton->create_select= create_federatedx_select_handler; + + if (mysql_mutex_init(fe_key_mutex_federatedx, + &federatedx_mutex, MY_MUTEX_INIT_FAST)) + goto error; + if (!my_hash_init(PSI_INSTRUMENT_ME, &federatedx_open_tables, &my_charset_bin, 32, 0, 0, + (my_hash_get_key) federatedx_share_get_key, 0, 0) && + !my_hash_init(PSI_INSTRUMENT_ME, &federatedx_open_servers, &my_charset_bin, 32, 0, 0, + (my_hash_get_key) federatedx_server_get_key, 0, 0)) + { + DBUG_RETURN(FALSE); + } + + mysql_mutex_destroy(&federatedx_mutex); +error: + DBUG_RETURN(TRUE); +} + + +/* + Release the federatedx handler. + + SYNOPSIS + federatedx_db_end() + + RETURN + FALSE OK +*/ + +int federatedx_done(void *p) +{ + my_hash_free(&federatedx_open_tables); + my_hash_free(&federatedx_open_servers); + mysql_mutex_destroy(&federatedx_mutex); + + return 0; +} + +/** + @brief Append identifiers to the string. + + @param[in,out] string The target string. + @param[in] name Identifier name + @param[in] length Length of identifier name in bytes + @param[in] quote_char Quote char to use for quoting identifier. + + @return Operation Status + @retval FALSE OK + @retval TRUE There was an error appending to the string. + + @note This function is based upon the append_identifier() function + in sql_show.cc except that quoting always occurs. +*/ + +bool append_ident(String *string, const char *name, size_t length, + const char quote_char) +{ + bool result; + uint clen; + const char *name_end; + DBUG_ENTER("append_ident"); + + if (quote_char) + { + string->reserve(length * 2 + 2); + if ((result= string->append("e_char, 1, system_charset_info))) + goto err; + + for (name_end= name+length; name < name_end; name+= clen) + { + uchar c= *(uchar *) name; + clen= system_charset_info->charlen_fix(name, name_end); + if (clen == 1 && c == (uchar) quote_char && + (result= string->append("e_char, 1, system_charset_info))) + goto err; + if ((result= string->append(name, clen, string->charset()))) + goto err; + } + result= string->append("e_char, 1, system_charset_info); + } + else + result= string->append(name, length, system_charset_info); + +err: + DBUG_RETURN(result); +} + + +static int parse_url_error(FEDERATEDX_SHARE *share, TABLE_SHARE *table_s, + int error_num) +{ + char buf[FEDERATEDX_QUERY_BUFFER_SIZE]; + size_t buf_len; + DBUG_ENTER("ha_federatedx parse_url_error"); + + buf_len= MY_MIN(table_s->connect_string.length, + FEDERATEDX_QUERY_BUFFER_SIZE-1); + strmake(buf, table_s->connect_string.str, buf_len); + my_error(error_num, MYF(0), buf, 14); + DBUG_RETURN(error_num); +} + +/* + retrieve server object which contains server meta-data + from the system table given a server's name, set share + connection parameter members +*/ +int get_connection(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share) +{ + int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST; + FOREIGN_SERVER *server, server_buffer; + DBUG_ENTER("ha_federatedx::get_connection"); + + /* + get_server_by_name() clones the server if exists and allocates + copies of strings in the supplied mem_root + */ + if (!(server= + get_server_by_name(mem_root, share->connection_string, &server_buffer))) + { + DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!")); + /* need to come up with error handling */ + error_num=1; + goto error; + } + DBUG_PRINT("info", ("get_server_by_name returned server at %p", + server)); + + /* + Most of these should never be empty strings, error handling will + need to be implemented. Also, is this the best way to set the share + members? Is there some allocation needed? In running this code, it works + except there are errors in the trace file of the share being overrun + at the address of the share. + */ + share->server_name_length= server->server_name_length; + share->server_name= const_cast<char*>(server->server_name); + share->username= const_cast<char*>(server->username); + share->password= const_cast<char*>(server->password); + share->database= const_cast<char*>(server->db); + share->port= server->port > MIN_PORT && server->port < 65536 ? + (ushort) server->port : MYSQL_PORT; + share->hostname= const_cast<char*>(server->host); + if (!(share->socket= const_cast<char*>(server->socket)) && + !strcmp(share->hostname, my_localhost)) + share->socket= (char *) MYSQL_UNIX_ADDR; + share->scheme= const_cast<char*>(server->scheme); + + DBUG_PRINT("info", ("share->username: %s", share->username)); + DBUG_PRINT("info", ("share->password: %s", share->password)); + DBUG_PRINT("info", ("share->hostname: %s", share->hostname)); + DBUG_PRINT("info", ("share->database: %s", share->database)); + DBUG_PRINT("info", ("share->port: %d", share->port)); + DBUG_PRINT("info", ("share->socket: %s", share->socket)); + DBUG_RETURN(0); + +error: + my_printf_error(error_num, "server name: '%s' doesn't exist!", + MYF(0), share->connection_string); + DBUG_RETURN(error_num); +} + +/* + Parse connection info from table->s->connect_string + + SYNOPSIS + parse_url() + mem_root MEM_ROOT pointer for memory allocation + share pointer to FEDERATEDX share + table_s pointer to current TABLE_SHARE class + table_create_flag determines what error to throw + + DESCRIPTION + Populates the share with information about the connection + to the foreign database that will serve as the data source. + This string must be specified (currently) in the "CONNECTION" field, + listed in the CREATE TABLE statement. + + This string MUST be in the format of any of these: + + CONNECTION="scheme://username:password@hostname:port/database/table" + CONNECTION="scheme://username@hostname/database/table" + CONNECTION="scheme://username@hostname:port/database/table" + CONNECTION="scheme://username:password@hostname/database/table" + + _OR_ + + CONNECTION="connection name" + + + + An Example: + + CREATE TABLE t1 (id int(32)) + ENGINE="FEDERATEDX" + CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federatedx/testtable"; + + CREATE TABLE t2 ( + id int(4) NOT NULL auto_increment, + name varchar(32) NOT NULL, + PRIMARY KEY(id) + ) ENGINE="FEDERATEDX" CONNECTION="my_conn"; + + ***IMPORTANT*** + Currently, the FederatedX Storage Engine only supports connecting to another + Database ("scheme" of "mysql"). Connections using JDBC as well as + other connectors are in the planning stage. + + + 'password' and 'port' are both optional. + + RETURN VALUE + 0 success + error_num particular error code + +*/ + +static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share, + TABLE_SHARE *table_s, uint table_create_flag) +{ + uint error_num= (table_create_flag ? + ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : + ER_FOREIGN_DATA_STRING_INVALID); + DBUG_ENTER("ha_federatedx::parse_url"); + + share->port= 0; + share->socket= 0; + DBUG_PRINT("info", ("share at %p", share)); + DBUG_PRINT("info", ("Length: %u", (uint) table_s->connect_string.length)); + DBUG_PRINT("info", ("String: '%.*s'", (int) table_s->connect_string.length, + table_s->connect_string.str)); + share->connection_string= strmake_root(mem_root, table_s->connect_string.str, + table_s->connect_string.length); + + DBUG_PRINT("info",("parse_url alloced share->connection_string %p", + share->connection_string)); + + DBUG_PRINT("info",("share->connection_string: %s",share->connection_string)); + /* + No :// or @ in connection string. Must be a straight connection name of + either "servername" or "servername/tablename" + */ + if ((!strstr(share->connection_string, "://") && + (!strchr(share->connection_string, '@')))) + { + + DBUG_PRINT("info", + ("share->connection_string: %s internal format " + "share->connection_string: %p", + share->connection_string, + share->connection_string)); + + /* ok, so we do a little parsing, but not completely! */ + share->parsed= FALSE; + /* + If there is a single '/' in the connection string, this means the user is + specifying a table name + */ + + if ((share->table_name= strchr(share->connection_string, '/'))) + { + *share->table_name++= '\0'; + share->table_name_length= strlen(share->table_name); + + DBUG_PRINT("info", + ("internal format, parsed table_name " + "share->connection_string: %s share->table_name: %s", + share->connection_string, share->table_name)); + + /* + there better not be any more '/'s ! + */ + if (strchr(share->table_name, '/')) + goto error; + } + /* + Otherwise, straight server name, use tablename of federatedx table + as remote table name + */ + else + { + /* + Connection specifies everything but, resort to + expecting remote and foreign table names to match + */ + share->table_name= strmake_root(mem_root, table_s->table_name.str, + (share->table_name_length= + table_s->table_name.length)); + DBUG_PRINT("info", + ("internal format, default table_name " + "share->connection_string: %s share->table_name: %s", + share->connection_string, share->table_name)); + } + + if ((error_num= get_connection(mem_root, share))) + goto error; + } + else + { + share->parsed= TRUE; + // Add a null for later termination of table name + share->connection_string[table_s->connect_string.length]= 0; + share->scheme= share->connection_string; + DBUG_PRINT("info",("parse_url alloced share->scheme: %p", + share->scheme)); + + /* + Remove addition of null terminator and store length + for each string in share + */ + if (!(share->username= strstr(share->scheme, "://"))) + goto error; + share->scheme[share->username - share->scheme]= '\0'; + + if (!federatedx_io::handles_scheme(share->scheme)) + goto error; + + share->username+= 3; + + if (!(share->hostname= strchr(share->username, '@'))) + goto error; + *share->hostname++= '\0'; // End username + + if ((share->password= strchr(share->username, ':'))) + { + *share->password++= '\0'; // End username + + /* make sure there isn't an extra / or @ */ + if ((strchr(share->password, '/') || strchr(share->hostname, '@'))) + goto error; + /* + Found that if the string is: + user:@hostname:port/db/table + Then password is a null string, so set to NULL + */ + if (share->password[0] == '\0') + share->password= NULL; + } + + /* make sure there isn't an extra / or @ */ + if ((strchr(share->username, '/')) || (strchr(share->hostname, '@'))) + goto error; + + if (!(share->database= strchr(share->hostname, '/'))) + goto error; + *share->database++= '\0'; + + if ((share->sport= strchr(share->hostname, ':'))) + { + *share->sport++= '\0'; + if (share->sport[0] == '\0') + share->sport= NULL; + else + share->port= atoi(share->sport); + } + + if (!(share->table_name= strchr(share->database, '/'))) + goto error; + *share->table_name++= '\0'; + + share->table_name_length= strlen(share->table_name); + + /* make sure there's not an extra / */ + if ((strchr(share->table_name, '/'))) + goto error; + + if (share->hostname[0] == '\0') + share->hostname= strdup_root(mem_root, my_localhost); + + } + if (!share->port) + { + if (0 == strcmp(share->hostname, my_localhost)) + share->socket= (char *) MYSQL_UNIX_ADDR; + else + share->port= MYSQL_PORT; + } + + DBUG_PRINT("info", + ("scheme: %s username: %s password: %s hostname: %s " + "port: %d db: %s tablename: %s", + share->scheme, share->username, share->password, + share->hostname, share->port, share->database, + share->table_name)); + + DBUG_RETURN(0); + +error: + DBUG_RETURN(parse_url_error(share, table_s, error_num)); +} + +/***************************************************************************** +** FEDERATEDX tables +*****************************************************************************/ + +ha_federatedx::ha_federatedx(handlerton *hton, + TABLE_SHARE *table_arg) + :handler(hton, table_arg), + txn(0), io(0), stored_result(0) +{ + bzero(&bulk_insert, sizeof(bulk_insert)); +} + + +/* + Convert MySQL result set row to handler internal format + + SYNOPSIS + convert_row_to_internal_format() + record Byte pointer to record + row MySQL result set row from fetchrow() + result Result set to use + + DESCRIPTION + This method simply iterates through a row returned via fetchrow with + values from a successful SELECT , and then stores each column's value + in the field object via the field object pointer (pointing to the table's + array of field object pointers). This is how the handler needs the data + to be stored to then return results back to the user + + RETURN VALUE + 0 After fields have had field values stored from record +*/ + +uint ha_federatedx::convert_row_to_internal_format(uchar *record, + FEDERATEDX_IO_ROW *row, + FEDERATEDX_IO_RESULT *result) +{ + ulong *lengths; + Field **field; + int column= 0; + MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->write_set); + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + DBUG_ENTER("ha_federatedx::convert_row_to_internal_format"); + + table->in_use->variables.time_zone= UTC; + lengths= io->fetch_lengths(result); + + for (field= table->field; *field; field++, column++) + { + /* + index variable to move us through the row at the + same iterative step as the field + */ + my_ptrdiff_t old_ptr; + old_ptr= (my_ptrdiff_t) (record - table->record[0]); + (*field)->move_field_offset(old_ptr); + if (io->is_column_null(row, column)) + (*field)->set_null(); + else + { + if (bitmap_is_set(table->read_set, (*field)->field_index)) + { + (*field)->set_notnull(); + (*field)->store_text(io->get_column_data(row, column), lengths[column], + &my_charset_bin); + } + } + (*field)->move_field_offset(-old_ptr); + } + table->in_use->variables.time_zone= saved_time_zone; + dbug_tmp_restore_column_map(&table->write_set, old_map); + DBUG_RETURN(0); +} + +static bool emit_key_part_name(String *to, KEY_PART_INFO *part) +{ + DBUG_ENTER("emit_key_part_name"); + if (append_ident(to, part->field->field_name.str, + part->field->field_name.length, ident_quote_char)) + DBUG_RETURN(1); // Out of memory + DBUG_RETURN(0); +} + +static bool emit_key_part_element(String *to, KEY_PART_INFO *part, + bool needs_quotes, bool is_like, + const uchar *ptr, uint len) +{ + Field *field= part->field; + DBUG_ENTER("emit_key_part_element"); + + if (needs_quotes && to->append(STRING_WITH_LEN("'"))) + DBUG_RETURN(1); + + if (part->type == HA_KEYTYPE_BIT) + { + char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff; + + *buf++= '0'; + *buf++= 'x'; + buf= octet2hex(buf, (char*) ptr, len); + if (to->append((char*) buff, (uint)(buf - buff))) + DBUG_RETURN(1); + } + else if (part->key_part_flag & HA_BLOB_PART) + { + uint blob_length= uint2korr(ptr); + String blob((char*) ptr+HA_KEY_BLOB_LENGTH, + blob_length, &my_charset_bin); + if (to->append_for_single_quote(&blob)) + DBUG_RETURN(1); + } + else if (part->key_part_flag & HA_VAR_LENGTH_PART) + { + uint var_length= uint2korr(ptr); + String varchar((char*) ptr+HA_KEY_BLOB_LENGTH, + var_length, &my_charset_bin); + if (to->append_for_single_quote(&varchar)) + DBUG_RETURN(1); + } + else + { + char strbuff[MAX_FIELD_WIDTH]; + String str(strbuff, sizeof(strbuff), part->field->charset()), *res; + + res= field->val_str(&str, ptr); + + if (field->result_type() == STRING_RESULT) + { + if (to->append_for_single_quote(res)) + DBUG_RETURN(1); + } + else if (to->append(res->ptr(), res->length())) + DBUG_RETURN(1); + } + + if (is_like && to->append(STRING_WITH_LEN("%"))) + DBUG_RETURN(1); + + if (needs_quotes && to->append(STRING_WITH_LEN("'"))) + DBUG_RETURN(1); + + DBUG_RETURN(0); +} + +/* + Create a WHERE clause based off of values in keys + Note: This code was inspired by key_copy from key.cc + + SYNOPSIS + create_where_from_key () + to String object to store WHERE clause + key_info KEY struct pointer + key byte pointer containing key + key_length length of key + range_type 0 - no range, 1 - min range, 2 - max range + (see enum range_operation) + + DESCRIPTION + Using iteration through all the keys via a KEY_PART_INFO pointer, + This method 'extracts' the value of each key in the byte pointer + *key, and for each key found, constructs an appropriate WHERE clause + + RETURN VALUE + 0 After all keys have been accounted for to create the WHERE clause + 1 No keys found + + Range flags Table per Timour: + + ----------------- + - start_key: + * ">" -> HA_READ_AFTER_KEY + * ">=" -> HA_READ_KEY_OR_NEXT + * "=" -> HA_READ_KEY_EXACT + + - end_key: + * "<" -> HA_READ_BEFORE_KEY + * "<=" -> HA_READ_AFTER_KEY + + records_in_range: + ----------------- + - start_key: + * ">" -> HA_READ_AFTER_KEY + * ">=" -> HA_READ_KEY_EXACT + * "=" -> HA_READ_KEY_EXACT + + - end_key: + * "<" -> HA_READ_BEFORE_KEY + * "<=" -> HA_READ_AFTER_KEY + * "=" -> HA_READ_AFTER_KEY + +0 HA_READ_KEY_EXACT, Find first record else error +1 HA_READ_KEY_OR_NEXT, Record or next record +2 HA_READ_KEY_OR_PREV, Record or previous +3 HA_READ_AFTER_KEY, Find next rec. after key-record +4 HA_READ_BEFORE_KEY, Find next rec. before key-record +5 HA_READ_PREFIX, Key which as same prefix +6 HA_READ_PREFIX_LAST, Last key with the same prefix +7 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix + +Flags that I've found: + +id, primary key, varchar + +id = 'ccccc' +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 0 end_key NULL + +id > 'ccccc' +records_in_range: start_key 3 end_key NULL +read_range_first: start_key 3 end_key NULL + +id < 'ccccc' +records_in_range: start_key NULL end_key 4 +read_range_first: start_key NULL end_key 4 + +id <= 'ccccc' +records_in_range: start_key NULL end_key 3 +read_range_first: start_key NULL end_key 3 + +id >= 'ccccc' +records_in_range: start_key 0 end_key NULL +read_range_first: start_key 1 end_key NULL + +id like 'cc%cc' +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 'aaaaa' and id < 'ccccc' +records_in_range: start_key 3 end_key 4 +read_range_first: start_key 3 end_key 4 + +id >= 'aaaaa' and id < 'ccccc'; +records_in_range: start_key 0 end_key 4 +read_range_first: start_key 1 end_key 4 + +id >= 'aaaaa' and id <= 'ccccc'; +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 'aaaaa' and id <= 'ccccc'; +records_in_range: start_key 3 end_key 3 +read_range_first: start_key 3 end_key 3 + +numeric keys: + +id = 4 +index_read_idx: start_key 0 end_key NULL + +id > 4 +records_in_range: start_key 3 end_key NULL +read_range_first: start_key 3 end_key NULL + +id >= 4 +records_in_range: start_key 0 end_key NULL +read_range_first: start_key 1 end_key NULL + +id < 4 +records_in_range: start_key NULL end_key 4 +read_range_first: start_key NULL end_key 4 + +id <= 4 +records_in_range: start_key NULL end_key 3 +read_range_first: start_key NULL end_key 3 + +id like 4 +full table scan, select * from + +id > 2 and id < 8 +records_in_range: start_key 3 end_key 4 +read_range_first: start_key 3 end_key 4 + +id >= 2 and id < 8 +records_in_range: start_key 0 end_key 4 +read_range_first: start_key 1 end_key 4 + +id >= 2 and id <= 8 +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 2 and id <= 8 +records_in_range: start_key 3 end_key 3 +read_range_first: start_key 3 end_key 3 + +multi keys (id int, name varchar, other varchar) + +id = 1; +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 0 end_key NULL + +id > 4; +id > 2 and name = '333'; remote: id > 2 +id > 2 and name > '333'; remote: id > 2 +id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results +id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result +id >= 4 and name = 'eric was here' and other > 'eeee'; +records_in_range: start_key 3 end_key NULL +read_range_first: start_key 3 end_key NULL + +id >= 4; +id >= 2 and name = '333' and other < 'ddd'; +remote: `id` >= 2 AND `name` >= '333'; +records_in_range: start_key 0 end_key NULL +read_range_first: start_key 1 end_key NULL + +id < 4; +id < 3 and name = '222' and other <= 'ccc'; remote: id < 3 +records_in_range: start_key NULL end_key 4 +read_range_first: start_key NULL end_key 4 + +id <= 4; +records_in_range: start_key NULL end_key 3 +read_range_first: start_key NULL end_key 3 + +id like 4; +full table scan + +id > 2 and id < 4; +records_in_range: start_key 3 end_key 4 +read_range_first: start_key 3 end_key 4 + +id >= 2 and id < 4; +records_in_range: start_key 0 end_key 4 +read_range_first: start_key 1 end_key 4 + +id >= 2 and id <= 4; +records_in_range: start_key 0 end_key 3 +read_range_first: start_key 1 end_key 3 + +id > 2 and id <= 4; +id = 6 and name = 'eric was here' and other > 'eeee'; +remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee') +AND (`id` <= 6) AND ( AND `name` <= 'eric was here') +no results +records_in_range: start_key 3 end_key 3 +read_range_first: start_key 3 end_key 3 + +Summary: + +* If the start key flag is 0 the max key flag shouldn't even be set, + and if it is, the query produced would be invalid. +* Multipart keys, even if containing some or all numeric columns, + are treated the same as non-numeric keys + + If the query is " = " (quotes or not): + - records in range start key flag HA_READ_KEY_EXACT, + end key flag HA_READ_AFTER_KEY (incorrect) + - any other: start key flag HA_READ_KEY_OR_NEXT, + end key flag HA_READ_AFTER_KEY (correct) + +* 'like' queries (of key) + - Numeric, full table scan + - Non-numeric + records_in_range: start_key 0 end_key 3 + other : start_key 1 end_key 3 + +* If the key flag is HA_READ_AFTER_KEY: + if start_key, append > + if end_key, append <= + +* If create_where_key was called by records_in_range: + + - if the key is numeric: + start key flag is 0 when end key is NULL, end key flag is 3 or 4 + - if create_where_key was called by any other function: + start key flag is 1 when end key is NULL, end key flag is 3 or 4 + - if the key is non-numeric, or multipart + When the query is an exact match, the start key flag is 0, + end key flag is 3 for what should be a no-range condition where + you should have 0 and max key NULL, which it is if called by + read_range_first + +Conclusion: + +1. Need logic to determin if a key is min or max when the flag is +HA_READ_AFTER_KEY, and handle appending correct operator accordingly + +2. Need a boolean flag to pass to create_where_from_key, used in the +switch statement. Add 1 to the flag if: + - start key flag is HA_READ_KEY_EXACT and the end key is NULL + +*/ + +bool ha_federatedx::create_where_from_key(String *to, + KEY *key_info, + const key_range *start_key, + const key_range *end_key, + bool eq_range) +{ + bool both_not_null= + (start_key != NULL && end_key != NULL) ? TRUE : FALSE; + const uchar *ptr; + uint remainder, length; + char tmpbuff[FEDERATEDX_QUERY_BUFFER_SIZE]; + String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info); + const key_range *ranges[2]= { start_key, end_key }; + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + DBUG_ENTER("ha_federatedx::create_where_from_key"); + + tmp.length(0); + if (start_key == NULL && end_key == NULL) + DBUG_RETURN(1); + + table->in_use->variables.time_zone= UTC; + MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->write_set); + for (uint i= 0; i <= 1; i++) + { + KEY_PART_INFO *key_part; + if (ranges[i] == NULL) + continue; + + if (both_not_null) + { + if (i > 0) + tmp.append(STRING_WITH_LEN(") AND (")); + else + tmp.append(STRING_WITH_LEN(" (")); + } + + for (key_part= key_info->key_part, + remainder= key_info->user_defined_key_parts, + length= ranges[i]->length, + ptr= ranges[i]->key; ; + remainder--, + key_part++) + { + Field *field= key_part->field; + uint store_length= key_part->store_length; + uint part_length= MY_MIN(store_length, length); + bool needs_quotes= field->str_needs_quotes(); + bool reverse= key_part->key_part_flag & HA_REVERSE_SORT; + static const LEX_CSTRING lt={STRING_WITH_LEN(" < ") }; + static const LEX_CSTRING gt={STRING_WITH_LEN(" > ") }; + static const LEX_CSTRING le={STRING_WITH_LEN(" <= ") }; + static const LEX_CSTRING ge={STRING_WITH_LEN(" >= ") }; + DBUG_DUMP("key, start of loop", ptr, length); + + if (key_part->null_bit) + { + if (*ptr++) + { + LEX_CSTRING constraint; + if (ranges[i]->flag == HA_READ_KEY_EXACT) + constraint= {STRING_WITH_LEN(" IS NULL ") }; + else + constraint= {STRING_WITH_LEN(" IS NOT NULL ") }; + /* + We got "IS [NOT] NULL" condition against nullable column. We + distinguish between "IS NOT NULL" and "IS NULL" by flag. For + "IS NULL", flag is set to HA_READ_KEY_EXACT. + */ + if (emit_key_part_name(&tmp, key_part) || + tmp.append(constraint)) + goto err; + /* + We need to adjust pointer and length to be prepared for next + key part. As well as check if this was last key part. + */ + goto prepare_for_next_key_part; + } + } + + if (tmp.append(STRING_WITH_LEN(" ("))) + goto err; + + switch (ranges[i]->flag) { + case HA_READ_KEY_EXACT: + DBUG_PRINT("info", ("federatedx HA_READ_KEY_EXACT %d", i)); + if (store_length >= length || + !needs_quotes || + key_part->type == HA_KEYTYPE_BIT || + field->result_type() != STRING_RESULT) + { + if (emit_key_part_name(&tmp, key_part)) + goto err; + + if (tmp.append(STRING_WITH_LEN(" = "))) + goto err; + + if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + goto err; + } + else + { + /* LIKE */ + if (emit_key_part_name(&tmp, key_part) || + tmp.append(STRING_WITH_LEN(" LIKE ")) || + emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr, + part_length)) + goto err; + } + break; + case HA_READ_AFTER_KEY: + if (eq_range) + { + if (tmp.append(STRING_WITH_LEN("1=1"))) // Dummy + goto err; + break; + } + DBUG_PRINT("info", ("federatedx HA_READ_AFTER_KEY %d", i)); + if (store_length >= length || i > 0) /* end key */ + { + if (emit_key_part_name(&tmp, key_part)) + goto err; + + if (i > 0) /* end key */ + { + if (tmp.append(reverse ? ge : le)) + goto err; + } + else /* start key */ + { + if (tmp.append(reverse ? lt : gt)) + goto err; + } + + if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + { + goto err; + } + break; + } + /* fall through */ + case HA_READ_KEY_OR_NEXT: + DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_NEXT %d", i)); + if (emit_key_part_name(&tmp, key_part) || + tmp.append(reverse ? le : ge) || + emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + goto err; + break; + case HA_READ_BEFORE_KEY: + DBUG_PRINT("info", ("federatedx HA_READ_BEFORE_KEY %d", i)); + if (store_length >= length) + { + if (emit_key_part_name(&tmp, key_part) || + tmp.append(reverse ? gt : lt) || + emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + goto err; + break; + } + /* fall through */ + case HA_READ_KEY_OR_PREV: + DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_PREV %d", i)); + if (emit_key_part_name(&tmp, key_part) || + tmp.append(reverse ? ge : le) || + emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr, + part_length)) + goto err; + break; + default: + DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag)); + goto err; + } + if (tmp.append(STRING_WITH_LEN(") "))) + goto err; + +prepare_for_next_key_part: + if (store_length >= length) + break; + DBUG_PRINT("info", ("remainder %d", remainder)); + DBUG_ASSERT(remainder > 1); + length-= store_length; + /* + For nullable columns, null-byte is already skipped before, that is + ptr was incremented by 1. Since store_length still counts null-byte, + we need to subtract 1 from store_length. + */ + ptr+= store_length - MY_TEST(key_part->null_bit); + if (tmp.append(STRING_WITH_LEN(" AND "))) + goto err; + + DBUG_PRINT("info", + ("create_where_from_key WHERE clause: %s", + tmp.c_ptr_quick())); + } + } + dbug_tmp_restore_column_map(&table->write_set, old_map); + table->in_use->variables.time_zone= saved_time_zone; + + if (both_not_null) + if (tmp.append(STRING_WITH_LEN(") "))) + DBUG_RETURN(1); + + if (to->append(STRING_WITH_LEN(" WHERE "))) + DBUG_RETURN(1); + + if (to->append(tmp)) + DBUG_RETURN(1); + + DBUG_RETURN(0); + +err: + dbug_tmp_restore_column_map(&table->write_set, old_map); + table->in_use->variables.time_zone= saved_time_zone; + DBUG_RETURN(1); +} + +static void fill_server(MEM_ROOT *mem_root, FEDERATEDX_SERVER *server, + FEDERATEDX_SHARE *share, CHARSET_INFO *table_charset) +{ + char buffer[STRING_BUFFER_USUAL_SIZE]; + const char *socket_arg= share->socket ? share->socket : ""; + const char *password_arg= share->password ? share->password : ""; + + String key(buffer, sizeof(buffer), &my_charset_bin); + String scheme(share->scheme, strlen(share->scheme), &my_charset_latin1); + String hostname(share->hostname, strlen(share->hostname), &my_charset_latin1); + String database(share->database, strlen(share->database), system_charset_info); + String username(share->username, strlen(share->username), system_charset_info); + String socket(socket_arg, strlen(socket_arg), files_charset_info); + String password(password_arg, strlen(password_arg), &my_charset_bin); + DBUG_ENTER("fill_server"); + + /* Do some case conversions */ + scheme.reserve(scheme.length()); + scheme.length(my_casedn_str(&my_charset_latin1, scheme.c_ptr_safe())); + + hostname.reserve(hostname.length()); + hostname.length(my_casedn_str(&my_charset_latin1, hostname.c_ptr_safe())); + + if (lower_case_table_names) + { + database.reserve(database.length()); + database.length(my_casedn_str(system_charset_info, database.c_ptr_safe())); + } + +#ifndef _WIN32 + /* + TODO: there is no unix sockets under windows so the engine should be + revised about using sockets in such environment. + */ + if (lower_case_file_system && socket.length()) + { + socket.reserve(socket.length()); + socket.length(my_casedn_str(files_charset_info, socket.c_ptr_safe())); + } +#endif + + /* start with all bytes zeroed */ + bzero(server, sizeof(*server)); + + key.length(0); + key.reserve(scheme.length() + hostname.length() + database.length() + + socket.length() + username.length() + password.length() + + sizeof(int) + 8); + key.append(scheme); + key.q_append('\0'); + server->hostname= (const char *) (intptr) key.length(); + key.append(hostname); + key.q_append('\0'); + server->database= (const char *) (intptr) key.length(); + key.append(database); + key.q_append('\0'); + key.q_append((uint32) share->port); + server->socket= (const char *) (intptr) key.length(); + key.append(socket); + key.q_append('\0'); + server->username= (const char *) (intptr) key.length(); + key.append(username); + key.q_append('\0'); + server->password= (const char *) (intptr) key.length(); + key.append(password); + key.c_ptr_safe(); // Ensure we have end \0 + + server->key_length= key.length(); + /* Copy and add end \0 */ + server->key= (uchar *) strmake_root(mem_root, key.ptr(), key.length()); + + /* pointer magic */ + server->scheme+= (intptr) server->key; + server->hostname+= (intptr) server->key; + server->database+= (intptr) server->key; + server->username+= (intptr) server->key; + server->password+= (intptr) server->key; + server->socket+= (intptr) server->key; + server->port= share->port; + + if (!share->socket) + server->socket= NULL; + if (!share->password) + server->password= NULL; + + if (table_charset) + server->csname= strdup_root(mem_root, table_charset->cs_name.str); + + DBUG_VOID_RETURN; +} + + +static FEDERATEDX_SERVER *get_server(FEDERATEDX_SHARE *share, TABLE *table) +{ + FEDERATEDX_SERVER *server= NULL, tmp_server; + MEM_ROOT mem_root; + char buffer[STRING_BUFFER_USUAL_SIZE]; + const char *socket_arg= share->socket ? share->socket : ""; + const char *password_arg= share->password ? share->password : ""; + + String key(buffer, sizeof(buffer), &my_charset_bin); + String scheme(share->scheme, strlen(share->scheme), &my_charset_latin1); + String hostname(share->hostname, strlen(share->hostname), &my_charset_latin1); + String database(share->database, strlen(share->database), system_charset_info); + String username(share->username, strlen(share->username), system_charset_info); + String socket(socket_arg, strlen(socket_arg), files_charset_info); + String password(password_arg, strlen(password_arg), &my_charset_bin); + DBUG_ENTER("ha_federated.cc::get_server"); + + mysql_mutex_assert_owner(&federatedx_mutex); + + init_alloc_root(PSI_INSTRUMENT_ME, &mem_root, 4096, 4096, MYF(0)); + + fill_server(&mem_root, &tmp_server, share, table ? table->s->table_charset : 0); + + if (!(server= (FEDERATEDX_SERVER *) my_hash_search(&federatedx_open_servers, + tmp_server.key, + tmp_server.key_length))) + { + if (!table || !tmp_server.csname) + goto error; + + if (!(server= (FEDERATEDX_SERVER *) memdup_root(&mem_root, + (char *) &tmp_server, + sizeof(*server)))) + goto error; + + server->mem_root= mem_root; + + if (my_hash_insert(&federatedx_open_servers, (uchar*) server)) + goto error; + + mysql_mutex_init(fe_key_mutex_FEDERATEDX_SERVER_mutex, + &server->mutex, MY_MUTEX_INIT_FAST); + } + else + free_root(&mem_root, MYF(0)); /* prevents memory leak */ + + server->use_count++; + + DBUG_RETURN(server); +error: + free_root(&mem_root, MYF(0)); + DBUG_RETURN(NULL); +} + + +/* + Example of simple lock controls. The "share" it creates is structure we will + pass to each federatedx handler. Do you have to have one of these? Well, you + have pieces that are used for locking, and they are needed to function. +*/ + +static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table) +{ + char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + Field **field; + String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + FEDERATEDX_SHARE *share= NULL, tmp_share; + MEM_ROOT mem_root; + DBUG_ENTER("ha_federatedx.cc::get_share"); + + /* + In order to use this string, we must first zero it's length, + or it will contain garbage + */ + query.length(0); + + bzero(&tmp_share, sizeof(tmp_share)); + init_alloc_root(PSI_INSTRUMENT_ME, &mem_root, 256, 0, MYF(0)); + + mysql_mutex_lock(&federatedx_mutex); + + if (unlikely(!UTC)) + { + String tz_00_name(STRING_WITH_LEN("+00:00"), &my_charset_bin); + UTC= my_tz_find(current_thd, &tz_00_name); + } + + tmp_share.share_key= table_name; + tmp_share.share_key_length= (int)strlen(table_name); + if (parse_url(&mem_root, &tmp_share, table->s, 0)) + goto error; + + /* TODO: change tmp_share.scheme to LEX_STRING object */ + if (!(share= (FEDERATEDX_SHARE *) my_hash_search(&federatedx_open_tables, + (uchar*) tmp_share.share_key, + tmp_share. + share_key_length))) + { + query.set_charset(system_charset_info); + query.append(STRING_WITH_LEN("SELECT ")); + for (field= table->field; *field; field++) + { + append_ident(&query, (*field)->field_name.str, + (*field)->field_name.length, ident_quote_char); + query.append(STRING_WITH_LEN(", ")); + } + /* chops off trailing comma */ + query.length(query.length() - sizeof_trailing_comma); + + query.append(STRING_WITH_LEN(" FROM ")); + + append_ident(&query, tmp_share.table_name, + tmp_share.table_name_length, ident_quote_char); + + if (!(share= (FEDERATEDX_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) || + !(share->share_key= (char*) memdup_root(&mem_root, tmp_share.share_key, tmp_share.share_key_length+1)) || + !(share->select_query.str= (char*) strmake_root(&mem_root, query.ptr(), query.length()))) + goto error; + share->select_query.length= query.length(); + + share->mem_root= mem_root; + + DBUG_PRINT("info", + ("share->select_query %s", share->select_query.str)); + + if (!(share->s= get_server(share, table))) + goto error; + + if (my_hash_insert(&federatedx_open_tables, (uchar*) share)) + goto error; + thr_lock_init(&share->lock); + } + else + free_root(&mem_root, MYF(0)); /* prevents memory leak */ + + share->use_count++; + mysql_mutex_unlock(&federatedx_mutex); + + DBUG_RETURN(share); + +error: + mysql_mutex_unlock(&federatedx_mutex); + free_root(&mem_root, MYF(0)); + DBUG_RETURN(NULL); +} + + +static federatedx_txn zero_txn; +static int free_server(federatedx_txn *txn, FEDERATEDX_SERVER *server) +{ + bool destroy; + DBUG_ENTER("free_server"); + + mysql_mutex_lock(&federatedx_mutex); + if ((destroy= !--server->use_count)) + my_hash_delete(&federatedx_open_servers, (uchar*) server); + mysql_mutex_unlock(&federatedx_mutex); + + if (destroy) + { + MEM_ROOT mem_root; + + if (!txn) + txn= &zero_txn; + + txn->close(server); + + DBUG_ASSERT(server->io_count == 0); + + mysql_mutex_destroy(&server->mutex); + mem_root= server->mem_root; + free_root(&mem_root, MYF(0)); + } + + DBUG_RETURN(0); +} + + +/* + Free lock controls. We call this whenever we close a table. + If the table had the last reference to the share then we + free memory associated with it. +*/ + +static void free_share(federatedx_txn *txn, FEDERATEDX_SHARE *share) +{ + bool destroy; + DBUG_ENTER("free_share"); + + mysql_mutex_lock(&federatedx_mutex); + if ((destroy= !--share->use_count)) + my_hash_delete(&federatedx_open_tables, (uchar*) share); + mysql_mutex_unlock(&federatedx_mutex); + + if (destroy) + { + MEM_ROOT mem_root; + FEDERATEDX_SERVER *server= share->s; + + thr_lock_delete(&share->lock); + + mem_root= share->mem_root; + free_root(&mem_root, MYF(0)); + + free_server(txn, server); + } + + DBUG_VOID_RETURN; +} + + +ha_rows ha_federatedx::records_in_range(uint inx, + const key_range *start_key, + const key_range *end_key, + page_range *pages) +{ + /* + + We really want indexes to be used as often as possible, therefore + we just need to hard-code the return value to a very low number to + force the issue + +*/ + DBUG_ENTER("ha_federatedx::records_in_range"); + DBUG_RETURN(FEDERATEDX_RECORDS_IN_RANGE); +} + +federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create) +{ + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton); + if (!txn && !no_create) + { + txn= new federatedx_txn(); + thd_set_ha_data(thd, federatedx_hton, txn); + } + return txn; +} + + +int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd) +{ + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); + delete txn; + return 0; +} + + +/* + Used for opening tables. The name will be the name of the file. + A table is opened when it needs to be opened. For instance + when a request comes in for a select on the table (tables are not + open and closed for each request, they are cached). + + Called from handler.cc by handler::ha_open(). The server opens + all tables by calling ha_open() which then calls the handler + specific open(). +*/ + +int ha_federatedx::open(const char *name, int mode, uint test_if_locked) +{ + int error; + THD *thd= ha_thd(); + DBUG_ENTER("ha_federatedx::open"); + + if (!(share= get_share(name, table))) + DBUG_RETURN(1); + thr_lock_data_init(&share->lock, &lock, NULL); + + DBUG_ASSERT(io == NULL); + + txn= get_txn(thd); + + if ((error= txn->acquire(share, thd, TRUE, &io))) + { + free_share(txn, share); + DBUG_RETURN(error); + } + + ref_length= (uint)io->get_ref_length(); + + txn->release(&io); + + DBUG_PRINT("info", ("ref_length: %u", ref_length)); + + my_init_dynamic_array(PSI_INSTRUMENT_ME, &results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4, MYF(0)); + + reset(); + + DBUG_RETURN(0); +} + +class Net_error_handler : public Internal_error_handler +{ +public: + Net_error_handler() = default; + +public: + bool handle_condition(THD *thd, uint sql_errno, const char* sqlstate, + Sql_condition::enum_warning_level *level, + const char* msg, Sql_condition ** cond_hdl) + { + return sql_errno >= ER_ABORTING_CONNECTION && + sql_errno <= ER_NET_WRITE_INTERRUPTED; + } +}; + +/* + Closes a table. We call the free_share() function to free any resources + that we have allocated in the "shared" structure. + + Called from sql_base.cc, sql_select.cc, and table.cc. + In sql_select.cc it is only used to close up temporary tables or during + the process where a temporary table is converted over to being a + myisam table. + For sql_base.cc look at close_data_tables(). +*/ + +int ha_federatedx::close(void) +{ + int retval= 0; + THD *thd= ha_thd(); + DBUG_ENTER("ha_federatedx::close"); + + /* free the result set */ + reset(); + + delete_dynamic(&results); + + /* Disconnect from mysql */ + if (!thd || !(txn= get_txn(thd, true))) + txn= &zero_txn; + + txn->release(&io); + DBUG_ASSERT(io == NULL); + + Net_error_handler err_handler; + if (thd) + thd->push_internal_handler(&err_handler); + free_share(txn, share); + if (thd) + thd->pop_internal_handler(); + + DBUG_RETURN(retval); +} + +/* + + Checks if a field in a record is SQL NULL. + + SYNOPSIS + field_in_record_is_null() + table TABLE pointer, MySQL table object + field Field pointer, MySQL field object + record char pointer, contains record + + DESCRIPTION + This method uses the record format information in table to track + the null bit in record. + + RETURN VALUE + 1 if NULL + 0 otherwise +*/ + +static inline uint field_in_record_is_null(TABLE *table, Field *field, + char *record) +{ + int null_offset; + DBUG_ENTER("ha_federatedx::field_in_record_is_null"); + + if (!field->null_ptr) + DBUG_RETURN(0); + + null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]); + + if (record[null_offset] & field->null_bit) + DBUG_RETURN(1); + + DBUG_RETURN(0); +} + + +/** + @brief Construct the INSERT statement. + + @details This method will construct the INSERT statement and appends it to + the supplied query string buffer. + + @return + @retval FALSE No error + @retval TRUE Failure +*/ + +bool ha_federatedx::append_stmt_insert(String *query) +{ + char insert_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + Field **field; + uint tmp_length; + bool added_field= FALSE; + + /* The main insert query string */ + String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); + DBUG_ENTER("ha_federatedx::append_stmt_insert"); + + insert_string.length(0); + + if (replace_duplicates) + insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); + else if (ignore_duplicates && !insert_dup_update) + insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); + else + insert_string.append(STRING_WITH_LEN("INSERT INTO ")); + append_ident(&insert_string, share->table_name, share->table_name_length, + ident_quote_char); + tmp_length= insert_string.length(); + insert_string.append(STRING_WITH_LEN(" (")); + + /* + loop through the field pointer array, add any fields to both the values + list and the fields list that match the current query id + */ + for (field= table->field; *field; field++) + { + if (bitmap_is_set(table->write_set, (*field)->field_index)) + { + /* append the field name */ + append_ident(&insert_string, (*field)->field_name.str, + (*field)->field_name.length, ident_quote_char); + + /* append commas between both fields and fieldnames */ + /* + unfortunately, we can't use the logic if *(fields + 1) to + make the following appends conditional as we don't know if the + next field is in the write set + */ + insert_string.append(STRING_WITH_LEN(", ")); + added_field= TRUE; + } + } + + if (added_field) + { + /* Remove trailing comma. */ + insert_string.length(insert_string.length() - sizeof_trailing_comma); + insert_string.append(STRING_WITH_LEN(") ")); + } + else + { + /* If there were no fields, we don't want to add a closing paren. */ + insert_string.length(tmp_length); + } + + insert_string.append(STRING_WITH_LEN(" VALUES ")); + + DBUG_RETURN(query->append(insert_string)); +} + + +/* + write_row() inserts a row. No extra() hint is given currently if a bulk load + is happeneding. buf() is a byte array of data. You can use the field + information to extract the data from the native byte array type. + Example of this would be: + for (Field **field=table->field ; *field ; field++) + { + ... + } + + Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc, + sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc. +*/ + +int ha_federatedx::write_row(const uchar *buf) +{ + char values_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; + Field **field; + uint tmp_length; + int error= 0; + bool use_bulk_insert; + bool auto_increment_update_required= (table->next_number_field != NULL); + + /* The string containing the values to be added to the insert */ + String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); + /* The actual value of the field, to be added to the values_string */ + String insert_field_value_string(insert_field_value_buffer, + sizeof(insert_field_value_buffer), + &my_charset_bin); + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->read_set); + DBUG_ENTER("ha_federatedx::write_row"); + + table->in_use->variables.time_zone= UTC; + values_string.length(0); + insert_field_value_string.length(0); + + /* + start both our field and field values strings + We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE" + Ignore duplicates is always true when insert_dup_update is true. + When replace_duplicates == TRUE, we can safely enable multi-row insert. + When performing multi-row insert, we only collect the columns values for + the row. The start of the statement is only created when the first + row is copied in to the bulk_insert string. + */ + if (!(use_bulk_insert= bulk_insert.str && + (!insert_dup_update || replace_duplicates))) + append_stmt_insert(&values_string); + + values_string.append(STRING_WITH_LEN(" (")); + tmp_length= values_string.length(); + + /* + loop through the field pointer array, add any fields to both the values + list and the fields list that is part of the write set + */ + for (field= table->field; *field; field++) + { + if (bitmap_is_set(table->write_set, (*field)->field_index)) + { + if ((*field)->is_null()) + values_string.append(STRING_WITH_LEN(" NULL ")); + else + { + bool needs_quote= (*field)->str_needs_quotes(); + (*field)->val_str(&insert_field_value_string); + if (needs_quote) + values_string.append(value_quote_char); + insert_field_value_string.print(&values_string); + if (needs_quote) + values_string.append(value_quote_char); + + insert_field_value_string.length(0); + } + + /* append commas between both fields and fieldnames */ + /* + unfortunately, we can't use the logic if *(fields + 1) to + make the following appends conditional as we don't know if the + next field is in the write set + */ + values_string.append(STRING_WITH_LEN(", ")); + } + } + dbug_tmp_restore_column_map(&table->read_set, old_map); + table->in_use->variables.time_zone= saved_time_zone; + + /* + if there were no fields, we don't want to add a closing paren + AND, we don't want to chop off the last char '(' + insert will be "INSERT INTO t1 VALUES ();" + */ + if (values_string.length() > tmp_length) + { + /* chops off trailing comma */ + values_string.length(values_string.length() - sizeof_trailing_comma); + } + /* we always want to append this, even if there aren't any fields */ + values_string.append(STRING_WITH_LEN(") ")); + + if ((error= txn->acquire(share, ha_thd(), FALSE, &io))) + DBUG_RETURN(error); + + if (use_bulk_insert) + { + /* + Send the current bulk insert out if appending the current row would + cause the statement to overflow the packet size, otherwise set + auto_increment_update_required to FALSE as no query was executed. + */ + if (bulk_insert.length + values_string.length() + bulk_padding > + io->max_query_size() && bulk_insert.length) + { + error= io->query(bulk_insert.str, bulk_insert.length); + bulk_insert.length= 0; + } + else + auto_increment_update_required= FALSE; + + if (bulk_insert.length == 0) + { + char insert_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + String insert_string(insert_buffer, sizeof(insert_buffer), + &my_charset_bin); + insert_string.length(0); + append_stmt_insert(&insert_string); + dynstr_append_mem(&bulk_insert, insert_string.ptr(), + insert_string.length()); + } + else + dynstr_append_mem(&bulk_insert, ",", 1); + + dynstr_append_mem(&bulk_insert, values_string.ptr(), + values_string.length()); + } + else + { + error= io->query(values_string.ptr(), values_string.length()); + } + + if (error) + { + DBUG_RETURN(stash_remote_error()); + } + /* + If the table we've just written a record to contains an auto_increment + field, then store the last_insert_id() value from the foreign server + */ + if (auto_increment_update_required) + { + update_auto_increment(); + + /* mysql_insert() uses this for protocol return value */ + table->next_number_field->store(stats.auto_increment_value, 1); + } + + DBUG_RETURN(0); +} + + +/** + @brief Prepares the storage engine for bulk inserts. + + @param[in] rows estimated number of rows in bulk insert + or 0 if unknown. + + @details Initializes memory structures required for bulk insert. +*/ + +void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags) +{ + uint page_size; + DBUG_ENTER("ha_federatedx::start_bulk_insert"); + + dynstr_free(&bulk_insert); + + /** + We don't bother with bulk-insert semantics when the estimated rows == 1 + The rows value will be 0 if the server does not know how many rows + would be inserted. This can occur when performing INSERT...SELECT + */ + + if (rows == 1) + DBUG_VOID_RETURN; + + /* + Make sure we have an open connection so that we know the + maximum packet size. + */ + if (txn->acquire(share, ha_thd(), FALSE, &io)) + DBUG_VOID_RETURN; + + page_size= (uint) my_getpagesize(); + + if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size)) + DBUG_VOID_RETURN; + + bulk_insert.length= 0; + DBUG_VOID_RETURN; +} + + +/** + @brief End bulk insert. + + @details This method will send any remaining rows to the remote server. + Finally, it will deinitialize the bulk insert data structure. + + @return Operation status + @retval 0 No error + @retval != 0 Error occurred at remote server. Also sets my_errno. +*/ + +int ha_federatedx::end_bulk_insert() +{ + int error= 0; + DBUG_ENTER("ha_federatedx::end_bulk_insert"); + + if (bulk_insert.str && bulk_insert.length && !table_will_be_deleted) + { + if ((error= txn->acquire(share, ha_thd(), FALSE, &io))) + DBUG_RETURN(error); + if (io->query(bulk_insert.str, bulk_insert.length)) + error= stash_remote_error(); + else + if (table->next_number_field) + update_auto_increment(); + } + + dynstr_free(&bulk_insert); + + DBUG_RETURN(my_errno= error); +} + + +/* + ha_federatedx::update_auto_increment + + This method ensures that last_insert_id() works properly. What it simply does + is calls last_insert_id() on the foreign database immediately after insert + (if the table has an auto_increment field) and sets the insert id via + thd->insert_id(ID)). +*/ +void ha_federatedx::update_auto_increment(void) +{ + THD *thd= ha_thd(); + DBUG_ENTER("ha_federatedx::update_auto_increment"); + + ha_federatedx::info(HA_STATUS_AUTO); + thd->first_successful_insert_id_in_cur_stmt= + stats.auto_increment_value; + DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value)); + + DBUG_VOID_RETURN; +} + +int ha_federatedx::optimize(THD* thd, HA_CHECK_OPT* check_opt) +{ + int error= 0; + char query_buffer[STRING_BUFFER_USUAL_SIZE]; + String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + DBUG_ENTER("ha_federatedx::optimize"); + + query.length(0); + + query.set_charset(system_charset_info); + query.append(STRING_WITH_LEN("OPTIMIZE TABLE ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); + + DBUG_ASSERT(txn == get_txn(thd)); + + if ((error= txn->acquire(share, thd, FALSE, &io))) + DBUG_RETURN(error); + + if (io->query(query.ptr(), query.length())) + error= stash_remote_error(); + + DBUG_RETURN(error); +} + + +int ha_federatedx::repair(THD* thd, HA_CHECK_OPT* check_opt) +{ + int error= 0; + char query_buffer[STRING_BUFFER_USUAL_SIZE]; + String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + DBUG_ENTER("ha_federatedx::repair"); + + query.length(0); + + query.set_charset(system_charset_info); + query.append(STRING_WITH_LEN("REPAIR TABLE ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); + if (check_opt->flags & T_QUICK) + query.append(STRING_WITH_LEN(" QUICK")); + if (check_opt->flags & T_EXTEND) + query.append(STRING_WITH_LEN(" EXTENDED")); + if (check_opt->sql_flags & TT_USEFRM) + query.append(STRING_WITH_LEN(" USE_FRM")); + + DBUG_ASSERT(txn == get_txn(thd)); + + if ((error= txn->acquire(share, thd, FALSE, &io))) + DBUG_RETURN(error); + + if (io->query(query.ptr(), query.length())) + error= stash_remote_error(); + + DBUG_RETURN(error); +} + + +/* + Yes, update_row() does what you expect, it updates a row. old_data will have + the previous row record in it, while new_data will have the newest data in + it. + + Keep in mind that the server can do updates based on ordering if an ORDER BY + clause was used. Consecutive ordering is not guaranteed. + Currently new_data will not have an updated auto_increament record, or + and updated timestamp field. You can do these for federatedx by doing these: + if (table->timestamp_on_update_now) + update_timestamp(new_row+table->timestamp_on_update_now-1); + if (table->next_number_field && record == table->record[0]) + update_auto_increment(); + + Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc. +*/ + +int ha_federatedx::update_row(const uchar *old_data, const uchar *new_data) +{ + /* + This used to control how the query was built. If there was a + primary key, the query would be built such that there was a where + clause with only that column as the condition. This is flawed, + because if we have a multi-part primary key, it would only use the + first part! We don't need to do this anyway, because + read_range_first will retrieve the correct record, which is what + is used to build the WHERE clause. We can however use this to + append a LIMIT to the end if there is NOT a primary key. Why do + this? Because we only are updating one record, and LIMIT enforces + this. + */ + bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY); + + /* + buffers for following strings + */ + char field_value_buffer[STRING_BUFFER_USUAL_SIZE]; + char update_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + char where_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + + /* Work area for field values */ + String field_value(field_value_buffer, sizeof(field_value_buffer), + &my_charset_bin); + /* stores the update query */ + String update_string(update_buffer, + sizeof(update_buffer), + &my_charset_bin); + /* stores the WHERE clause */ + String where_string(where_buffer, + sizeof(where_buffer), + &my_charset_bin); + uchar *record= table->record[0]; + int error; + DBUG_ENTER("ha_federatedx::update_row"); + /* + set string lengths to 0 to avoid misc chars in string + */ + field_value.length(0); + update_string.length(0); + where_string.length(0); + + if (ignore_duplicates) + update_string.append(STRING_WITH_LEN("UPDATE IGNORE ")); + else + update_string.append(STRING_WITH_LEN("UPDATE ")); + append_ident(&update_string, share->table_name, + share->table_name_length, ident_quote_char); + update_string.append(STRING_WITH_LEN(" SET ")); + + /* + In this loop, we want to match column names to values being inserted + (while building INSERT statement). + + Iterate through table->field (new data) and share->old_field (old_data) + using the same index to create an SQL UPDATE statement. New data is + used to create SET field=value and old data is used to create WHERE + field=oldvalue + */ + + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + table->in_use->variables.time_zone= UTC; + for (Field **field= table->field; *field; field++) + { + if (bitmap_is_set(table->write_set, (*field)->field_index)) + { + append_ident(&update_string, (*field)->field_name.str, + (*field)->field_name.length, + ident_quote_char); + update_string.append(STRING_WITH_LEN(" = ")); + + if ((*field)->is_null()) + update_string.append(STRING_WITH_LEN(" NULL ")); + else + { + /* otherwise = */ + MY_BITMAP *old_map= tmp_use_all_columns(table, &table->read_set); + bool needs_quote= (*field)->str_needs_quotes(); + (*field)->val_str(&field_value); + if (needs_quote) + update_string.append(value_quote_char); + field_value.print(&update_string); + if (needs_quote) + update_string.append(value_quote_char); + field_value.length(0); + tmp_restore_column_map(&table->read_set, old_map); + } + update_string.append(STRING_WITH_LEN(", ")); + } + + if (bitmap_is_set(table->read_set, (*field)->field_index)) + { + append_ident(&where_string, (*field)->field_name.str, + (*field)->field_name.length, + ident_quote_char); + if (field_in_record_is_null(table, *field, (char*) old_data)) + where_string.append(STRING_WITH_LEN(" IS NULL ")); + else + { + bool needs_quote= (*field)->str_needs_quotes(); + where_string.append(STRING_WITH_LEN(" = ")); + (*field)->val_str(&field_value, + (old_data + (*field)->offset(record))); + if (needs_quote) + where_string.append(value_quote_char); + field_value.print(&where_string); + if (needs_quote) + where_string.append(value_quote_char); + field_value.length(0); + } + where_string.append(STRING_WITH_LEN(" AND ")); + } + } + table->in_use->variables.time_zone= saved_time_zone; + + /* Remove last ', '. This works as there must be at least on updated field */ + update_string.length(update_string.length() - sizeof_trailing_comma); + + if (where_string.length()) + { + /* chop off trailing AND */ + where_string.length(where_string.length() - sizeof_trailing_and); + update_string.append(STRING_WITH_LEN(" WHERE ")); + update_string.append(where_string); + } + + /* + If this table has not a primary key, then we could possibly + update multiple rows. We want to make sure to only update one! + */ + if (!has_a_primary_key) + update_string.append(STRING_WITH_LEN(" LIMIT 1")); + + if ((error= txn->acquire(share, ha_thd(), FALSE, &io))) + DBUG_RETURN(error); + + if (io->query(update_string.ptr(), update_string.length())) + { + DBUG_RETURN(stash_remote_error()); + } + DBUG_RETURN(0); +} + +/* + This will delete a row. 'buf' will contain a copy of the row to be =deleted. + The server will call this right after the current row has been called (from + either a previous rnd_next() or index call). + If you keep a pointer to the last row or can access a primary key it will + make doing the deletion quite a bit easier. + Keep in mind that the server does no guarentee consecutive deletions. + ORDER BY clauses can be used. + + Called in sql_acl.cc and sql_udf.cc to manage internal table information. + Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select + it is used for removing duplicates while in insert it is used for REPLACE + calls. +*/ + +int ha_federatedx::delete_row(const uchar *buf) +{ + char delete_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + char data_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin); + String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin); + uint found= 0; + int error; + DBUG_ENTER("ha_federatedx::delete_row"); + + delete_string.length(0); + delete_string.append(STRING_WITH_LEN("DELETE FROM ")); + append_ident(&delete_string, share->table_name, + share->table_name_length, ident_quote_char); + delete_string.append(STRING_WITH_LEN(" WHERE ")); + + Time_zone *saved_time_zone= table->in_use->variables.time_zone; + table->in_use->variables.time_zone= UTC; + for (Field **field= table->field; *field; field++) + { + Field *cur_field= *field; + found++; + if (bitmap_is_set(table->read_set, cur_field->field_index)) + { + append_ident(&delete_string, (*field)->field_name.str, + (*field)->field_name.length, ident_quote_char); + data_string.length(0); + if (cur_field->is_null()) + { + delete_string.append(STRING_WITH_LEN(" IS NULL ")); + } + else + { + bool needs_quote= cur_field->str_needs_quotes(); + delete_string.append(STRING_WITH_LEN(" = ")); + cur_field->val_str(&data_string); + if (needs_quote) + delete_string.append(value_quote_char); + data_string.print(&delete_string); + if (needs_quote) + delete_string.append(value_quote_char); + } + delete_string.append(STRING_WITH_LEN(" AND ")); + } + } + table->in_use->variables.time_zone= saved_time_zone; + + // Remove trailing AND + delete_string.length(delete_string.length() - sizeof_trailing_and); + if (!found) + delete_string.length(delete_string.length() - sizeof_trailing_where); + + delete_string.append(STRING_WITH_LEN(" LIMIT 1")); + DBUG_PRINT("info", + ("Delete sql: %s", delete_string.c_ptr_quick())); + + if ((error= txn->acquire(share, ha_thd(), FALSE, &io))) + DBUG_RETURN(error); + + if (io->query(delete_string.ptr(), delete_string.length())) + { + DBUG_RETURN(stash_remote_error()); + } + stats.deleted+= (ha_rows) io->affected_rows(); + stats.records-= (ha_rows) io->affected_rows(); + DBUG_PRINT("info", + ("rows deleted %ld rows deleted for all time %ld", + (long) io->affected_rows(), (long) stats.deleted)); + + DBUG_RETURN(0); +} + + +/* + Positions an index cursor to the index specified in the handle. Fetches the + row if available. If the key value is null, begin at the first key of the + index. This method, which is called in the case of an SQL statement having + a WHERE clause on a non-primary key index, simply calls index_read_idx. +*/ + +int ha_federatedx::index_read(uchar *buf, const uchar *key, + uint key_len, ha_rkey_function find_flag) +{ + DBUG_ENTER("ha_federatedx::index_read"); + + if (stored_result) + (void) free_result(); + DBUG_RETURN(index_read_idx_with_result_set(buf, active_index, key, + key_len, find_flag, + &stored_result)); +} + + +/* + Positions an index cursor to the index specified in key. Fetches the + row if any. This is only used to read whole keys. + + This method is called via index_read in the case of a WHERE clause using + a primary key index OR is called DIRECTLY when the WHERE clause + uses a PRIMARY KEY index. + + NOTES + This uses an internal result set that is deleted before function + returns. We need to be able to be callable from ha_rnd_pos() +*/ + +int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int retval; + FEDERATEDX_IO_RESULT *io_result= 0; + DBUG_ENTER("ha_federatedx::index_read_idx"); + + if ((retval= index_read_idx_with_result_set(buf, index, key, + key_len, find_flag, + &io_result))) + DBUG_RETURN(retval); + /* io is correct, as index_read_idx_with_result_set was ok */ + io->free_result(io_result); + DBUG_RETURN(retval); +} + + +/* + Create result set for rows matching query and return first row + + RESULT + 0 ok In this case *result will contain the result set + # error In this case *result will contain 0 +*/ + +int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index, + const uchar *key, + uint key_len, + ha_rkey_function find_flag, + FEDERATEDX_IO_RESULT **result) +{ + int retval; + char error_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + char index_value[STRING_BUFFER_USUAL_SIZE]; + char sql_query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + String index_string(index_value, + sizeof(index_value), + &my_charset_bin); + String sql_query(sql_query_buffer, + sizeof(sql_query_buffer), + &my_charset_bin); + key_range range; + DBUG_ENTER("ha_federatedx::index_read_idx_with_result_set"); + + *result= 0; // In case of errors + index_string.length(0); + sql_query.length(0); + + sql_query.append(share->select_query); + + range.key= key; + range.length= key_len; + range.flag= find_flag; + create_where_from_key(&index_string, &table->key_info[index], &range, 0, 0); + sql_query.append(index_string); + + if ((retval= txn->acquire(share, ha_thd(), TRUE, &io))) + DBUG_RETURN(retval); + + if (io->query(sql_query.ptr(), sql_query.length())) + { + snprintf(error_buffer, sizeof(error_buffer), "error: %d '%s'", + io->error_code(), io->error_str()); + retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + goto error; + } + if (!(*result= io->store_result())) + { + retval= HA_ERR_END_OF_FILE; + goto error; + } + if (!(retval= read_next(buf, *result))) + DBUG_RETURN(retval); + + insert_dynamic(&results, (uchar*) result); + *result= 0; + DBUG_RETURN(retval); + +error: + my_error(retval, MYF(0), error_buffer); + DBUG_RETURN(retval); +} + + +/* + This method is used exlusevely by filesort() to check if we + can create sorting buffers of necessary size. + If the handler returns more records that it declares + here server can just crash on filesort(). + We cannot guarantee that's not going to happen with + the FEDERATEDX engine, as we have records==0 always if the + client is a VIEW, and for the table the number of + records can inpredictably change during execution. + So we return maximum possible value here. +*/ + +ha_rows ha_federatedx::estimate_rows_upper_bound() +{ + return HA_POS_ERROR; +} + + +/* Initialized at each key walk (called multiple times unlike rnd_init()) */ + +int ha_federatedx::index_init(uint keynr, bool sorted) +{ + DBUG_ENTER("ha_federatedx::index_init"); + DBUG_PRINT("info", ("table: '%s' key: %u", table->s->table_name.str, keynr)); + active_index= keynr; + DBUG_RETURN(0); +} + + +/* + Read first range +*/ + +int ha_federatedx::read_range_first(const key_range *start_key, + const key_range *end_key, + bool eq_range_arg, bool sorted) +{ + char sql_query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + int retval; + String sql_query(sql_query_buffer, + sizeof(sql_query_buffer), + &my_charset_bin); + DBUG_ENTER("ha_federatedx::read_range_first"); + + DBUG_ASSERT(!(start_key == NULL && end_key == NULL)); + + sql_query.length(0); + sql_query.append(share->select_query); + create_where_from_key(&sql_query, &table->key_info[active_index], + start_key, end_key, eq_range_arg); + + if ((retval= txn->acquire(share, ha_thd(), TRUE, &io))) + DBUG_RETURN(retval); + + if (stored_result) + (void) free_result(); + + if (io->query(sql_query.ptr(), sql_query.length())) + { + retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + goto error; + } + sql_query.length(0); + + if (!(stored_result= io->store_result())) + { + retval= HA_ERR_END_OF_FILE; + goto error; + } + + retval= read_next(table->record[0], stored_result); + DBUG_RETURN(retval); + +error: + DBUG_RETURN(retval); +} + + +int ha_federatedx::read_range_next() +{ + int retval; + DBUG_ENTER("ha_federatedx::read_range_next"); + retval= rnd_next(table->record[0]); + DBUG_RETURN(retval); +} + + +/* Used to read forward through the index. */ +int ha_federatedx::index_next(uchar *buf) +{ + DBUG_ENTER("ha_federatedx::index_next"); + int retval=read_next(buf, stored_result); + DBUG_RETURN(retval); +} + + +/* + rnd_init() is called when the system wants the storage engine to do a table + scan. + + This is the method that gets data for the SELECT calls. + + See the federatedx in the introduction at the top of this file to see when + rnd_init() is called. + + Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc, + sql_table.cc, and sql_update.cc. +*/ + +int ha_federatedx::rnd_init(bool scan) +{ + DBUG_ENTER("ha_federatedx::rnd_init"); + /* + The use of the 'scan' flag is incredibly important for this handler + to work properly, especially with updates containing WHERE clauses + using indexed columns. + + When the initial query contains a WHERE clause of the query using an + indexed column, it's index_read_idx that selects the exact record from + the foreign database. + + When there is NO index in the query, either due to not having a WHERE + clause, or the WHERE clause is using columns that are not indexed, a + 'full table scan' done by rnd_init, which in this situation simply means + a 'select * from ...' on the foreign table. + + In other words, this 'scan' flag gives us the means to ensure that if + there is an index involved in the query, we want index_read_idx to + retrieve the exact record (scan flag is 0), and do not want rnd_init + to do a 'full table scan' and wipe out that result set. + + Prior to using this flag, the problem was most apparent with updates. + + An initial query like 'UPDATE tablename SET anything = whatever WHERE + indexedcol = someval', index_read_idx would get called, using a query + constructed with a WHERE clause built from the values of index ('indexcol' + in this case, having a value of 'someval'). mysql_store_result would + then get called (this would be the result set we want to use). + + After this rnd_init (from sql_update.cc) would be called, it would then + unecessarily call "select * from table" on the foreign table, then call + mysql_store_result, which would wipe out the correct previous result set + from the previous call of index_read_idx's that had the result set + containing the correct record, hence update the wrong row! + + */ + + if (scan) + { + int error; + + if ((error= txn->acquire(share, ha_thd(), TRUE, &io))) + DBUG_RETURN(error); + + if (stored_result) + (void) free_result(); + + if (io->query(share->select_query.str, share->select_query.length)) + goto error; + + stored_result= io->store_result(); + if (!stored_result) + goto error; + } + DBUG_RETURN(0); + +error: + DBUG_RETURN(stash_remote_error()); +} + + +int ha_federatedx::rnd_end() +{ + DBUG_ENTER("ha_federatedx::rnd_end"); + DBUG_RETURN(index_end()); +} + + +int ha_federatedx::free_result() +{ + int error; + DBUG_ENTER("ha_federatedx::free_result"); + DBUG_ASSERT(stored_result); + for (uint i= 0; i < results.elements; ++i) + { + FEDERATEDX_IO_RESULT *result= 0; + get_dynamic(&results, (uchar*) &result, i); + if (result == stored_result) + goto end; + } + if (position_called) + { + insert_dynamic(&results, (uchar*) &stored_result); + } + else + { + federatedx_io *tmp_io= 0, **iop; + if (!*(iop= &io) && (error= txn->acquire(share, ha_thd(), TRUE, (iop= &tmp_io)))) + { + DBUG_ASSERT(0); // Fail when testing + insert_dynamic(&results, (uchar*) &stored_result); + goto end; + } + (*iop)->free_result(stored_result); + txn->release(&tmp_io); + } +end: + stored_result= 0; + position_called= FALSE; + DBUG_RETURN(0); +} + +int ha_federatedx::index_end(void) +{ + int error= 0; + DBUG_ENTER("ha_federatedx::index_end"); + if (stored_result) + error= free_result(); + active_index= MAX_KEY; + DBUG_RETURN(error); +} + + +/* + This is called for each row of the table scan. When you run out of records + you should return HA_ERR_END_OF_FILE. Fill buff up with the row information. + The Field structure for the table is the key to getting data into buf + in a manner that will allow the server to understand it. + + Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc, + sql_table.cc, and sql_update.cc. +*/ + +int ha_federatedx::rnd_next(uchar *buf) +{ + DBUG_ENTER("ha_federatedx::rnd_next"); + + if (stored_result == 0) + { + /* + Return value of rnd_init is not always checked (see records.cc), + so we can get here _even_ if there is _no_ pre-fetched result-set! + TODO: fix it. We can delete this in 5.1 when rnd_init() is checked. + */ + DBUG_RETURN(1); + } + int retval=read_next(buf, stored_result); + DBUG_RETURN(retval); +} + + +/* + ha_federatedx::read_next + + reads from a result set and converts to mysql internal + format + + SYNOPSIS + field_in_record_is_null() + buf byte pointer to record + result mysql result set + + DESCRIPTION + This method is a wrapper method that reads one record from a result + set and converts it to the internal table format + + RETURN VALUE + 1 error + 0 no error +*/ + +int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result) +{ + int retval; + FEDERATEDX_IO_ROW *row; + DBUG_ENTER("ha_federatedx::read_next"); + + if ((retval= txn->acquire(share, ha_thd(), TRUE, &io))) + DBUG_RETURN(retval); + + /* Fetch a row, insert it back in a row format. */ + if (!(row= io->fetch_row(result, ¤t))) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + if (!(retval= convert_row_to_internal_format(buf, row, result))) + table->status= 0; + + DBUG_RETURN(retval); +} + + +/** + @brief Store a reference to current row. + + @details During a query execution we may have different result sets (RS), + e.g. for different ranges. All the RS's used are stored in + memory and placed in @c results dynamic array. At the end of + execution all stored RS's are freed at once in the + @c ha_federated::reset(). + So, in case of federated, a reference to current row is a + stored result address and current data cursor position. + As we keep all RS in memory during a query execution, + we can get any record using the reference any time until + @c ha_federated::reset() is called. + TODO: we don't have to store all RS's rows but only those + we call @c ha_federated::position() for, so we can free memory + where we store other rows in the @c ha_federated::index_end(). + + @param[in] record record data (unused) + +*/ + +void ha_federatedx::position(const uchar *record __attribute__ ((unused))) +{ + DBUG_ENTER("ha_federatedx::position"); + + if (!stored_result) + { + bzero(ref, ref_length); + DBUG_VOID_RETURN; + } + + if (txn->acquire(share, ha_thd(), TRUE, &io)) + DBUG_VOID_RETURN; + + io->mark_position(stored_result, ref, current); + + position_called= TRUE; + + DBUG_VOID_RETURN; +} + + +/* + This is like rnd_next, but you are given a position to use to determine the + row. The position will be of the type that you stored in ref. + + This method is required for an ORDER BY + + Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc. +*/ + +int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) +{ + int retval; + FEDERATEDX_IO_RESULT *result= stored_result; + DBUG_ENTER("ha_federatedx::rnd_pos"); + + /* We have to move this to 'ref' to get things aligned */ + bmove(ref, pos, ref_length); + + if ((retval= txn->acquire(share, ha_thd(), TRUE, &io))) + goto error; + + if ((retval= io->seek_position(&result, ref))) + goto error; + + retval= read_next(buf, result); + DBUG_RETURN(retval); + +error: + DBUG_RETURN(retval); +} + + +/* + ::info() is used to return information to the optimizer. + Currently this table handler doesn't implement most of the fields + really needed. SHOW also makes use of this data + Another note, you will probably want to have the following in your + code: + if (records < 2) + records = 2; + The reason is that the server will optimize for cases of only a single + record. If in a table scan you don't know the number of records + it will probably be better to set records to two so you can return + as many records as you need. + Along with records a few more variables you may wish to set are: + records + deleted + data_file_length + index_file_length + delete_length + check_time + Take a look at the public variables in handler.h for more information. + + Called in: + filesort.cc + ha_heap.cc + item_sum.cc + opt_sum.cc + sql_delete.cc + sql_delete.cc + sql_derived.cc + sql_select.cc + sql_select.cc + sql_select.cc + sql_select.cc + sql_select.cc + sql_show.cc + sql_show.cc + sql_show.cc + sql_show.cc + sql_table.cc + sql_union.cc + sql_update.cc + +*/ + +int ha_federatedx::info(uint flag) +{ + uint error_code; + THD *thd= ha_thd(); + federatedx_txn *tmp_txn; + federatedx_io *tmp_io= 0, **iop= 0; + DBUG_ENTER("ha_federatedx::info"); + + error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; + + // external_lock may not have been called so txn may not be set + tmp_txn= get_txn(thd); + + /* we want not to show table status if not needed to do so */ + if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO)) + { + if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io)))) + goto fail; + } + + if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST)) + { + /* + size of IO operations (This is based on a good guess, no high science + involved) + */ + if (flag & HA_STATUS_CONST) + stats.block_size= 4096; + + if ((*iop)->table_metadata(&stats, share->table_name, + (uint)share->table_name_length, flag)) + goto error; + } + + if (flag & HA_STATUS_AUTO) + stats.auto_increment_value= (*iop)->last_insert_id(); + + /* + If ::info created it's own transaction, close it. This happens in case + of show table status; + */ + tmp_txn->release(&tmp_io); + + DBUG_RETURN(0); + +error: + if (iop && *iop) + { + my_printf_error((*iop)->error_code(), "Received error: %d : %s", MYF(0), + (*iop)->error_code(), (*iop)->error_str()); + } + else if (remote_error_number != -1 /* error already reported */) + { + error_code= remote_error_number; + my_error(error_code, MYF(0), ER_THD(thd, error_code)); + } +fail: + tmp_txn->release(&tmp_io); + DBUG_RETURN(error_code); +} + + +/** + @brief Handles extra signals from MySQL server + + @param[in] operation Hint for storage engine + + @return Operation Status + @retval 0 OK + */ +int ha_federatedx::extra(ha_extra_function operation) +{ + DBUG_ENTER("ha_federatedx::extra"); + switch (operation) { + case HA_EXTRA_IGNORE_DUP_KEY: + ignore_duplicates= TRUE; + break; + case HA_EXTRA_NO_IGNORE_DUP_KEY: + insert_dup_update= FALSE; + ignore_duplicates= FALSE; + break; + case HA_EXTRA_WRITE_CAN_REPLACE: + replace_duplicates= TRUE; + break; + case HA_EXTRA_WRITE_CANNOT_REPLACE: + /* + We use this flag to ensure that we do not create an "INSERT IGNORE" + statement when inserting new rows into the remote table. + */ + replace_duplicates= FALSE; + break; + case HA_EXTRA_INSERT_WITH_UPDATE: + insert_dup_update= TRUE; + break; + case HA_EXTRA_PREPARE_FOR_DROP: + table_will_be_deleted = TRUE; + break; + default: + /* do nothing */ + DBUG_PRINT("info",("unhandled operation: %d", (uint) operation)); + } + DBUG_RETURN(0); +} + + +/** + @brief Reset state of file to after 'open'. + + @detail This function is called after every statement for all tables + used by that statement. + + @return Operation status + @retval 0 OK +*/ + +int ha_federatedx::reset(void) +{ + THD *thd= ha_thd(); + int error = 0; + + insert_dup_update= FALSE; + ignore_duplicates= FALSE; + replace_duplicates= FALSE; + position_called= FALSE; + + if (stored_result) + insert_dynamic(&results, (uchar*) &stored_result); + stored_result= 0; + + if (results.elements) + { + federatedx_txn *tmp_txn; + federatedx_io *tmp_io= 0, **iop; + + // external_lock may not have been called so txn may not be set + tmp_txn= get_txn(thd); + + if (!*(iop= &io) && (error= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io)))) + { + DBUG_ASSERT(0); // Fail when testing + return error; + } + + for (uint i= 0; i < results.elements; ++i) + { + FEDERATEDX_IO_RESULT *result= 0; + get_dynamic(&results, (uchar*) &result, i); + (*iop)->free_result(result); + } + tmp_txn->release(&tmp_io); + reset_dynamic(&results); + } + + return error; + +} + +/* + Used to delete all rows in a table. Both for cases of truncate and + for cases where the optimizer realizes that all rows will be + removed as a result of a SQL statement. + + Called from item_sum.cc by Item_func_group_concat::clear(), + Item_sum_count_distinct::clear(), and Item_func_group_concat::clear(). + Called from sql_delete.cc by mysql_delete(). + Called from sql_select.cc by JOIN::reinit(). + Called from sql_union.cc by st_select_lex_unit::exec(). +*/ + +int ha_federatedx::delete_all_rows() +{ + THD *thd= ha_thd(); + char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + String query(query_buffer, sizeof(query_buffer), &my_charset_bin); + int error; + DBUG_ENTER("ha_federatedx::delete_all_rows"); + + query.length(0); + + query.set_charset(system_charset_info); + if (thd->lex->sql_command == SQLCOM_TRUNCATE) + query.append(STRING_WITH_LEN("TRUNCATE ")); + else + query.append(STRING_WITH_LEN("DELETE FROM ")); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); + + /* no need for savepoint in autocommit mode */ + if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) + txn->stmt_autocommit(); + + /* + TRUNCATE won't return anything in mysql_affected_rows + */ + + if ((error= txn->acquire(share, thd, FALSE, &io))) + DBUG_RETURN(error); + + if (io->query(query.ptr(), query.length())) + { + DBUG_RETURN(stash_remote_error()); + } + stats.deleted+= stats.records; + stats.records= 0; + DBUG_RETURN(0); +} + + +/* + The idea with handler::store_lock() is the following: + + The statement decided which locks we should need for the table + for updates/deletes/inserts we get WRITE locks, for SELECT... we get + read locks. + + Before adding the lock into the table lock handler (see thr_lock.c) + mysqld calls store lock with the requested locks. Store lock can now + modify a write lock to a read lock (or some other lock), ignore the + lock (if we don't want to use MySQL table locks at all) or add locks + for many tables (like we do when we are using a MERGE handler). + + Berkeley DB for federatedx changes all WRITE locks to TL_WRITE_ALLOW_WRITE + (which signals that we are doing WRITES, but we are still allowing other + reader's and writer's. + + When releasing locks, store_lock() are also called. In this case one + usually doesn't have to do anything. + + In some exceptional cases MySQL may send a request for a TL_IGNORE; + This means that we are requesting the same lock as last time and this + should also be ignored. (This may happen when someone does a flush + table when we have opened a part of the tables, in which case mysqld + closes and reopens the tables and tries to get the same locks at last + time). In the future we will probably try to remove this. + + Called from lock.cc by get_lock_data(). +*/ + +THR_LOCK_DATA **ha_federatedx::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + DBUG_ENTER("ha_federatedx::store_lock"); + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) + { + /* + Here is where we get into the guts of a row level lock. + If TL_UNLOCK is set + If we are not doing a LOCK TABLE or DISCARD/IMPORT + TABLESPACE, then allow multiple writers + */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && !thd->in_lock_tables) + lock_type= TL_WRITE_ALLOW_WRITE; + + /* + In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. + */ + + if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) + lock_type= TL_READ; + + lock.type= lock_type; + } + + *to++= &lock; + + DBUG_RETURN(to); +} + + +static int test_connection(MYSQL_THD thd, federatedx_io *io, + FEDERATEDX_SHARE *share) +{ + char buffer[FEDERATEDX_QUERY_BUFFER_SIZE]; + String str(buffer, sizeof(buffer), &my_charset_bin); + FEDERATEDX_IO_RESULT *resultset= NULL; + int retval; + + str.length(0); + str.append(STRING_WITH_LEN("SELECT * FROM ")); + append_identifier(thd, &str, share->table_name, + share->table_name_length); + str.append(STRING_WITH_LEN(" WHERE 1=0")); + + if ((retval= io->query(str.ptr(), str.length()))) + { + snprintf(buffer, sizeof(buffer), "database: '%s' username: '%s' hostname: '%s'", + share->database, share->username, share->hostname); + DBUG_PRINT("info", ("error-code: %d", io->error_code())); + my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer); + } + else + resultset= io->store_result(); + + io->free_result(resultset); + + return retval; +} + +/* + create() does nothing, since we have no local setup of our own. + FUTURE: We should potentially connect to the foreign database and +*/ + +int ha_federatedx::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *create_info) +{ + int retval; + THD *thd= ha_thd(); + FEDERATEDX_SHARE tmp_share; // Only a temporary share, to test the url + federatedx_txn *tmp_txn; + federatedx_io *tmp_io= NULL; + DBUG_ENTER("ha_federatedx::create"); + + if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg->s, 1))) + goto error; + + /* loopback socket connections hang due to LOCK_open mutex */ + if (0 == strcmp(tmp_share.hostname, my_localhost) && !tmp_share.port) + goto error; + + /* + If possible, we try to use an existing network connection to + the remote server. To ensure that no new FEDERATEDX_SERVER + instance is created, we pass NULL in get_server() TABLE arg. + */ + mysql_mutex_lock(&federatedx_mutex); + tmp_share.s= get_server(&tmp_share, NULL); + mysql_mutex_unlock(&federatedx_mutex); + + if (tmp_share.s) + { + tmp_txn= get_txn(thd); + if (!(retval= tmp_txn->acquire(&tmp_share, thd, TRUE, &tmp_io))) + { + retval= test_connection(thd, tmp_io, &tmp_share); + tmp_txn->release(&tmp_io); + } + free_server(tmp_txn, tmp_share.s); + } + else + { + FEDERATEDX_SERVER server; + + // It's possibly wrong to use alter_table_convert_to_charset here. + fill_server(thd->mem_root, &server, &tmp_share, + create_info->alter_table_convert_to_charset); + +#ifndef DBUG_OFF + mysql_mutex_init(fe_key_mutex_FEDERATEDX_SERVER_mutex, + &server.mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_lock(&server.mutex); +#endif + + tmp_io= federatedx_io::construct(thd->mem_root, &server); + + retval= test_connection(thd, tmp_io, &tmp_share); + +#ifndef DBUG_OFF + mysql_mutex_unlock(&server.mutex); + mysql_mutex_destroy(&server.mutex); +#endif + + delete tmp_io; + } + +error: + DBUG_RETURN(retval); + +} + + +int ha_federatedx::stash_remote_error() +{ + DBUG_ENTER("ha_federatedx::stash_remote_error()"); + if (!io) + DBUG_RETURN(remote_error_number); + remote_error_number= io->error_code(); + strmake_buf(remote_error_buf, io->error_str()); + if (remote_error_number == ER_DUP_ENTRY || + remote_error_number == ER_DUP_KEY) + DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); + DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM); +} + + +bool ha_federatedx::get_error_message(int error, String* buf) +{ + DBUG_ENTER("ha_federatedx::get_error_message"); + DBUG_PRINT("enter", ("error: %d", error)); + if (error == HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM) + { + buf->append(STRING_WITH_LEN("Error on remote system: ")); + buf->qs_append(remote_error_number); + buf->append(STRING_WITH_LEN(": ")); + buf->append(remote_error_buf, strlen(remote_error_buf)); + /* Ensure string ends with \0 */ + (void) buf->c_ptr_safe(); + + remote_error_number= 0; + remote_error_buf[0]= '\0'; + } + DBUG_PRINT("exit", ("message: %s", buf->c_ptr_safe())); + DBUG_RETURN(FALSE); +} + + +int ha_federatedx::start_stmt(MYSQL_THD thd, thr_lock_type lock_type) +{ + DBUG_ENTER("ha_federatedx::start_stmt"); + DBUG_ASSERT(txn == get_txn(thd)); + + if (!txn->in_transaction()) + { + txn->stmt_begin(); + trans_register_ha(thd, FALSE, ht, 0); + } + DBUG_RETURN(0); +} + + +int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type) +{ + int error= 0; + DBUG_ENTER("ha_federatedx::external_lock"); + + if (lock_type == F_UNLCK) + txn->release(&io); + else + { + table_will_be_deleted = FALSE; + txn= get_txn(thd); + if (!(error= txn->acquire(share, ha_thd(), lock_type == F_RDLCK, &io)) && + (lock_type == F_WRLCK || !io->is_autocommit())) + { + if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) + { + txn->stmt_begin(); + trans_register_ha(thd, FALSE, ht, 0); + } + else + { + txn->txn_begin(); + trans_register_ha(thd, TRUE, ht, 0); + } + } + } + + DBUG_RETURN(error); +} + + +int ha_federatedx::savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv) +{ + int error= 0; + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); + DBUG_ENTER("ha_federatedx::savepoint_set"); + + if (txn && txn->has_connections()) + { + if (txn->txn_begin()) + trans_register_ha(thd, TRUE, hton, 0); + + txn->sp_acquire((ulong *) sv); + + DBUG_ASSERT(1 < *(ulong *) sv); + } + + DBUG_RETURN(error); +} + + +int ha_federatedx::savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv) + { + int error= 0; + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); + DBUG_ENTER("ha_federatedx::savepoint_rollback"); + + if (txn) + error= txn->sp_rollback((ulong *) sv); + + DBUG_RETURN(error); +} + + +int ha_federatedx::savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv) +{ + int error= 0; + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); + DBUG_ENTER("ha_federatedx::savepoint_release"); + + if (txn) + error= txn->sp_release((ulong *) sv); + + DBUG_RETURN(error); +} + + +int ha_federatedx::commit(handlerton *hton, MYSQL_THD thd, bool all) +{ + int return_val; + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); + DBUG_ENTER("ha_federatedx::commit"); + + if (all) + return_val= txn->txn_commit(); + else + return_val= txn->stmt_commit(); + + DBUG_PRINT("info", ("error val: %d", return_val)); + DBUG_RETURN(return_val); +} + + +int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all) +{ + int return_val; + federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); + DBUG_ENTER("ha_federatedx::rollback"); + + if (all) + return_val= txn->txn_rollback(); + else + return_val= txn->stmt_rollback(); + + DBUG_PRINT("info", ("error val: %d", return_val)); + DBUG_RETURN(return_val); +} + + +/* + Federated supports assisted discovery, like + CREATE TABLE t1 CONNECTION="mysql://joe:pass@192.168.1.111/federated/t1"; + but not a fully automatic discovery where a table magically appear + on any use (like, on SELECT * from t1). +*/ +int ha_federatedx::discover_assisted(handlerton *hton, THD* thd, + TABLE_SHARE *table_s, HA_CREATE_INFO *info) +{ + int error= HA_ERR_NO_CONNECTION; + FEDERATEDX_SHARE tmp_share; + CHARSET_INFO *cs= system_charset_info; + MYSQL mysql; + char buf[1024]; + String query(buf, sizeof(buf), cs); + static LEX_CSTRING cut_clause={STRING_WITH_LEN(" WITH SYSTEM VERSIONING")}; + static LEX_CSTRING cut_start={STRING_WITH_LEN("GENERATED ALWAYS AS ROW START")}; + static LEX_CSTRING cut_end={STRING_WITH_LEN("GENERATED ALWAYS AS ROW END")}; + static LEX_CSTRING set_ts={STRING_WITH_LEN("DEFAULT TIMESTAMP'1971-01-01 00:00:00'")}; + int cut_offset; + MYSQL_RES *res; + MYSQL_ROW rdata; + ulong *rlen; + my_bool my_true= 1; + + if (parse_url(thd->mem_root, &tmp_share, table_s, 1)) + return HA_WRONG_CREATE_OPTION; + + mysql_init(&mysql); + mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, cs->cs_name.str); + mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*)&my_true); + + if (!mysql_real_connect(&mysql, tmp_share.hostname, tmp_share.username, + tmp_share.password, tmp_share.database, + tmp_share.port, tmp_share.socket, 0)) + goto err1; + + if (mysql_real_query(&mysql, STRING_WITH_LEN("SET SQL_MODE=NO_TABLE_OPTIONS"))) + goto err1; + + query.copy(STRING_WITH_LEN("SHOW CREATE TABLE "), cs); + append_ident(&query, tmp_share.table_name, + tmp_share.table_name_length, ident_quote_char); + + if (mysql_real_query(&mysql, query.ptr(), query.length())) + goto err1; + + if (!((res= mysql_store_result(&mysql)))) + goto err1; + + if (!(rdata= mysql_fetch_row(res)) || !((rlen= mysql_fetch_lengths(res)))) + goto err2; + + query.copy(rdata[1], rlen[1], cs); + cut_offset= (int)query.length() - (int)cut_clause.length; + if (cut_offset > 0 && !memcmp(query.ptr() + cut_offset, + cut_clause.str, cut_clause.length)) + { + query.length(cut_offset); + const char *ptr= strstr(query.ptr(), cut_start.str); + if (ptr) + { + query.replace((uint32) (ptr - query.ptr()), (uint32) cut_start.length, + set_ts.str, (uint32) set_ts.length); + } + ptr= strstr(query.ptr(), cut_end.str); + if (ptr) + { + query.replace((uint32) (ptr - query.ptr()), (uint32) cut_end.length, + set_ts.str, (uint32) set_ts.length); + } + } + query.append(STRING_WITH_LEN(" CONNECTION='"), cs); + query.append_for_single_quote(table_s->connect_string.str, + table_s->connect_string.length); + query.append('\''); + + error= table_s->init_from_sql_statement_string(thd, true, + query.ptr(), query.length()); + +err2: + mysql_free_result(res); +err1: + if (error) + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), mysql_error(&mysql)); + mysql_close(&mysql); + return error; +} + + +struct st_mysql_storage_engine federatedx_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +my_bool use_pushdown; +static MYSQL_SYSVAR_BOOL(pushdown, use_pushdown, 0, + "Use query fragments pushdown capabilities", NULL, NULL, FALSE); +static struct st_mysql_sys_var* sysvars[]= { MYSQL_SYSVAR(pushdown), NULL }; + +#include "federatedx_pushdown.cc" + +maria_declare_plugin(federatedx) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &federatedx_storage_engine, + "FEDERATED", + "Patrick Galbraith", + "Allows one to access tables on other MariaDB servers, supports transactions and more", + PLUGIN_LICENSE_GPL, + federatedx_db_init, /* Plugin Init */ + federatedx_done, /* Plugin Deinit */ + 0x0201 /* 2.1 */, + NULL, /* status variables */ + sysvars, /* system variables */ + "2.1", /* string version */ + MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ +} +maria_declare_plugin_end; diff --git a/storage/federatedx/ha_federatedx.h b/storage/federatedx/ha_federatedx.h new file mode 100644 index 00000000..3573c658 --- /dev/null +++ b/storage/federatedx/ha_federatedx.h @@ -0,0 +1,487 @@ +#ifndef HA_FEDERATEDX_INCLUDED +#define HA_FEDERATEDX_INCLUDED +/* +Copyright (c) 2008, Patrick Galbraith +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + + * Neither the name of Patrick Galbraith nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + + +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + +//#include <mysql.h> +#include <my_global.h> +#include <thr_lock.h> +#include "handler.h" + +class federatedx_io; + +/* + FEDERATEDX_SERVER will eventually be a structure that will be shared among + all FEDERATEDX_SHARE instances so that the federated server can minimise + the number of open connections. This will eventually lead to the support + of reliable XA federated tables. +*/ +typedef struct st_fedrated_server { + MEM_ROOT mem_root; + uint use_count, io_count; + + uchar *key; + uint key_length; + + const char *scheme; + const char *hostname; + const char *username; + const char *password; + const char *database; + const char *socket; + ushort port; + + const char *csname; + + mysql_mutex_t mutex; + federatedx_io *idle_list; +} FEDERATEDX_SERVER; + +/* + Please read ha_exmple.cc before reading this file. + Please keep in mind that the federatedx storage engine implements all methods + that are required to be implemented. handler.h has a full list of methods + that you can implement. +*/ + +/* + handler::print_error has a case statement for error numbers. + This value is (10000) is far out of range and will envoke the + default: case. + (Current error range is 120-159 from include/my_base.h) +*/ +#define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 + +#define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5 +#define FEDERATEDX_RECORDS_IN_RANGE 2 +#define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb + +/* + FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers + The example implements the minimum of what you will probably need. +*/ +typedef struct st_federatedx_share { + MEM_ROOT mem_root; + + bool parsed; + /* this key is unique db/tablename */ + const char *share_key; + /* + the primary select query to be used in rnd_init + */ + LEX_CSTRING select_query; + /* + remote host info, parse_url supplies + */ + char *server_name; + char *connection_string; + char *scheme; + char *hostname; + char *username; + char *password; + char *database; + char *table_name; + char *table; + char *socket; + char *sport; + int share_key_length; + ushort port; + + size_t table_name_length, server_name_length, connect_string_length; + uint use_count; + THR_LOCK lock; + FEDERATEDX_SERVER *s; +} FEDERATEDX_SHARE; + + +typedef struct st_federatedx_result FEDERATEDX_IO_RESULT; +typedef struct st_federatedx_row FEDERATEDX_IO_ROW; +typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS; +typedef ptrdiff_t FEDERATEDX_IO_OFFSET; + +class federatedx_io +{ + friend class federatedx_txn; + FEDERATEDX_SERVER * const server; + federatedx_io **owner_ptr; + federatedx_io *txn_next; + federatedx_io *idle_next; + bool active; /* currently participating in a transaction */ + bool busy; /* in use by a ha_federated instance */ + bool readonly;/* indicates that no updates have occurred */ + +protected: + void set_active(bool new_active) + { active= new_active; } +public: + federatedx_io(FEDERATEDX_SERVER *); + virtual ~federatedx_io(); + + bool is_readonly() const { return readonly; } + bool is_active() const { return active; } + + const char * get_charsetname() const + { return server->csname ? server->csname : "latin1"; } + + const char * get_hostname() const { return server->hostname; } + const char * get_username() const { return server->username; } + const char * get_password() const { return server->password; } + const char * get_database() const { return server->database; } + ushort get_port() const { return server->port; } + const char * get_socket() const { return server->socket; } + + static bool handles_scheme(const char *scheme); + static federatedx_io *construct(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server); + + static void *operator new(size_t size, MEM_ROOT *mem_root) throw () + { return alloc_root(mem_root, size); } + static void operator delete(void *ptr, size_t size) + { TRASH_FREE(ptr, size); } + static void operator delete(void *, MEM_ROOT *) + { } + + virtual int query(const char *buffer, size_t length)=0; + virtual FEDERATEDX_IO_RESULT *store_result()=0; + + virtual size_t max_query_size() const=0; + + virtual my_ulonglong affected_rows() const=0; + virtual my_ulonglong last_insert_id() const=0; + + virtual int error_code()=0; + virtual const char *error_str()=0; + + virtual void reset()=0; + virtual int commit()=0; + virtual int rollback()=0; + + virtual int savepoint_set(ulong sp)=0; + virtual ulong savepoint_release(ulong sp)=0; + virtual ulong savepoint_rollback(ulong sp)=0; + virtual void savepoint_restrict(ulong sp)=0; + + virtual ulong last_savepoint() const=0; + virtual ulong actual_savepoint() const=0; + virtual bool is_autocommit() const=0; + + virtual bool table_metadata(ha_statistics *stats, const char *table_name, + uint table_name_length, uint flag) = 0; + + /* resultset operations */ + + virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; + virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; + virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; + virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result, + FEDERATEDX_IO_ROWS **current= NULL)=0; + virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0; + virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, + unsigned int column)=0; + virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, + unsigned int column) const=0; + + virtual size_t get_ref_length() const=0; + virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, + void *ref, FEDERATEDX_IO_ROWS *current)=0; + virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, + const void *ref)=0; + virtual void set_thd(void *thd) { } + +}; + + +class federatedx_txn +{ + federatedx_io *txn_list; + ulong savepoint_level; + ulong savepoint_stmt; + ulong savepoint_next; + + void release_scan(); +public: + federatedx_txn(); + ~federatedx_txn(); + + bool has_connections() const { return txn_list != NULL; } + bool in_transaction() const { return savepoint_next != 0; } + int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io); + void release(federatedx_io **io); + void close(FEDERATEDX_SERVER *); + + bool txn_begin(); + int txn_commit(); + int txn_rollback(); + + bool sp_acquire(ulong *save); + int sp_rollback(ulong *save); + int sp_release(ulong *save); + + bool stmt_begin(); + int stmt_commit(); + int stmt_rollback(); + void stmt_autocommit(); +}; + +/* + Class definition for the storage engine +*/ +class ha_federatedx final : public handler +{ + friend int federatedx_db_init(void *p); + + THR_LOCK_DATA lock; /* MySQL lock */ + FEDERATEDX_SHARE *share; /* Shared lock info */ + federatedx_txn *txn; + federatedx_io *io; + FEDERATEDX_IO_RESULT *stored_result; + FEDERATEDX_IO_ROWS *current; + /** + Array of all stored results we get during a query execution. + */ + DYNAMIC_ARRAY results; + bool position_called; + int remote_error_number; + char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; + bool ignore_duplicates, replace_duplicates; + bool insert_dup_update, table_will_be_deleted; + DYNAMIC_STRING bulk_insert; + +private: + /* + return 0 on success + return errorcode otherwise + */ + uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, + FEDERATEDX_IO_RESULT *result); + bool create_where_from_key(String *to, KEY *key_info, + const key_range *start_key, + const key_range *end_key, bool eq_range); + int stash_remote_error(); + + static federatedx_txn *get_txn(THD *thd, bool no_create= FALSE); + static int disconnect(handlerton *hton, MYSQL_THD thd); + static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv); + static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv); + static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv); + static int commit(handlerton *hton, MYSQL_THD thd, bool all); + static int rollback(handlerton *hton, MYSQL_THD thd, bool all); + static int discover_assisted(handlerton *, THD*, TABLE_SHARE *, + HA_CREATE_INFO *); + + bool append_stmt_insert(String *query); + + int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result); + int index_read_idx_with_result_set(uchar *buf, uint index, + const uchar *key, + uint key_len, + ha_rkey_function find_flag, + FEDERATEDX_IO_RESULT **result); + int real_query(const char *query, uint length); + int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag); +public: + ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg); + ~ha_federatedx() = default; + /* + The name of the index type that will be used for display + don't implement this method unless you really have indexes + */ + // perhaps get index type + const char *index_type(uint inx) { return "REMOTE"; } + /* + This is a list of flags that says what the storage engine + implements. The current table flags are documented in + handler.h + */ + ulonglong table_flags() const + { + /* fix server to be able to get remote server table flags */ + return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED + | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS | + HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR | + HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | HA_CAN_ONLINE_BACKUPS | + HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY | HA_NON_COMPARABLE_ROWID); + } + /* + This is a bitmap of flags that says how the storage engine + implements indexes. The current index flags are documented in + handler.h. If you do not implement indexes, just return zero + here. + + part is the key part to check. First key part is 0 + If all_parts it's set, MySQL want to know the flags for the combined + index up to and including 'part'. + */ + /* fix server to be able to get remote server index flags */ + ulong index_flags(uint inx, uint part, bool all_parts) const + { + return (HA_READ_NEXT | HA_READ_RANGE); + } + uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; } + uint max_supported_keys() const { return MAX_KEY; } + uint max_supported_key_parts() const { return MAX_REF_PARTS; } + uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } + uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; } + /* + Called in test_quick_select to determine if indexes should be used. + Normally, we need to know number of blocks . For federatedx we need to + know number of blocks on remote side, and number of packets and blocks + on the network side (?) + Talk to Kostja about this - how to get the + number of rows * ... + disk scan time on other side (block size, size of the row) + network time ... + The reason for "records * 1000" is that such a large number forces + this to use indexes " + */ + double scan_time() + { + DBUG_PRINT("info", ("records %lu", (ulong) stats.records)); + return (double)(stats.records*1000); + } + /* + The next method will never be called if you do not implement indexes. + */ + double read_time(uint index, uint ranges, ha_rows rows) + { + /* + Per Brian, this number is bugus, but this method must be implemented, + and at a later date, he intends to document this issue for handler code + */ + return (double) rows / 20.0+1; + } + + const key_map *keys_to_use_for_scanning() { return &key_map_full; } + /* + Everything below are methods that we implment in ha_federatedx.cc. + + Most of these methods are not obligatory, skip them and + MySQL will treat them as not implemented + */ + int open(const char *name, int mode, uint test_if_locked); // required + int close(void); // required + + void start_bulk_insert(ha_rows rows, uint flags); + int end_bulk_insert(); + int write_row(const uchar *buf); + int update_row(const uchar *old_data, const uchar *new_data); + int delete_row(const uchar *buf); + int index_init(uint keynr, bool sorted); + ha_rows estimate_rows_upper_bound(); + int index_read(uchar *buf, const uchar *key, + uint key_len, enum ha_rkey_function find_flag); + int index_read_idx(uchar *buf, uint idx, const uchar *key, + uint key_len, enum ha_rkey_function find_flag); + int index_next(uchar *buf); + int index_end(); + int read_range_first(const key_range *start_key, + const key_range *end_key, + bool eq_range, bool sorted); + int read_range_next(); + /* + unlike index_init(), rnd_init() can be called two times + without rnd_end() in between (it only makes sense if scan=1). + then the second call should prepare for the new table scan + (e.g if rnd_init allocates the cursor, second call should + position it to the start of the table, no need to deallocate + and allocate it again + */ + int rnd_init(bool scan); //required + int rnd_end(); + int rnd_next(uchar *buf); //required + int rnd_pos(uchar *buf, uchar *pos); //required + void position(const uchar *record); //required + /* + A ref is a pointer inside a local buffer. It is not comparable to + other ref's. This is never called as HA_NON_COMPARABLE_ROWID is set. + */ + int cmp_ref(const uchar *ref1, const uchar *ref2) + { +#ifdef NOT_YET + DBUG_ASSERT(0); + return 0; +#else + return handler::cmp_ref(ref1,ref2); /* Works if table scan is used */ +#endif + } + int info(uint); //required + int extra(ha_extra_function operation); + + void update_auto_increment(void); + int repair(THD* thd, HA_CHECK_OPT* check_opt); + int optimize(THD* thd, HA_CHECK_OPT* check_opt); + int delete_table(const char *name) + { + return 0; + } + int delete_all_rows(void); + int create(const char *name, TABLE *form, + HA_CREATE_INFO *create_info); //required + ha_rows records_in_range(uint inx, const key_range *start_key, + const key_range *end_key, page_range *pages); + uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; } + + THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, + enum thr_lock_type lock_type); //required + bool get_error_message(int error, String *buf); + int start_stmt(THD *thd, thr_lock_type lock_type); + int external_lock(THD *thd, int lock_type); + int reset(void); + int free_result(void); + + const FEDERATEDX_SHARE *get_federatedx_share() const { return share; } + friend class ha_federatedx_derived_handler; + friend class ha_federatedx_select_handler; +}; + +extern const char ident_quote_char; // Character for quoting + // identifiers +extern const char value_quote_char; // Character for quoting + // literals + +extern bool append_ident(String *string, const char *name, size_t length, + const char quote_char); + + +extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server); +extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root, + FEDERATEDX_SERVER *server); + +#include "federatedx_pushdown.h" + +#endif /* HA_FEDERATEDX_INCLUDED */ |