summaryrefslogtreecommitdiffstats
path: root/storage/federatedx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
commit3f619478f796eddbba6e39502fe941b285dd97b1 (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/federatedx
parentInitial commit. (diff)
downloadmariadb-upstream.tar.xz
mariadb-upstream.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--storage/federatedx/AUTHORS11
-rw-r--r--storage/federatedx/CMakeLists.txt3
-rw-r--r--storage/federatedx/ChangeLog18
-rw-r--r--storage/federatedx/FAQ40
-rw-r--r--storage/federatedx/README33
-rw-r--r--storage/federatedx/README.windows23
-rw-r--r--storage/federatedx/federatedx_io.cc101
-rw-r--r--storage/federatedx/federatedx_io_mysql.cc659
-rw-r--r--storage/federatedx/federatedx_io_null.cc299
-rw-r--r--storage/federatedx/federatedx_probes.h45
-rw-r--r--storage/federatedx/federatedx_pushdown.cc373
-rw-r--r--storage/federatedx/federatedx_pushdown.h63
-rw-r--r--storage/federatedx/federatedx_txn.cc439
-rw-r--r--storage/federatedx/ha_federatedx.cc3730
-rw-r--r--storage/federatedx/ha_federatedx.h487
15 files changed, 6324 insertions, 0 deletions
diff --git a/storage/federatedx/AUTHORS b/storage/federatedx/AUTHORS
new file mode 100644
index 00000000..cdd6600f
--- /dev/null
+++ b/storage/federatedx/AUTHORS
@@ -0,0 +1,11 @@
+FederatedX
+
+Patrick Galbraith <patg@patg.net> - Federated
+
+Pluggable Storage Engine Skeleton setup
+
+Brian Aker <brian@mysql.com> | <brian@tangent.org> - Original Design
+Calvin Sun - Windows Support
+Brian Miezejewski - Bug fixes
+Antony T Curtis - Help in initial development, transactions and various help
+Michael Widenius - Bug fixes and some simple early optimizations
diff --git a/storage/federatedx/CMakeLists.txt b/storage/federatedx/CMakeLists.txt
new file mode 100644
index 00000000..6a9b12ee
--- /dev/null
+++ b/storage/federatedx/CMakeLists.txt
@@ -0,0 +1,3 @@
+SET(FEDERATEDX_SOURCES ha_federatedx.cc federatedx_txn.cc federatedx_io.cc federatedx_io_null.cc federatedx_io_mysql.cc)
+MYSQL_ADD_PLUGIN(federatedx ${FEDERATEDX_SOURCES} STORAGE_ENGINE
+ RECOMPILE_FOR_EMBEDDED)
diff --git a/storage/federatedx/ChangeLog b/storage/federatedx/ChangeLog
new file mode 100644
index 00000000..170321cc
--- /dev/null
+++ b/storage/federatedx/ChangeLog
@@ -0,0 +1,18 @@
+0.2 - Thu March 8 00:00:00 EST 2008
+
+ - Fixed bug #30051 "CREATE TABLE does not connect and check existence of remote table"
+ Modified "real_connect" to take a share and create flag to in order to not rely
+ on any settings that are later instantiated and/or set by get_share
+ Also, put logic in the code to not attempt this if a localhost. There's an annoying
+ functionality that if federated tries to connect to itself during creater table, you
+ get 1159 error (timeout) - only when local. This prevents having this functionality
+ and is probably part of the reason it was removed.
+
+0.1 - Thu Feb 1 00:00:00 EST 2008
+
+ - This is the FederatedX Storage Engine,
+ first release.
+ - Added documentation
+ - Added simple test and README file to explain
+ how to run the test
+ - Added FAQ
diff --git a/storage/federatedx/FAQ b/storage/federatedx/FAQ
new file mode 100644
index 00000000..50def432
--- /dev/null
+++ b/storage/federatedx/FAQ
@@ -0,0 +1,40 @@
+Q. What is the FederatedX pluggable storage engine?
+
+A. It is a fork of the Federated Storage Engine that Brian Aker and I
+(Patrick Galbraith) developed originally . It is a storage engine that
+uses a client connection to a remote MySQL data source as its data
+source instead of a local file on disk.
+
+Q. Why did you fork from Federated?
+
+A. To enhance the storage engine independently of the
+MySQL Server release schedule. Many people have been
+mentioning their dissatisfaction with the limitations
+of Federated. I think the engine is a great concept and
+have a sense of obligation to continue to improve it.
+There are some patches already that are in dire need
+of being applied and tested.
+
+Q. What do you plan to do with FederatedX?
+
+A. Many things need addressing:
+
+- Outstanding bugs
+- How do deal with huge result sets
+- Pushdown conditions (being able to pass things like LIMIT
+ to the remote connection to keep from returning huge
+ result sets).
+- Better transactional support
+- Other connection mechanisms (ODBC, JDBC, native drivers
+ of other RDBMSs)
+
+Q. What FederatedX is and is not?
+
+A. FederatedX is not yet a complete "federated" solution in
+ the sense that other venders have developed (IBM, etc). It
+ is essentially a networked storage engine. It is my hope
+ to make it a real federated solution.
+
+Q. In which MySQL distributions/forks/branches can I find FederateX
+
+A. MariaDB (http://www.mariadb.com)
diff --git a/storage/federatedx/README b/storage/federatedx/README
new file mode 100644
index 00000000..6618527c
--- /dev/null
+++ b/storage/federatedx/README
@@ -0,0 +1,33 @@
+This is the FederatedX Storage Engine, developed as an external storage engine.
+
+NOTE:
+
+The following is only relevant if you use it for MySQL. MariaDB already comes
+with the latest version of FederatedX.
+
+To install, grab a copy of the mysql source code and run this:
+
+./configure --with-mysql=/path/to/src/mysql-5.x --libdir=/usr/local/lib/mysql/
+
+make install
+
+And then inside of MySQL:
+
+mysql> INSTALL PLUGIN federatedx SONAME 'libfederatedx_engine.so';
+
+mysql> CREATE TABLE `d` (`a` varchar(125), b text, primary key(a)) ENGINE=FEDERATEDX CONNECTION="mysql://root@host/schema/table"
+
+or
+
+mysql> CREATE TABLE `d` (`a` varchar(125), b text, primary key(a)) ENGINE=FEDERATEDX CONNECTION="server" CHARSET=latin1;
+
+You will probably need to edit the Makefile.am in the src/ tree if you want
+to build on anything other then Linux (and the Makefile assumes that the
+server was not compiled for debug). The reason for the two possible
+configure lines is that libdir is dependent on where MySQL was installed. If
+you run the "INSTALL PLUGIN ..." and you get a file not found, check that
+your configured this directory correctly.
+
+For Solaris you can enable DTrace probes by adding to configure
+--enable-dtrace
+
diff --git a/storage/federatedx/README.windows b/storage/federatedx/README.windows
new file mode 100644
index 00000000..74de15c6
--- /dev/null
+++ b/storage/federatedx/README.windows
@@ -0,0 +1,23 @@
+The following files are changed in order to build a new engine on Windows:
+
+- Update win\configure.js with
+case "WITH_FEDERATEDX_STORAGE_ENGINE":
+to make sure it will pass WITH_FEDERATEDX_STORAGE_ENGINE in.
+
+- Update CMakeFiles.txt under mysql root:
+ IF(WITH_FEDERATEDX_STORAGE_ENGINE)
+ ADD_DEFINITIONS(-D WITH_FEDERATEDX_STORAGE_ENGINE)
+ SET (mysql_plugin_defs
+ "${mysql_plugin_defs},builtin_skeleton_plugin")
+ ENDIF(WITH_FEDERATEDX_STORAGE_ENGINE)
+
+ and,
+
+ IF(WITH_FEDERATEDX_STORAGE_ENGINE)
+ ADD_SUBDIRECTORY(storage/skeleton/src)
+ ENDIF(WITH_FEDERATEDX_STORAGE_ENGINE)
+
+ - Update CMakeFiles.txt under sql:
+ IF(WITH_FEDERATEDX_STORAGE_ENGINE)
+ TARGET_LINK_LIBRARIES(mysqld skeleton)
+ ENDIF(WITH_FEDERATEDX_STORAGE_ENGINE)
diff --git a/storage/federatedx/federatedx_io.cc b/storage/federatedx/federatedx_io.cc
new file mode 100644
index 00000000..5baec617
--- /dev/null
+++ b/storage/federatedx/federatedx_io.cc
@@ -0,0 +1,101 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+
+/*#define MYSQL_SERVER 1*/
+#include <my_global.h>
+#include "sql_priv.h"
+
+#include "ha_federatedx.h"
+
+#include "m_string.h"
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+typedef federatedx_io *(*instantiate_io_type)(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server);
+struct io_schemes_st
+{
+ const char *scheme;
+ instantiate_io_type instantiate;
+};
+
+
+static const io_schemes_st federated_io_schemes[] =
+{
+ { "mysql", &instantiate_io_mysql },
+ { "null", instantiate_io_null } /* must be last element */
+};
+
+federatedx_io::federatedx_io(FEDERATEDX_SERVER *aserver)
+ : server(aserver), owner_ptr(0), txn_next(0), idle_next(0),
+ active(FALSE), busy(FALSE), readonly(TRUE)
+{
+ DBUG_ENTER("federatedx_io::federatedx_io");
+ DBUG_ASSERT(server);
+
+ mysql_mutex_assert_owner(&server->mutex);
+ server->io_count++;
+
+ DBUG_VOID_RETURN;
+}
+
+
+federatedx_io::~federatedx_io()
+{
+ DBUG_ENTER("federatedx_io::~federatedx_io");
+
+ server->io_count--;
+
+ DBUG_VOID_RETURN;
+}
+
+
+bool federatedx_io::handles_scheme(const char *scheme)
+{
+ const io_schemes_st *ptr = federated_io_schemes;
+ const io_schemes_st *end = ptr + array_elements(federated_io_schemes);
+ while (ptr != end && strcasecmp(scheme, ptr->scheme))
+ ++ptr;
+ return ptr != end;
+}
+
+
+federatedx_io *federatedx_io::construct(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server)
+{
+ const io_schemes_st *ptr = federated_io_schemes;
+ const io_schemes_st *end = ptr + (array_elements(federated_io_schemes) - 1);
+ while (ptr != end && strcasecmp(server->scheme, ptr->scheme))
+ ++ptr;
+ return ptr->instantiate(server_root, server);
+}
+
+
diff --git a/storage/federatedx/federatedx_io_mysql.cc b/storage/federatedx/federatedx_io_mysql.cc
new file mode 100644
index 00000000..fc32146b
--- /dev/null
+++ b/storage/federatedx/federatedx_io_mysql.cc
@@ -0,0 +1,659 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+
+#define MYSQL_SERVER 1
+#include <my_global.h>
+#include "sql_priv.h"
+#include <mysqld_error.h>
+#include <mysql.h>
+
+#include "ha_federatedx.h"
+
+#include "m_string.h"
+#include "mysqld_error.h"
+#include "sql_servers.h"
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+
+#define SAVEPOINT_REALIZED 1
+#define SAVEPOINT_RESTRICT 2
+#define SAVEPOINT_EMITTED 4
+
+
+typedef struct federatedx_savepoint
+{
+ ulong level;
+ uint flags;
+} SAVEPT;
+
+struct mysql_position
+{
+ MYSQL_RES* result;
+ MYSQL_ROW_OFFSET offset;
+};
+
+
+class federatedx_io_mysql :public federatedx_io
+{
+ MYSQL mysql; /* MySQL connection */
+ DYNAMIC_ARRAY savepoints;
+ bool requested_autocommit;
+ bool actual_autocommit;
+
+ int actual_query(const char *buffer, size_t length);
+ bool test_all_restrict() const;
+public:
+ federatedx_io_mysql(FEDERATEDX_SERVER *);
+ ~federatedx_io_mysql();
+
+ int simple_query(const char *fmt, ...);
+ int query(const char *buffer, size_t length);
+ virtual FEDERATEDX_IO_RESULT *store_result();
+
+ virtual size_t max_query_size() const;
+
+ virtual my_ulonglong affected_rows() const;
+ virtual my_ulonglong last_insert_id() const;
+
+ virtual int error_code();
+ virtual const char *error_str();
+
+ void reset();
+ int commit();
+ int rollback();
+
+ int savepoint_set(ulong sp);
+ ulong savepoint_release(ulong sp);
+ ulong savepoint_rollback(ulong sp);
+ void savepoint_restrict(ulong sp);
+
+ ulong last_savepoint() const;
+ ulong actual_savepoint() const;
+ bool is_autocommit() const;
+
+ bool table_metadata(ha_statistics *stats, const char *table_name,
+ uint table_name_length, uint flag);
+
+ /* resultset operations */
+
+ virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
+ virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
+ virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
+ virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
+ FEDERATEDX_IO_ROWS **current= NULL);
+ virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
+ virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
+ unsigned int column);
+ virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
+ unsigned int column) const;
+
+ virtual size_t get_ref_length() const;
+ virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref, FEDERATEDX_IO_ROWS *current);
+ virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref);
+ virtual void set_thd(void *thd);
+};
+
+
+federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server)
+{
+ return new (server_root) federatedx_io_mysql(server);
+}
+
+
+federatedx_io_mysql::federatedx_io_mysql(FEDERATEDX_SERVER *aserver)
+ : federatedx_io(aserver),
+ requested_autocommit(TRUE), actual_autocommit(TRUE)
+{
+ DBUG_ENTER("federatedx_io_mysql::federatedx_io_mysql");
+
+ bzero(&mysql, sizeof(MYSQL));
+ bzero(&savepoints, sizeof(DYNAMIC_ARRAY));
+
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &savepoints, sizeof(SAVEPT), 16, 16, MYF(0));
+
+ DBUG_VOID_RETURN;
+}
+
+
+federatedx_io_mysql::~federatedx_io_mysql()
+{
+ DBUG_ENTER("federatedx_io_mysql::~federatedx_io_mysql");
+
+ mysql_close(&mysql);
+ delete_dynamic(&savepoints);
+
+ DBUG_VOID_RETURN;
+}
+
+
+void federatedx_io_mysql::reset()
+{
+ reset_dynamic(&savepoints);
+ set_active(FALSE);
+
+ requested_autocommit= TRUE;
+ mysql.reconnect= 1;
+}
+
+
+int federatedx_io_mysql::commit()
+{
+ int error= 0;
+ DBUG_ENTER("federatedx_io_mysql::commit");
+
+ if (!actual_autocommit && (error= actual_query("COMMIT", 6)))
+ rollback();
+
+ reset();
+
+ DBUG_RETURN(error);
+}
+
+int federatedx_io_mysql::rollback()
+{
+ int error= 0;
+ DBUG_ENTER("federatedx_io_mysql::rollback");
+
+ if (!actual_autocommit)
+ error= actual_query("ROLLBACK", 8);
+ else
+ error= ER_WARNING_NOT_COMPLETE_ROLLBACK;
+
+ reset();
+
+ DBUG_RETURN(error);
+}
+
+
+ulong federatedx_io_mysql::last_savepoint() const
+{
+ SAVEPT *savept= NULL;
+ DBUG_ENTER("federatedx_io_mysql::last_savepoint");
+
+ if (savepoints.elements)
+ savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
+
+ DBUG_RETURN(savept ? savept->level : 0);
+}
+
+
+ulong federatedx_io_mysql::actual_savepoint() const
+{
+ SAVEPT *savept= NULL;
+ size_t index= savepoints.elements;
+ DBUG_ENTER("federatedx_io_mysql::last_savepoint");
+
+ while (index)
+ {
+ savept= dynamic_element(&savepoints, --index, SAVEPT *);
+ if (savept->flags & SAVEPOINT_REALIZED)
+ break;
+ savept= NULL;
+ }
+
+ DBUG_RETURN(savept ? savept->level : 0);
+}
+
+bool federatedx_io_mysql::is_autocommit() const
+{
+ return actual_autocommit;
+}
+
+
+int federatedx_io_mysql::savepoint_set(ulong sp)
+{
+ int error;
+ SAVEPT savept;
+ DBUG_ENTER("federatedx_io_mysql::savepoint_set");
+ DBUG_PRINT("info",("savepoint=%lu", sp));
+ DBUG_ASSERT(sp > last_savepoint());
+
+ savept.level= sp;
+ savept.flags= 0;
+
+ if ((error= insert_dynamic(&savepoints, (uchar*) &savept) ? -1 : 0))
+ goto err;
+
+ set_active(TRUE);
+ mysql.reconnect= 0;
+ requested_autocommit= FALSE;
+
+err:
+ DBUG_RETURN(error);
+}
+
+
+ulong federatedx_io_mysql::savepoint_release(ulong sp)
+{
+ SAVEPT *savept, *last= NULL;
+ DBUG_ENTER("federatedx_io_mysql::savepoint_release");
+ DBUG_PRINT("info",("savepoint=%lu", sp));
+
+ while (savepoints.elements)
+ {
+ savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
+ if (savept->level < sp)
+ break;
+ if ((savept->flags & (SAVEPOINT_REALIZED | SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED)
+ last= savept;
+ savepoints.elements--;
+ }
+
+ if (last)
+ {
+ char buffer[STRING_BUFFER_USUAL_SIZE];
+ size_t length= my_snprintf(buffer, sizeof(buffer),
+ "RELEASE SAVEPOINT save%lu", last->level);
+ actual_query(buffer, length);
+ }
+
+ DBUG_RETURN(last_savepoint());
+}
+
+
+ulong federatedx_io_mysql::savepoint_rollback(ulong sp)
+{
+ SAVEPT *savept;
+ size_t index;
+ DBUG_ENTER("federatedx_io_mysql::savepoint_release");
+ DBUG_PRINT("info",("savepoint=%lu", sp));
+
+ while (savepoints.elements)
+ {
+ savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
+ if (savept->level <= sp)
+ break;
+ savepoints.elements--;
+ }
+
+ for (index= savepoints.elements, savept= NULL; index;)
+ {
+ savept= dynamic_element(&savepoints, --index, SAVEPT *);
+ if (savept->flags & SAVEPOINT_REALIZED)
+ break;
+ savept= NULL;
+ }
+
+ if (savept && !(savept->flags & SAVEPOINT_RESTRICT))
+ {
+ char buffer[STRING_BUFFER_USUAL_SIZE];
+ size_t length= my_snprintf(buffer, sizeof(buffer),
+ "ROLLBACK TO SAVEPOINT save%lu", savept->level);
+ actual_query(buffer, length);
+ }
+
+ DBUG_RETURN(last_savepoint());
+}
+
+
+void federatedx_io_mysql::savepoint_restrict(ulong sp)
+{
+ SAVEPT *savept;
+ size_t index= savepoints.elements;
+ DBUG_ENTER("federatedx_io_mysql::savepoint_restrict");
+
+ while (index)
+ {
+ savept= dynamic_element(&savepoints, --index, SAVEPT *);
+ if (savept->level > sp)
+ continue;
+ if (savept->level < sp)
+ break;
+ savept->flags|= SAVEPOINT_RESTRICT;
+ break;
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+
+int federatedx_io_mysql::simple_query(const char *fmt, ...)
+{
+ char buffer[STRING_BUFFER_USUAL_SIZE];
+ size_t length;
+ int error;
+ va_list arg;
+ DBUG_ENTER("federatedx_io_mysql::simple_query");
+
+ va_start(arg, fmt);
+ length= my_vsnprintf(buffer, sizeof(buffer), fmt, arg);
+ va_end(arg);
+
+ error= query(buffer, length);
+
+ DBUG_RETURN(error);
+}
+
+
+bool federatedx_io_mysql::test_all_restrict() const
+{
+ bool result= FALSE;
+ SAVEPT *savept;
+ size_t index= savepoints.elements;
+ DBUG_ENTER("federatedx_io_mysql::test_all_restrict");
+
+ while (index)
+ {
+ savept= dynamic_element(&savepoints, --index, SAVEPT *);
+ if ((savept->flags & (SAVEPOINT_REALIZED |
+ SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED ||
+ (savept->flags & SAVEPOINT_EMITTED))
+ DBUG_RETURN(FALSE);
+ if (savept->flags & SAVEPOINT_RESTRICT)
+ result= TRUE;
+ }
+
+ DBUG_RETURN(result);
+}
+
+
+int federatedx_io_mysql::query(const char *buffer, size_t length)
+{
+ int error;
+ bool wants_autocommit= requested_autocommit | is_readonly();
+ DBUG_ENTER("federatedx_io_mysql::query");
+
+ if (!wants_autocommit && test_all_restrict())
+ wants_autocommit= TRUE;
+
+ if (wants_autocommit != actual_autocommit)
+ {
+ if ((error= actual_query(wants_autocommit ? "SET AUTOCOMMIT=1"
+ : "SET AUTOCOMMIT=0", 16)))
+ DBUG_RETURN(error);
+ mysql.reconnect= wants_autocommit ? 1 : 0;
+ actual_autocommit= wants_autocommit;
+ }
+
+ if (!actual_autocommit && last_savepoint() != actual_savepoint())
+ {
+ SAVEPT *savept= dynamic_element(&savepoints, savepoints.elements - 1,
+ SAVEPT *);
+ if (!(savept->flags & SAVEPOINT_RESTRICT))
+ {
+ char buf[STRING_BUFFER_USUAL_SIZE];
+ size_t len= my_snprintf(buf, sizeof(buf),
+ "SAVEPOINT save%lu", savept->level);
+ if ((error= actual_query(buf, len)))
+ DBUG_RETURN(error);
+ set_active(TRUE);
+ savept->flags|= SAVEPOINT_EMITTED;
+ }
+ savept->flags|= SAVEPOINT_REALIZED;
+ }
+
+ if (!(error= actual_query(buffer, length)))
+ set_active(is_active() || !actual_autocommit);
+
+ DBUG_RETURN(error);
+}
+
+
+int federatedx_io_mysql::actual_query(const char *buffer, size_t length)
+{
+ int error;
+ DBUG_ENTER("federatedx_io_mysql::actual_query");
+
+ if (!mysql.net.vio)
+ {
+ my_bool my_true= 1;
+
+ if (!(mysql_init(&mysql)))
+ DBUG_RETURN(-1);
+
+ /*
+ BUG# 17044 Federated Storage Engine is not UTF8 clean
+ Add set names to whatever charset the table is at open
+ of table
+ */
+ /* this sets the csname like 'set names utf8' */
+ mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, get_charsetname());
+ mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
+ (char*) &my_true);
+
+ if (!mysql_real_connect(&mysql,
+ get_hostname(),
+ get_username(),
+ get_password(),
+ get_database(),
+ get_port(),
+ get_socket(), 0))
+ DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
+ mysql.reconnect= 1;
+ }
+
+ if (!(error= mysql_real_query(&mysql, STRING_WITH_LEN("set time_zone='+00:00'"))))
+ error= mysql_real_query(&mysql, buffer, (ulong)length);
+
+ DBUG_RETURN(error);
+}
+
+size_t federatedx_io_mysql::max_query_size() const
+{
+ return mysql.net.max_packet_size;
+}
+
+
+my_ulonglong federatedx_io_mysql::affected_rows() const
+{
+ return mysql.affected_rows;
+}
+
+
+my_ulonglong federatedx_io_mysql::last_insert_id() const
+{
+ return mysql.insert_id;
+}
+
+
+int federatedx_io_mysql::error_code()
+{
+ return mysql_errno(&mysql);
+}
+
+
+const char *federatedx_io_mysql::error_str()
+{
+ return mysql_error(&mysql);
+}
+
+FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result()
+{
+ FEDERATEDX_IO_RESULT *result;
+ DBUG_ENTER("federatedx_io_mysql::store_result");
+
+ result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql);
+
+ DBUG_RETURN(result);
+}
+
+
+void federatedx_io_mysql::free_result(FEDERATEDX_IO_RESULT *io_result)
+{
+ mysql_free_result((MYSQL_RES *) io_result);
+}
+
+
+unsigned int federatedx_io_mysql::get_num_fields(FEDERATEDX_IO_RESULT *io_result)
+{
+ return mysql_num_fields((MYSQL_RES *) io_result);
+}
+
+
+my_ulonglong federatedx_io_mysql::get_num_rows(FEDERATEDX_IO_RESULT *io_result)
+{
+ return mysql_num_rows((MYSQL_RES *) io_result);
+}
+
+
+FEDERATEDX_IO_ROW *federatedx_io_mysql::fetch_row(FEDERATEDX_IO_RESULT *io_result,
+ FEDERATEDX_IO_ROWS **current)
+{
+ MYSQL_RES *result= (MYSQL_RES*)io_result;
+ if (current)
+ *current= (FEDERATEDX_IO_ROWS *) result->data_cursor;
+ return (FEDERATEDX_IO_ROW *) mysql_fetch_row(result);
+}
+
+
+ulong *federatedx_io_mysql::fetch_lengths(FEDERATEDX_IO_RESULT *io_result)
+{
+ return mysql_fetch_lengths((MYSQL_RES *) io_result);
+}
+
+
+const char *federatedx_io_mysql::get_column_data(FEDERATEDX_IO_ROW *row,
+ unsigned int column)
+{
+ return ((MYSQL_ROW)row)[column];
+}
+
+
+bool federatedx_io_mysql::is_column_null(const FEDERATEDX_IO_ROW *row,
+ unsigned int column) const
+{
+ return !((MYSQL_ROW)row)[column];
+}
+
+bool federatedx_io_mysql::table_metadata(ha_statistics *stats,
+ const char *table_name,
+ uint table_name_length, uint flag)
+{
+ char status_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
+ FEDERATEDX_IO_RESULT *result= 0;
+ FEDERATEDX_IO_ROW *row;
+ String status_query_string(status_buf, sizeof(status_buf), &my_charset_bin);
+ int error;
+
+ status_query_string.length(0);
+ status_query_string.append(STRING_WITH_LEN("SHOW TABLE STATUS LIKE "));
+ append_ident(&status_query_string, table_name,
+ table_name_length, value_quote_char);
+
+ if (query(status_query_string.ptr(), status_query_string.length()))
+ goto error;
+
+ status_query_string.length(0);
+
+ result= store_result();
+
+ /*
+ We're going to use fields num. 4, 12 and 13 of the resultset,
+ so make sure we have these fields.
+ */
+ if (!result || (get_num_fields(result) < 14))
+ goto error;
+
+ if (!get_num_rows(result))
+ goto error;
+
+ if (!(row= fetch_row(result)))
+ goto error;
+
+ /*
+ deleted is set in ha_federatedx::info
+ */
+ /*
+ need to figure out what this means as far as federatedx is concerned,
+ since we don't have a "file"
+
+ data_file_length = ?
+ index_file_length = ?
+ delete_length = ?
+ */
+ if (!is_column_null(row, 4))
+ stats->records= (ha_rows) my_strtoll10(get_column_data(row, 4),
+ (char**) 0, &error);
+ if (!is_column_null(row, 5))
+ stats->mean_rec_length= (ulong) my_strtoll10(get_column_data(row, 5),
+ (char**) 0, &error);
+
+ stats->data_file_length= stats->records * stats->mean_rec_length;
+
+ if (!is_column_null(row, 12))
+ stats->update_time= (time_t) my_strtoll10(get_column_data(row, 12),
+ (char**) 0, &error);
+ if (!is_column_null(row, 13))
+ stats->check_time= (time_t) my_strtoll10(get_column_data(row, 13),
+ (char**) 0, &error);
+
+ free_result(result);
+ return 0;
+
+error:
+ if (!mysql_errno(&mysql))
+ {
+ mysql.net.last_errno= ER_NO_SUCH_TABLE;
+ strmake_buf(mysql.net.last_error, "Remote table does not exist");
+ }
+ free_result(result);
+ return 1;
+}
+
+
+
+size_t federatedx_io_mysql::get_ref_length() const
+{
+ return sizeof(mysql_position);
+}
+
+
+void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref, FEDERATEDX_IO_ROWS *current)
+{
+ mysql_position& pos= *reinterpret_cast<mysql_position*>(ref);
+ pos.result= (MYSQL_RES *) io_result;
+ pos.offset= (MYSQL_ROW_OFFSET) current;
+}
+
+int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref)
+{
+ const mysql_position& pos= *reinterpret_cast<const mysql_position*>(ref);
+
+ if (!pos.result || !pos.offset)
+ return HA_ERR_END_OF_FILE;
+
+ pos.result->current_row= 0;
+ pos.result->data_cursor= pos.offset;
+ *io_result= (FEDERATEDX_IO_RESULT*) pos.result;
+
+ return 0;
+}
+
+void federatedx_io_mysql::set_thd(void *thd)
+{
+ mysql.net.thd= thd;
+}
diff --git a/storage/federatedx/federatedx_io_null.cc b/storage/federatedx/federatedx_io_null.cc
new file mode 100644
index 00000000..8a2394f2
--- /dev/null
+++ b/storage/federatedx/federatedx_io_null.cc
@@ -0,0 +1,299 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+
+/*#define MYSQL_SERVER 1*/
+#include <my_global.h>
+#include "sql_priv.h"
+
+#include "ha_federatedx.h"
+
+#include "m_string.h"
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+
+#define SAVEPOINT_REALIZED 1
+#define SAVEPOINT_RESTRICT 2
+#define SAVEPOINT_EMITTED 4
+
+
+typedef struct federatedx_savepoint
+{
+ ulong level;
+ uint flags;
+} SAVEPT;
+
+
+class federatedx_io_null :public federatedx_io
+{
+public:
+ federatedx_io_null(FEDERATEDX_SERVER *);
+ ~federatedx_io_null();
+
+ int query(const char *buffer, size_t length);
+ virtual FEDERATEDX_IO_RESULT *store_result();
+
+ virtual size_t max_query_size() const;
+
+ virtual my_ulonglong affected_rows() const;
+ virtual my_ulonglong last_insert_id() const;
+
+ virtual int error_code();
+ virtual const char *error_str();
+
+ void reset();
+ int commit();
+ int rollback();
+
+ int savepoint_set(ulong sp);
+ ulong savepoint_release(ulong sp);
+ ulong savepoint_rollback(ulong sp);
+ void savepoint_restrict(ulong sp);
+
+ ulong last_savepoint() const;
+ ulong actual_savepoint() const;
+ bool is_autocommit() const;
+
+ bool table_metadata(ha_statistics *stats, const char *table_name,
+ uint table_name_length, uint flag);
+
+ /* resultset operations */
+
+ virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
+ virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
+ virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
+ virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
+ FEDERATEDX_IO_ROWS **current= NULL);
+ virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
+ virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
+ unsigned int column);
+ virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
+ unsigned int column) const;
+ virtual size_t get_ref_length() const;
+ virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref, FEDERATEDX_IO_ROWS *current);
+ virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref);
+};
+
+
+federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server)
+{
+ return new (server_root) federatedx_io_null(server);
+}
+
+
+federatedx_io_null::federatedx_io_null(FEDERATEDX_SERVER *aserver)
+ : federatedx_io(aserver)
+{
+}
+
+
+federatedx_io_null::~federatedx_io_null() = default;
+
+
+void federatedx_io_null::reset()
+{
+}
+
+
+int federatedx_io_null::commit()
+{
+ return 0;
+}
+
+int federatedx_io_null::rollback()
+{
+ return 0;
+}
+
+
+ulong federatedx_io_null::last_savepoint() const
+{
+ return 0;
+}
+
+
+ulong federatedx_io_null::actual_savepoint() const
+{
+ return 0;
+}
+
+bool federatedx_io_null::is_autocommit() const
+{
+ return 0;
+}
+
+
+int federatedx_io_null::savepoint_set(ulong sp)
+{
+ return 0;
+}
+
+
+ulong federatedx_io_null::savepoint_release(ulong sp)
+{
+ return 0;
+}
+
+
+ulong federatedx_io_null::savepoint_rollback(ulong sp)
+{
+ return 0;
+}
+
+
+void federatedx_io_null::savepoint_restrict(ulong sp)
+{
+}
+
+
+int federatedx_io_null::query(const char *buffer, size_t length)
+{
+ return 0;
+}
+
+
+size_t federatedx_io_null::max_query_size() const
+{
+ return INT_MAX;
+}
+
+
+my_ulonglong federatedx_io_null::affected_rows() const
+{
+ return 0;
+}
+
+
+my_ulonglong federatedx_io_null::last_insert_id() const
+{
+ return 0;
+}
+
+
+int federatedx_io_null::error_code()
+{
+ return 0;
+}
+
+
+const char *federatedx_io_null::error_str()
+{
+ return "";
+}
+
+
+FEDERATEDX_IO_RESULT *federatedx_io_null::store_result()
+{
+ FEDERATEDX_IO_RESULT *result;
+ DBUG_ENTER("federatedx_io_null::store_result");
+
+ result= NULL;
+
+ DBUG_RETURN(result);
+}
+
+
+void federatedx_io_null::free_result(FEDERATEDX_IO_RESULT *)
+{
+}
+
+
+unsigned int federatedx_io_null::get_num_fields(FEDERATEDX_IO_RESULT *)
+{
+ return 0;
+}
+
+
+my_ulonglong federatedx_io_null::get_num_rows(FEDERATEDX_IO_RESULT *)
+{
+ return 0;
+}
+
+
+FEDERATEDX_IO_ROW *federatedx_io_null::fetch_row(FEDERATEDX_IO_RESULT *,
+ FEDERATEDX_IO_ROWS **current)
+{
+ return NULL;
+}
+
+
+ulong *federatedx_io_null::fetch_lengths(FEDERATEDX_IO_RESULT *)
+{
+ return NULL;
+}
+
+
+const char *federatedx_io_null::get_column_data(FEDERATEDX_IO_ROW *,
+ unsigned int)
+{
+ return "";
+}
+
+
+bool federatedx_io_null::is_column_null(const FEDERATEDX_IO_ROW *,
+ unsigned int) const
+{
+ return true;
+}
+
+bool federatedx_io_null::table_metadata(ha_statistics *stats,
+ const char *table_name,
+ uint table_name_length, uint flag)
+{
+ stats->records= (ha_rows) 0;
+ stats->mean_rec_length= (ulong) 0;
+ stats->data_file_length= 0;
+
+ stats->update_time= (time_t) 0;
+ stats->check_time= (time_t) 0;
+
+ return 0;
+}
+
+size_t federatedx_io_null::get_ref_length() const
+{
+ return sizeof(int);
+}
+
+
+void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref, FEDERATEDX_IO_ROWS *current)
+{
+}
+
+int federatedx_io_null::seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref)
+{
+ return 0;
+}
diff --git a/storage/federatedx/federatedx_probes.h b/storage/federatedx/federatedx_probes.h
new file mode 100644
index 00000000..62041951
--- /dev/null
+++ b/storage/federatedx/federatedx_probes.h
@@ -0,0 +1,45 @@
+/*
+ * Generated by dtrace(1M).
+ */
+
+#ifndef _FEDERATED_PROBES_H
+#define _FEDERATED_PROBES_H
+
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#if _DTRACE_VERSION
+
+#define FEDERATED_CLOSE() \
+ __dtrace_federated___close()
+#define FEDERATED_CLOSE_ENABLED() \
+ __dtraceenabled_federated___close()
+#define FEDERATED_OPEN() \
+ __dtrace_federated___open()
+#define FEDERATED_OPEN_ENABLED() \
+ __dtraceenabled_federated___open()
+
+
+extern void __dtrace_federated___close(void);
+extern int __dtraceenabled_federated___close(void);
+extern void __dtrace_federated___open(void);
+extern int __dtraceenabled_federated___open(void);
+
+#else
+
+#define FEDERATED_CLOSE()
+#define FEDERATED_CLOSE_ENABLED() (0)
+#define FEDERATED_OPEN()
+#define FEDERATED_OPEN_ENABLED() (0)
+
+#endif
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _FEDERATED_PROBES_H */
diff --git a/storage/federatedx/federatedx_pushdown.cc b/storage/federatedx/federatedx_pushdown.cc
new file mode 100644
index 00000000..e9a9791a
--- /dev/null
+++ b/storage/federatedx/federatedx_pushdown.cc
@@ -0,0 +1,373 @@
+/*
+ Copyright (c) 2019, 2020, MariaDB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+/* !!! For inclusion into ha_federatedx.cc */
+
+
+/*
+ This is a quick a dirty implemention of the derived_handler and select_handler
+ interfaces to be used to push select queries and the queries specifying
+ derived tables into FEDERATEDX engine.
+ The functions
+ create_federatedx_derived_handler and
+ create_federatedx_select_handler
+ that return the corresponding interfaces for pushdown capabilities do
+ not check a lot of things. In particular they do not check that the tables
+ of the pushed queries belong to the same foreign server.
+
+ The implementation is provided purely for testing purposes.
+ The pushdown capabilities are enabled by turning on the plugin system
+ variable federated_pushdown:
+ set global federated_pushdown=1;
+*/
+
+
+/*
+ Check if table and database names are equal on local and remote servers
+
+ SYNOPSIS
+ local_and_remote_names_match()
+ tbl_share Pointer to current table TABLE_SHARE structure
+ fshare Pointer to current table FEDERATEDX_SHARE structure
+
+ DESCRIPTION
+ FederatedX table on the local server may refer to a table having another
+ name on the remote server. The remote table may even reside in a different
+ database. For example:
+
+ -- Remote server
+ CREATE TABLE t1 (id int(32));
+
+ -- Local server
+ CREATE TABLE t2 ENGINE="FEDERATEDX"
+ CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federatedx/t1";
+
+ It's not a problem while the federated_pushdown is disabled 'cause
+ the CONNECTION strings are being parsed for every table during
+ the execution, so the table names are translated from local to remote.
+ But in case of the federated_pushdown the whole query is pushed down
+ to the engine without any translation, so the remote server may try
+ to select data from a nonexistent table (for example, query
+ "SELECT * FROM t2" will try to retrieve data from nonexistent "t2").
+
+ This function checks whether there is a mismatch between local and remote
+ table/database names
+
+ RETURN VALUE
+ false names are equal
+ true names are not equal
+
+*/
+bool local_and_remote_names_mismatch(const TABLE_SHARE *tbl_share,
+ const FEDERATEDX_SHARE *fshare)
+{
+
+ if (lower_case_table_names)
+ {
+ if (strcasecmp(fshare->database, tbl_share->db.str) != 0)
+ return true;
+ }
+ else
+ {
+ if (strncmp(fshare->database, tbl_share->db.str, tbl_share->db.length) != 0)
+ return true;
+ }
+
+ return my_strnncoll(system_charset_info, (uchar *) fshare->table_name,
+ strlen(fshare->table_name),
+ (uchar *) tbl_share->table_name.str,
+ tbl_share->table_name.length) != 0;
+}
+
+
+static derived_handler*
+create_federatedx_derived_handler(THD* thd, TABLE_LIST *derived)
+{
+ if (!use_pushdown)
+ return 0;
+
+ ha_federatedx_derived_handler* handler = NULL;
+
+ SELECT_LEX_UNIT *unit= derived->derived;
+
+ for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select())
+ {
+ if (!(sl->join))
+ return 0;
+ for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local)
+ {
+ if (!tbl->table)
+ return 0;
+ /*
+ We intentionally don't support partitioned federatedx tables here, so
+ use file->ht and not file->partition_ht().
+ */
+ if (tbl->table->file->ht != federatedx_hton)
+ return 0;
+
+ const FEDERATEDX_SHARE *fshare=
+ ((ha_federatedx*)tbl->table->file)->get_federatedx_share();
+ if (local_and_remote_names_mismatch(tbl->table->s, fshare))
+ return 0;
+ }
+ }
+
+ handler= new ha_federatedx_derived_handler(thd, derived);
+
+ return handler;
+}
+
+
+/*
+ Implementation class of the derived_handler interface for FEDERATEDX:
+ class implementation
+*/
+
+ha_federatedx_derived_handler::ha_federatedx_derived_handler(THD *thd,
+ TABLE_LIST *dt)
+ : derived_handler(thd, federatedx_hton),
+ share(NULL), txn(NULL), iop(NULL), stored_result(NULL)
+{
+ derived= dt;
+}
+
+ha_federatedx_derived_handler::~ha_federatedx_derived_handler() = default;
+
+int ha_federatedx_derived_handler::init_scan()
+{
+ THD *thd;
+ int rc= 0;
+
+ DBUG_ENTER("ha_federatedx_derived_handler::init_scan");
+
+ TABLE *table= derived->get_first_table()->table;
+ ha_federatedx *h= (ha_federatedx *) table->file;
+ iop= &h->io;
+ share= get_share(table->s->table_name.str, table);
+ thd= table->in_use;
+ txn= h->get_txn(thd);
+ if ((rc= txn->acquire(share, thd, TRUE, iop)))
+ DBUG_RETURN(rc);
+
+ if ((*iop)->query(derived->derived_spec.str, derived->derived_spec.length))
+ goto err;
+
+ stored_result= (*iop)->store_result();
+ if (!stored_result)
+ goto err;
+
+ DBUG_RETURN(0);
+
+err:
+ DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM);
+}
+
+int ha_federatedx_derived_handler::next_row()
+{
+ int rc;
+ FEDERATEDX_IO_ROW *row;
+ ulong *lengths;
+ Field **field;
+ int column= 0;
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ DBUG_ENTER("ha_federatedx_derived_handler::next_row");
+
+ if ((rc= txn->acquire(share, table->in_use, TRUE, iop)))
+ DBUG_RETURN(rc);
+
+ if (!(row= (*iop)->fetch_row(stored_result)))
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+ /* Convert row to internal format */
+ table->in_use->variables.time_zone= UTC;
+ lengths= (*iop)->fetch_lengths(stored_result);
+
+ for (field= table->field; *field; field++, column++)
+ {
+ if ((*iop)->is_column_null(row, column))
+ (*field)->set_null();
+ else
+ {
+ (*field)->set_notnull();
+ (*field)->store((*iop)->get_column_data(row, column),
+ lengths[column], &my_charset_bin);
+ }
+ }
+ table->in_use->variables.time_zone= saved_time_zone;
+
+ DBUG_RETURN(rc);
+}
+
+int ha_federatedx_derived_handler::end_scan()
+{
+ DBUG_ENTER("ha_federatedx_derived_handler::end_scan");
+
+ (*iop)->free_result(stored_result);
+
+ free_share(txn, share);
+
+ DBUG_RETURN(0);
+}
+
+void ha_federatedx_derived_handler::print_error(int, unsigned long)
+{
+}
+
+
+static select_handler*
+create_federatedx_select_handler(THD* thd, SELECT_LEX *sel)
+{
+ if (!use_pushdown)
+ return 0;
+
+ ha_federatedx_select_handler* handler = NULL;
+
+ for (TABLE_LIST *tbl= thd->lex->query_tables; tbl; tbl= tbl->next_global)
+ {
+ if (!tbl->table)
+ return 0;
+ /*
+ We intentionally don't support partitioned federatedx tables here, so
+ use file->ht and not file->partition_ht().
+ */
+ if (tbl->table->file->ht != federatedx_hton)
+ return 0;
+
+ const FEDERATEDX_SHARE *fshare=
+ ((ha_federatedx*)tbl->table->file)->get_federatedx_share();
+
+ if (local_and_remote_names_mismatch(tbl->table->s, fshare))
+ return 0;
+ }
+
+ /*
+ Currently, ha_federatedx_select_handler::init_scan just takes the
+ thd->query and sends it to the backend.
+ This obviously won't work if the SELECT uses an "INTO @var" or
+ "INTO OUTFILE". It is also unlikely to work if the select has some
+ other kind of side effect.
+ */
+ if (sel->uncacheable & UNCACHEABLE_SIDEEFFECT)
+ return NULL;
+
+ handler= new ha_federatedx_select_handler(thd, sel);
+
+ return handler;
+}
+
+/*
+ Implementation class of the select_handler interface for FEDERATEDX:
+ class implementation
+*/
+
+ha_federatedx_select_handler::ha_federatedx_select_handler(THD *thd,
+ SELECT_LEX *sel)
+ : select_handler(thd, federatedx_hton),
+ share(NULL), txn(NULL), iop(NULL), stored_result(NULL)
+{
+ select= sel;
+}
+
+ha_federatedx_select_handler::~ha_federatedx_select_handler() = default;
+
+int ha_federatedx_select_handler::init_scan()
+{
+ int rc= 0;
+
+ DBUG_ENTER("ha_federatedx_select_handler::init_scan");
+
+ TABLE *table= 0;
+ for (TABLE_LIST *tbl= thd->lex->query_tables; tbl; tbl= tbl->next_global)
+ {
+ if (!tbl->table)
+ continue;
+ table= tbl->table;
+ break;
+ }
+ ha_federatedx *h= (ha_federatedx *) table->file;
+ iop= &h->io;
+ share= get_share(table->s->table_name.str, table);
+ txn= h->get_txn(thd);
+ if ((rc= txn->acquire(share, thd, TRUE, iop)))
+ DBUG_RETURN(rc);
+
+ if ((*iop)->query(thd->query(), thd->query_length()))
+ goto err;
+
+ stored_result= (*iop)->store_result();
+ if (!stored_result)
+ goto err;
+
+ DBUG_RETURN(0);
+
+err:
+ DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM);
+}
+
+int ha_federatedx_select_handler::next_row()
+{
+ int rc= 0;
+ FEDERATEDX_IO_ROW *row;
+ ulong *lengths;
+ Field **field;
+ int column= 0;
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ DBUG_ENTER("ha_federatedx_select_handler::next_row");
+
+ if ((rc= txn->acquire(share, table->in_use, TRUE, iop)))
+ DBUG_RETURN(rc);
+
+ if (!(row= (*iop)->fetch_row(stored_result)))
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+ /* Convert row to internal format */
+ table->in_use->variables.time_zone= UTC;
+ lengths= (*iop)->fetch_lengths(stored_result);
+
+ for (field= table->field; *field; field++, column++)
+ {
+ if ((*iop)->is_column_null(row, column))
+ (*field)->set_null();
+ else
+ {
+ (*field)->set_notnull();
+ (*field)->store((*iop)->get_column_data(row, column),
+ lengths[column], &my_charset_bin);
+ }
+ }
+ table->in_use->variables.time_zone= saved_time_zone;
+
+ DBUG_RETURN(rc);
+}
+
+int ha_federatedx_select_handler::end_scan()
+{
+ DBUG_ENTER("ha_federatedx_derived_handler::end_scan");
+
+ free_tmp_table(thd, table);
+ table= 0;
+
+ (*iop)->free_result(stored_result);
+
+ free_share(txn, share);
+
+ DBUG_RETURN(0);
+}
+
+void ha_federatedx_select_handler::print_error(int error, myf error_flag)
+{
+ select_handler::print_error(error, error_flag);
+}
diff --git a/storage/federatedx/federatedx_pushdown.h b/storage/federatedx/federatedx_pushdown.h
new file mode 100644
index 00000000..673abcfc
--- /dev/null
+++ b/storage/federatedx/federatedx_pushdown.h
@@ -0,0 +1,63 @@
+/*
+ Copyright (c) 2019 MariaDB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#include "derived_handler.h"
+#include "select_handler.h"
+
+/*
+ Implementation class of the derived_handler interface for FEDERATEDX:
+ class declaration
+*/
+
+class ha_federatedx_derived_handler: public derived_handler
+{
+private:
+ FEDERATEDX_SHARE *share;
+ federatedx_txn *txn;
+ federatedx_io **iop;
+ FEDERATEDX_IO_RESULT *stored_result;
+
+public:
+ ha_federatedx_derived_handler(THD* thd_arg, TABLE_LIST *tbl);
+ ~ha_federatedx_derived_handler();
+ int init_scan();
+ int next_row();
+ int end_scan();
+ void print_error(int, unsigned long);
+};
+
+
+/*
+ Implementation class of the select_handler interface for FEDERATEDX:
+ class declaration
+*/
+
+class ha_federatedx_select_handler: public select_handler
+{
+private:
+ FEDERATEDX_SHARE *share;
+ federatedx_txn *txn;
+ federatedx_io **iop;
+ FEDERATEDX_IO_RESULT *stored_result;
+
+public:
+ ha_federatedx_select_handler(THD* thd_arg, SELECT_LEX *sel);
+ ~ha_federatedx_select_handler();
+ int init_scan();
+ int next_row();
+ int end_scan();
+ void print_error(int, unsigned long);
+};
diff --git a/storage/federatedx/federatedx_txn.cc b/storage/federatedx/federatedx_txn.cc
new file mode 100644
index 00000000..c434a008
--- /dev/null
+++ b/storage/federatedx/federatedx_txn.cc
@@ -0,0 +1,439 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+#define MYSQL_SERVER 1
+#include <my_global.h>
+#include "sql_priv.h"
+
+#include "ha_federatedx.h"
+
+#include "m_string.h"
+#include "table.h"
+#include "sql_servers.h"
+
+federatedx_txn::federatedx_txn()
+ : txn_list(0), savepoint_level(0), savepoint_stmt(0), savepoint_next(0)
+{
+ DBUG_ENTER("federatedx_txn::federatedx_txn");
+ DBUG_VOID_RETURN;
+}
+
+federatedx_txn::~federatedx_txn()
+{
+ DBUG_ENTER("federatedx_txn::~federatedx_txn");
+ DBUG_ASSERT(!txn_list);
+ DBUG_VOID_RETURN;
+}
+
+
+void federatedx_txn::close(FEDERATEDX_SERVER *server)
+{
+#ifdef DBUG_TRACE
+ uint count= 0;
+#endif
+ federatedx_io *io, **iop;
+ DBUG_ENTER("federatedx_txn::close");
+
+ DBUG_ASSERT(!server->use_count);
+ DBUG_PRINT("info",("use count: %u connections: %u",
+ server->use_count, server->io_count));
+
+ for (iop= &txn_list; (io= *iop);)
+ {
+ if (io->server != server)
+ iop= &io->txn_next;
+ else
+ {
+ *iop= io->txn_next;
+ io->txn_next= NULL;
+ io->busy= FALSE;
+
+ io->idle_next= server->idle_list;
+ server->idle_list= io;
+ }
+ }
+
+ while ((io= server->idle_list))
+ {
+ server->idle_list= io->idle_next;
+ delete io;
+#ifdef DBUG_TRACE
+ count++;
+#endif
+ }
+
+ DBUG_PRINT("info",("closed %u connections, txn_list: %s", count,
+ txn_list ? "active": "empty"));
+ DBUG_VOID_RETURN;
+}
+
+
+int federatedx_txn::acquire(FEDERATEDX_SHARE *share, void *thd,
+ bool readonly, federatedx_io **ioptr)
+{
+ federatedx_io *io;
+ FEDERATEDX_SERVER *server= share->s;
+ DBUG_ENTER("federatedx_txn::acquire");
+ DBUG_ASSERT(ioptr && server);
+
+ if (!(io= *ioptr))
+ {
+ /* check to see if we have an available IO connection */
+ for (io= txn_list; io; io= io->txn_next)
+ if (io->server == server)
+ break;
+
+ if (!io)
+ {
+ /* check to see if there are any unowned IO connections */
+ mysql_mutex_lock(&server->mutex);
+ if ((io= server->idle_list))
+ {
+ server->idle_list= io->idle_next;
+ io->idle_next= NULL;
+ }
+ else
+ io= federatedx_io::construct(&server->mem_root, server);
+
+ io->txn_next= txn_list;
+ txn_list= io;
+
+ mysql_mutex_unlock(&server->mutex);
+ }
+
+ if (io->busy)
+ *io->owner_ptr= NULL;
+
+ io->busy= TRUE;
+ io->owner_ptr= ioptr;
+ io->set_thd(thd);
+ }
+
+ DBUG_ASSERT(io->busy && io->server == server);
+
+ io->readonly&= readonly;
+
+ DBUG_RETURN((*ioptr= io) ? 0 : -1);
+}
+
+
+void federatedx_txn::release(federatedx_io **ioptr)
+{
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::release");
+ DBUG_ASSERT(ioptr);
+
+ if ((io= *ioptr))
+ {
+ /* mark as available for reuse in this transaction */
+ io->busy= FALSE;
+ *ioptr= NULL;
+
+ DBUG_PRINT("info", ("active: %d autocommit: %d",
+ io->active, io->is_autocommit()));
+
+ if (io->is_autocommit())
+ {
+ io->set_thd(NULL);
+ io->active= FALSE;
+ }
+ }
+
+ release_scan();
+
+ DBUG_VOID_RETURN;
+}
+
+
+void federatedx_txn::release_scan()
+{
+#ifdef DBUG_TRACE
+ uint count= 0, returned= 0;
+#endif
+ federatedx_io *io, **pio;
+ DBUG_ENTER("federatedx_txn::release_scan");
+
+ /* return any inactive and idle connections to the server */
+ for (pio= &txn_list; (io= *pio);)
+ {
+ if (io->active || io->busy)
+ pio= &io->txn_next;
+ else
+ {
+ FEDERATEDX_SERVER *server= io->server;
+
+ /* unlink from list of connections bound to the transaction */
+ *pio= io->txn_next;
+ io->txn_next= NULL;
+
+ /* reset some values */
+ io->readonly= TRUE;
+
+ mysql_mutex_lock(&server->mutex);
+ io->idle_next= server->idle_list;
+ server->idle_list= io;
+ mysql_mutex_unlock(&server->mutex);
+#ifdef DBUG_TRACE
+ returned++;
+#endif
+ }
+#ifdef DBUG_TRACE
+ count++;
+#endif
+ }
+ DBUG_PRINT("info",("returned %u of %u connections(s)", returned, count));
+
+ DBUG_VOID_RETURN;
+}
+
+
+bool federatedx_txn::txn_begin()
+{
+ ulong level= 0;
+ DBUG_ENTER("federatedx_txn::txn_begin");
+
+ if (savepoint_next == 0)
+ {
+ savepoint_next++;
+ savepoint_level= savepoint_stmt= 0;
+ sp_acquire(&level);
+ }
+
+ DBUG_RETURN(level == 1);
+}
+
+
+int federatedx_txn::txn_commit()
+{
+ int error= 0;
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::txn_commit");
+
+ if (savepoint_next)
+ {
+ DBUG_ASSERT(savepoint_stmt != 1);
+
+ for (io= txn_list; io; io= io->txn_next)
+ {
+ int rc= 0;
+
+ if (io->active)
+ rc= io->commit();
+ else
+ io->rollback();
+
+ if (io->active && rc)
+ error= -1;
+
+ io->reset();
+ }
+
+ release_scan();
+
+ savepoint_next= savepoint_stmt= savepoint_level= 0;
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+int federatedx_txn::txn_rollback()
+{
+ int error= 0;
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::txn_commit");
+
+ if (savepoint_next)
+ {
+ DBUG_ASSERT(savepoint_stmt != 1);
+
+ for (io= txn_list; io; io= io->txn_next)
+ {
+ int rc= io->rollback();
+
+ if (io->active && rc)
+ error= -1;
+
+ io->reset();
+ }
+
+ release_scan();
+
+ savepoint_next= savepoint_stmt= savepoint_level= 0;
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+bool federatedx_txn::sp_acquire(ulong *sp)
+{
+ bool rc= FALSE;
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::sp_acquire");
+ DBUG_ASSERT(sp && savepoint_next);
+
+ *sp= savepoint_level= savepoint_next++;
+
+ for (io= txn_list; io; io= io->txn_next)
+ {
+ if (io->readonly)
+ continue;
+
+ io->savepoint_set(savepoint_level);
+ rc= TRUE;
+ }
+
+ DBUG_RETURN(rc);
+}
+
+
+int federatedx_txn::sp_rollback(ulong *sp)
+{
+ ulong level, new_level= savepoint_level;
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::sp_rollback");
+ DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level);
+
+ for (io= txn_list; io; io= io->txn_next)
+ {
+ if (io->readonly)
+ continue;
+
+ if ((level= io->savepoint_rollback(*sp)) < new_level)
+ new_level= level;
+ }
+
+ savepoint_level= new_level;
+
+ DBUG_RETURN(0);
+}
+
+
+int federatedx_txn::sp_release(ulong *sp)
+{
+ ulong level, new_level= savepoint_level;
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::sp_release");
+ DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level);
+
+ for (io= txn_list; io; io= io->txn_next)
+ {
+ if (io->readonly)
+ continue;
+
+ if ((level= io->savepoint_release(*sp)) < new_level)
+ new_level= level;
+ }
+
+ savepoint_level= new_level;
+ *sp= 0;
+
+ DBUG_RETURN(0);
+}
+
+
+bool federatedx_txn::stmt_begin()
+{
+ bool result= FALSE;
+ DBUG_ENTER("federatedx_txn::stmt_begin");
+
+ if (!savepoint_stmt)
+ {
+ if (!savepoint_next)
+ {
+ savepoint_next++;
+ savepoint_level= savepoint_stmt= 0;
+ }
+ result= sp_acquire(&savepoint_stmt);
+ }
+
+ DBUG_RETURN(result);
+}
+
+
+int federatedx_txn::stmt_commit()
+{
+ int result= 0;
+ DBUG_ENTER("federatedx_txn::stmt_commit");
+
+ if (savepoint_stmt == 1)
+ {
+ savepoint_stmt= 0;
+ result= txn_commit();
+ }
+ else
+ if (savepoint_stmt)
+ result= sp_release(&savepoint_stmt);
+
+ DBUG_RETURN(result);
+}
+
+
+int federatedx_txn::stmt_rollback()
+{
+ int result= 0;
+ DBUG_ENTER("federated:txn::stmt_rollback");
+
+ if (savepoint_stmt == 1)
+ {
+ savepoint_stmt= 0;
+ result= txn_rollback();
+ }
+ else
+ if (savepoint_stmt)
+ {
+ result= sp_rollback(&savepoint_stmt);
+ sp_release(&savepoint_stmt);
+ }
+
+ DBUG_RETURN(result);
+}
+
+
+void federatedx_txn::stmt_autocommit()
+{
+ federatedx_io *io;
+ DBUG_ENTER("federatedx_txn::stmt_autocommit");
+
+ for (io= txn_list; savepoint_stmt && io; io= io->txn_next)
+ {
+ if (io->readonly)
+ continue;
+
+ io->savepoint_restrict(savepoint_stmt);
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+
diff --git a/storage/federatedx/ha_federatedx.cc b/storage/federatedx/ha_federatedx.cc
new file mode 100644
index 00000000..598886b8
--- /dev/null
+++ b/storage/federatedx/ha_federatedx.cc
@@ -0,0 +1,3730 @@
+/*
+Copyright (c) 2008-2009, Patrick Galbraith & Antony Curtis
+Copyright (c) 2020, 2022, MariaDB Corporation.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Neither the name of Patrick Galbraith nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+/*
+
+ FederatedX Pluggable Storage Engine
+
+ ha_federatedx.cc - FederatedX Pluggable Storage Engine
+ Patrick Galbraith, 2008
+
+ This is a handler which uses a foreign database as the data file, as
+ opposed to a handler like MyISAM, which uses .MYD files locally.
+
+ How this handler works
+ ----------------------------------
+ Normal database files are local and as such: You create a table called
+ 'users', a file such as 'users.MYD' is created. A handler reads, inserts,
+ deletes, updates data in this file. The data is stored in particular format,
+ so to read, that data has to be parsed into fields, to write, fields have to
+ be stored in this format to write to this data file.
+
+ With FederatedX storage engine, there will be no local files
+ for each table's data (such as .MYD). A foreign database will store
+ the data that would normally be in this file. This will necessitate
+ the use of MySQL client API to read, delete, update, insert this
+ data. The data will have to be retrieve via an SQL call "SELECT *
+ FROM users". Then, to read this data, it will have to be retrieved
+ via mysql_fetch_row one row at a time, then converted from the
+ column in this select into the format that the handler expects.
+
+ The create table will simply create the .frm file, and within the
+ "CREATE TABLE" SQL, there SHALL be any of the following :
+
+ connection=scheme://username:password@hostname:port/database/tablename
+ connection=scheme://username@hostname/database/tablename
+ connection=scheme://username:password@hostname/database/tablename
+ connection=scheme://username:password@hostname/database/tablename
+
+ - OR -
+
+ As of 5.1 federatedx now allows you to use a non-url
+ format, taking advantage of mysql.servers:
+
+ connection="connection_one"
+ connection="connection_one/table_foo"
+
+ An example would be:
+
+ connection=mysql://username:password@hostname:port/database/tablename
+
+ or, if we had:
+
+ create server 'server_one' foreign data wrapper 'mysql' options
+ (HOST '127.0.0.1',
+ DATABASE 'db1',
+ USER 'root',
+ PASSWORD '',
+ PORT 3306,
+ SOCKET '',
+ OWNER 'root');
+
+ CREATE TABLE federatedx.t1 (
+ `id` int(20) NOT NULL,
+ `name` varchar(64) NOT NULL default ''
+ )
+ ENGINE="FEDERATEDX" DEFAULT CHARSET=latin1
+ CONNECTION='server_one';
+
+ So, this will have been the equivalent of
+
+ CONNECTION="mysql://root@127.0.0.1:3306/db1/t1"
+
+ Then, we can also change the server to point to a new schema:
+
+ ALTER SERVER 'server_one' options(DATABASE 'db2');
+
+ All subsequent calls will now be against db2.t1! Guess what? You don't
+ have to perform an alter table!
+
+ This connecton="connection string" is necessary for the handler to be
+ able to connect to the foreign server, either by URL, or by server
+ name.
+
+
+ The basic flow is this:
+
+ SQL calls issues locally ->
+ mysql handler API (data in handler format) ->
+ mysql client API (data converted to SQL calls) ->
+ foreign database -> mysql client API ->
+ convert result sets (if any) to handler format ->
+ handler API -> results or rows affected to local
+
+ What this handler does and doesn't support
+ ------------------------------------------
+ * Tables MUST be created on the foreign server prior to any action on those
+ tables via the handler, first version. IMPORTANT: IF you MUST use the
+ federatedx storage engine type on the REMOTE end, MAKE SURE [ :) ] That
+ the table you connect to IS NOT a table pointing BACK to your ORIGNAL
+ table! You know and have heard the screaching of audio feedback? You
+ know putting two mirror in front of each other how the reflection
+ continues for eternity? Well, need I say more?!
+ * There will not be support for transactions.
+ * There is no way for the handler to know if the foreign database or table
+ has changed. The reason for this is that this database has to work like a
+ data file that would never be written to by anything other than the
+ database. The integrity of the data in the local table could be breached
+ if there was any change to the foreign database.
+ * Support for SELECT, INSERT, UPDATE , DELETE, indexes.
+ * No ALTER TABLE, DROP TABLE or any other Data Definition Language calls.
+ * Prepared statements will not be used in the first implementation, it
+ remains to to be seen whether the limited subset of the client API for the
+ server supports this.
+ * This uses SELECT, INSERT, UPDATE, DELETE and not HANDLER for its
+ implementation.
+ * This will not work with the query cache.
+
+ Method calls
+
+ A two column table, with one record:
+
+ (SELECT)
+
+ "SELECT * FROM foo"
+ ha_federatedx::info
+ ha_federatedx::scan_time:
+ ha_federatedx::rnd_init: share->select_query SELECT * FROM foo
+ ha_federatedx::extra
+
+ <for every row of data retrieved>
+ ha_federatedx::rnd_next
+ ha_federatedx::convert_row_to_internal_format
+ ha_federatedx::rnd_next
+ </for every row of data retrieved>
+
+ ha_federatedx::rnd_end
+ ha_federatedx::extra
+ ha_federatedx::reset
+
+ (INSERT)
+
+ "INSERT INTO foo (id, ts) VALUES (2, now());"
+
+ ha_federatedx::write_row
+
+ ha_federatedx::reset
+
+ (UPDATE)
+
+ "UPDATE foo SET ts = now() WHERE id = 1;"
+
+ ha_federatedx::index_init
+ ha_federatedx::index_read
+ ha_federatedx::index_read_idx
+ ha_federatedx::rnd_next
+ ha_federatedx::convert_row_to_internal_format
+ ha_federatedx::update_row
+
+ ha_federatedx::extra
+ ha_federatedx::extra
+ ha_federatedx::extra
+ ha_federatedx::external_lock
+ ha_federatedx::reset
+
+
+ How do I use this handler?
+ --------------------------
+
+ <insert text about plugin storage engine>
+
+ Next, to use this handler, it's very simple. You must
+ have two databases running, either both on the same host, or
+ on different hosts.
+
+ One the server that will be connecting to the foreign
+ host (client), you create your table as such:
+
+ CREATE TABLE test_table (
+ id int(20) NOT NULL auto_increment,
+ name varchar(32) NOT NULL default '',
+ other int(20) NOT NULL default '0',
+ PRIMARY KEY (id),
+ KEY name (name),
+ KEY other_key (other))
+ ENGINE="FEDERATEDX"
+ DEFAULT CHARSET=latin1
+ CONNECTION='mysql://root@127.0.0.1:9306/federatedx/test_federatedx';
+
+ Notice the "COMMENT" and "ENGINE" field? This is where you
+ respectively set the engine type, "FEDERATEDX" and foreign
+ host information, this being the database your 'client' database
+ will connect to and use as the "data file". Obviously, the foreign
+ database is running on port 9306, so you want to start up your other
+ database so that it is indeed on port 9306, and your federatedx
+ database on a port other than that. In my setup, I use port 5554
+ for federatedx, and port 5555 for the foreign database.
+
+ Then, on the foreign database:
+
+ CREATE TABLE test_table (
+ id int(20) NOT NULL auto_increment,
+ name varchar(32) NOT NULL default '',
+ other int(20) NOT NULL default '0',
+ PRIMARY KEY (id),
+ KEY name (name),
+ KEY other_key (other))
+ ENGINE="<NAME>" <-- whatever you want, or not specify
+ DEFAULT CHARSET=latin1 ;
+
+ This table is exactly the same (and must be exactly the same),
+ except that it is not using the federatedx handler and does
+ not need the URL.
+
+
+ How to see the handler in action
+ --------------------------------
+
+ When developing this handler, I compiled the federatedx database with
+ debugging:
+
+ ./configure --with-federatedx-storage-engine
+ --prefix=/home/mysql/mysql-build/federatedx/ --with-debug
+
+ Once compiled, I did a 'make install' (not for the purpose of installing
+ the binary, but to install all the files the binary expects to see in the
+ diretory I specified in the build with --prefix,
+ "/home/mysql/mysql-build/federatedx".
+
+ Then, I started the foreign server:
+
+ /usr/local/mysql/bin/mysqld_safe
+ --user=mysql --log=/tmp/mysqld.5555.log -P 5555
+
+ Then, I went back to the directory containing the newly compiled mysqld,
+ <builddir>/sql/, started up gdb:
+
+ gdb ./mysqld
+
+ Then, withn the (gdb) prompt:
+ (gdb) run --gdb --port=5554 --socket=/tmp/mysqld.5554 --skip-innodb --debug-dbug
+
+ Next, I open several windows for each:
+
+ 1. Tail the debug trace: tail -f /tmp/mysqld.trace|grep ha_fed
+ 2. Tail the SQL calls to the foreign database: tail -f /tmp/mysqld.5555.log
+ 3. A window with a client open to the federatedx server on port 5554
+ 4. A window with a client open to the federatedx server on port 5555
+
+ I would create a table on the client to the foreign server on port
+ 5555, and then to the federatedx server on port 5554. At this point,
+ I would run whatever queries I wanted to on the federatedx server,
+ just always remembering that whatever changes I wanted to make on
+ the table, or if I created new tables, that I would have to do that
+ on the foreign server.
+
+ Another thing to look for is 'show variables' to show you that you have
+ support for federatedx handler support:
+
+ show variables like '%federat%'
+
+ and:
+
+ show storage engines;
+
+ Both should display the federatedx storage handler.
+
+
+ Testing
+ -------
+
+ Testing for FederatedX as a pluggable storage engine for
+ now is a manual process that I intend to build a test
+ suite that works for all pluggable storage engines.
+
+ How to test
+
+ 1. cp fed.dat /tmp
+ (make sure you have access to "test". Use a user that has
+ super privileges for now)
+ 2. mysql -f -u root test < federated.test > federated.myresult 2>&1
+ 3. diff federated.result federated.myresult (there _should_ be no differences)
+
+
+*/
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+#define MYSQL_SERVER 1
+#include <my_global.h>
+#include <mysql/plugin.h>
+#include <mysql.h>
+#include "ha_federatedx.h"
+#include "sql_servers.h"
+#include "sql_analyse.h" // append_escaped()
+#include "sql_show.h" // append_identifier()
+#include "tztime.h" // my_tz_find()
+#include "sql_select.h"
+
+#ifdef I_AM_PARANOID
+#define MIN_PORT 1023
+#else
+#define MIN_PORT 0
+#endif
+
+/* Variables for federatedx share methods */
+static HASH federatedx_open_tables; // To track open tables
+static HASH federatedx_open_servers; // To track open servers
+mysql_mutex_t federatedx_mutex; // To init the hash
+const char ident_quote_char= '`'; // Character for quoting
+ // identifiers
+const char value_quote_char= '\''; // Character for quoting
+ // literals
+static const int bulk_padding= 64; // bytes "overhead" in packet
+
+/* Variables used when chopping off trailing characters */
+static const uint sizeof_trailing_comma= sizeof(", ") - 1;
+static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
+static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
+
+static Time_zone *UTC= 0;
+
+/* Static declaration for handerton */
+static handler *federatedx_create_handler(handlerton *hton,
+ TABLE_SHARE *table,
+ MEM_ROOT *mem_root);
+
+/* FederatedX storage engine handlerton */
+
+static handler *federatedx_create_handler(handlerton *hton,
+ TABLE_SHARE *table,
+ MEM_ROOT *mem_root)
+{
+ return new (mem_root) ha_federatedx(hton, table);
+}
+
+
+/* Function we use in the creation of our hash to get key */
+
+static uchar *
+federatedx_share_get_key(FEDERATEDX_SHARE *share, size_t *length,
+ my_bool not_used __attribute__ ((unused)))
+{
+ *length= share->share_key_length;
+ return (uchar*) share->share_key;
+}
+
+
+static uchar *
+federatedx_server_get_key(FEDERATEDX_SERVER *server, size_t *length,
+ my_bool not_used __attribute__ ((unused)))
+{
+ *length= server->key_length;
+ return server->key;
+}
+
+#ifdef HAVE_PSI_INTERFACE
+static PSI_mutex_key fe_key_mutex_federatedx, fe_key_mutex_FEDERATEDX_SERVER_mutex;
+
+static PSI_mutex_info all_federated_mutexes[]=
+{
+ { &fe_key_mutex_federatedx, "federatedx", PSI_FLAG_GLOBAL},
+ { &fe_key_mutex_FEDERATEDX_SERVER_mutex, "FEDERATED_SERVER::mutex", 0}
+};
+
+static void init_federated_psi_keys(void)
+{
+ const char* category= "federated";
+ int count;
+
+ if (PSI_server == NULL)
+ return;
+
+ count= array_elements(all_federated_mutexes);
+ PSI_server->register_mutex(category, all_federated_mutexes, count);
+}
+#else
+#define init_federated_psi_keys() /* no-op */
+#endif /* HAVE_PSI_INTERFACE */
+
+handlerton* federatedx_hton;
+
+static derived_handler*
+create_federatedx_derived_handler(THD* thd, TABLE_LIST *derived);
+static select_handler*
+create_federatedx_select_handler(THD* thd, SELECT_LEX *sel);
+
+/*
+ Initialize the federatedx handler.
+
+ SYNOPSIS
+ federatedx_db_init()
+ p Handlerton
+
+ RETURN
+ FALSE OK
+ TRUE Error
+*/
+
+int federatedx_db_init(void *p)
+{
+ DBUG_ENTER("federatedx_db_init");
+ init_federated_psi_keys();
+ federatedx_hton= (handlerton *)p;
+ /* Needed to work with old .frm files */
+ federatedx_hton->db_type= DB_TYPE_FEDERATED_DB;
+ federatedx_hton->savepoint_offset= sizeof(ulong);
+ federatedx_hton->close_connection= ha_federatedx::disconnect;
+ federatedx_hton->savepoint_set= ha_federatedx::savepoint_set;
+ federatedx_hton->savepoint_rollback= ha_federatedx::savepoint_rollback;
+ federatedx_hton->savepoint_release= ha_federatedx::savepoint_release;
+ federatedx_hton->commit= ha_federatedx::commit;
+ federatedx_hton->rollback= ha_federatedx::rollback;
+ federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted;
+ federatedx_hton->create= federatedx_create_handler;
+ federatedx_hton->drop_table= [](handlerton *, const char*) { return -1; };
+ federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED;
+ federatedx_hton->create_derived= create_federatedx_derived_handler;
+ federatedx_hton->create_select= create_federatedx_select_handler;
+
+ if (mysql_mutex_init(fe_key_mutex_federatedx,
+ &federatedx_mutex, MY_MUTEX_INIT_FAST))
+ goto error;
+ if (!my_hash_init(PSI_INSTRUMENT_ME, &federatedx_open_tables, &my_charset_bin, 32, 0, 0,
+ (my_hash_get_key) federatedx_share_get_key, 0, 0) &&
+ !my_hash_init(PSI_INSTRUMENT_ME, &federatedx_open_servers, &my_charset_bin, 32, 0, 0,
+ (my_hash_get_key) federatedx_server_get_key, 0, 0))
+ {
+ DBUG_RETURN(FALSE);
+ }
+
+ mysql_mutex_destroy(&federatedx_mutex);
+error:
+ DBUG_RETURN(TRUE);
+}
+
+
+/*
+ Release the federatedx handler.
+
+ SYNOPSIS
+ federatedx_db_end()
+
+ RETURN
+ FALSE OK
+*/
+
+int federatedx_done(void *p)
+{
+ my_hash_free(&federatedx_open_tables);
+ my_hash_free(&federatedx_open_servers);
+ mysql_mutex_destroy(&federatedx_mutex);
+
+ return 0;
+}
+
+/**
+ @brief Append identifiers to the string.
+
+ @param[in,out] string The target string.
+ @param[in] name Identifier name
+ @param[in] length Length of identifier name in bytes
+ @param[in] quote_char Quote char to use for quoting identifier.
+
+ @return Operation Status
+ @retval FALSE OK
+ @retval TRUE There was an error appending to the string.
+
+ @note This function is based upon the append_identifier() function
+ in sql_show.cc except that quoting always occurs.
+*/
+
+bool append_ident(String *string, const char *name, size_t length,
+ const char quote_char)
+{
+ bool result;
+ uint clen;
+ const char *name_end;
+ DBUG_ENTER("append_ident");
+
+ if (quote_char)
+ {
+ string->reserve(length * 2 + 2);
+ if ((result= string->append(&quote_char, 1, system_charset_info)))
+ goto err;
+
+ for (name_end= name+length; name < name_end; name+= clen)
+ {
+ uchar c= *(uchar *) name;
+ clen= system_charset_info->charlen_fix(name, name_end);
+ if (clen == 1 && c == (uchar) quote_char &&
+ (result= string->append(&quote_char, 1, system_charset_info)))
+ goto err;
+ if ((result= string->append(name, clen, string->charset())))
+ goto err;
+ }
+ result= string->append(&quote_char, 1, system_charset_info);
+ }
+ else
+ result= string->append(name, length, system_charset_info);
+
+err:
+ DBUG_RETURN(result);
+}
+
+
+static int parse_url_error(FEDERATEDX_SHARE *share, TABLE_SHARE *table_s,
+ int error_num)
+{
+ char buf[FEDERATEDX_QUERY_BUFFER_SIZE];
+ size_t buf_len;
+ DBUG_ENTER("ha_federatedx parse_url_error");
+
+ buf_len= MY_MIN(table_s->connect_string.length,
+ FEDERATEDX_QUERY_BUFFER_SIZE-1);
+ strmake(buf, table_s->connect_string.str, buf_len);
+ my_error(error_num, MYF(0), buf, 14);
+ DBUG_RETURN(error_num);
+}
+
+/*
+ retrieve server object which contains server meta-data
+ from the system table given a server's name, set share
+ connection parameter members
+*/
+int get_connection(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share)
+{
+ int error_num= ER_FOREIGN_SERVER_DOESNT_EXIST;
+ FOREIGN_SERVER *server, server_buffer;
+ DBUG_ENTER("ha_federatedx::get_connection");
+
+ /*
+ get_server_by_name() clones the server if exists and allocates
+ copies of strings in the supplied mem_root
+ */
+ if (!(server=
+ get_server_by_name(mem_root, share->connection_string, &server_buffer)))
+ {
+ DBUG_PRINT("info", ("get_server_by_name returned > 0 error condition!"));
+ /* need to come up with error handling */
+ error_num=1;
+ goto error;
+ }
+ DBUG_PRINT("info", ("get_server_by_name returned server at %p",
+ server));
+
+ /*
+ Most of these should never be empty strings, error handling will
+ need to be implemented. Also, is this the best way to set the share
+ members? Is there some allocation needed? In running this code, it works
+ except there are errors in the trace file of the share being overrun
+ at the address of the share.
+ */
+ share->server_name_length= server->server_name_length;
+ share->server_name= const_cast<char*>(server->server_name);
+ share->username= const_cast<char*>(server->username);
+ share->password= const_cast<char*>(server->password);
+ share->database= const_cast<char*>(server->db);
+ share->port= server->port > MIN_PORT && server->port < 65536 ?
+ (ushort) server->port : MYSQL_PORT;
+ share->hostname= const_cast<char*>(server->host);
+ if (!(share->socket= const_cast<char*>(server->socket)) &&
+ !strcmp(share->hostname, my_localhost))
+ share->socket= (char *) MYSQL_UNIX_ADDR;
+ share->scheme= const_cast<char*>(server->scheme);
+
+ DBUG_PRINT("info", ("share->username: %s", share->username));
+ DBUG_PRINT("info", ("share->password: %s", share->password));
+ DBUG_PRINT("info", ("share->hostname: %s", share->hostname));
+ DBUG_PRINT("info", ("share->database: %s", share->database));
+ DBUG_PRINT("info", ("share->port: %d", share->port));
+ DBUG_PRINT("info", ("share->socket: %s", share->socket));
+ DBUG_RETURN(0);
+
+error:
+ my_printf_error(error_num, "server name: '%s' doesn't exist!",
+ MYF(0), share->connection_string);
+ DBUG_RETURN(error_num);
+}
+
+/*
+ Parse connection info from table->s->connect_string
+
+ SYNOPSIS
+ parse_url()
+ mem_root MEM_ROOT pointer for memory allocation
+ share pointer to FEDERATEDX share
+ table_s pointer to current TABLE_SHARE class
+ table_create_flag determines what error to throw
+
+ DESCRIPTION
+ Populates the share with information about the connection
+ to the foreign database that will serve as the data source.
+ This string must be specified (currently) in the "CONNECTION" field,
+ listed in the CREATE TABLE statement.
+
+ This string MUST be in the format of any of these:
+
+ CONNECTION="scheme://username:password@hostname:port/database/table"
+ CONNECTION="scheme://username@hostname/database/table"
+ CONNECTION="scheme://username@hostname:port/database/table"
+ CONNECTION="scheme://username:password@hostname/database/table"
+
+ _OR_
+
+ CONNECTION="connection name"
+
+
+
+ An Example:
+
+ CREATE TABLE t1 (id int(32))
+ ENGINE="FEDERATEDX"
+ CONNECTION="mysql://joe:joespass@192.168.1.111:9308/federatedx/testtable";
+
+ CREATE TABLE t2 (
+ id int(4) NOT NULL auto_increment,
+ name varchar(32) NOT NULL,
+ PRIMARY KEY(id)
+ ) ENGINE="FEDERATEDX" CONNECTION="my_conn";
+
+ ***IMPORTANT***
+ Currently, the FederatedX Storage Engine only supports connecting to another
+ Database ("scheme" of "mysql"). Connections using JDBC as well as
+ other connectors are in the planning stage.
+
+
+ 'password' and 'port' are both optional.
+
+ RETURN VALUE
+ 0 success
+ error_num particular error code
+
+*/
+
+static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share,
+ TABLE_SHARE *table_s, uint table_create_flag)
+{
+ uint error_num= (table_create_flag ?
+ ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
+ ER_FOREIGN_DATA_STRING_INVALID);
+ DBUG_ENTER("ha_federatedx::parse_url");
+
+ share->port= 0;
+ share->socket= 0;
+ DBUG_PRINT("info", ("share at %p", share));
+ DBUG_PRINT("info", ("Length: %u", (uint) table_s->connect_string.length));
+ DBUG_PRINT("info", ("String: '%.*s'", (int) table_s->connect_string.length,
+ table_s->connect_string.str));
+ share->connection_string= strmake_root(mem_root, table_s->connect_string.str,
+ table_s->connect_string.length);
+
+ DBUG_PRINT("info",("parse_url alloced share->connection_string %p",
+ share->connection_string));
+
+ DBUG_PRINT("info",("share->connection_string: %s",share->connection_string));
+ /*
+ No :// or @ in connection string. Must be a straight connection name of
+ either "servername" or "servername/tablename"
+ */
+ if ((!strstr(share->connection_string, "://") &&
+ (!strchr(share->connection_string, '@'))))
+ {
+
+ DBUG_PRINT("info",
+ ("share->connection_string: %s internal format "
+ "share->connection_string: %p",
+ share->connection_string,
+ share->connection_string));
+
+ /* ok, so we do a little parsing, but not completely! */
+ share->parsed= FALSE;
+ /*
+ If there is a single '/' in the connection string, this means the user is
+ specifying a table name
+ */
+
+ if ((share->table_name= strchr(share->connection_string, '/')))
+ {
+ *share->table_name++= '\0';
+ share->table_name_length= strlen(share->table_name);
+
+ DBUG_PRINT("info",
+ ("internal format, parsed table_name "
+ "share->connection_string: %s share->table_name: %s",
+ share->connection_string, share->table_name));
+
+ /*
+ there better not be any more '/'s !
+ */
+ if (strchr(share->table_name, '/'))
+ goto error;
+ }
+ /*
+ Otherwise, straight server name, use tablename of federatedx table
+ as remote table name
+ */
+ else
+ {
+ /*
+ Connection specifies everything but, resort to
+ expecting remote and foreign table names to match
+ */
+ share->table_name= strmake_root(mem_root, table_s->table_name.str,
+ (share->table_name_length=
+ table_s->table_name.length));
+ DBUG_PRINT("info",
+ ("internal format, default table_name "
+ "share->connection_string: %s share->table_name: %s",
+ share->connection_string, share->table_name));
+ }
+
+ if ((error_num= get_connection(mem_root, share)))
+ goto error;
+ }
+ else
+ {
+ share->parsed= TRUE;
+ // Add a null for later termination of table name
+ share->connection_string[table_s->connect_string.length]= 0;
+ share->scheme= share->connection_string;
+ DBUG_PRINT("info",("parse_url alloced share->scheme: %p",
+ share->scheme));
+
+ /*
+ Remove addition of null terminator and store length
+ for each string in share
+ */
+ if (!(share->username= strstr(share->scheme, "://")))
+ goto error;
+ share->scheme[share->username - share->scheme]= '\0';
+
+ if (!federatedx_io::handles_scheme(share->scheme))
+ goto error;
+
+ share->username+= 3;
+
+ if (!(share->hostname= strchr(share->username, '@')))
+ goto error;
+ *share->hostname++= '\0'; // End username
+
+ if ((share->password= strchr(share->username, ':')))
+ {
+ *share->password++= '\0'; // End username
+
+ /* make sure there isn't an extra / or @ */
+ if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
+ goto error;
+ /*
+ Found that if the string is:
+ user:@hostname:port/db/table
+ Then password is a null string, so set to NULL
+ */
+ if (share->password[0] == '\0')
+ share->password= NULL;
+ }
+
+ /* make sure there isn't an extra / or @ */
+ if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
+ goto error;
+
+ if (!(share->database= strchr(share->hostname, '/')))
+ goto error;
+ *share->database++= '\0';
+
+ if ((share->sport= strchr(share->hostname, ':')))
+ {
+ *share->sport++= '\0';
+ if (share->sport[0] == '\0')
+ share->sport= NULL;
+ else
+ share->port= atoi(share->sport);
+ }
+
+ if (!(share->table_name= strchr(share->database, '/')))
+ goto error;
+ *share->table_name++= '\0';
+
+ share->table_name_length= strlen(share->table_name);
+
+ /* make sure there's not an extra / */
+ if ((strchr(share->table_name, '/')))
+ goto error;
+
+ if (share->hostname[0] == '\0')
+ share->hostname= strdup_root(mem_root, my_localhost);
+
+ }
+ if (!share->port)
+ {
+ if (0 == strcmp(share->hostname, my_localhost))
+ share->socket= (char *) MYSQL_UNIX_ADDR;
+ else
+ share->port= MYSQL_PORT;
+ }
+
+ DBUG_PRINT("info",
+ ("scheme: %s username: %s password: %s hostname: %s "
+ "port: %d db: %s tablename: %s",
+ share->scheme, share->username, share->password,
+ share->hostname, share->port, share->database,
+ share->table_name));
+
+ DBUG_RETURN(0);
+
+error:
+ DBUG_RETURN(parse_url_error(share, table_s, error_num));
+}
+
+/*****************************************************************************
+** FEDERATEDX tables
+*****************************************************************************/
+
+ha_federatedx::ha_federatedx(handlerton *hton,
+ TABLE_SHARE *table_arg)
+ :handler(hton, table_arg),
+ txn(0), io(0), stored_result(0)
+{
+ bzero(&bulk_insert, sizeof(bulk_insert));
+}
+
+
+/*
+ Convert MySQL result set row to handler internal format
+
+ SYNOPSIS
+ convert_row_to_internal_format()
+ record Byte pointer to record
+ row MySQL result set row from fetchrow()
+ result Result set to use
+
+ DESCRIPTION
+ This method simply iterates through a row returned via fetchrow with
+ values from a successful SELECT , and then stores each column's value
+ in the field object via the field object pointer (pointing to the table's
+ array of field object pointers). This is how the handler needs the data
+ to be stored to then return results back to the user
+
+ RETURN VALUE
+ 0 After fields have had field values stored from record
+*/
+
+uint ha_federatedx::convert_row_to_internal_format(uchar *record,
+ FEDERATEDX_IO_ROW *row,
+ FEDERATEDX_IO_RESULT *result)
+{
+ ulong *lengths;
+ Field **field;
+ int column= 0;
+ MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->write_set);
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ DBUG_ENTER("ha_federatedx::convert_row_to_internal_format");
+
+ table->in_use->variables.time_zone= UTC;
+ lengths= io->fetch_lengths(result);
+
+ for (field= table->field; *field; field++, column++)
+ {
+ /*
+ index variable to move us through the row at the
+ same iterative step as the field
+ */
+ my_ptrdiff_t old_ptr;
+ old_ptr= (my_ptrdiff_t) (record - table->record[0]);
+ (*field)->move_field_offset(old_ptr);
+ if (io->is_column_null(row, column))
+ (*field)->set_null();
+ else
+ {
+ if (bitmap_is_set(table->read_set, (*field)->field_index))
+ {
+ (*field)->set_notnull();
+ (*field)->store_text(io->get_column_data(row, column), lengths[column],
+ &my_charset_bin);
+ }
+ }
+ (*field)->move_field_offset(-old_ptr);
+ }
+ table->in_use->variables.time_zone= saved_time_zone;
+ dbug_tmp_restore_column_map(&table->write_set, old_map);
+ DBUG_RETURN(0);
+}
+
+static bool emit_key_part_name(String *to, KEY_PART_INFO *part)
+{
+ DBUG_ENTER("emit_key_part_name");
+ if (append_ident(to, part->field->field_name.str,
+ part->field->field_name.length, ident_quote_char))
+ DBUG_RETURN(1); // Out of memory
+ DBUG_RETURN(0);
+}
+
+static bool emit_key_part_element(String *to, KEY_PART_INFO *part,
+ bool needs_quotes, bool is_like,
+ const uchar *ptr, uint len)
+{
+ Field *field= part->field;
+ DBUG_ENTER("emit_key_part_element");
+
+ if (needs_quotes && to->append(STRING_WITH_LEN("'")))
+ DBUG_RETURN(1);
+
+ if (part->type == HA_KEYTYPE_BIT)
+ {
+ char buff[STRING_BUFFER_USUAL_SIZE], *buf= buff;
+
+ *buf++= '0';
+ *buf++= 'x';
+ buf= octet2hex(buf, (char*) ptr, len);
+ if (to->append((char*) buff, (uint)(buf - buff)))
+ DBUG_RETURN(1);
+ }
+ else if (part->key_part_flag & HA_BLOB_PART)
+ {
+ uint blob_length= uint2korr(ptr);
+ String blob((char*) ptr+HA_KEY_BLOB_LENGTH,
+ blob_length, &my_charset_bin);
+ if (to->append_for_single_quote(&blob))
+ DBUG_RETURN(1);
+ }
+ else if (part->key_part_flag & HA_VAR_LENGTH_PART)
+ {
+ uint var_length= uint2korr(ptr);
+ String varchar((char*) ptr+HA_KEY_BLOB_LENGTH,
+ var_length, &my_charset_bin);
+ if (to->append_for_single_quote(&varchar))
+ DBUG_RETURN(1);
+ }
+ else
+ {
+ char strbuff[MAX_FIELD_WIDTH];
+ String str(strbuff, sizeof(strbuff), part->field->charset()), *res;
+
+ res= field->val_str(&str, ptr);
+
+ if (field->result_type() == STRING_RESULT)
+ {
+ if (to->append_for_single_quote(res))
+ DBUG_RETURN(1);
+ }
+ else if (to->append(res->ptr(), res->length()))
+ DBUG_RETURN(1);
+ }
+
+ if (is_like && to->append(STRING_WITH_LEN("%")))
+ DBUG_RETURN(1);
+
+ if (needs_quotes && to->append(STRING_WITH_LEN("'")))
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Create a WHERE clause based off of values in keys
+ Note: This code was inspired by key_copy from key.cc
+
+ SYNOPSIS
+ create_where_from_key ()
+ to String object to store WHERE clause
+ key_info KEY struct pointer
+ key byte pointer containing key
+ key_length length of key
+ range_type 0 - no range, 1 - min range, 2 - max range
+ (see enum range_operation)
+
+ DESCRIPTION
+ Using iteration through all the keys via a KEY_PART_INFO pointer,
+ This method 'extracts' the value of each key in the byte pointer
+ *key, and for each key found, constructs an appropriate WHERE clause
+
+ RETURN VALUE
+ 0 After all keys have been accounted for to create the WHERE clause
+ 1 No keys found
+
+ Range flags Table per Timour:
+
+ -----------------
+ - start_key:
+ * ">" -> HA_READ_AFTER_KEY
+ * ">=" -> HA_READ_KEY_OR_NEXT
+ * "=" -> HA_READ_KEY_EXACT
+
+ - end_key:
+ * "<" -> HA_READ_BEFORE_KEY
+ * "<=" -> HA_READ_AFTER_KEY
+
+ records_in_range:
+ -----------------
+ - start_key:
+ * ">" -> HA_READ_AFTER_KEY
+ * ">=" -> HA_READ_KEY_EXACT
+ * "=" -> HA_READ_KEY_EXACT
+
+ - end_key:
+ * "<" -> HA_READ_BEFORE_KEY
+ * "<=" -> HA_READ_AFTER_KEY
+ * "=" -> HA_READ_AFTER_KEY
+
+0 HA_READ_KEY_EXACT, Find first record else error
+1 HA_READ_KEY_OR_NEXT, Record or next record
+2 HA_READ_KEY_OR_PREV, Record or previous
+3 HA_READ_AFTER_KEY, Find next rec. after key-record
+4 HA_READ_BEFORE_KEY, Find next rec. before key-record
+5 HA_READ_PREFIX, Key which as same prefix
+6 HA_READ_PREFIX_LAST, Last key with the same prefix
+7 HA_READ_PREFIX_LAST_OR_PREV, Last or prev key with the same prefix
+
+Flags that I've found:
+
+id, primary key, varchar
+
+id = 'ccccc'
+records_in_range: start_key 0 end_key 3
+read_range_first: start_key 0 end_key NULL
+
+id > 'ccccc'
+records_in_range: start_key 3 end_key NULL
+read_range_first: start_key 3 end_key NULL
+
+id < 'ccccc'
+records_in_range: start_key NULL end_key 4
+read_range_first: start_key NULL end_key 4
+
+id <= 'ccccc'
+records_in_range: start_key NULL end_key 3
+read_range_first: start_key NULL end_key 3
+
+id >= 'ccccc'
+records_in_range: start_key 0 end_key NULL
+read_range_first: start_key 1 end_key NULL
+
+id like 'cc%cc'
+records_in_range: start_key 0 end_key 3
+read_range_first: start_key 1 end_key 3
+
+id > 'aaaaa' and id < 'ccccc'
+records_in_range: start_key 3 end_key 4
+read_range_first: start_key 3 end_key 4
+
+id >= 'aaaaa' and id < 'ccccc';
+records_in_range: start_key 0 end_key 4
+read_range_first: start_key 1 end_key 4
+
+id >= 'aaaaa' and id <= 'ccccc';
+records_in_range: start_key 0 end_key 3
+read_range_first: start_key 1 end_key 3
+
+id > 'aaaaa' and id <= 'ccccc';
+records_in_range: start_key 3 end_key 3
+read_range_first: start_key 3 end_key 3
+
+numeric keys:
+
+id = 4
+index_read_idx: start_key 0 end_key NULL
+
+id > 4
+records_in_range: start_key 3 end_key NULL
+read_range_first: start_key 3 end_key NULL
+
+id >= 4
+records_in_range: start_key 0 end_key NULL
+read_range_first: start_key 1 end_key NULL
+
+id < 4
+records_in_range: start_key NULL end_key 4
+read_range_first: start_key NULL end_key 4
+
+id <= 4
+records_in_range: start_key NULL end_key 3
+read_range_first: start_key NULL end_key 3
+
+id like 4
+full table scan, select * from
+
+id > 2 and id < 8
+records_in_range: start_key 3 end_key 4
+read_range_first: start_key 3 end_key 4
+
+id >= 2 and id < 8
+records_in_range: start_key 0 end_key 4
+read_range_first: start_key 1 end_key 4
+
+id >= 2 and id <= 8
+records_in_range: start_key 0 end_key 3
+read_range_first: start_key 1 end_key 3
+
+id > 2 and id <= 8
+records_in_range: start_key 3 end_key 3
+read_range_first: start_key 3 end_key 3
+
+multi keys (id int, name varchar, other varchar)
+
+id = 1;
+records_in_range: start_key 0 end_key 3
+read_range_first: start_key 0 end_key NULL
+
+id > 4;
+id > 2 and name = '333'; remote: id > 2
+id > 2 and name > '333'; remote: id > 2
+id > 2 and name > '333' and other < 'ddd'; remote: id > 2 no results
+id > 2 and name >= '333' and other < 'ddd'; remote: id > 2 1 result
+id >= 4 and name = 'eric was here' and other > 'eeee';
+records_in_range: start_key 3 end_key NULL
+read_range_first: start_key 3 end_key NULL
+
+id >= 4;
+id >= 2 and name = '333' and other < 'ddd';
+remote: `id` >= 2 AND `name` >= '333';
+records_in_range: start_key 0 end_key NULL
+read_range_first: start_key 1 end_key NULL
+
+id < 4;
+id < 3 and name = '222' and other <= 'ccc'; remote: id < 3
+records_in_range: start_key NULL end_key 4
+read_range_first: start_key NULL end_key 4
+
+id <= 4;
+records_in_range: start_key NULL end_key 3
+read_range_first: start_key NULL end_key 3
+
+id like 4;
+full table scan
+
+id > 2 and id < 4;
+records_in_range: start_key 3 end_key 4
+read_range_first: start_key 3 end_key 4
+
+id >= 2 and id < 4;
+records_in_range: start_key 0 end_key 4
+read_range_first: start_key 1 end_key 4
+
+id >= 2 and id <= 4;
+records_in_range: start_key 0 end_key 3
+read_range_first: start_key 1 end_key 3
+
+id > 2 and id <= 4;
+id = 6 and name = 'eric was here' and other > 'eeee';
+remote: (`id` > 6 AND `name` > 'eric was here' AND `other` > 'eeee')
+AND (`id` <= 6) AND ( AND `name` <= 'eric was here')
+no results
+records_in_range: start_key 3 end_key 3
+read_range_first: start_key 3 end_key 3
+
+Summary:
+
+* If the start key flag is 0 the max key flag shouldn't even be set,
+ and if it is, the query produced would be invalid.
+* Multipart keys, even if containing some or all numeric columns,
+ are treated the same as non-numeric keys
+
+ If the query is " = " (quotes or not):
+ - records in range start key flag HA_READ_KEY_EXACT,
+ end key flag HA_READ_AFTER_KEY (incorrect)
+ - any other: start key flag HA_READ_KEY_OR_NEXT,
+ end key flag HA_READ_AFTER_KEY (correct)
+
+* 'like' queries (of key)
+ - Numeric, full table scan
+ - Non-numeric
+ records_in_range: start_key 0 end_key 3
+ other : start_key 1 end_key 3
+
+* If the key flag is HA_READ_AFTER_KEY:
+ if start_key, append >
+ if end_key, append <=
+
+* If create_where_key was called by records_in_range:
+
+ - if the key is numeric:
+ start key flag is 0 when end key is NULL, end key flag is 3 or 4
+ - if create_where_key was called by any other function:
+ start key flag is 1 when end key is NULL, end key flag is 3 or 4
+ - if the key is non-numeric, or multipart
+ When the query is an exact match, the start key flag is 0,
+ end key flag is 3 for what should be a no-range condition where
+ you should have 0 and max key NULL, which it is if called by
+ read_range_first
+
+Conclusion:
+
+1. Need logic to determin if a key is min or max when the flag is
+HA_READ_AFTER_KEY, and handle appending correct operator accordingly
+
+2. Need a boolean flag to pass to create_where_from_key, used in the
+switch statement. Add 1 to the flag if:
+ - start key flag is HA_READ_KEY_EXACT and the end key is NULL
+
+*/
+
+bool ha_federatedx::create_where_from_key(String *to,
+ KEY *key_info,
+ const key_range *start_key,
+ const key_range *end_key,
+ bool eq_range)
+{
+ bool both_not_null=
+ (start_key != NULL && end_key != NULL) ? TRUE : FALSE;
+ const uchar *ptr;
+ uint remainder, length;
+ char tmpbuff[FEDERATEDX_QUERY_BUFFER_SIZE];
+ String tmp(tmpbuff, sizeof(tmpbuff), system_charset_info);
+ const key_range *ranges[2]= { start_key, end_key };
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ DBUG_ENTER("ha_federatedx::create_where_from_key");
+
+ tmp.length(0);
+ if (start_key == NULL && end_key == NULL)
+ DBUG_RETURN(1);
+
+ table->in_use->variables.time_zone= UTC;
+ MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->write_set);
+ for (uint i= 0; i <= 1; i++)
+ {
+ KEY_PART_INFO *key_part;
+ if (ranges[i] == NULL)
+ continue;
+
+ if (both_not_null)
+ {
+ if (i > 0)
+ tmp.append(STRING_WITH_LEN(") AND ("));
+ else
+ tmp.append(STRING_WITH_LEN(" ("));
+ }
+
+ for (key_part= key_info->key_part,
+ remainder= key_info->user_defined_key_parts,
+ length= ranges[i]->length,
+ ptr= ranges[i]->key; ;
+ remainder--,
+ key_part++)
+ {
+ Field *field= key_part->field;
+ uint store_length= key_part->store_length;
+ uint part_length= MY_MIN(store_length, length);
+ bool needs_quotes= field->str_needs_quotes();
+ bool reverse= key_part->key_part_flag & HA_REVERSE_SORT;
+ static const LEX_CSTRING lt={STRING_WITH_LEN(" < ") };
+ static const LEX_CSTRING gt={STRING_WITH_LEN(" > ") };
+ static const LEX_CSTRING le={STRING_WITH_LEN(" <= ") };
+ static const LEX_CSTRING ge={STRING_WITH_LEN(" >= ") };
+ DBUG_DUMP("key, start of loop", ptr, length);
+
+ if (key_part->null_bit)
+ {
+ if (*ptr++)
+ {
+ LEX_CSTRING constraint;
+ if (ranges[i]->flag == HA_READ_KEY_EXACT)
+ constraint= {STRING_WITH_LEN(" IS NULL ") };
+ else
+ constraint= {STRING_WITH_LEN(" IS NOT NULL ") };
+ /*
+ We got "IS [NOT] NULL" condition against nullable column. We
+ distinguish between "IS NOT NULL" and "IS NULL" by flag. For
+ "IS NULL", flag is set to HA_READ_KEY_EXACT.
+ */
+ if (emit_key_part_name(&tmp, key_part) ||
+ tmp.append(constraint))
+ goto err;
+ /*
+ We need to adjust pointer and length to be prepared for next
+ key part. As well as check if this was last key part.
+ */
+ goto prepare_for_next_key_part;
+ }
+ }
+
+ if (tmp.append(STRING_WITH_LEN(" (")))
+ goto err;
+
+ switch (ranges[i]->flag) {
+ case HA_READ_KEY_EXACT:
+ DBUG_PRINT("info", ("federatedx HA_READ_KEY_EXACT %d", i));
+ if (store_length >= length ||
+ !needs_quotes ||
+ key_part->type == HA_KEYTYPE_BIT ||
+ field->result_type() != STRING_RESULT)
+ {
+ if (emit_key_part_name(&tmp, key_part))
+ goto err;
+
+ if (tmp.append(STRING_WITH_LEN(" = ")))
+ goto err;
+
+ if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
+ part_length))
+ goto err;
+ }
+ else
+ {
+ /* LIKE */
+ if (emit_key_part_name(&tmp, key_part) ||
+ tmp.append(STRING_WITH_LEN(" LIKE ")) ||
+ emit_key_part_element(&tmp, key_part, needs_quotes, 1, ptr,
+ part_length))
+ goto err;
+ }
+ break;
+ case HA_READ_AFTER_KEY:
+ if (eq_range)
+ {
+ if (tmp.append(STRING_WITH_LEN("1=1"))) // Dummy
+ goto err;
+ break;
+ }
+ DBUG_PRINT("info", ("federatedx HA_READ_AFTER_KEY %d", i));
+ if (store_length >= length || i > 0) /* end key */
+ {
+ if (emit_key_part_name(&tmp, key_part))
+ goto err;
+
+ if (i > 0) /* end key */
+ {
+ if (tmp.append(reverse ? ge : le))
+ goto err;
+ }
+ else /* start key */
+ {
+ if (tmp.append(reverse ? lt : gt))
+ goto err;
+ }
+
+ if (emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
+ part_length))
+ {
+ goto err;
+ }
+ break;
+ }
+ /* fall through */
+ case HA_READ_KEY_OR_NEXT:
+ DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_NEXT %d", i));
+ if (emit_key_part_name(&tmp, key_part) ||
+ tmp.append(reverse ? le : ge) ||
+ emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
+ part_length))
+ goto err;
+ break;
+ case HA_READ_BEFORE_KEY:
+ DBUG_PRINT("info", ("federatedx HA_READ_BEFORE_KEY %d", i));
+ if (store_length >= length)
+ {
+ if (emit_key_part_name(&tmp, key_part) ||
+ tmp.append(reverse ? gt : lt) ||
+ emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
+ part_length))
+ goto err;
+ break;
+ }
+ /* fall through */
+ case HA_READ_KEY_OR_PREV:
+ DBUG_PRINT("info", ("federatedx HA_READ_KEY_OR_PREV %d", i));
+ if (emit_key_part_name(&tmp, key_part) ||
+ tmp.append(reverse ? ge : le) ||
+ emit_key_part_element(&tmp, key_part, needs_quotes, 0, ptr,
+ part_length))
+ goto err;
+ break;
+ default:
+ DBUG_PRINT("info",("cannot handle flag %d", ranges[i]->flag));
+ goto err;
+ }
+ if (tmp.append(STRING_WITH_LEN(") ")))
+ goto err;
+
+prepare_for_next_key_part:
+ if (store_length >= length)
+ break;
+ DBUG_PRINT("info", ("remainder %d", remainder));
+ DBUG_ASSERT(remainder > 1);
+ length-= store_length;
+ /*
+ For nullable columns, null-byte is already skipped before, that is
+ ptr was incremented by 1. Since store_length still counts null-byte,
+ we need to subtract 1 from store_length.
+ */
+ ptr+= store_length - MY_TEST(key_part->null_bit);
+ if (tmp.append(STRING_WITH_LEN(" AND ")))
+ goto err;
+
+ DBUG_PRINT("info",
+ ("create_where_from_key WHERE clause: %s",
+ tmp.c_ptr_quick()));
+ }
+ }
+ dbug_tmp_restore_column_map(&table->write_set, old_map);
+ table->in_use->variables.time_zone= saved_time_zone;
+
+ if (both_not_null)
+ if (tmp.append(STRING_WITH_LEN(") ")))
+ DBUG_RETURN(1);
+
+ if (to->append(STRING_WITH_LEN(" WHERE ")))
+ DBUG_RETURN(1);
+
+ if (to->append(tmp))
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(0);
+
+err:
+ dbug_tmp_restore_column_map(&table->write_set, old_map);
+ table->in_use->variables.time_zone= saved_time_zone;
+ DBUG_RETURN(1);
+}
+
+static void fill_server(MEM_ROOT *mem_root, FEDERATEDX_SERVER *server,
+ FEDERATEDX_SHARE *share, CHARSET_INFO *table_charset)
+{
+ char buffer[STRING_BUFFER_USUAL_SIZE];
+ const char *socket_arg= share->socket ? share->socket : "";
+ const char *password_arg= share->password ? share->password : "";
+
+ String key(buffer, sizeof(buffer), &my_charset_bin);
+ String scheme(share->scheme, strlen(share->scheme), &my_charset_latin1);
+ String hostname(share->hostname, strlen(share->hostname), &my_charset_latin1);
+ String database(share->database, strlen(share->database), system_charset_info);
+ String username(share->username, strlen(share->username), system_charset_info);
+ String socket(socket_arg, strlen(socket_arg), files_charset_info);
+ String password(password_arg, strlen(password_arg), &my_charset_bin);
+ DBUG_ENTER("fill_server");
+
+ /* Do some case conversions */
+ scheme.reserve(scheme.length());
+ scheme.length(my_casedn_str(&my_charset_latin1, scheme.c_ptr_safe()));
+
+ hostname.reserve(hostname.length());
+ hostname.length(my_casedn_str(&my_charset_latin1, hostname.c_ptr_safe()));
+
+ if (lower_case_table_names)
+ {
+ database.reserve(database.length());
+ database.length(my_casedn_str(system_charset_info, database.c_ptr_safe()));
+ }
+
+#ifndef _WIN32
+ /*
+ TODO: there is no unix sockets under windows so the engine should be
+ revised about using sockets in such environment.
+ */
+ if (lower_case_file_system && socket.length())
+ {
+ socket.reserve(socket.length());
+ socket.length(my_casedn_str(files_charset_info, socket.c_ptr_safe()));
+ }
+#endif
+
+ /* start with all bytes zeroed */
+ bzero(server, sizeof(*server));
+
+ key.length(0);
+ key.reserve(scheme.length() + hostname.length() + database.length() +
+ socket.length() + username.length() + password.length() +
+ sizeof(int) + 8);
+ key.append(scheme);
+ key.q_append('\0');
+ server->hostname= (const char *) (intptr) key.length();
+ key.append(hostname);
+ key.q_append('\0');
+ server->database= (const char *) (intptr) key.length();
+ key.append(database);
+ key.q_append('\0');
+ key.q_append((uint32) share->port);
+ server->socket= (const char *) (intptr) key.length();
+ key.append(socket);
+ key.q_append('\0');
+ server->username= (const char *) (intptr) key.length();
+ key.append(username);
+ key.q_append('\0');
+ server->password= (const char *) (intptr) key.length();
+ key.append(password);
+ key.c_ptr_safe(); // Ensure we have end \0
+
+ server->key_length= key.length();
+ /* Copy and add end \0 */
+ server->key= (uchar *) strmake_root(mem_root, key.ptr(), key.length());
+
+ /* pointer magic */
+ server->scheme+= (intptr) server->key;
+ server->hostname+= (intptr) server->key;
+ server->database+= (intptr) server->key;
+ server->username+= (intptr) server->key;
+ server->password+= (intptr) server->key;
+ server->socket+= (intptr) server->key;
+ server->port= share->port;
+
+ if (!share->socket)
+ server->socket= NULL;
+ if (!share->password)
+ server->password= NULL;
+
+ if (table_charset)
+ server->csname= strdup_root(mem_root, table_charset->cs_name.str);
+
+ DBUG_VOID_RETURN;
+}
+
+
+static FEDERATEDX_SERVER *get_server(FEDERATEDX_SHARE *share, TABLE *table)
+{
+ FEDERATEDX_SERVER *server= NULL, tmp_server;
+ MEM_ROOT mem_root;
+ char buffer[STRING_BUFFER_USUAL_SIZE];
+ const char *socket_arg= share->socket ? share->socket : "";
+ const char *password_arg= share->password ? share->password : "";
+
+ String key(buffer, sizeof(buffer), &my_charset_bin);
+ String scheme(share->scheme, strlen(share->scheme), &my_charset_latin1);
+ String hostname(share->hostname, strlen(share->hostname), &my_charset_latin1);
+ String database(share->database, strlen(share->database), system_charset_info);
+ String username(share->username, strlen(share->username), system_charset_info);
+ String socket(socket_arg, strlen(socket_arg), files_charset_info);
+ String password(password_arg, strlen(password_arg), &my_charset_bin);
+ DBUG_ENTER("ha_federated.cc::get_server");
+
+ mysql_mutex_assert_owner(&federatedx_mutex);
+
+ init_alloc_root(PSI_INSTRUMENT_ME, &mem_root, 4096, 4096, MYF(0));
+
+ fill_server(&mem_root, &tmp_server, share, table ? table->s->table_charset : 0);
+
+ if (!(server= (FEDERATEDX_SERVER *) my_hash_search(&federatedx_open_servers,
+ tmp_server.key,
+ tmp_server.key_length)))
+ {
+ if (!table || !tmp_server.csname)
+ goto error;
+
+ if (!(server= (FEDERATEDX_SERVER *) memdup_root(&mem_root,
+ (char *) &tmp_server,
+ sizeof(*server))))
+ goto error;
+
+ server->mem_root= mem_root;
+
+ if (my_hash_insert(&federatedx_open_servers, (uchar*) server))
+ goto error;
+
+ mysql_mutex_init(fe_key_mutex_FEDERATEDX_SERVER_mutex,
+ &server->mutex, MY_MUTEX_INIT_FAST);
+ }
+ else
+ free_root(&mem_root, MYF(0)); /* prevents memory leak */
+
+ server->use_count++;
+
+ DBUG_RETURN(server);
+error:
+ free_root(&mem_root, MYF(0));
+ DBUG_RETURN(NULL);
+}
+
+
+/*
+ Example of simple lock controls. The "share" it creates is structure we will
+ pass to each federatedx handler. Do you have to have one of these? Well, you
+ have pieces that are used for locking, and they are needed to function.
+*/
+
+static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table)
+{
+ char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ Field **field;
+ String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+ FEDERATEDX_SHARE *share= NULL, tmp_share;
+ MEM_ROOT mem_root;
+ DBUG_ENTER("ha_federatedx.cc::get_share");
+
+ /*
+ In order to use this string, we must first zero it's length,
+ or it will contain garbage
+ */
+ query.length(0);
+
+ bzero(&tmp_share, sizeof(tmp_share));
+ init_alloc_root(PSI_INSTRUMENT_ME, &mem_root, 256, 0, MYF(0));
+
+ mysql_mutex_lock(&federatedx_mutex);
+
+ if (unlikely(!UTC))
+ {
+ String tz_00_name(STRING_WITH_LEN("+00:00"), &my_charset_bin);
+ UTC= my_tz_find(current_thd, &tz_00_name);
+ }
+
+ tmp_share.share_key= table_name;
+ tmp_share.share_key_length= (int)strlen(table_name);
+ if (parse_url(&mem_root, &tmp_share, table->s, 0))
+ goto error;
+
+ /* TODO: change tmp_share.scheme to LEX_STRING object */
+ if (!(share= (FEDERATEDX_SHARE *) my_hash_search(&federatedx_open_tables,
+ (uchar*) tmp_share.share_key,
+ tmp_share.
+ share_key_length)))
+ {
+ query.set_charset(system_charset_info);
+ query.append(STRING_WITH_LEN("SELECT "));
+ for (field= table->field; *field; field++)
+ {
+ append_ident(&query, (*field)->field_name.str,
+ (*field)->field_name.length, ident_quote_char);
+ query.append(STRING_WITH_LEN(", "));
+ }
+ /* chops off trailing comma */
+ query.length(query.length() - sizeof_trailing_comma);
+
+ query.append(STRING_WITH_LEN(" FROM "));
+
+ append_ident(&query, tmp_share.table_name,
+ tmp_share.table_name_length, ident_quote_char);
+
+ if (!(share= (FEDERATEDX_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
+ !(share->share_key= (char*) memdup_root(&mem_root, tmp_share.share_key, tmp_share.share_key_length+1)) ||
+ !(share->select_query.str= (char*) strmake_root(&mem_root, query.ptr(), query.length())))
+ goto error;
+ share->select_query.length= query.length();
+
+ share->mem_root= mem_root;
+
+ DBUG_PRINT("info",
+ ("share->select_query %s", share->select_query.str));
+
+ if (!(share->s= get_server(share, table)))
+ goto error;
+
+ if (my_hash_insert(&federatedx_open_tables, (uchar*) share))
+ goto error;
+ thr_lock_init(&share->lock);
+ }
+ else
+ free_root(&mem_root, MYF(0)); /* prevents memory leak */
+
+ share->use_count++;
+ mysql_mutex_unlock(&federatedx_mutex);
+
+ DBUG_RETURN(share);
+
+error:
+ mysql_mutex_unlock(&federatedx_mutex);
+ free_root(&mem_root, MYF(0));
+ DBUG_RETURN(NULL);
+}
+
+
+static federatedx_txn zero_txn;
+static int free_server(federatedx_txn *txn, FEDERATEDX_SERVER *server)
+{
+ bool destroy;
+ DBUG_ENTER("free_server");
+
+ mysql_mutex_lock(&federatedx_mutex);
+ if ((destroy= !--server->use_count))
+ my_hash_delete(&federatedx_open_servers, (uchar*) server);
+ mysql_mutex_unlock(&federatedx_mutex);
+
+ if (destroy)
+ {
+ MEM_ROOT mem_root;
+
+ if (!txn)
+ txn= &zero_txn;
+
+ txn->close(server);
+
+ DBUG_ASSERT(server->io_count == 0);
+
+ mysql_mutex_destroy(&server->mutex);
+ mem_root= server->mem_root;
+ free_root(&mem_root, MYF(0));
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Free lock controls. We call this whenever we close a table.
+ If the table had the last reference to the share then we
+ free memory associated with it.
+*/
+
+static void free_share(federatedx_txn *txn, FEDERATEDX_SHARE *share)
+{
+ bool destroy;
+ DBUG_ENTER("free_share");
+
+ mysql_mutex_lock(&federatedx_mutex);
+ if ((destroy= !--share->use_count))
+ my_hash_delete(&federatedx_open_tables, (uchar*) share);
+ mysql_mutex_unlock(&federatedx_mutex);
+
+ if (destroy)
+ {
+ MEM_ROOT mem_root;
+ FEDERATEDX_SERVER *server= share->s;
+
+ thr_lock_delete(&share->lock);
+
+ mem_root= share->mem_root;
+ free_root(&mem_root, MYF(0));
+
+ free_server(txn, server);
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+
+ha_rows ha_federatedx::records_in_range(uint inx,
+ const key_range *start_key,
+ const key_range *end_key,
+ page_range *pages)
+{
+ /*
+
+ We really want indexes to be used as often as possible, therefore
+ we just need to hard-code the return value to a very low number to
+ force the issue
+
+*/
+ DBUG_ENTER("ha_federatedx::records_in_range");
+ DBUG_RETURN(FEDERATEDX_RECORDS_IN_RANGE);
+}
+
+federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create)
+{
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
+ if (!txn && !no_create)
+ {
+ txn= new federatedx_txn();
+ thd_set_ha_data(thd, federatedx_hton, txn);
+ }
+ return txn;
+}
+
+
+int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd)
+{
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
+ delete txn;
+ return 0;
+}
+
+
+/*
+ Used for opening tables. The name will be the name of the file.
+ A table is opened when it needs to be opened. For instance
+ when a request comes in for a select on the table (tables are not
+ open and closed for each request, they are cached).
+
+ Called from handler.cc by handler::ha_open(). The server opens
+ all tables by calling ha_open() which then calls the handler
+ specific open().
+*/
+
+int ha_federatedx::open(const char *name, int mode, uint test_if_locked)
+{
+ int error;
+ THD *thd= ha_thd();
+ DBUG_ENTER("ha_federatedx::open");
+
+ if (!(share= get_share(name, table)))
+ DBUG_RETURN(1);
+ thr_lock_data_init(&share->lock, &lock, NULL);
+
+ DBUG_ASSERT(io == NULL);
+
+ txn= get_txn(thd);
+
+ if ((error= txn->acquire(share, thd, TRUE, &io)))
+ {
+ free_share(txn, share);
+ DBUG_RETURN(error);
+ }
+
+ ref_length= (uint)io->get_ref_length();
+
+ txn->release(&io);
+
+ DBUG_PRINT("info", ("ref_length: %u", ref_length));
+
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4, MYF(0));
+
+ reset();
+
+ DBUG_RETURN(0);
+}
+
+class Net_error_handler : public Internal_error_handler
+{
+public:
+ Net_error_handler() = default;
+
+public:
+ bool handle_condition(THD *thd, uint sql_errno, const char* sqlstate,
+ Sql_condition::enum_warning_level *level,
+ const char* msg, Sql_condition ** cond_hdl)
+ {
+ return sql_errno >= ER_ABORTING_CONNECTION &&
+ sql_errno <= ER_NET_WRITE_INTERRUPTED;
+ }
+};
+
+/*
+ Closes a table. We call the free_share() function to free any resources
+ that we have allocated in the "shared" structure.
+
+ Called from sql_base.cc, sql_select.cc, and table.cc.
+ In sql_select.cc it is only used to close up temporary tables or during
+ the process where a temporary table is converted over to being a
+ myisam table.
+ For sql_base.cc look at close_data_tables().
+*/
+
+int ha_federatedx::close(void)
+{
+ int retval= 0;
+ THD *thd= ha_thd();
+ DBUG_ENTER("ha_federatedx::close");
+
+ /* free the result set */
+ reset();
+
+ delete_dynamic(&results);
+
+ /* Disconnect from mysql */
+ if (!thd || !(txn= get_txn(thd, true)))
+ txn= &zero_txn;
+
+ txn->release(&io);
+ DBUG_ASSERT(io == NULL);
+
+ Net_error_handler err_handler;
+ if (thd)
+ thd->push_internal_handler(&err_handler);
+ free_share(txn, share);
+ if (thd)
+ thd->pop_internal_handler();
+
+ DBUG_RETURN(retval);
+}
+
+/*
+
+ Checks if a field in a record is SQL NULL.
+
+ SYNOPSIS
+ field_in_record_is_null()
+ table TABLE pointer, MySQL table object
+ field Field pointer, MySQL field object
+ record char pointer, contains record
+
+ DESCRIPTION
+ This method uses the record format information in table to track
+ the null bit in record.
+
+ RETURN VALUE
+ 1 if NULL
+ 0 otherwise
+*/
+
+static inline uint field_in_record_is_null(TABLE *table, Field *field,
+ char *record)
+{
+ int null_offset;
+ DBUG_ENTER("ha_federatedx::field_in_record_is_null");
+
+ if (!field->null_ptr)
+ DBUG_RETURN(0);
+
+ null_offset= (uint) ((char*)field->null_ptr - (char*)table->record[0]);
+
+ if (record[null_offset] & field->null_bit)
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(0);
+}
+
+
+/**
+ @brief Construct the INSERT statement.
+
+ @details This method will construct the INSERT statement and appends it to
+ the supplied query string buffer.
+
+ @return
+ @retval FALSE No error
+ @retval TRUE Failure
+*/
+
+bool ha_federatedx::append_stmt_insert(String *query)
+{
+ char insert_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ Field **field;
+ uint tmp_length;
+ bool added_field= FALSE;
+
+ /* The main insert query string */
+ String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+ DBUG_ENTER("ha_federatedx::append_stmt_insert");
+
+ insert_string.length(0);
+
+ if (replace_duplicates)
+ insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
+ else if (ignore_duplicates && !insert_dup_update)
+ insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
+ else
+ insert_string.append(STRING_WITH_LEN("INSERT INTO "));
+ append_ident(&insert_string, share->table_name, share->table_name_length,
+ ident_quote_char);
+ tmp_length= insert_string.length();
+ insert_string.append(STRING_WITH_LEN(" ("));
+
+ /*
+ loop through the field pointer array, add any fields to both the values
+ list and the fields list that match the current query id
+ */
+ for (field= table->field; *field; field++)
+ {
+ if (bitmap_is_set(table->write_set, (*field)->field_index))
+ {
+ /* append the field name */
+ append_ident(&insert_string, (*field)->field_name.str,
+ (*field)->field_name.length, ident_quote_char);
+
+ /* append commas between both fields and fieldnames */
+ /*
+ unfortunately, we can't use the logic if *(fields + 1) to
+ make the following appends conditional as we don't know if the
+ next field is in the write set
+ */
+ insert_string.append(STRING_WITH_LEN(", "));
+ added_field= TRUE;
+ }
+ }
+
+ if (added_field)
+ {
+ /* Remove trailing comma. */
+ insert_string.length(insert_string.length() - sizeof_trailing_comma);
+ insert_string.append(STRING_WITH_LEN(") "));
+ }
+ else
+ {
+ /* If there were no fields, we don't want to add a closing paren. */
+ insert_string.length(tmp_length);
+ }
+
+ insert_string.append(STRING_WITH_LEN(" VALUES "));
+
+ DBUG_RETURN(query->append(insert_string));
+}
+
+
+/*
+ write_row() inserts a row. No extra() hint is given currently if a bulk load
+ is happeneding. buf() is a byte array of data. You can use the field
+ information to extract the data from the native byte array type.
+ Example of this would be:
+ for (Field **field=table->field ; *field ; field++)
+ {
+ ...
+ }
+
+ Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
+ sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
+*/
+
+int ha_federatedx::write_row(const uchar *buf)
+{
+ char values_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
+ Field **field;
+ uint tmp_length;
+ int error= 0;
+ bool use_bulk_insert;
+ bool auto_increment_update_required= (table->next_number_field != NULL);
+
+ /* The string containing the values to be added to the insert */
+ String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
+ /* The actual value of the field, to be added to the values_string */
+ String insert_field_value_string(insert_field_value_buffer,
+ sizeof(insert_field_value_buffer),
+ &my_charset_bin);
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->read_set);
+ DBUG_ENTER("ha_federatedx::write_row");
+
+ table->in_use->variables.time_zone= UTC;
+ values_string.length(0);
+ insert_field_value_string.length(0);
+
+ /*
+ start both our field and field values strings
+ We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
+ Ignore duplicates is always true when insert_dup_update is true.
+ When replace_duplicates == TRUE, we can safely enable multi-row insert.
+ When performing multi-row insert, we only collect the columns values for
+ the row. The start of the statement is only created when the first
+ row is copied in to the bulk_insert string.
+ */
+ if (!(use_bulk_insert= bulk_insert.str &&
+ (!insert_dup_update || replace_duplicates)))
+ append_stmt_insert(&values_string);
+
+ values_string.append(STRING_WITH_LEN(" ("));
+ tmp_length= values_string.length();
+
+ /*
+ loop through the field pointer array, add any fields to both the values
+ list and the fields list that is part of the write set
+ */
+ for (field= table->field; *field; field++)
+ {
+ if (bitmap_is_set(table->write_set, (*field)->field_index))
+ {
+ if ((*field)->is_null())
+ values_string.append(STRING_WITH_LEN(" NULL "));
+ else
+ {
+ bool needs_quote= (*field)->str_needs_quotes();
+ (*field)->val_str(&insert_field_value_string);
+ if (needs_quote)
+ values_string.append(value_quote_char);
+ insert_field_value_string.print(&values_string);
+ if (needs_quote)
+ values_string.append(value_quote_char);
+
+ insert_field_value_string.length(0);
+ }
+
+ /* append commas between both fields and fieldnames */
+ /*
+ unfortunately, we can't use the logic if *(fields + 1) to
+ make the following appends conditional as we don't know if the
+ next field is in the write set
+ */
+ values_string.append(STRING_WITH_LEN(", "));
+ }
+ }
+ dbug_tmp_restore_column_map(&table->read_set, old_map);
+ table->in_use->variables.time_zone= saved_time_zone;
+
+ /*
+ if there were no fields, we don't want to add a closing paren
+ AND, we don't want to chop off the last char '('
+ insert will be "INSERT INTO t1 VALUES ();"
+ */
+ if (values_string.length() > tmp_length)
+ {
+ /* chops off trailing comma */
+ values_string.length(values_string.length() - sizeof_trailing_comma);
+ }
+ /* we always want to append this, even if there aren't any fields */
+ values_string.append(STRING_WITH_LEN(") "));
+
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
+ DBUG_RETURN(error);
+
+ if (use_bulk_insert)
+ {
+ /*
+ Send the current bulk insert out if appending the current row would
+ cause the statement to overflow the packet size, otherwise set
+ auto_increment_update_required to FALSE as no query was executed.
+ */
+ if (bulk_insert.length + values_string.length() + bulk_padding >
+ io->max_query_size() && bulk_insert.length)
+ {
+ error= io->query(bulk_insert.str, bulk_insert.length);
+ bulk_insert.length= 0;
+ }
+ else
+ auto_increment_update_required= FALSE;
+
+ if (bulk_insert.length == 0)
+ {
+ char insert_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ String insert_string(insert_buffer, sizeof(insert_buffer),
+ &my_charset_bin);
+ insert_string.length(0);
+ append_stmt_insert(&insert_string);
+ dynstr_append_mem(&bulk_insert, insert_string.ptr(),
+ insert_string.length());
+ }
+ else
+ dynstr_append_mem(&bulk_insert, ",", 1);
+
+ dynstr_append_mem(&bulk_insert, values_string.ptr(),
+ values_string.length());
+ }
+ else
+ {
+ error= io->query(values_string.ptr(), values_string.length());
+ }
+
+ if (error)
+ {
+ DBUG_RETURN(stash_remote_error());
+ }
+ /*
+ If the table we've just written a record to contains an auto_increment
+ field, then store the last_insert_id() value from the foreign server
+ */
+ if (auto_increment_update_required)
+ {
+ update_auto_increment();
+
+ /* mysql_insert() uses this for protocol return value */
+ table->next_number_field->store(stats.auto_increment_value, 1);
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+/**
+ @brief Prepares the storage engine for bulk inserts.
+
+ @param[in] rows estimated number of rows in bulk insert
+ or 0 if unknown.
+
+ @details Initializes memory structures required for bulk insert.
+*/
+
+void ha_federatedx::start_bulk_insert(ha_rows rows, uint flags)
+{
+ uint page_size;
+ DBUG_ENTER("ha_federatedx::start_bulk_insert");
+
+ dynstr_free(&bulk_insert);
+
+ /**
+ We don't bother with bulk-insert semantics when the estimated rows == 1
+ The rows value will be 0 if the server does not know how many rows
+ would be inserted. This can occur when performing INSERT...SELECT
+ */
+
+ if (rows == 1)
+ DBUG_VOID_RETURN;
+
+ /*
+ Make sure we have an open connection so that we know the
+ maximum packet size.
+ */
+ if (txn->acquire(share, ha_thd(), FALSE, &io))
+ DBUG_VOID_RETURN;
+
+ page_size= (uint) my_getpagesize();
+
+ if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
+ DBUG_VOID_RETURN;
+
+ bulk_insert.length= 0;
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief End bulk insert.
+
+ @details This method will send any remaining rows to the remote server.
+ Finally, it will deinitialize the bulk insert data structure.
+
+ @return Operation status
+ @retval 0 No error
+ @retval != 0 Error occurred at remote server. Also sets my_errno.
+*/
+
+int ha_federatedx::end_bulk_insert()
+{
+ int error= 0;
+ DBUG_ENTER("ha_federatedx::end_bulk_insert");
+
+ if (bulk_insert.str && bulk_insert.length && !table_will_be_deleted)
+ {
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
+ DBUG_RETURN(error);
+ if (io->query(bulk_insert.str, bulk_insert.length))
+ error= stash_remote_error();
+ else
+ if (table->next_number_field)
+ update_auto_increment();
+ }
+
+ dynstr_free(&bulk_insert);
+
+ DBUG_RETURN(my_errno= error);
+}
+
+
+/*
+ ha_federatedx::update_auto_increment
+
+ This method ensures that last_insert_id() works properly. What it simply does
+ is calls last_insert_id() on the foreign database immediately after insert
+ (if the table has an auto_increment field) and sets the insert id via
+ thd->insert_id(ID)).
+*/
+void ha_federatedx::update_auto_increment(void)
+{
+ THD *thd= ha_thd();
+ DBUG_ENTER("ha_federatedx::update_auto_increment");
+
+ ha_federatedx::info(HA_STATUS_AUTO);
+ thd->first_successful_insert_id_in_cur_stmt=
+ stats.auto_increment_value;
+ DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
+
+ DBUG_VOID_RETURN;
+}
+
+int ha_federatedx::optimize(THD* thd, HA_CHECK_OPT* check_opt)
+{
+ int error= 0;
+ char query_buffer[STRING_BUFFER_USUAL_SIZE];
+ String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+ DBUG_ENTER("ha_federatedx::optimize");
+
+ query.length(0);
+
+ query.set_charset(system_charset_info);
+ query.append(STRING_WITH_LEN("OPTIMIZE TABLE "));
+ append_ident(&query, share->table_name, share->table_name_length,
+ ident_quote_char);
+
+ DBUG_ASSERT(txn == get_txn(thd));
+
+ if ((error= txn->acquire(share, thd, FALSE, &io)))
+ DBUG_RETURN(error);
+
+ if (io->query(query.ptr(), query.length()))
+ error= stash_remote_error();
+
+ DBUG_RETURN(error);
+}
+
+
+int ha_federatedx::repair(THD* thd, HA_CHECK_OPT* check_opt)
+{
+ int error= 0;
+ char query_buffer[STRING_BUFFER_USUAL_SIZE];
+ String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+ DBUG_ENTER("ha_federatedx::repair");
+
+ query.length(0);
+
+ query.set_charset(system_charset_info);
+ query.append(STRING_WITH_LEN("REPAIR TABLE "));
+ append_ident(&query, share->table_name, share->table_name_length,
+ ident_quote_char);
+ if (check_opt->flags & T_QUICK)
+ query.append(STRING_WITH_LEN(" QUICK"));
+ if (check_opt->flags & T_EXTEND)
+ query.append(STRING_WITH_LEN(" EXTENDED"));
+ if (check_opt->sql_flags & TT_USEFRM)
+ query.append(STRING_WITH_LEN(" USE_FRM"));
+
+ DBUG_ASSERT(txn == get_txn(thd));
+
+ if ((error= txn->acquire(share, thd, FALSE, &io)))
+ DBUG_RETURN(error);
+
+ if (io->query(query.ptr(), query.length()))
+ error= stash_remote_error();
+
+ DBUG_RETURN(error);
+}
+
+
+/*
+ Yes, update_row() does what you expect, it updates a row. old_data will have
+ the previous row record in it, while new_data will have the newest data in
+ it.
+
+ Keep in mind that the server can do updates based on ordering if an ORDER BY
+ clause was used. Consecutive ordering is not guaranteed.
+ Currently new_data will not have an updated auto_increament record, or
+ and updated timestamp field. You can do these for federatedx by doing these:
+ if (table->timestamp_on_update_now)
+ update_timestamp(new_row+table->timestamp_on_update_now-1);
+ if (table->next_number_field && record == table->record[0])
+ update_auto_increment();
+
+ Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
+*/
+
+int ha_federatedx::update_row(const uchar *old_data, const uchar *new_data)
+{
+ /*
+ This used to control how the query was built. If there was a
+ primary key, the query would be built such that there was a where
+ clause with only that column as the condition. This is flawed,
+ because if we have a multi-part primary key, it would only use the
+ first part! We don't need to do this anyway, because
+ read_range_first will retrieve the correct record, which is what
+ is used to build the WHERE clause. We can however use this to
+ append a LIMIT to the end if there is NOT a primary key. Why do
+ this? Because we only are updating one record, and LIMIT enforces
+ this.
+ */
+ bool has_a_primary_key= MY_TEST(table->s->primary_key != MAX_KEY);
+
+ /*
+ buffers for following strings
+ */
+ char field_value_buffer[STRING_BUFFER_USUAL_SIZE];
+ char update_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ char where_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+
+ /* Work area for field values */
+ String field_value(field_value_buffer, sizeof(field_value_buffer),
+ &my_charset_bin);
+ /* stores the update query */
+ String update_string(update_buffer,
+ sizeof(update_buffer),
+ &my_charset_bin);
+ /* stores the WHERE clause */
+ String where_string(where_buffer,
+ sizeof(where_buffer),
+ &my_charset_bin);
+ uchar *record= table->record[0];
+ int error;
+ DBUG_ENTER("ha_federatedx::update_row");
+ /*
+ set string lengths to 0 to avoid misc chars in string
+ */
+ field_value.length(0);
+ update_string.length(0);
+ where_string.length(0);
+
+ if (ignore_duplicates)
+ update_string.append(STRING_WITH_LEN("UPDATE IGNORE "));
+ else
+ update_string.append(STRING_WITH_LEN("UPDATE "));
+ append_ident(&update_string, share->table_name,
+ share->table_name_length, ident_quote_char);
+ update_string.append(STRING_WITH_LEN(" SET "));
+
+ /*
+ In this loop, we want to match column names to values being inserted
+ (while building INSERT statement).
+
+ Iterate through table->field (new data) and share->old_field (old_data)
+ using the same index to create an SQL UPDATE statement. New data is
+ used to create SET field=value and old data is used to create WHERE
+ field=oldvalue
+ */
+
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ table->in_use->variables.time_zone= UTC;
+ for (Field **field= table->field; *field; field++)
+ {
+ if (bitmap_is_set(table->write_set, (*field)->field_index))
+ {
+ append_ident(&update_string, (*field)->field_name.str,
+ (*field)->field_name.length,
+ ident_quote_char);
+ update_string.append(STRING_WITH_LEN(" = "));
+
+ if ((*field)->is_null())
+ update_string.append(STRING_WITH_LEN(" NULL "));
+ else
+ {
+ /* otherwise = */
+ MY_BITMAP *old_map= tmp_use_all_columns(table, &table->read_set);
+ bool needs_quote= (*field)->str_needs_quotes();
+ (*field)->val_str(&field_value);
+ if (needs_quote)
+ update_string.append(value_quote_char);
+ field_value.print(&update_string);
+ if (needs_quote)
+ update_string.append(value_quote_char);
+ field_value.length(0);
+ tmp_restore_column_map(&table->read_set, old_map);
+ }
+ update_string.append(STRING_WITH_LEN(", "));
+ }
+
+ if (bitmap_is_set(table->read_set, (*field)->field_index))
+ {
+ append_ident(&where_string, (*field)->field_name.str,
+ (*field)->field_name.length,
+ ident_quote_char);
+ if (field_in_record_is_null(table, *field, (char*) old_data))
+ where_string.append(STRING_WITH_LEN(" IS NULL "));
+ else
+ {
+ bool needs_quote= (*field)->str_needs_quotes();
+ where_string.append(STRING_WITH_LEN(" = "));
+ (*field)->val_str(&field_value,
+ (old_data + (*field)->offset(record)));
+ if (needs_quote)
+ where_string.append(value_quote_char);
+ field_value.print(&where_string);
+ if (needs_quote)
+ where_string.append(value_quote_char);
+ field_value.length(0);
+ }
+ where_string.append(STRING_WITH_LEN(" AND "));
+ }
+ }
+ table->in_use->variables.time_zone= saved_time_zone;
+
+ /* Remove last ', '. This works as there must be at least on updated field */
+ update_string.length(update_string.length() - sizeof_trailing_comma);
+
+ if (where_string.length())
+ {
+ /* chop off trailing AND */
+ where_string.length(where_string.length() - sizeof_trailing_and);
+ update_string.append(STRING_WITH_LEN(" WHERE "));
+ update_string.append(where_string);
+ }
+
+ /*
+ If this table has not a primary key, then we could possibly
+ update multiple rows. We want to make sure to only update one!
+ */
+ if (!has_a_primary_key)
+ update_string.append(STRING_WITH_LEN(" LIMIT 1"));
+
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
+ DBUG_RETURN(error);
+
+ if (io->query(update_string.ptr(), update_string.length()))
+ {
+ DBUG_RETURN(stash_remote_error());
+ }
+ DBUG_RETURN(0);
+}
+
+/*
+ This will delete a row. 'buf' will contain a copy of the row to be =deleted.
+ The server will call this right after the current row has been called (from
+ either a previous rnd_next() or index call).
+ If you keep a pointer to the last row or can access a primary key it will
+ make doing the deletion quite a bit easier.
+ Keep in mind that the server does no guarentee consecutive deletions.
+ ORDER BY clauses can be used.
+
+ Called in sql_acl.cc and sql_udf.cc to manage internal table information.
+ Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select
+ it is used for removing duplicates while in insert it is used for REPLACE
+ calls.
+*/
+
+int ha_federatedx::delete_row(const uchar *buf)
+{
+ char delete_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ char data_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
+ String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
+ uint found= 0;
+ int error;
+ DBUG_ENTER("ha_federatedx::delete_row");
+
+ delete_string.length(0);
+ delete_string.append(STRING_WITH_LEN("DELETE FROM "));
+ append_ident(&delete_string, share->table_name,
+ share->table_name_length, ident_quote_char);
+ delete_string.append(STRING_WITH_LEN(" WHERE "));
+
+ Time_zone *saved_time_zone= table->in_use->variables.time_zone;
+ table->in_use->variables.time_zone= UTC;
+ for (Field **field= table->field; *field; field++)
+ {
+ Field *cur_field= *field;
+ found++;
+ if (bitmap_is_set(table->read_set, cur_field->field_index))
+ {
+ append_ident(&delete_string, (*field)->field_name.str,
+ (*field)->field_name.length, ident_quote_char);
+ data_string.length(0);
+ if (cur_field->is_null())
+ {
+ delete_string.append(STRING_WITH_LEN(" IS NULL "));
+ }
+ else
+ {
+ bool needs_quote= cur_field->str_needs_quotes();
+ delete_string.append(STRING_WITH_LEN(" = "));
+ cur_field->val_str(&data_string);
+ if (needs_quote)
+ delete_string.append(value_quote_char);
+ data_string.print(&delete_string);
+ if (needs_quote)
+ delete_string.append(value_quote_char);
+ }
+ delete_string.append(STRING_WITH_LEN(" AND "));
+ }
+ }
+ table->in_use->variables.time_zone= saved_time_zone;
+
+ // Remove trailing AND
+ delete_string.length(delete_string.length() - sizeof_trailing_and);
+ if (!found)
+ delete_string.length(delete_string.length() - sizeof_trailing_where);
+
+ delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
+ DBUG_PRINT("info",
+ ("Delete sql: %s", delete_string.c_ptr_quick()));
+
+ if ((error= txn->acquire(share, ha_thd(), FALSE, &io)))
+ DBUG_RETURN(error);
+
+ if (io->query(delete_string.ptr(), delete_string.length()))
+ {
+ DBUG_RETURN(stash_remote_error());
+ }
+ stats.deleted+= (ha_rows) io->affected_rows();
+ stats.records-= (ha_rows) io->affected_rows();
+ DBUG_PRINT("info",
+ ("rows deleted %ld rows deleted for all time %ld",
+ (long) io->affected_rows(), (long) stats.deleted));
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Positions an index cursor to the index specified in the handle. Fetches the
+ row if available. If the key value is null, begin at the first key of the
+ index. This method, which is called in the case of an SQL statement having
+ a WHERE clause on a non-primary key index, simply calls index_read_idx.
+*/
+
+int ha_federatedx::index_read(uchar *buf, const uchar *key,
+ uint key_len, ha_rkey_function find_flag)
+{
+ DBUG_ENTER("ha_federatedx::index_read");
+
+ if (stored_result)
+ (void) free_result();
+ DBUG_RETURN(index_read_idx_with_result_set(buf, active_index, key,
+ key_len, find_flag,
+ &stored_result));
+}
+
+
+/*
+ Positions an index cursor to the index specified in key. Fetches the
+ row if any. This is only used to read whole keys.
+
+ This method is called via index_read in the case of a WHERE clause using
+ a primary key index OR is called DIRECTLY when the WHERE clause
+ uses a PRIMARY KEY index.
+
+ NOTES
+ This uses an internal result set that is deleted before function
+ returns. We need to be able to be callable from ha_rnd_pos()
+*/
+
+int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key,
+ uint key_len, enum ha_rkey_function find_flag)
+{
+ int retval;
+ FEDERATEDX_IO_RESULT *io_result= 0;
+ DBUG_ENTER("ha_federatedx::index_read_idx");
+
+ if ((retval= index_read_idx_with_result_set(buf, index, key,
+ key_len, find_flag,
+ &io_result)))
+ DBUG_RETURN(retval);
+ /* io is correct, as index_read_idx_with_result_set was ok */
+ io->free_result(io_result);
+ DBUG_RETURN(retval);
+}
+
+
+/*
+ Create result set for rows matching query and return first row
+
+ RESULT
+ 0 ok In this case *result will contain the result set
+ # error In this case *result will contain 0
+*/
+
+int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index,
+ const uchar *key,
+ uint key_len,
+ ha_rkey_function find_flag,
+ FEDERATEDX_IO_RESULT **result)
+{
+ int retval;
+ char error_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ char index_value[STRING_BUFFER_USUAL_SIZE];
+ char sql_query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ String index_string(index_value,
+ sizeof(index_value),
+ &my_charset_bin);
+ String sql_query(sql_query_buffer,
+ sizeof(sql_query_buffer),
+ &my_charset_bin);
+ key_range range;
+ DBUG_ENTER("ha_federatedx::index_read_idx_with_result_set");
+
+ *result= 0; // In case of errors
+ index_string.length(0);
+ sql_query.length(0);
+
+ sql_query.append(share->select_query);
+
+ range.key= key;
+ range.length= key_len;
+ range.flag= find_flag;
+ create_where_from_key(&index_string, &table->key_info[index], &range, 0, 0);
+ sql_query.append(index_string);
+
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
+ DBUG_RETURN(retval);
+
+ if (io->query(sql_query.ptr(), sql_query.length()))
+ {
+ snprintf(error_buffer, sizeof(error_buffer), "error: %d '%s'",
+ io->error_code(), io->error_str());
+ retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
+ goto error;
+ }
+ if (!(*result= io->store_result()))
+ {
+ retval= HA_ERR_END_OF_FILE;
+ goto error;
+ }
+ if (!(retval= read_next(buf, *result)))
+ DBUG_RETURN(retval);
+
+ insert_dynamic(&results, (uchar*) result);
+ *result= 0;
+ DBUG_RETURN(retval);
+
+error:
+ my_error(retval, MYF(0), error_buffer);
+ DBUG_RETURN(retval);
+}
+
+
+/*
+ This method is used exlusevely by filesort() to check if we
+ can create sorting buffers of necessary size.
+ If the handler returns more records that it declares
+ here server can just crash on filesort().
+ We cannot guarantee that's not going to happen with
+ the FEDERATEDX engine, as we have records==0 always if the
+ client is a VIEW, and for the table the number of
+ records can inpredictably change during execution.
+ So we return maximum possible value here.
+*/
+
+ha_rows ha_federatedx::estimate_rows_upper_bound()
+{
+ return HA_POS_ERROR;
+}
+
+
+/* Initialized at each key walk (called multiple times unlike rnd_init()) */
+
+int ha_federatedx::index_init(uint keynr, bool sorted)
+{
+ DBUG_ENTER("ha_federatedx::index_init");
+ DBUG_PRINT("info", ("table: '%s' key: %u", table->s->table_name.str, keynr));
+ active_index= keynr;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Read first range
+*/
+
+int ha_federatedx::read_range_first(const key_range *start_key,
+ const key_range *end_key,
+ bool eq_range_arg, bool sorted)
+{
+ char sql_query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ int retval;
+ String sql_query(sql_query_buffer,
+ sizeof(sql_query_buffer),
+ &my_charset_bin);
+ DBUG_ENTER("ha_federatedx::read_range_first");
+
+ DBUG_ASSERT(!(start_key == NULL && end_key == NULL));
+
+ sql_query.length(0);
+ sql_query.append(share->select_query);
+ create_where_from_key(&sql_query, &table->key_info[active_index],
+ start_key, end_key, eq_range_arg);
+
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
+ DBUG_RETURN(retval);
+
+ if (stored_result)
+ (void) free_result();
+
+ if (io->query(sql_query.ptr(), sql_query.length()))
+ {
+ retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
+ goto error;
+ }
+ sql_query.length(0);
+
+ if (!(stored_result= io->store_result()))
+ {
+ retval= HA_ERR_END_OF_FILE;
+ goto error;
+ }
+
+ retval= read_next(table->record[0], stored_result);
+ DBUG_RETURN(retval);
+
+error:
+ DBUG_RETURN(retval);
+}
+
+
+int ha_federatedx::read_range_next()
+{
+ int retval;
+ DBUG_ENTER("ha_federatedx::read_range_next");
+ retval= rnd_next(table->record[0]);
+ DBUG_RETURN(retval);
+}
+
+
+/* Used to read forward through the index. */
+int ha_federatedx::index_next(uchar *buf)
+{
+ DBUG_ENTER("ha_federatedx::index_next");
+ int retval=read_next(buf, stored_result);
+ DBUG_RETURN(retval);
+}
+
+
+/*
+ rnd_init() is called when the system wants the storage engine to do a table
+ scan.
+
+ This is the method that gets data for the SELECT calls.
+
+ See the federatedx in the introduction at the top of this file to see when
+ rnd_init() is called.
+
+ Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
+ sql_table.cc, and sql_update.cc.
+*/
+
+int ha_federatedx::rnd_init(bool scan)
+{
+ DBUG_ENTER("ha_federatedx::rnd_init");
+ /*
+ The use of the 'scan' flag is incredibly important for this handler
+ to work properly, especially with updates containing WHERE clauses
+ using indexed columns.
+
+ When the initial query contains a WHERE clause of the query using an
+ indexed column, it's index_read_idx that selects the exact record from
+ the foreign database.
+
+ When there is NO index in the query, either due to not having a WHERE
+ clause, or the WHERE clause is using columns that are not indexed, a
+ 'full table scan' done by rnd_init, which in this situation simply means
+ a 'select * from ...' on the foreign table.
+
+ In other words, this 'scan' flag gives us the means to ensure that if
+ there is an index involved in the query, we want index_read_idx to
+ retrieve the exact record (scan flag is 0), and do not want rnd_init
+ to do a 'full table scan' and wipe out that result set.
+
+ Prior to using this flag, the problem was most apparent with updates.
+
+ An initial query like 'UPDATE tablename SET anything = whatever WHERE
+ indexedcol = someval', index_read_idx would get called, using a query
+ constructed with a WHERE clause built from the values of index ('indexcol'
+ in this case, having a value of 'someval'). mysql_store_result would
+ then get called (this would be the result set we want to use).
+
+ After this rnd_init (from sql_update.cc) would be called, it would then
+ unecessarily call "select * from table" on the foreign table, then call
+ mysql_store_result, which would wipe out the correct previous result set
+ from the previous call of index_read_idx's that had the result set
+ containing the correct record, hence update the wrong row!
+
+ */
+
+ if (scan)
+ {
+ int error;
+
+ if ((error= txn->acquire(share, ha_thd(), TRUE, &io)))
+ DBUG_RETURN(error);
+
+ if (stored_result)
+ (void) free_result();
+
+ if (io->query(share->select_query.str, share->select_query.length))
+ goto error;
+
+ stored_result= io->store_result();
+ if (!stored_result)
+ goto error;
+ }
+ DBUG_RETURN(0);
+
+error:
+ DBUG_RETURN(stash_remote_error());
+}
+
+
+int ha_federatedx::rnd_end()
+{
+ DBUG_ENTER("ha_federatedx::rnd_end");
+ DBUG_RETURN(index_end());
+}
+
+
+int ha_federatedx::free_result()
+{
+ int error;
+ DBUG_ENTER("ha_federatedx::free_result");
+ DBUG_ASSERT(stored_result);
+ for (uint i= 0; i < results.elements; ++i)
+ {
+ FEDERATEDX_IO_RESULT *result= 0;
+ get_dynamic(&results, (uchar*) &result, i);
+ if (result == stored_result)
+ goto end;
+ }
+ if (position_called)
+ {
+ insert_dynamic(&results, (uchar*) &stored_result);
+ }
+ else
+ {
+ federatedx_io *tmp_io= 0, **iop;
+ if (!*(iop= &io) && (error= txn->acquire(share, ha_thd(), TRUE, (iop= &tmp_io))))
+ {
+ DBUG_ASSERT(0); // Fail when testing
+ insert_dynamic(&results, (uchar*) &stored_result);
+ goto end;
+ }
+ (*iop)->free_result(stored_result);
+ txn->release(&tmp_io);
+ }
+end:
+ stored_result= 0;
+ position_called= FALSE;
+ DBUG_RETURN(0);
+}
+
+int ha_federatedx::index_end(void)
+{
+ int error= 0;
+ DBUG_ENTER("ha_federatedx::index_end");
+ if (stored_result)
+ error= free_result();
+ active_index= MAX_KEY;
+ DBUG_RETURN(error);
+}
+
+
+/*
+ This is called for each row of the table scan. When you run out of records
+ you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
+ The Field structure for the table is the key to getting data into buf
+ in a manner that will allow the server to understand it.
+
+ Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
+ sql_table.cc, and sql_update.cc.
+*/
+
+int ha_federatedx::rnd_next(uchar *buf)
+{
+ DBUG_ENTER("ha_federatedx::rnd_next");
+
+ if (stored_result == 0)
+ {
+ /*
+ Return value of rnd_init is not always checked (see records.cc),
+ so we can get here _even_ if there is _no_ pre-fetched result-set!
+ TODO: fix it. We can delete this in 5.1 when rnd_init() is checked.
+ */
+ DBUG_RETURN(1);
+ }
+ int retval=read_next(buf, stored_result);
+ DBUG_RETURN(retval);
+}
+
+
+/*
+ ha_federatedx::read_next
+
+ reads from a result set and converts to mysql internal
+ format
+
+ SYNOPSIS
+ field_in_record_is_null()
+ buf byte pointer to record
+ result mysql result set
+
+ DESCRIPTION
+ This method is a wrapper method that reads one record from a result
+ set and converts it to the internal table format
+
+ RETURN VALUE
+ 1 error
+ 0 no error
+*/
+
+int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
+{
+ int retval;
+ FEDERATEDX_IO_ROW *row;
+ DBUG_ENTER("ha_federatedx::read_next");
+
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
+ DBUG_RETURN(retval);
+
+ /* Fetch a row, insert it back in a row format. */
+ if (!(row= io->fetch_row(result, &current)))
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+ if (!(retval= convert_row_to_internal_format(buf, row, result)))
+ table->status= 0;
+
+ DBUG_RETURN(retval);
+}
+
+
+/**
+ @brief Store a reference to current row.
+
+ @details During a query execution we may have different result sets (RS),
+ e.g. for different ranges. All the RS's used are stored in
+ memory and placed in @c results dynamic array. At the end of
+ execution all stored RS's are freed at once in the
+ @c ha_federated::reset().
+ So, in case of federated, a reference to current row is a
+ stored result address and current data cursor position.
+ As we keep all RS in memory during a query execution,
+ we can get any record using the reference any time until
+ @c ha_federated::reset() is called.
+ TODO: we don't have to store all RS's rows but only those
+ we call @c ha_federated::position() for, so we can free memory
+ where we store other rows in the @c ha_federated::index_end().
+
+ @param[in] record record data (unused)
+
+*/
+
+void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
+{
+ DBUG_ENTER("ha_federatedx::position");
+
+ if (!stored_result)
+ {
+ bzero(ref, ref_length);
+ DBUG_VOID_RETURN;
+ }
+
+ if (txn->acquire(share, ha_thd(), TRUE, &io))
+ DBUG_VOID_RETURN;
+
+ io->mark_position(stored_result, ref, current);
+
+ position_called= TRUE;
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ This is like rnd_next, but you are given a position to use to determine the
+ row. The position will be of the type that you stored in ref.
+
+ This method is required for an ORDER BY
+
+ Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
+*/
+
+int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
+{
+ int retval;
+ FEDERATEDX_IO_RESULT *result= stored_result;
+ DBUG_ENTER("ha_federatedx::rnd_pos");
+
+ /* We have to move this to 'ref' to get things aligned */
+ bmove(ref, pos, ref_length);
+
+ if ((retval= txn->acquire(share, ha_thd(), TRUE, &io)))
+ goto error;
+
+ if ((retval= io->seek_position(&result, ref)))
+ goto error;
+
+ retval= read_next(buf, result);
+ DBUG_RETURN(retval);
+
+error:
+ DBUG_RETURN(retval);
+}
+
+
+/*
+ ::info() is used to return information to the optimizer.
+ Currently this table handler doesn't implement most of the fields
+ really needed. SHOW also makes use of this data
+ Another note, you will probably want to have the following in your
+ code:
+ if (records < 2)
+ records = 2;
+ The reason is that the server will optimize for cases of only a single
+ record. If in a table scan you don't know the number of records
+ it will probably be better to set records to two so you can return
+ as many records as you need.
+ Along with records a few more variables you may wish to set are:
+ records
+ deleted
+ data_file_length
+ index_file_length
+ delete_length
+ check_time
+ Take a look at the public variables in handler.h for more information.
+
+ Called in:
+ filesort.cc
+ ha_heap.cc
+ item_sum.cc
+ opt_sum.cc
+ sql_delete.cc
+ sql_delete.cc
+ sql_derived.cc
+ sql_select.cc
+ sql_select.cc
+ sql_select.cc
+ sql_select.cc
+ sql_select.cc
+ sql_show.cc
+ sql_show.cc
+ sql_show.cc
+ sql_show.cc
+ sql_table.cc
+ sql_union.cc
+ sql_update.cc
+
+*/
+
+int ha_federatedx::info(uint flag)
+{
+ uint error_code;
+ THD *thd= ha_thd();
+ federatedx_txn *tmp_txn;
+ federatedx_io *tmp_io= 0, **iop= 0;
+ DBUG_ENTER("ha_federatedx::info");
+
+ error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
+
+ // external_lock may not have been called so txn may not be set
+ tmp_txn= get_txn(thd);
+
+ /* we want not to show table status if not needed to do so */
+ if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO))
+ {
+ if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
+ goto fail;
+ }
+
+ if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST))
+ {
+ /*
+ size of IO operations (This is based on a good guess, no high science
+ involved)
+ */
+ if (flag & HA_STATUS_CONST)
+ stats.block_size= 4096;
+
+ if ((*iop)->table_metadata(&stats, share->table_name,
+ (uint)share->table_name_length, flag))
+ goto error;
+ }
+
+ if (flag & HA_STATUS_AUTO)
+ stats.auto_increment_value= (*iop)->last_insert_id();
+
+ /*
+ If ::info created it's own transaction, close it. This happens in case
+ of show table status;
+ */
+ tmp_txn->release(&tmp_io);
+
+ DBUG_RETURN(0);
+
+error:
+ if (iop && *iop)
+ {
+ my_printf_error((*iop)->error_code(), "Received error: %d : %s", MYF(0),
+ (*iop)->error_code(), (*iop)->error_str());
+ }
+ else if (remote_error_number != -1 /* error already reported */)
+ {
+ error_code= remote_error_number;
+ my_error(error_code, MYF(0), ER_THD(thd, error_code));
+ }
+fail:
+ tmp_txn->release(&tmp_io);
+ DBUG_RETURN(error_code);
+}
+
+
+/**
+ @brief Handles extra signals from MySQL server
+
+ @param[in] operation Hint for storage engine
+
+ @return Operation Status
+ @retval 0 OK
+ */
+int ha_federatedx::extra(ha_extra_function operation)
+{
+ DBUG_ENTER("ha_federatedx::extra");
+ switch (operation) {
+ case HA_EXTRA_IGNORE_DUP_KEY:
+ ignore_duplicates= TRUE;
+ break;
+ case HA_EXTRA_NO_IGNORE_DUP_KEY:
+ insert_dup_update= FALSE;
+ ignore_duplicates= FALSE;
+ break;
+ case HA_EXTRA_WRITE_CAN_REPLACE:
+ replace_duplicates= TRUE;
+ break;
+ case HA_EXTRA_WRITE_CANNOT_REPLACE:
+ /*
+ We use this flag to ensure that we do not create an "INSERT IGNORE"
+ statement when inserting new rows into the remote table.
+ */
+ replace_duplicates= FALSE;
+ break;
+ case HA_EXTRA_INSERT_WITH_UPDATE:
+ insert_dup_update= TRUE;
+ break;
+ case HA_EXTRA_PREPARE_FOR_DROP:
+ table_will_be_deleted = TRUE;
+ break;
+ default:
+ /* do nothing */
+ DBUG_PRINT("info",("unhandled operation: %d", (uint) operation));
+ }
+ DBUG_RETURN(0);
+}
+
+
+/**
+ @brief Reset state of file to after 'open'.
+
+ @detail This function is called after every statement for all tables
+ used by that statement.
+
+ @return Operation status
+ @retval 0 OK
+*/
+
+int ha_federatedx::reset(void)
+{
+ THD *thd= ha_thd();
+ int error = 0;
+
+ insert_dup_update= FALSE;
+ ignore_duplicates= FALSE;
+ replace_duplicates= FALSE;
+ position_called= FALSE;
+
+ if (stored_result)
+ insert_dynamic(&results, (uchar*) &stored_result);
+ stored_result= 0;
+
+ if (results.elements)
+ {
+ federatedx_txn *tmp_txn;
+ federatedx_io *tmp_io= 0, **iop;
+
+ // external_lock may not have been called so txn may not be set
+ tmp_txn= get_txn(thd);
+
+ if (!*(iop= &io) && (error= tmp_txn->acquire(share, thd, TRUE, (iop= &tmp_io))))
+ {
+ DBUG_ASSERT(0); // Fail when testing
+ return error;
+ }
+
+ for (uint i= 0; i < results.elements; ++i)
+ {
+ FEDERATEDX_IO_RESULT *result= 0;
+ get_dynamic(&results, (uchar*) &result, i);
+ (*iop)->free_result(result);
+ }
+ tmp_txn->release(&tmp_io);
+ reset_dynamic(&results);
+ }
+
+ return error;
+
+}
+
+/*
+ Used to delete all rows in a table. Both for cases of truncate and
+ for cases where the optimizer realizes that all rows will be
+ removed as a result of a SQL statement.
+
+ Called from item_sum.cc by Item_func_group_concat::clear(),
+ Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
+ Called from sql_delete.cc by mysql_delete().
+ Called from sql_select.cc by JOIN::reinit().
+ Called from sql_union.cc by st_select_lex_unit::exec().
+*/
+
+int ha_federatedx::delete_all_rows()
+{
+ THD *thd= ha_thd();
+ char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+ int error;
+ DBUG_ENTER("ha_federatedx::delete_all_rows");
+
+ query.length(0);
+
+ query.set_charset(system_charset_info);
+ if (thd->lex->sql_command == SQLCOM_TRUNCATE)
+ query.append(STRING_WITH_LEN("TRUNCATE "));
+ else
+ query.append(STRING_WITH_LEN("DELETE FROM "));
+ append_ident(&query, share->table_name, share->table_name_length,
+ ident_quote_char);
+
+ /* no need for savepoint in autocommit mode */
+ if (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+ txn->stmt_autocommit();
+
+ /*
+ TRUNCATE won't return anything in mysql_affected_rows
+ */
+
+ if ((error= txn->acquire(share, thd, FALSE, &io)))
+ DBUG_RETURN(error);
+
+ if (io->query(query.ptr(), query.length()))
+ {
+ DBUG_RETURN(stash_remote_error());
+ }
+ stats.deleted+= stats.records;
+ stats.records= 0;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ The idea with handler::store_lock() is the following:
+
+ The statement decided which locks we should need for the table
+ for updates/deletes/inserts we get WRITE locks, for SELECT... we get
+ read locks.
+
+ Before adding the lock into the table lock handler (see thr_lock.c)
+ mysqld calls store lock with the requested locks. Store lock can now
+ modify a write lock to a read lock (or some other lock), ignore the
+ lock (if we don't want to use MySQL table locks at all) or add locks
+ for many tables (like we do when we are using a MERGE handler).
+
+ Berkeley DB for federatedx changes all WRITE locks to TL_WRITE_ALLOW_WRITE
+ (which signals that we are doing WRITES, but we are still allowing other
+ reader's and writer's.
+
+ When releasing locks, store_lock() are also called. In this case one
+ usually doesn't have to do anything.
+
+ In some exceptional cases MySQL may send a request for a TL_IGNORE;
+ This means that we are requesting the same lock as last time and this
+ should also be ignored. (This may happen when someone does a flush
+ table when we have opened a part of the tables, in which case mysqld
+ closes and reopens the tables and tries to get the same locks at last
+ time). In the future we will probably try to remove this.
+
+ Called from lock.cc by get_lock_data().
+*/
+
+THR_LOCK_DATA **ha_federatedx::store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type)
+{
+ DBUG_ENTER("ha_federatedx::store_lock");
+ if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
+ {
+ /*
+ Here is where we get into the guts of a row level lock.
+ If TL_UNLOCK is set
+ If we are not doing a LOCK TABLE or DISCARD/IMPORT
+ TABLESPACE, then allow multiple writers
+ */
+
+ if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+ lock_type <= TL_WRITE) && !thd->in_lock_tables)
+ lock_type= TL_WRITE_ALLOW_WRITE;
+
+ /*
+ In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
+ MySQL would use the lock TL_READ_NO_INSERT on t2, and that
+ would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
+ to t2. Convert the lock to a normal read lock to allow
+ concurrent inserts to t2.
+ */
+
+ if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
+ lock_type= TL_READ;
+
+ lock.type= lock_type;
+ }
+
+ *to++= &lock;
+
+ DBUG_RETURN(to);
+}
+
+
+static int test_connection(MYSQL_THD thd, federatedx_io *io,
+ FEDERATEDX_SHARE *share)
+{
+ char buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+ String str(buffer, sizeof(buffer), &my_charset_bin);
+ FEDERATEDX_IO_RESULT *resultset= NULL;
+ int retval;
+
+ str.length(0);
+ str.append(STRING_WITH_LEN("SELECT * FROM "));
+ append_identifier(thd, &str, share->table_name,
+ share->table_name_length);
+ str.append(STRING_WITH_LEN(" WHERE 1=0"));
+
+ if ((retval= io->query(str.ptr(), str.length())))
+ {
+ snprintf(buffer, sizeof(buffer), "database: '%s' username: '%s' hostname: '%s'",
+ share->database, share->username, share->hostname);
+ DBUG_PRINT("info", ("error-code: %d", io->error_code()));
+ my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer);
+ }
+ else
+ resultset= io->store_result();
+
+ io->free_result(resultset);
+
+ return retval;
+}
+
+/*
+ create() does nothing, since we have no local setup of our own.
+ FUTURE: We should potentially connect to the foreign database and
+*/
+
+int ha_federatedx::create(const char *name, TABLE *table_arg,
+ HA_CREATE_INFO *create_info)
+{
+ int retval;
+ THD *thd= ha_thd();
+ FEDERATEDX_SHARE tmp_share; // Only a temporary share, to test the url
+ federatedx_txn *tmp_txn;
+ federatedx_io *tmp_io= NULL;
+ DBUG_ENTER("ha_federatedx::create");
+
+ if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg->s, 1)))
+ goto error;
+
+ /* loopback socket connections hang due to LOCK_open mutex */
+ if (0 == strcmp(tmp_share.hostname, my_localhost) && !tmp_share.port)
+ goto error;
+
+ /*
+ If possible, we try to use an existing network connection to
+ the remote server. To ensure that no new FEDERATEDX_SERVER
+ instance is created, we pass NULL in get_server() TABLE arg.
+ */
+ mysql_mutex_lock(&federatedx_mutex);
+ tmp_share.s= get_server(&tmp_share, NULL);
+ mysql_mutex_unlock(&federatedx_mutex);
+
+ if (tmp_share.s)
+ {
+ tmp_txn= get_txn(thd);
+ if (!(retval= tmp_txn->acquire(&tmp_share, thd, TRUE, &tmp_io)))
+ {
+ retval= test_connection(thd, tmp_io, &tmp_share);
+ tmp_txn->release(&tmp_io);
+ }
+ free_server(tmp_txn, tmp_share.s);
+ }
+ else
+ {
+ FEDERATEDX_SERVER server;
+
+ // It's possibly wrong to use alter_table_convert_to_charset here.
+ fill_server(thd->mem_root, &server, &tmp_share,
+ create_info->alter_table_convert_to_charset);
+
+#ifndef DBUG_OFF
+ mysql_mutex_init(fe_key_mutex_FEDERATEDX_SERVER_mutex,
+ &server.mutex, MY_MUTEX_INIT_FAST);
+ mysql_mutex_lock(&server.mutex);
+#endif
+
+ tmp_io= federatedx_io::construct(thd->mem_root, &server);
+
+ retval= test_connection(thd, tmp_io, &tmp_share);
+
+#ifndef DBUG_OFF
+ mysql_mutex_unlock(&server.mutex);
+ mysql_mutex_destroy(&server.mutex);
+#endif
+
+ delete tmp_io;
+ }
+
+error:
+ DBUG_RETURN(retval);
+
+}
+
+
+int ha_federatedx::stash_remote_error()
+{
+ DBUG_ENTER("ha_federatedx::stash_remote_error()");
+ if (!io)
+ DBUG_RETURN(remote_error_number);
+ remote_error_number= io->error_code();
+ strmake_buf(remote_error_buf, io->error_str());
+ if (remote_error_number == ER_DUP_ENTRY ||
+ remote_error_number == ER_DUP_KEY)
+ DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+ DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM);
+}
+
+
+bool ha_federatedx::get_error_message(int error, String* buf)
+{
+ DBUG_ENTER("ha_federatedx::get_error_message");
+ DBUG_PRINT("enter", ("error: %d", error));
+ if (error == HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM)
+ {
+ buf->append(STRING_WITH_LEN("Error on remote system: "));
+ buf->qs_append(remote_error_number);
+ buf->append(STRING_WITH_LEN(": "));
+ buf->append(remote_error_buf, strlen(remote_error_buf));
+ /* Ensure string ends with \0 */
+ (void) buf->c_ptr_safe();
+
+ remote_error_number= 0;
+ remote_error_buf[0]= '\0';
+ }
+ DBUG_PRINT("exit", ("message: %s", buf->c_ptr_safe()));
+ DBUG_RETURN(FALSE);
+}
+
+
+int ha_federatedx::start_stmt(MYSQL_THD thd, thr_lock_type lock_type)
+{
+ DBUG_ENTER("ha_federatedx::start_stmt");
+ DBUG_ASSERT(txn == get_txn(thd));
+
+ if (!txn->in_transaction())
+ {
+ txn->stmt_begin();
+ trans_register_ha(thd, FALSE, ht, 0);
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type)
+{
+ int error= 0;
+ DBUG_ENTER("ha_federatedx::external_lock");
+
+ if (lock_type == F_UNLCK)
+ txn->release(&io);
+ else
+ {
+ table_will_be_deleted = FALSE;
+ txn= get_txn(thd);
+ if (!(error= txn->acquire(share, ha_thd(), lock_type == F_RDLCK, &io)) &&
+ (lock_type == F_WRLCK || !io->is_autocommit()))
+ {
+ if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+ {
+ txn->stmt_begin();
+ trans_register_ha(thd, FALSE, ht, 0);
+ }
+ else
+ {
+ txn->txn_begin();
+ trans_register_ha(thd, TRUE, ht, 0);
+ }
+ }
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+int ha_federatedx::savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv)
+{
+ int error= 0;
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
+ DBUG_ENTER("ha_federatedx::savepoint_set");
+
+ if (txn && txn->has_connections())
+ {
+ if (txn->txn_begin())
+ trans_register_ha(thd, TRUE, hton, 0);
+
+ txn->sp_acquire((ulong *) sv);
+
+ DBUG_ASSERT(1 < *(ulong *) sv);
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+int ha_federatedx::savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv)
+ {
+ int error= 0;
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
+ DBUG_ENTER("ha_federatedx::savepoint_rollback");
+
+ if (txn)
+ error= txn->sp_rollback((ulong *) sv);
+
+ DBUG_RETURN(error);
+}
+
+
+int ha_federatedx::savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv)
+{
+ int error= 0;
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
+ DBUG_ENTER("ha_federatedx::savepoint_release");
+
+ if (txn)
+ error= txn->sp_release((ulong *) sv);
+
+ DBUG_RETURN(error);
+}
+
+
+int ha_federatedx::commit(handlerton *hton, MYSQL_THD thd, bool all)
+{
+ int return_val;
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
+ DBUG_ENTER("ha_federatedx::commit");
+
+ if (all)
+ return_val= txn->txn_commit();
+ else
+ return_val= txn->stmt_commit();
+
+ DBUG_PRINT("info", ("error val: %d", return_val));
+ DBUG_RETURN(return_val);
+}
+
+
+int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all)
+{
+ int return_val;
+ federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
+ DBUG_ENTER("ha_federatedx::rollback");
+
+ if (all)
+ return_val= txn->txn_rollback();
+ else
+ return_val= txn->stmt_rollback();
+
+ DBUG_PRINT("info", ("error val: %d", return_val));
+ DBUG_RETURN(return_val);
+}
+
+
+/*
+ Federated supports assisted discovery, like
+ CREATE TABLE t1 CONNECTION="mysql://joe:pass@192.168.1.111/federated/t1";
+ but not a fully automatic discovery where a table magically appear
+ on any use (like, on SELECT * from t1).
+*/
+int ha_federatedx::discover_assisted(handlerton *hton, THD* thd,
+ TABLE_SHARE *table_s, HA_CREATE_INFO *info)
+{
+ int error= HA_ERR_NO_CONNECTION;
+ FEDERATEDX_SHARE tmp_share;
+ CHARSET_INFO *cs= system_charset_info;
+ MYSQL mysql;
+ char buf[1024];
+ String query(buf, sizeof(buf), cs);
+ static LEX_CSTRING cut_clause={STRING_WITH_LEN(" WITH SYSTEM VERSIONING")};
+ static LEX_CSTRING cut_start={STRING_WITH_LEN("GENERATED ALWAYS AS ROW START")};
+ static LEX_CSTRING cut_end={STRING_WITH_LEN("GENERATED ALWAYS AS ROW END")};
+ static LEX_CSTRING set_ts={STRING_WITH_LEN("DEFAULT TIMESTAMP'1971-01-01 00:00:00'")};
+ int cut_offset;
+ MYSQL_RES *res;
+ MYSQL_ROW rdata;
+ ulong *rlen;
+ my_bool my_true= 1;
+
+ if (parse_url(thd->mem_root, &tmp_share, table_s, 1))
+ return HA_WRONG_CREATE_OPTION;
+
+ mysql_init(&mysql);
+ mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, cs->cs_name.str);
+ mysql_options(&mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*)&my_true);
+
+ if (!mysql_real_connect(&mysql, tmp_share.hostname, tmp_share.username,
+ tmp_share.password, tmp_share.database,
+ tmp_share.port, tmp_share.socket, 0))
+ goto err1;
+
+ if (mysql_real_query(&mysql, STRING_WITH_LEN("SET SQL_MODE=NO_TABLE_OPTIONS")))
+ goto err1;
+
+ query.copy(STRING_WITH_LEN("SHOW CREATE TABLE "), cs);
+ append_ident(&query, tmp_share.table_name,
+ tmp_share.table_name_length, ident_quote_char);
+
+ if (mysql_real_query(&mysql, query.ptr(), query.length()))
+ goto err1;
+
+ if (!((res= mysql_store_result(&mysql))))
+ goto err1;
+
+ if (!(rdata= mysql_fetch_row(res)) || !((rlen= mysql_fetch_lengths(res))))
+ goto err2;
+
+ query.copy(rdata[1], rlen[1], cs);
+ cut_offset= (int)query.length() - (int)cut_clause.length;
+ if (cut_offset > 0 && !memcmp(query.ptr() + cut_offset,
+ cut_clause.str, cut_clause.length))
+ {
+ query.length(cut_offset);
+ const char *ptr= strstr(query.ptr(), cut_start.str);
+ if (ptr)
+ {
+ query.replace((uint32) (ptr - query.ptr()), (uint32) cut_start.length,
+ set_ts.str, (uint32) set_ts.length);
+ }
+ ptr= strstr(query.ptr(), cut_end.str);
+ if (ptr)
+ {
+ query.replace((uint32) (ptr - query.ptr()), (uint32) cut_end.length,
+ set_ts.str, (uint32) set_ts.length);
+ }
+ }
+ query.append(STRING_WITH_LEN(" CONNECTION='"), cs);
+ query.append_for_single_quote(table_s->connect_string.str,
+ table_s->connect_string.length);
+ query.append('\'');
+
+ error= table_s->init_from_sql_statement_string(thd, true,
+ query.ptr(), query.length());
+
+err2:
+ mysql_free_result(res);
+err1:
+ if (error)
+ my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), mysql_error(&mysql));
+ mysql_close(&mysql);
+ return error;
+}
+
+
+struct st_mysql_storage_engine federatedx_storage_engine=
+{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+
+my_bool use_pushdown;
+static MYSQL_SYSVAR_BOOL(pushdown, use_pushdown, 0,
+ "Use query fragments pushdown capabilities", NULL, NULL, FALSE);
+static struct st_mysql_sys_var* sysvars[]= { MYSQL_SYSVAR(pushdown), NULL };
+
+#include "federatedx_pushdown.cc"
+
+maria_declare_plugin(federatedx)
+{
+ MYSQL_STORAGE_ENGINE_PLUGIN,
+ &federatedx_storage_engine,
+ "FEDERATED",
+ "Patrick Galbraith",
+ "Allows one to access tables on other MariaDB servers, supports transactions and more",
+ PLUGIN_LICENSE_GPL,
+ federatedx_db_init, /* Plugin Init */
+ federatedx_done, /* Plugin Deinit */
+ 0x0201 /* 2.1 */,
+ NULL, /* status variables */
+ sysvars, /* system variables */
+ "2.1", /* string version */
+ MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
+}
+maria_declare_plugin_end;
diff --git a/storage/federatedx/ha_federatedx.h b/storage/federatedx/ha_federatedx.h
new file mode 100644
index 00000000..3573c658
--- /dev/null
+++ b/storage/federatedx/ha_federatedx.h
@@ -0,0 +1,487 @@
+#ifndef HA_FEDERATEDX_INCLUDED
+#define HA_FEDERATEDX_INCLUDED
+/*
+Copyright (c) 2008, Patrick Galbraith
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+ * Neither the name of Patrick Galbraith nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface /* gcc class implementation */
+#endif
+
+//#include <mysql.h>
+#include <my_global.h>
+#include <thr_lock.h>
+#include "handler.h"
+
+class federatedx_io;
+
+/*
+ FEDERATEDX_SERVER will eventually be a structure that will be shared among
+ all FEDERATEDX_SHARE instances so that the federated server can minimise
+ the number of open connections. This will eventually lead to the support
+ of reliable XA federated tables.
+*/
+typedef struct st_fedrated_server {
+ MEM_ROOT mem_root;
+ uint use_count, io_count;
+
+ uchar *key;
+ uint key_length;
+
+ const char *scheme;
+ const char *hostname;
+ const char *username;
+ const char *password;
+ const char *database;
+ const char *socket;
+ ushort port;
+
+ const char *csname;
+
+ mysql_mutex_t mutex;
+ federatedx_io *idle_list;
+} FEDERATEDX_SERVER;
+
+/*
+ Please read ha_exmple.cc before reading this file.
+ Please keep in mind that the federatedx storage engine implements all methods
+ that are required to be implemented. handler.h has a full list of methods
+ that you can implement.
+*/
+
+/*
+ handler::print_error has a case statement for error numbers.
+ This value is (10000) is far out of range and will envoke the
+ default: case.
+ (Current error range is 120-159 from include/my_base.h)
+*/
+#define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000
+
+#define FEDERATEDX_QUERY_BUFFER_SIZE STRING_BUFFER_USUAL_SIZE * 5
+#define FEDERATEDX_RECORDS_IN_RANGE 2
+#define FEDERATEDX_MAX_KEY_LENGTH 3500 // Same as innodb
+
+/*
+ FEDERATEDX_SHARE is a structure that will be shared amoung all open handlers
+ The example implements the minimum of what you will probably need.
+*/
+typedef struct st_federatedx_share {
+ MEM_ROOT mem_root;
+
+ bool parsed;
+ /* this key is unique db/tablename */
+ const char *share_key;
+ /*
+ the primary select query to be used in rnd_init
+ */
+ LEX_CSTRING select_query;
+ /*
+ remote host info, parse_url supplies
+ */
+ char *server_name;
+ char *connection_string;
+ char *scheme;
+ char *hostname;
+ char *username;
+ char *password;
+ char *database;
+ char *table_name;
+ char *table;
+ char *socket;
+ char *sport;
+ int share_key_length;
+ ushort port;
+
+ size_t table_name_length, server_name_length, connect_string_length;
+ uint use_count;
+ THR_LOCK lock;
+ FEDERATEDX_SERVER *s;
+} FEDERATEDX_SHARE;
+
+
+typedef struct st_federatedx_result FEDERATEDX_IO_RESULT;
+typedef struct st_federatedx_row FEDERATEDX_IO_ROW;
+typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS;
+typedef ptrdiff_t FEDERATEDX_IO_OFFSET;
+
+class federatedx_io
+{
+ friend class federatedx_txn;
+ FEDERATEDX_SERVER * const server;
+ federatedx_io **owner_ptr;
+ federatedx_io *txn_next;
+ federatedx_io *idle_next;
+ bool active; /* currently participating in a transaction */
+ bool busy; /* in use by a ha_federated instance */
+ bool readonly;/* indicates that no updates have occurred */
+
+protected:
+ void set_active(bool new_active)
+ { active= new_active; }
+public:
+ federatedx_io(FEDERATEDX_SERVER *);
+ virtual ~federatedx_io();
+
+ bool is_readonly() const { return readonly; }
+ bool is_active() const { return active; }
+
+ const char * get_charsetname() const
+ { return server->csname ? server->csname : "latin1"; }
+
+ const char * get_hostname() const { return server->hostname; }
+ const char * get_username() const { return server->username; }
+ const char * get_password() const { return server->password; }
+ const char * get_database() const { return server->database; }
+ ushort get_port() const { return server->port; }
+ const char * get_socket() const { return server->socket; }
+
+ static bool handles_scheme(const char *scheme);
+ static federatedx_io *construct(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server);
+
+ static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
+ { return alloc_root(mem_root, size); }
+ static void operator delete(void *ptr, size_t size)
+ { TRASH_FREE(ptr, size); }
+ static void operator delete(void *, MEM_ROOT *)
+ { }
+
+ virtual int query(const char *buffer, size_t length)=0;
+ virtual FEDERATEDX_IO_RESULT *store_result()=0;
+
+ virtual size_t max_query_size() const=0;
+
+ virtual my_ulonglong affected_rows() const=0;
+ virtual my_ulonglong last_insert_id() const=0;
+
+ virtual int error_code()=0;
+ virtual const char *error_str()=0;
+
+ virtual void reset()=0;
+ virtual int commit()=0;
+ virtual int rollback()=0;
+
+ virtual int savepoint_set(ulong sp)=0;
+ virtual ulong savepoint_release(ulong sp)=0;
+ virtual ulong savepoint_rollback(ulong sp)=0;
+ virtual void savepoint_restrict(ulong sp)=0;
+
+ virtual ulong last_savepoint() const=0;
+ virtual ulong actual_savepoint() const=0;
+ virtual bool is_autocommit() const=0;
+
+ virtual bool table_metadata(ha_statistics *stats, const char *table_name,
+ uint table_name_length, uint flag) = 0;
+
+ /* resultset operations */
+
+ virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
+ virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
+ virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
+ virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
+ FEDERATEDX_IO_ROWS **current= NULL)=0;
+ virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
+ virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
+ unsigned int column)=0;
+ virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
+ unsigned int column) const=0;
+
+ virtual size_t get_ref_length() const=0;
+ virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+ void *ref, FEDERATEDX_IO_ROWS *current)=0;
+ virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+ const void *ref)=0;
+ virtual void set_thd(void *thd) { }
+
+};
+
+
+class federatedx_txn
+{
+ federatedx_io *txn_list;
+ ulong savepoint_level;
+ ulong savepoint_stmt;
+ ulong savepoint_next;
+
+ void release_scan();
+public:
+ federatedx_txn();
+ ~federatedx_txn();
+
+ bool has_connections() const { return txn_list != NULL; }
+ bool in_transaction() const { return savepoint_next != 0; }
+ int acquire(FEDERATEDX_SHARE *share, void *thd, bool readonly, federatedx_io **io);
+ void release(federatedx_io **io);
+ void close(FEDERATEDX_SERVER *);
+
+ bool txn_begin();
+ int txn_commit();
+ int txn_rollback();
+
+ bool sp_acquire(ulong *save);
+ int sp_rollback(ulong *save);
+ int sp_release(ulong *save);
+
+ bool stmt_begin();
+ int stmt_commit();
+ int stmt_rollback();
+ void stmt_autocommit();
+};
+
+/*
+ Class definition for the storage engine
+*/
+class ha_federatedx final : public handler
+{
+ friend int federatedx_db_init(void *p);
+
+ THR_LOCK_DATA lock; /* MySQL lock */
+ FEDERATEDX_SHARE *share; /* Shared lock info */
+ federatedx_txn *txn;
+ federatedx_io *io;
+ FEDERATEDX_IO_RESULT *stored_result;
+ FEDERATEDX_IO_ROWS *current;
+ /**
+ Array of all stored results we get during a query execution.
+ */
+ DYNAMIC_ARRAY results;
+ bool position_called;
+ int remote_error_number;
+ char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
+ bool ignore_duplicates, replace_duplicates;
+ bool insert_dup_update, table_will_be_deleted;
+ DYNAMIC_STRING bulk_insert;
+
+private:
+ /*
+ return 0 on success
+ return errorcode otherwise
+ */
+ uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row,
+ FEDERATEDX_IO_RESULT *result);
+ bool create_where_from_key(String *to, KEY *key_info,
+ const key_range *start_key,
+ const key_range *end_key, bool eq_range);
+ int stash_remote_error();
+
+ static federatedx_txn *get_txn(THD *thd, bool no_create= FALSE);
+ static int disconnect(handlerton *hton, MYSQL_THD thd);
+ static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv);
+ static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv);
+ static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv);
+ static int commit(handlerton *hton, MYSQL_THD thd, bool all);
+ static int rollback(handlerton *hton, MYSQL_THD thd, bool all);
+ static int discover_assisted(handlerton *, THD*, TABLE_SHARE *,
+ HA_CREATE_INFO *);
+
+ bool append_stmt_insert(String *query);
+
+ int read_next(uchar *buf, FEDERATEDX_IO_RESULT *result);
+ int index_read_idx_with_result_set(uchar *buf, uint index,
+ const uchar *key,
+ uint key_len,
+ ha_rkey_function find_flag,
+ FEDERATEDX_IO_RESULT **result);
+ int real_query(const char *query, uint length);
+ int real_connect(FEDERATEDX_SHARE *my_share, uint create_flag);
+public:
+ ha_federatedx(handlerton *hton, TABLE_SHARE *table_arg);
+ ~ha_federatedx() = default;
+ /*
+ The name of the index type that will be used for display
+ don't implement this method unless you really have indexes
+ */
+ // perhaps get index type
+ const char *index_type(uint inx) { return "REMOTE"; }
+ /*
+ This is a list of flags that says what the storage engine
+ implements. The current table flags are documented in
+ handler.h
+ */
+ ulonglong table_flags() const
+ {
+ /* fix server to be able to get remote server table flags */
+ return (HA_PRIMARY_KEY_IN_READ_INDEX | HA_FILE_BASED
+ | HA_REC_NOT_IN_SEQ | HA_AUTO_PART_KEY | HA_CAN_INDEX_BLOBS |
+ HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_CAN_REPAIR |
+ HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | HA_CAN_ONLINE_BACKUPS |
+ HA_PARTIAL_COLUMN_READ | HA_NULL_IN_KEY | HA_NON_COMPARABLE_ROWID);
+ }
+ /*
+ This is a bitmap of flags that says how the storage engine
+ implements indexes. The current index flags are documented in
+ handler.h. If you do not implement indexes, just return zero
+ here.
+
+ part is the key part to check. First key part is 0
+ If all_parts it's set, MySQL want to know the flags for the combined
+ index up to and including 'part'.
+ */
+ /* fix server to be able to get remote server index flags */
+ ulong index_flags(uint inx, uint part, bool all_parts) const
+ {
+ return (HA_READ_NEXT | HA_READ_RANGE);
+ }
+ uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
+ uint max_supported_keys() const { return MAX_KEY; }
+ uint max_supported_key_parts() const { return MAX_REF_PARTS; }
+ uint max_supported_key_length() const { return FEDERATEDX_MAX_KEY_LENGTH; }
+ uint max_supported_key_part_length() const { return FEDERATEDX_MAX_KEY_LENGTH; }
+ /*
+ Called in test_quick_select to determine if indexes should be used.
+ Normally, we need to know number of blocks . For federatedx we need to
+ know number of blocks on remote side, and number of packets and blocks
+ on the network side (?)
+ Talk to Kostja about this - how to get the
+ number of rows * ...
+ disk scan time on other side (block size, size of the row) + network time ...
+ The reason for "records * 1000" is that such a large number forces
+ this to use indexes "
+ */
+ double scan_time()
+ {
+ DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
+ return (double)(stats.records*1000);
+ }
+ /*
+ The next method will never be called if you do not implement indexes.
+ */
+ double read_time(uint index, uint ranges, ha_rows rows)
+ {
+ /*
+ Per Brian, this number is bugus, but this method must be implemented,
+ and at a later date, he intends to document this issue for handler code
+ */
+ return (double) rows / 20.0+1;
+ }
+
+ const key_map *keys_to_use_for_scanning() { return &key_map_full; }
+ /*
+ Everything below are methods that we implment in ha_federatedx.cc.
+
+ Most of these methods are not obligatory, skip them and
+ MySQL will treat them as not implemented
+ */
+ int open(const char *name, int mode, uint test_if_locked); // required
+ int close(void); // required
+
+ void start_bulk_insert(ha_rows rows, uint flags);
+ int end_bulk_insert();
+ int write_row(const uchar *buf);
+ int update_row(const uchar *old_data, const uchar *new_data);
+ int delete_row(const uchar *buf);
+ int index_init(uint keynr, bool sorted);
+ ha_rows estimate_rows_upper_bound();
+ int index_read(uchar *buf, const uchar *key,
+ uint key_len, enum ha_rkey_function find_flag);
+ int index_read_idx(uchar *buf, uint idx, const uchar *key,
+ uint key_len, enum ha_rkey_function find_flag);
+ int index_next(uchar *buf);
+ int index_end();
+ int read_range_first(const key_range *start_key,
+ const key_range *end_key,
+ bool eq_range, bool sorted);
+ int read_range_next();
+ /*
+ unlike index_init(), rnd_init() can be called two times
+ without rnd_end() in between (it only makes sense if scan=1).
+ then the second call should prepare for the new table scan
+ (e.g if rnd_init allocates the cursor, second call should
+ position it to the start of the table, no need to deallocate
+ and allocate it again
+ */
+ int rnd_init(bool scan); //required
+ int rnd_end();
+ int rnd_next(uchar *buf); //required
+ int rnd_pos(uchar *buf, uchar *pos); //required
+ void position(const uchar *record); //required
+ /*
+ A ref is a pointer inside a local buffer. It is not comparable to
+ other ref's. This is never called as HA_NON_COMPARABLE_ROWID is set.
+ */
+ int cmp_ref(const uchar *ref1, const uchar *ref2)
+ {
+#ifdef NOT_YET
+ DBUG_ASSERT(0);
+ return 0;
+#else
+ return handler::cmp_ref(ref1,ref2); /* Works if table scan is used */
+#endif
+ }
+ int info(uint); //required
+ int extra(ha_extra_function operation);
+
+ void update_auto_increment(void);
+ int repair(THD* thd, HA_CHECK_OPT* check_opt);
+ int optimize(THD* thd, HA_CHECK_OPT* check_opt);
+ int delete_table(const char *name)
+ {
+ return 0;
+ }
+ int delete_all_rows(void);
+ int create(const char *name, TABLE *form,
+ HA_CREATE_INFO *create_info); //required
+ ha_rows records_in_range(uint inx, const key_range *start_key,
+ const key_range *end_key, page_range *pages);
+ uint8 table_cache_type() { return HA_CACHE_TBL_NOCACHE; }
+
+ THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type); //required
+ bool get_error_message(int error, String *buf);
+ int start_stmt(THD *thd, thr_lock_type lock_type);
+ int external_lock(THD *thd, int lock_type);
+ int reset(void);
+ int free_result(void);
+
+ const FEDERATEDX_SHARE *get_federatedx_share() const { return share; }
+ friend class ha_federatedx_derived_handler;
+ friend class ha_federatedx_select_handler;
+};
+
+extern const char ident_quote_char; // Character for quoting
+ // identifiers
+extern const char value_quote_char; // Character for quoting
+ // literals
+
+extern bool append_ident(String *string, const char *name, size_t length,
+ const char quote_char);
+
+
+extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server);
+extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
+ FEDERATEDX_SERVER *server);
+
+#include "federatedx_pushdown.h"
+
+#endif /* HA_FEDERATEDX_INCLUDED */