summaryrefslogtreecommitdiffstats
path: root/storage/sphinx
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--storage/sphinx/CMakeLists.txt22
-rw-r--r--storage/sphinx/gen_data.php37
-rw-r--r--storage/sphinx/ha_sphinx.cc3698
-rw-r--r--storage/sphinx/ha_sphinx.h175
-rwxr-xr-xstorage/sphinx/make-patch.sh36
-rw-r--r--storage/sphinx/mysql-test/sphinx/my.cnf29
-rw-r--r--storage/sphinx/mysql-test/sphinx/sphinx.result97
-rw-r--r--storage/sphinx/mysql-test/sphinx/sphinx.test56
-rw-r--r--storage/sphinx/mysql-test/sphinx/suite.opt1
-rw-r--r--storage/sphinx/mysql-test/sphinx/suite.pm153
-rw-r--r--storage/sphinx/mysql-test/sphinx/testdata.xml44
-rw-r--r--storage/sphinx/mysql-test/sphinx/union-5539.result16
-rw-r--r--storage/sphinx/mysql-test/sphinx/union-5539.test11
-rw-r--r--storage/sphinx/snippets_udf.cc825
14 files changed, 5200 insertions, 0 deletions
diff --git a/storage/sphinx/CMakeLists.txt b/storage/sphinx/CMakeLists.txt
new file mode 100644
index 00000000..185ffdaa
--- /dev/null
+++ b/storage/sphinx/CMakeLists.txt
@@ -0,0 +1,22 @@
+INCLUDE(CheckCCompilerFlag)
+
+ADD_DEFINITIONS(-DMYSQL_SERVER)
+
+MY_CHECK_AND_SET_COMPILER_FLAG("-Wno-write-strings")
+
+IF(MSVC)
+ # Temporarily disable "conversion from size_t .." warnings
+ IF(CMAKE_SIZEOF_VOID_P EQUAL 8)
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4267")
+ ENDIF()
+ # Disable warning about deprecated functions, inet_aton
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996")
+ STRING(REPLACE "/permissive-" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}" )
+ENDIF()
+
+IF(MSVC)
+ LINK_LIBRARIES(ws2_32)
+ENDIF(MSVC)
+
+SET(SPHINX_SOURCES ha_sphinx.cc snippets_udf.cc)
+MYSQL_ADD_PLUGIN(sphinx ${SPHINX_SOURCES} STORAGE_ENGINE RECOMPILE_FOR_EMBEDDED)
diff --git a/storage/sphinx/gen_data.php b/storage/sphinx/gen_data.php
new file mode 100644
index 00000000..dac374f0
--- /dev/null
+++ b/storage/sphinx/gen_data.php
@@ -0,0 +1,37 @@
+<?php
+
+$file_name= $argv[1];
+
+//echo $file_name;
+
+$cont= file_get_contents($file_name);
+
+$words= explode(" ", $cont);
+
+//echo "words: ".(count($words))."\n";
+
+$cw = count($words);
+
+echo "REPLACE INTO test.documents ( id, group_id, date_added, title, content ) VALUES\n";
+
+
+for ($i=1; $i<=100000; $i++)
+{
+ $count_words= mt_rand(10,30);
+ $pred = "";
+ for ($j=0; $j<$count_words; $j++)
+ {
+ $pred .= chop($words[mt_rand(1, $cw-1)])." ";
+ }
+ $count_words= mt_rand(3,5);
+ $tit = "";
+ for ($j=0; $j<$count_words; $j++)
+ {
+ $tit .= chop($words[mt_rand(1, $cw-1)])." ";
+ }
+ echo "($i,".mt_rand(1,20).",NOW(),'".addslashes($tit)."','".addslashes($pred)."'),\n";
+}
+ echo "(0,1,now(),'end','eND');\n";
+
+
+?>
diff --git a/storage/sphinx/ha_sphinx.cc b/storage/sphinx/ha_sphinx.cc
new file mode 100644
index 00000000..3c5b23f5
--- /dev/null
+++ b/storage/sphinx/ha_sphinx.cc
@@ -0,0 +1,3698 @@
+//
+// $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
+//
+
+//
+// Copyright (c) 2001-2014, Andrew Aksyonoff
+// Copyright (c) 2008-2014, Sphinx Technologies Inc
+// All rights reserved
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License. You should have
+// received a copy of the GPL license along with this program; if you
+// did not, you can find it at http://www.gnu.org/
+//
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+#if defined(_MSC_VER) && _MSC_VER>=1400
+#define _CRT_SECURE_NO_DEPRECATE 1
+#define _CRT_NONSTDC_NO_DEPRECATE 1
+#endif
+
+#include <my_global.h>
+#include <mysql_version.h>
+
+#if MYSQL_VERSION_ID>=50515
+#include "sql_class.h"
+#include "sql_array.h"
+#elif MYSQL_VERSION_ID>50100
+#include "mysql_priv.h"
+#include <mysql/plugin.h>
+#else
+#include "../mysql_priv.h"
+#endif
+
+#include <mysys_err.h>
+#include <my_sys.h>
+#include <mysql.h> // include client for INSERT table (sort of redoing federated..)
+
+#ifndef _WIN32
+ // UNIX-specific
+ #include <my_net.h>
+ #include <netdb.h>
+ #include <sys/un.h>
+
+ #define RECV_FLAGS MSG_WAITALL
+
+ #define sphSockClose(_sock) ::close(_sock)
+#else
+ // Windows-specific
+ #include <io.h>
+ #define snprintf _snprintf
+
+ #define RECV_FLAGS 0
+
+ #define sphSockClose(_sock) ::closesocket(_sock)
+#endif
+
+#include <ctype.h>
+#include "ha_sphinx.h"
+
+#ifndef MSG_WAITALL
+#define MSG_WAITALL 0
+#endif
+
+#if defined(_MSC_VER) && _MSC_VER>=1400
+#pragma warning(push,4)
+#endif
+
+/////////////////////////////////////////////////////////////////////////////
+
+/// there might be issues with min() on different platforms (eg. Gentoo, they say)
+#define Min(a,b) ((a)<(b)?(a):(b))
+
+/// unaligned RAM accesses are forbidden on SPARC
+#if defined(sparc) || defined(__sparc__)
+#define UNALIGNED_RAM_ACCESS 0
+#else
+#define UNALIGNED_RAM_ACCESS 1
+#endif
+
+
+#if UNALIGNED_RAM_ACCESS
+
+/// pass-through wrapper
+template < typename T > inline T sphUnalignedRead ( const T & tRef )
+{
+ return tRef;
+}
+
+/// pass-through wrapper
+template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
+{
+ *(T*)pPtr = tVal;
+}
+
+#else
+
+/// unaligned read wrapper for some architectures (eg. SPARC)
+template < typename T >
+inline T sphUnalignedRead ( const T & tRef )
+{
+ T uTmp;
+ byte * pSrc = (byte *) &tRef;
+ byte * pDst = (byte *) &uTmp;
+ for ( int i=0; i<(int)sizeof(T); i++ )
+ *pDst++ = *pSrc++;
+ return uTmp;
+}
+
+/// unaligned write wrapper for some architectures (eg. SPARC)
+template < typename T >
+void sphUnalignedWrite ( void * pPtr, const T & tVal )
+{
+ byte * pDst = (byte *) pPtr;
+ byte * pSrc = (byte *) &tVal;
+ for ( int i=0; i<(int)sizeof(T); i++ )
+ *pDst++ = *pSrc++;
+}
+
+#endif
+
+#if MYSQL_VERSION_ID>=50515
+
+#define sphinx_hash_init my_hash_init
+#define sphinx_hash_free my_hash_free
+#define sphinx_hash_search my_hash_search
+#define sphinx_hash_delete my_hash_delete
+
+#else
+
+#define sphinx_hash_init hash_init
+#define sphinx_hash_free hash_free
+#define sphinx_hash_search hash_search
+#define sphinx_hash_delete hash_delete
+
+#endif
+
+/////////////////////////////////////////////////////////////////////////////
+
+// FIXME! make this all dynamic
+#define SPHINXSE_MAX_FILTERS 32
+
+#define SPHINXAPI_DEFAULT_HOST "127.0.0.1"
+#define SPHINXAPI_DEFAULT_PORT 9312
+#define SPHINXAPI_DEFAULT_INDEX "*"
+
+#define SPHINXQL_DEFAULT_PORT 9306
+
+#define SPHINXSE_SYSTEM_COLUMNS 3
+
+#define SPHINXSE_MAX_ALLOC (16*1024*1024)
+#define SPHINXSE_MAX_KEYWORDSTATS 4096
+
+#define SPHINXSE_VERSION "2.2.6-release"
+
+// FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
+// cut-n-paste is somewhat simpler that adding dependencies however..
+
+enum
+{
+ SPHINX_SEARCHD_PROTO = 1,
+ SEARCHD_COMMAND_SEARCH = 0,
+ VER_COMMAND_SEARCH = 0x119,
+};
+
+/// search query sorting orders
+enum ESphSortOrder
+{
+ SPH_SORT_RELEVANCE = 0, ///< sort by document relevance desc, then by date
+ SPH_SORT_ATTR_DESC = 1, ///< sort by document date desc, then by relevance desc
+ SPH_SORT_ATTR_ASC = 2, ///< sort by document date asc, then by relevance desc
+ SPH_SORT_TIME_SEGMENTS = 3, ///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
+ SPH_SORT_EXTENDED = 4, ///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
+ SPH_SORT_EXPR = 5, ///< sort by expression
+
+ SPH_SORT_TOTAL
+};
+
+/// search query matching mode
+enum ESphMatchMode
+{
+ SPH_MATCH_ALL = 0, ///< match all query words
+ SPH_MATCH_ANY, ///< match any query word
+ SPH_MATCH_PHRASE, ///< match this exact phrase
+ SPH_MATCH_BOOLEAN, ///< match this boolean query
+ SPH_MATCH_EXTENDED, ///< match this extended query
+ SPH_MATCH_FULLSCAN, ///< match all document IDs w/o fulltext query, apply filters
+ SPH_MATCH_EXTENDED2, ///< extended engine V2
+
+ SPH_MATCH_TOTAL
+};
+
+/// search query relevance ranking mode
+enum ESphRankMode
+{
+ SPH_RANK_PROXIMITY_BM25 = 0, ///< default mode, phrase proximity major factor and BM25 minor one
+ SPH_RANK_BM25 = 1, ///< statistical mode, BM25 ranking only (faster but worse quality)
+ SPH_RANK_NONE = 2, ///< no ranking, all matches get a weight of 1
+ SPH_RANK_WORDCOUNT = 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurrence counts
+ SPH_RANK_PROXIMITY = 4, ///< phrase proximity
+ SPH_RANK_MATCHANY = 5, ///< emulate old match-any weighting
+ SPH_RANK_FIELDMASK = 6, ///< sets bits where there were matches
+ SPH_RANK_SPH04 = 7, ///< codename SPH04, phrase proximity + bm25 + head/exact boost
+ SPH_RANK_EXPR = 8, ///< expression based ranker
+
+ SPH_RANK_TOTAL,
+ SPH_RANK_DEFAULT = SPH_RANK_PROXIMITY_BM25
+};
+
+/// search query grouping mode
+enum ESphGroupBy
+{
+ SPH_GROUPBY_DAY = 0, ///< group by day
+ SPH_GROUPBY_WEEK = 1, ///< group by week
+ SPH_GROUPBY_MONTH = 2, ///< group by month
+ SPH_GROUPBY_YEAR = 3, ///< group by year
+ SPH_GROUPBY_ATTR = 4, ///< group by attribute value
+ SPH_GROUPBY_ATTRPAIR = 5, ///< group by sequential attrs pair (rendered redundant by 64bit attrs support; removed)
+ SPH_GROUPBY_MULTIPLE = 6 ///< group by on multiple attribute values
+};
+
+/// known attribute types
+enum
+{
+ SPH_ATTR_NONE = 0, ///< not an attribute at all
+ SPH_ATTR_INTEGER = 1, ///< this attr is just an integer
+ SPH_ATTR_TIMESTAMP = 2, ///< this attr is a timestamp
+ SPH_ATTR_ORDINAL = 3, ///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
+ SPH_ATTR_BOOL = 4, ///< this attr is a boolean bit field
+ SPH_ATTR_FLOAT = 5,
+ SPH_ATTR_BIGINT = 6,
+ SPH_ATTR_STRING = 7, ///< string (binary; in-memory)
+
+ SPH_ATTR_UINT32SET = 0x40000001UL, ///< this attr is multiple int32 values (0 or more)
+ SPH_ATTR_UINT64SET = 0x40000002UL ///< this attr is multiple int64 values (0 or more)
+};
+
+/// known answers
+enum
+{
+ SEARCHD_OK = 0, ///< general success, command-specific reply follows
+ SEARCHD_ERROR = 1, ///< general failure, error message follows
+ SEARCHD_RETRY = 2, ///< temporary failure, error message follows, client should retry later
+ SEARCHD_WARNING = 3 ///< general success, warning message and command-specific reply follow
+};
+
+//////////////////////////////////////////////////////////////////////////////
+
+#define SPHINX_DEBUG_OUTPUT 0
+#define SPHINX_DEBUG_CALLS 0
+
+#include <stdarg.h>
+
+#if SPHINX_DEBUG_OUTPUT
+inline void SPH_DEBUG ( const char * format, ... )
+{
+ va_list ap;
+ va_start ( ap, format );
+ fprintf ( stderr, "SphinxSE: " );
+ vfprintf ( stderr, format, ap );
+ fprintf ( stderr, "\n" );
+ va_end ( ap );
+}
+#else
+inline void SPH_DEBUG ( const char *, ... ) {}
+#endif
+
+#if SPHINX_DEBUG_CALLS
+
+#define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
+#define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
+#define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
+#define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
+
+#else
+
+#define SPH_ENTER_FUNC()
+#define SPH_ENTER_METHOD()
+#define SPH_RET(_arg) { return(_arg); }
+#define SPH_VOID_RET() { return; }
+
+#endif
+
+
+#define SafeDelete(_arg) { delete ( _arg ); (_arg) = NULL; }
+#define SafeDeleteArray(_arg) { if ( _arg ) { delete [] ( _arg ); (_arg) = NULL; } }
+
+//////////////////////////////////////////////////////////////////////////////
+
+/// per-table structure that will be shared among all open Sphinx SE handlers
+struct CSphSEShare
+{
+ pthread_mutex_t m_tMutex;
+ THR_LOCK m_tLock;
+
+ char * m_sTable;
+ char * m_sScheme; ///< our connection string
+ char * m_sHost; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
+ char * m_sSocket; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
+ char * m_sIndex; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
+ ushort m_iPort;
+ bool m_bSphinxQL; ///< is this read-only SphinxAPI table, or write-only SphinxQL table?
+ uint m_iTableNameLen;
+ uint m_iUseCount;
+#if MYSQL_VERSION_ID<50610
+ CHARSET_INFO * m_pTableQueryCharset;
+#else
+ const CHARSET_INFO * m_pTableQueryCharset;
+#endif
+
+ int m_iTableFields;
+ char ** m_sTableField;
+ enum_field_types * m_eTableFieldType;
+
+ CSphSEShare ()
+ : m_sTable ( NULL )
+ , m_sScheme ( NULL )
+ , m_sHost ( NULL )
+ , m_sSocket ( NULL )
+ , m_sIndex ( NULL )
+ , m_iPort ( 0 )
+ , m_bSphinxQL ( false )
+ , m_iTableNameLen ( 0 )
+ , m_iUseCount ( 1 )
+ , m_pTableQueryCharset ( NULL )
+
+ , m_iTableFields ( 0 )
+ , m_sTableField ( NULL )
+ , m_eTableFieldType ( NULL )
+ {
+ thr_lock_init ( &m_tLock );
+ pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
+ }
+
+ ~CSphSEShare ()
+ {
+ pthread_mutex_destroy ( &m_tMutex );
+ thr_lock_delete ( &m_tLock );
+
+ SafeDeleteArray ( m_sTable );
+ SafeDeleteArray ( m_sScheme );
+ ResetTable ();
+ }
+
+ void ResetTable ()
+ {
+ for ( int i=0; i<m_iTableFields; i++ )
+ SafeDeleteArray ( m_sTableField[i] );
+ SafeDeleteArray ( m_sTableField );
+ SafeDeleteArray ( m_eTableFieldType );
+ }
+};
+
+/// schema attribute
+struct CSphSEAttr
+{
+ char * m_sName; ///< attribute name (received from Sphinx)
+ uint32 m_uType; ///< attribute type (received from Sphinx)
+ int m_iField; ///< field index in current table (-1 if none)
+
+ CSphSEAttr()
+ : m_sName ( NULL )
+ , m_uType ( SPH_ATTR_NONE )
+ , m_iField ( -1 )
+ {}
+
+ ~CSphSEAttr ()
+ {
+ SafeDeleteArray ( m_sName );
+ }
+};
+
+/// word stats
+struct CSphSEWordStats
+{
+ char * m_sWord;
+ int m_iDocs;
+ int m_iHits;
+
+ CSphSEWordStats ()
+ : m_sWord ( NULL )
+ , m_iDocs ( 0 )
+ , m_iHits ( 0 )
+ {}
+
+ ~CSphSEWordStats ()
+ {
+ SafeDeleteArray ( m_sWord );
+ }
+};
+
+/// request stats
+struct CSphSEStats
+{
+public:
+ int m_iMatchesTotal;
+ int m_iMatchesFound;
+ int m_iQueryMsec;
+ int m_iWords;
+ CSphSEWordStats * m_dWords;
+ bool m_bLastError;
+ char m_sLastMessage[1024];
+
+ CSphSEStats()
+ : m_dWords ( NULL )
+ {
+ Reset ();
+ }
+
+ void Reset ()
+ {
+ m_iMatchesTotal = 0;
+ m_iMatchesFound = 0;
+ m_iQueryMsec = 0;
+ m_iWords = 0;
+ m_bLastError = false;
+ m_sLastMessage[0] = '\0';
+ SafeDeleteArray ( m_dWords );
+ }
+
+ ~CSphSEStats()
+ {
+ SafeDeleteArray ( m_dWords );
+ }
+};
+
+/// thread local storage
+struct CSphSEThreadTable
+{
+ static const int MAX_QUERY_LEN = 262144; // 256k should be enough, right?
+
+ bool m_bStats;
+ CSphSEStats m_tStats;
+
+ bool m_bQuery;
+ char m_sQuery[MAX_QUERY_LEN];
+
+#if MYSQL_VERSION_ID<50610
+ CHARSET_INFO * m_pQueryCharset;
+#else
+ const CHARSET_INFO * m_pQueryCharset;
+#endif
+
+ bool m_bReplace; ///< are we doing an INSERT or REPLACE
+
+ bool m_bCondId; ///< got a value from condition pushdown
+ longlong m_iCondId; ///< value acquired from id=value condition pushdown
+ bool m_bCondDone; ///< index_read() is now over
+
+ const ha_sphinx * m_pHandler;
+ CSphSEThreadTable * m_pTableNext;
+
+ CSphSEThreadTable ( const ha_sphinx * pHandler )
+ : m_bStats ( false )
+ , m_bQuery ( false )
+ , m_pQueryCharset ( NULL )
+ , m_bReplace ( false )
+ , m_bCondId ( false )
+ , m_iCondId ( 0 )
+ , m_bCondDone ( false )
+ , m_pHandler ( pHandler )
+ , m_pTableNext ( NULL )
+ {}
+};
+
+
+struct CSphTLS
+{
+ CSphSEThreadTable * m_pHeadTable;
+
+ explicit CSphTLS ( const ha_sphinx * pHandler )
+ {
+ m_pHeadTable = new CSphSEThreadTable ( pHandler );
+ }
+
+ ~CSphTLS()
+ {
+ CSphSEThreadTable * pCur = m_pHeadTable;
+ while ( pCur )
+ {
+ CSphSEThreadTable * pNext = pCur->m_pTableNext;
+ SafeDelete ( pCur );
+ pCur = pNext;
+ }
+ }
+};
+
+
+/// filter types
+enum ESphFilter
+{
+ SPH_FILTER_VALUES = 0, ///< filter by integer values set
+ SPH_FILTER_RANGE = 1, ///< filter by integer range
+ SPH_FILTER_FLOATRANGE = 2 ///< filter by float range
+};
+
+
+/// search query filter
+struct CSphSEFilter
+{
+public:
+ ESphFilter m_eType;
+ char * m_sAttrName;
+ longlong m_uMinValue;
+ longlong m_uMaxValue;
+ float m_fMinValue;
+ float m_fMaxValue;
+ int m_iValues;
+ longlong * m_pValues;
+ int m_bExclude;
+
+public:
+ CSphSEFilter ()
+ : m_eType ( SPH_FILTER_VALUES )
+ , m_sAttrName ( NULL )
+ , m_uMinValue ( 0 )
+ , m_uMaxValue ( UINT_MAX )
+ , m_fMinValue ( 0.0f )
+ , m_fMaxValue ( 0.0f )
+ , m_iValues ( 0 )
+ , m_pValues ( NULL )
+ , m_bExclude ( 0 )
+ {
+ }
+
+ ~CSphSEFilter ()
+ {
+ SafeDeleteArray ( m_pValues );
+ }
+};
+
+
+/// float vs dword conversion
+inline uint32 sphF2DW ( float f ) { union { float f; uint32 d; } u; u.f = f; return u.d; }
+
+/// dword vs float conversion
+inline float sphDW2F ( uint32 d ) { union { float f; uint32 d; } u; u.d = d; return u.f; }
+
+
+/// client-side search query
+struct CSphSEQuery
+{
+public:
+ const char * m_sHost;
+ int m_iPort;
+
+private:
+ char * m_sQueryBuffer;
+
+ const char * m_sIndex;
+ int m_iOffset;
+ int m_iLimit;
+
+ bool m_bQuery;
+ const char * m_sQuery;
+ uint32 * m_pWeights;
+ int m_iWeights;
+ ESphMatchMode m_eMode;
+ ESphRankMode m_eRanker;
+ char * m_sRankExpr;
+ ESphSortOrder m_eSort;
+ const char * m_sSortBy;
+ int m_iMaxMatches;
+ int m_iMaxQueryTime;
+ uint32 m_iMinID;
+ uint32 m_iMaxID;
+
+ int m_iFilters;
+ CSphSEFilter m_dFilters[SPHINXSE_MAX_FILTERS];
+
+ ESphGroupBy m_eGroupFunc;
+ const char * m_sGroupBy;
+ const char * m_sGroupSortBy;
+ int m_iCutoff;
+ int m_iRetryCount;
+ int m_iRetryDelay;
+ const char * m_sGroupDistinct; ///< points to query buffer; do NOT delete
+ int m_iIndexWeights;
+ char * m_sIndexWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
+ int m_iIndexWeight[SPHINXSE_MAX_FILTERS];
+ int m_iFieldWeights;
+ char * m_sFieldWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
+ int m_iFieldWeight[SPHINXSE_MAX_FILTERS];
+
+ bool m_bGeoAnchor;
+ const char * m_sGeoLatAttr;
+ const char * m_sGeoLongAttr;
+ float m_fGeoLatitude;
+ float m_fGeoLongitude;
+
+ char * m_sComment;
+ char * m_sSelect;
+
+ struct Override_t
+ {
+ Override_t() : m_dIds(PSI_INSTRUMENT_MEM), m_dValues(PSI_INSTRUMENT_MEM) {}
+ union Value_t
+ {
+ uint32 m_uValue;
+ longlong m_iValue64;
+ float m_fValue;
+ };
+ char * m_sName; ///< points to query buffer
+ int m_iType;
+ Dynamic_array<ulonglong> m_dIds;
+ Dynamic_array<Value_t> m_dValues;
+ };
+ Dynamic_array<Override_t *> m_dOverrides;
+
+public:
+ char m_sParseError[256];
+
+public:
+ CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
+ ~CSphSEQuery ();
+
+ bool Parse ();
+ int BuildRequest ( char ** ppBuffer );
+
+protected:
+ char * m_pBuf;
+ char * m_pCur;
+ int m_iBufLeft;
+ bool m_bBufOverrun;
+
+ template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
+ bool ParseField ( char * sField );
+
+ void SendBytes ( const void * pBytes, int iBytes );
+ void SendWord ( short int v ) { v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
+ void SendInt ( int v ) { v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
+ void SendDword ( uint v ) { v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
+ void SendUint64 ( ulonglong v ) { SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
+ void SendString ( const char * v ) { int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
+ void SendFloat ( float v ) { SendDword ( sphF2DW(v) ); }
+};
+
+#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
+template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
+template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
+#endif
+
+//////////////////////////////////////////////////////////////////////////////
+
+#if MYSQL_VERSION_ID>50100
+
+#if MYSQL_VERSION_ID<50114
+#error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
+#endif
+
+static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
+static int sphinx_init_func ( void * p );
+static int sphinx_close_connection ( handlerton * hton, THD * thd );
+static int sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
+static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
+
+#else
+
+static bool sphinx_init_func_for_handlerton ();
+static int sphinx_close_connection ( THD * thd );
+bool sphinx_show_status ( THD * thd );
+
+#endif // >50100
+
+//////////////////////////////////////////////////////////////////////////////
+
+static const char sphinx_hton_name[] = "SPHINX";
+static const char sphinx_hton_comment[] = "Sphinx storage engine " SPHINXSE_VERSION;
+
+#if MYSQL_VERSION_ID<50100
+handlerton sphinx_hton =
+{
+ #ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
+ MYSQL_HANDLERTON_INTERFACE_VERSION,
+ #endif
+ sphinx_hton_name,
+ SHOW_OPTION_YES,
+ sphinx_hton_comment,
+ DB_TYPE_SPHINX_DB,
+ sphinx_init_func_for_handlerton,
+ 0, // slot
+ 0, // savepoint size
+ sphinx_close_connection, // close_connection
+ NULL, // savepoint
+ NULL, // rollback to savepoint
+ NULL, // release savepoint
+ NULL, // commit
+ NULL, // rollback
+ NULL, // prepare
+ NULL, // recover
+ NULL, // commit_by_xid
+ NULL, // rollback_by_xid
+ NULL, // create_cursor_read_view
+ NULL, // set_cursor_read_view
+ NULL, // close_cursor_read_view
+ HTON_CAN_RECREATE | HTON_AUTOMATIC_DELETE_TABLE
+};
+#else
+static handlerton * sphinx_hton_ptr = NULL;
+#endif
+
+//////////////////////////////////////////////////////////////////////////////
+
+// variables for Sphinx shared methods
+pthread_mutex_t sphinx_mutex; // mutex to init the hash
+static int sphinx_init = 0; // flag whether the hash was initialized
+static HASH sphinx_open_tables; // hash used to track open tables
+
+//////////////////////////////////////////////////////////////////////////////
+// INITIALIZATION AND SHUTDOWN
+//////////////////////////////////////////////////////////////////////////////
+
+// hashing function
+#if MYSQL_VERSION_ID>=50120
+typedef size_t GetKeyLength_t;
+#else
+typedef uint GetKeyLength_t;
+#endif
+
+static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
+{
+ CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
+ *pLength = (size_t) pShare->m_iTableNameLen;
+ return (byte*) pShare->m_sTable;
+}
+
+#if MYSQL_VERSION_ID<50100
+static int sphinx_init_func ( void * ) // to avoid unused arg warning
+#else
+static int sphinx_init_func ( void * p )
+#endif
+{
+ SPH_ENTER_FUNC();
+ if ( !sphinx_init )
+ {
+ sphinx_init = 1;
+ void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
+ sphinx_hash_init ( PSI_NOT_INSTRUMENTED, &sphinx_open_tables,
+ system_charset_info, 32, 0, 0,
+ sphinx_get_key, 0, 0 );
+
+ #if MYSQL_VERSION_ID > 50100
+ handlerton * hton = (handlerton*) p;
+ hton->db_type = DB_TYPE_AUTOASSIGN;
+ hton->create = sphinx_create_handler;
+ hton->close_connection = sphinx_close_connection;
+ hton->show_status = sphinx_show_status;
+ hton->panic = sphinx_panic;
+ hton->drop_table= [](handlerton *, const char*) { return -1; };
+ hton->flags = HTON_CAN_RECREATE;
+ #endif
+ }
+ SPH_RET(0);
+}
+
+
+#if MYSQL_VERSION_ID<50100
+static bool sphinx_init_func_for_handlerton ()
+{
+ return sphinx_init_func ( &sphinx_hton );
+}
+#endif
+
+
+#if MYSQL_VERSION_ID>50100
+
+static int sphinx_close_connection ( handlerton * hton, THD * thd )
+{
+ // deallocate common handler data
+ SPH_ENTER_FUNC();
+ CSphTLS * pTls = (CSphTLS *) thd_get_ha_data ( thd, hton );
+ SafeDelete ( pTls );
+ SPH_RET(0);
+}
+
+
+static int sphinx_done_func ( void * )
+{
+ SPH_ENTER_FUNC();
+
+ int error __attribute__ ((unused)) = 0;
+ if ( sphinx_init )
+ {
+ sphinx_init = 0;
+ if ( sphinx_open_tables.records )
+ error = 1;
+ sphinx_hash_free ( &sphinx_open_tables );
+ pthread_mutex_destroy ( &sphinx_mutex );
+ }
+
+ SPH_RET(0);
+}
+
+
+static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
+{
+ return sphinx_done_func ( hton );
+}
+
+#else
+
+static int sphinx_close_connection ( THD * thd )
+{
+ // deallocate common handler data
+ SPH_ENTER_FUNC();
+ CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
+ SafeDelete ( pTls );
+ thd->ha_data[sphinx_hton.slot] = NULL;
+ SPH_RET(0);
+}
+
+#endif // >50100
+
+//////////////////////////////////////////////////////////////////////////////
+// SHOW STATUS
+//////////////////////////////////////////////////////////////////////////////
+
+#if MYSQL_VERSION_ID>50100
+static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
+ enum ha_stat_type )
+#else
+bool sphinx_show_status ( THD * thd )
+#endif
+{
+ SPH_ENTER_FUNC();
+
+#if MYSQL_VERSION_ID<50100
+ Protocol * protocol = thd->protocol;
+ List<Item> field_list;
+#endif
+
+ char buf1[IO_SIZE];
+ uint buf1len;
+ char buf2[IO_SIZE];
+ uint buf2len = 0;
+ String words;
+
+ buf1[0] = '\0';
+ buf2[0] = '\0';
+
+
+#if MYSQL_VERSION_ID>50100
+ // 5.1.x style stats
+ CSphTLS * pTls = (CSphTLS*) ( thd_get_ha_data ( thd, hton ) );
+
+#define LOC_STATS(_key,_keylen,_val,_vallen) \
+ stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
+
+#else
+ // 5.0.x style stats
+ if ( have_sphinx_db!=SHOW_OPTION_YES )
+ {
+ my_message ( ER_NOT_SUPPORTED_YET,
+ "failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
+ MYF(0) );
+ SPH_RET(TRUE);
+ }
+ CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
+
+ field_list.push_back ( new Item_empty_string ( thd, "Type", 10 ) );
+ field_list.push_back ( new Item_empty_string ( thd, "Name", FN_REFLEN ) );
+ field_list.push_back ( new Item_empty_string ( thd, "Status", 10 ) );
+ if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
+ SPH_RET(TRUE);
+
+#define LOC_STATS(_key,_keylen,_val,_vallen) \
+ protocol->prepare_for_resend (); \
+ protocol->store ( "SPHINX", 6, system_charset_info ); \
+ protocol->store ( _key, _keylen, system_charset_info ); \
+ protocol->store ( _val, _vallen, system_charset_info ); \
+ if ( protocol->write() ) \
+ SPH_RET(TRUE);
+
+#endif
+
+
+ // show query stats
+ if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
+ {
+ const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
+ buf1len = my_snprintf ( buf1, sizeof(buf1),
+ "total: %d, total found: %d, time: %d, words: %d",
+ pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
+
+ LOC_STATS ( "stats", 5, buf1, buf1len );
+
+ if ( pStats->m_iWords )
+ {
+ for ( int i=0; i<pStats->m_iWords; i++ )
+ {
+ CSphSEWordStats & tWord = pStats->m_dWords[i];
+ buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
+ buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
+ }
+
+ // convert it if we can
+ const char * sWord = buf2;
+ int iWord = buf2len;
+
+ String sBuf3;
+ if ( pTls->m_pHeadTable->m_pQueryCharset )
+ {
+ uint iErrors;
+ sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
+ sWord = sBuf3.c_ptr();
+ iWord = sBuf3.length();
+ }
+
+ LOC_STATS ( "words", 5, sWord, iWord );
+ }
+ }
+
+ // show last error or warning (either in addition to stats, or on their own)
+ if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
+ {
+ const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
+
+ LOC_STATS (
+ sMessageType, strlen ( sMessageType ),
+ pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
+
+ } else
+ {
+ // well, nothing to show just yet
+#if MYSQL_VERSION_ID < 50100
+ LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
+#endif
+ }
+
+#if MYSQL_VERSION_ID < 50100
+ send_eof(thd);
+#endif
+
+ SPH_RET(FALSE);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// HELPERS
+//////////////////////////////////////////////////////////////////////////////
+
+static char * sphDup ( const char * sSrc, int iLen=-1 )
+{
+ if ( !sSrc )
+ return NULL;
+
+ if ( iLen<0 )
+ iLen = strlen(sSrc);
+
+ char * sRes = new char [ 1+iLen ];
+ memcpy ( sRes, sSrc, iLen );
+ sRes[iLen] = '\0';
+ return sRes;
+}
+
+
+static void sphLogError ( const char * sFmt, ... )
+{
+ // emit timestamp
+#ifdef _WIN32
+ SYSTEMTIME t;
+ GetLocalTime ( &t );
+
+ fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
+ (int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
+ (int)t.wHour, (int)t.wMinute, (int)t.wSecond );
+#else
+ // Unix version
+ time_t tStamp;
+ time ( &tStamp );
+
+ struct tm * pParsed;
+#ifdef HAVE_LOCALTIME_R
+ struct tm tParsed;
+ localtime_r ( &tStamp, &tParsed );
+ pParsed = &tParsed;
+#else
+ pParsed = localtime ( &tStamp );
+#endif // HAVE_LOCALTIME_R
+
+ fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
+ pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
+ pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
+#endif // _WIN32
+
+ // emit message
+ va_list ap;
+ va_start ( ap, sFmt );
+ vfprintf ( stderr, sFmt, ap );
+ va_end ( ap );
+
+ // emit newline
+ fprintf ( stderr, "\n" );
+}
+
+
+
+// the following scheme variants are recognized
+//
+// sphinx://host[:port]/index
+// sphinxql://host[:port]/index
+// unix://unix/domain/socket[:index]
+static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
+{
+ SPH_ENTER_FUNC();
+
+ if ( share )
+ {
+ // check incoming stuff
+ if ( !table )
+ {
+ sphLogError ( "table==NULL in ParseUrl()" );
+ return false;
+ }
+ if ( !table->s )
+ {
+ sphLogError ( "(table->s)==NULL in ParseUrl()" );
+ return false;
+ }
+
+ // free old stuff
+ share->ResetTable ();
+
+ // fill new stuff
+ share->m_iTableFields = table->s->fields;
+ if ( share->m_iTableFields )
+ {
+ share->m_sTableField = new char * [ share->m_iTableFields ];
+ share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
+
+ for ( int i=0; i<share->m_iTableFields; i++ )
+ {
+ share->m_sTableField[i] = sphDup ( table->field[i]->field_name.str );
+ share->m_eTableFieldType[i] = table->field[i]->type();
+ }
+ }
+ }
+
+ // defaults
+ bool bOk = true;
+ bool bQL = false;
+ char * sScheme = NULL;
+ char * sHost = (char*) SPHINXAPI_DEFAULT_HOST;
+ char * sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
+ int iPort = SPHINXAPI_DEFAULT_PORT;
+
+ // parse connection string, if any
+ while ( table->s->connect_string.length!=0 )
+ {
+ sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
+
+ sHost = strstr ( sScheme, "://" );
+ if ( !sHost )
+ {
+ bOk = false;
+ break;
+ }
+ sHost[0] = '\0';
+ sHost += 3;
+
+ /////////////////////////////
+ // sphinxapi via unix socket
+ /////////////////////////////
+
+ if ( !strcmp ( sScheme, "unix" ) )
+ {
+ sHost--; // reuse last slash
+ iPort = 0;
+ if (!( sIndex = strrchr ( sHost, ':' ) ))
+ sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
+ else
+ {
+ *sIndex++ = '\0';
+ if ( !*sIndex )
+ sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
+ }
+ bOk = true;
+ break;
+ }
+
+ /////////////////////
+ // sphinxapi via tcp
+ /////////////////////
+
+ if ( !strcmp ( sScheme, "sphinx" ) )
+ {
+ char * sPort = strchr ( sHost, ':' );
+ if ( sPort )
+ {
+ *sPort++ = '\0';
+ if ( *sPort )
+ {
+ sIndex = strchr ( sPort, '/' );
+ if ( sIndex )
+ *sIndex++ = '\0';
+ else
+ sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
+
+ iPort = atoi(sPort);
+ if ( !iPort )
+ iPort = SPHINXAPI_DEFAULT_PORT;
+ }
+ } else
+ {
+ sIndex = strchr ( sHost, '/' );
+ if ( sIndex )
+ *sIndex++ = '\0';
+ else
+ sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
+ }
+ bOk = true;
+ break;
+ }
+
+ ////////////
+ // sphinxql
+ ////////////
+
+ if ( !strcmp ( sScheme, "sphinxql" ) )
+ {
+ bQL = true;
+ iPort = SPHINXQL_DEFAULT_PORT;
+
+ // handle port
+ char * sPort = strchr ( sHost, ':' );
+ sIndex = sHost; // starting point for index name search
+
+ if ( sPort )
+ {
+ *sPort++ = '\0';
+ sIndex = sPort;
+
+ iPort = atoi(sPort);
+ if ( !iPort )
+ {
+ bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
+ break;
+ }
+ }
+
+ // find index
+ sIndex = strchr ( sIndex, '/' );
+ if ( sIndex )
+ *sIndex++ = '\0';
+
+ // final checks
+ // host and index names are required
+ bOk = ( sHost && *sHost && sIndex && *sIndex );
+ break;
+ }
+
+ // unknown case
+ bOk = false;
+ break;
+ }
+
+ if ( !bOk )
+ {
+ my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
+ MYF(0), table->s->connect_string.str);
+ } else
+ {
+ if ( share )
+ {
+ SafeDeleteArray ( share->m_sScheme );
+ share->m_sScheme = sScheme;
+ share->m_sHost = sHost;
+ share->m_sIndex = sIndex;
+ share->m_iPort = (ushort)iPort;
+ share->m_bSphinxQL = bQL;
+ }
+ }
+ if ( !bOk && !share )
+ SafeDeleteArray ( sScheme );
+
+ SPH_RET(bOk);
+}
+
+
+// Example of simple lock controls. The "share" it creates is structure we will
+// pass to each sphinx handler. Do you have to have one of these? Well, you have
+// pieces that are used for locking, and they are needed to function.
+static CSphSEShare * get_share ( const char * table_name, TABLE * table )
+{
+ SPH_ENTER_FUNC();
+ pthread_mutex_lock ( &sphinx_mutex );
+
+ CSphSEShare * pShare = NULL;
+ for ( ;; )
+ {
+ // check if we already have this share
+#if MYSQL_VERSION_ID>=50120
+ pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
+#else
+#ifdef _WIN32
+ pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
+#else
+ pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
+#endif // win
+#endif // pre-5.1.20
+
+ if ( pShare )
+ {
+ pShare->m_iUseCount++;
+ break;
+ }
+
+ // try to allocate new share
+ pShare = new CSphSEShare ();
+ if ( !pShare )
+ break;
+
+ // try to setup it
+ if ( !ParseUrl ( pShare, table, false ) )
+ {
+ SafeDelete ( pShare );
+ break;
+ }
+
+ if ( !pShare->m_bSphinxQL )
+ pShare->m_pTableQueryCharset = table->field[2]->charset();
+
+ // try to hash it
+ pShare->m_iTableNameLen = strlen(table_name);
+ pShare->m_sTable = sphDup ( table_name );
+ if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
+ {
+ SafeDelete ( pShare );
+ break;
+ }
+
+ // all seems fine
+ break;
+ }
+
+ pthread_mutex_unlock ( &sphinx_mutex );
+ SPH_RET(pShare);
+}
+
+
+// Free lock controls. We call this whenever we close a table. If the table had
+// the last reference to the share then we free memory associated with it.
+static int free_share ( CSphSEShare * pShare )
+{
+ SPH_ENTER_FUNC();
+ pthread_mutex_lock ( &sphinx_mutex );
+
+ if ( !--pShare->m_iUseCount )
+ {
+ sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
+ SafeDelete ( pShare );
+ }
+
+ pthread_mutex_unlock ( &sphinx_mutex );
+ SPH_RET(0);
+}
+
+
+#if MYSQL_VERSION_ID>50100
+static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
+{
+ sphinx_hton_ptr = hton;
+ return new ( mem_root ) ha_sphinx ( hton, table );
+}
+#endif
+
+//////////////////////////////////////////////////////////////////////////////
+// CLIENT-SIDE REQUEST STUFF
+//////////////////////////////////////////////////////////////////////////////
+
+CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
+ : m_sHost ( "" )
+ , m_iPort ( 0 )
+ , m_sIndex ( sIndex ? sIndex : "*" )
+ , m_iOffset ( 0 )
+ , m_iLimit ( 20 )
+ , m_bQuery ( false )
+ , m_sQuery ( "" )
+ , m_pWeights ( NULL )
+ , m_iWeights ( 0 )
+ , m_eMode ( SPH_MATCH_ALL )
+ , m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
+ , m_sRankExpr ( NULL )
+ , m_eSort ( SPH_SORT_RELEVANCE )
+ , m_sSortBy ( "" )
+ , m_iMaxMatches ( 1000 )
+ , m_iMaxQueryTime ( 0 )
+ , m_iMinID ( 0 )
+ , m_iMaxID ( 0 )
+ , m_iFilters ( 0 )
+ , m_eGroupFunc ( SPH_GROUPBY_DAY )
+ , m_sGroupBy ( "" )
+ , m_sGroupSortBy ( "@group desc" )
+ , m_iCutoff ( 0 )
+ , m_iRetryCount ( 0 )
+ , m_iRetryDelay ( 0 )
+ , m_sGroupDistinct ( "" )
+ , m_iIndexWeights ( 0 )
+ , m_iFieldWeights ( 0 )
+ , m_bGeoAnchor ( false )
+ , m_sGeoLatAttr ( "" )
+ , m_sGeoLongAttr ( "" )
+ , m_fGeoLatitude ( 0.0f )
+ , m_fGeoLongitude ( 0.0f )
+ , m_sComment ( (char*) "" )
+ , m_sSelect ( (char*) "*" )
+ , m_dOverrides (PSI_INSTRUMENT_MEM)
+
+ , m_pBuf ( NULL )
+ , m_pCur ( NULL )
+ , m_iBufLeft ( 0 )
+ , m_bBufOverrun ( false )
+{
+ m_sQueryBuffer = new char [ iLength+2 ];
+ memcpy ( m_sQueryBuffer, sQuery, iLength );
+ m_sQueryBuffer[iLength] = ';';
+ m_sQueryBuffer[iLength+1] = '\0';
+}
+
+
+CSphSEQuery::~CSphSEQuery ()
+{
+ SPH_ENTER_METHOD();
+ SafeDeleteArray ( m_sQueryBuffer );
+ SafeDeleteArray ( m_pWeights );
+ SafeDeleteArray ( m_pBuf );
+ for ( size_t i=0; i<m_dOverrides.elements(); i++ )
+ SafeDelete ( m_dOverrides.at(i) );
+ SPH_VOID_RET();
+}
+
+
+template < typename T >
+int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
+{
+ SPH_ENTER_METHOD();
+
+ assert ( ppValues );
+ assert ( !(*ppValues) );
+
+ const char * pValue;
+ bool bPrevDigit = false;
+ int iValues = 0;
+
+ // count the values
+ for ( pValue=sValue; *pValue; pValue++ )
+ {
+ bool bDigit = (*pValue)>='0' && (*pValue)<='9';
+ if ( bDigit && !bPrevDigit )
+ iValues++;
+ bPrevDigit = bDigit;
+ }
+ if ( !iValues )
+ SPH_RET(0);
+
+ // extract the values
+ T * pValues = new T [ iValues ];
+ *ppValues = pValues;
+
+ int iIndex = 0, iSign = 1;
+ T uValue = 0;
+
+ bPrevDigit = false;
+ for ( pValue=sValue ;; pValue++ )
+ {
+ bool bDigit = (*pValue)>='0' && (*pValue)<='9';
+
+ if ( bDigit )
+ {
+ if ( !bPrevDigit )
+ uValue = 0;
+ uValue = uValue*10 + ( (*pValue)-'0' );
+ } else if ( bPrevDigit )
+ {
+ assert ( iIndex<iValues );
+ pValues [ iIndex++ ] = uValue * iSign;
+ iSign = 1;
+ } else if ( *pValue=='-' )
+ iSign = -1;
+
+ bPrevDigit = bDigit;
+ if ( !*pValue )
+ break;
+ }
+
+ SPH_RET ( iValues );
+}
+
+
+static char * chop ( char * s )
+{
+ while ( *s && isspace(*s) )
+ s++;
+
+ char * p = s + strlen(s);
+ while ( p>s && isspace ( p[-1] ) )
+ p--;
+ *p = '\0';
+
+ return s;
+}
+
+
+static bool myisattr ( char c )
+{
+ return
+ ( c>='0' && c<='9' ) ||
+ ( c>='a' && c<='z' ) ||
+ ( c>='A' && c<='Z' ) ||
+ c=='_';
+}
+
+static bool myismagic ( char c )
+{
+ return c=='@';
+}
+
+static bool myisjson ( char c )
+{
+ return
+ c=='.' ||
+ c=='[' ||
+ c==']';
+}
+
+
+bool CSphSEQuery::ParseField ( char * sField )
+{
+ SPH_ENTER_METHOD();
+
+ // look for option name/value separator
+ char * sValue = strchr ( sField, '=' );
+ if ( !sValue || sValue==sField || sValue[-1]=='\\' )
+ {
+ // by default let's assume it's just query
+ if ( sField[0] )
+ {
+ if ( m_bQuery )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
+ SPH_RET(false);
+ } else
+ {
+ m_sQuery = sField;
+ m_bQuery = true;
+
+ // unescape only 1st one
+ char *s = sField, *d = sField;
+ int iSlashes = 0;
+ while ( *s )
+ {
+ iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
+ if ( ( iSlashes%2 )==0 ) *d++ = *s;
+ s++;
+ }
+ *d = '\0';
+ }
+ }
+ SPH_RET(true);
+ }
+
+ // split
+ *sValue++ = '\0';
+ sValue = chop ( sValue );
+ int iValue = atoi ( sValue );
+
+ // handle options
+ char * sName = chop ( sField );
+
+ if ( !strcmp ( sName, "query" ) ) m_sQuery = sValue;
+ else if ( !strcmp ( sName, "host" ) ) m_sHost = sValue;
+ else if ( !strcmp ( sName, "port" ) ) m_iPort = iValue;
+ else if ( !strcmp ( sName, "index" ) ) m_sIndex = sValue;
+ else if ( !strcmp ( sName, "offset" ) ) m_iOffset = iValue;
+ else if ( !strcmp ( sName, "limit" ) ) m_iLimit = iValue;
+ else if ( !strcmp ( sName, "weights" ) ) m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
+ else if ( !strcmp ( sName, "minid" ) ) m_iMinID = iValue;
+ else if ( !strcmp ( sName, "maxid" ) ) m_iMaxID = iValue;
+ else if ( !strcmp ( sName, "maxmatches" ) ) m_iMaxMatches = iValue;
+ else if ( !strcmp ( sName, "maxquerytime" ) ) m_iMaxQueryTime = iValue;
+ else if ( !strcmp ( sName, "groupsort" ) ) m_sGroupSortBy = sValue;
+ else if ( !strcmp ( sName, "distinct" ) ) m_sGroupDistinct = sValue;
+ else if ( !strcmp ( sName, "cutoff" ) ) m_iCutoff = iValue;
+ else if ( !strcmp ( sName, "comment" ) ) m_sComment = sValue;
+ else if ( !strcmp ( sName, "select" ) ) m_sSelect = sValue;
+
+ else if ( !strcmp ( sName, "mode" ) )
+ {
+ m_eMode = SPH_MATCH_ALL;
+ if ( !strcmp ( sValue, "any" ) ) m_eMode = SPH_MATCH_ANY;
+ else if ( !strcmp ( sValue, "phrase" ) ) m_eMode = SPH_MATCH_PHRASE;
+ else if ( !strcmp ( sValue, "boolean" ) ) m_eMode = SPH_MATCH_BOOLEAN;
+ else if ( !strcmp ( sValue, "ext" ) ) m_eMode = SPH_MATCH_EXTENDED;
+ else if ( !strcmp ( sValue, "extended" ) ) m_eMode = SPH_MATCH_EXTENDED;
+ else if ( !strcmp ( sValue, "ext2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
+ else if ( !strcmp ( sValue, "extended2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
+ else if ( !strcmp ( sValue, "all" ) ) m_eMode = SPH_MATCH_ALL;
+ else if ( !strcmp ( sValue, "fullscan" ) ) m_eMode = SPH_MATCH_FULLSCAN;
+ else
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
+ SPH_RET(false);
+ }
+ } else if ( !strcmp ( sName, "ranker" ) )
+ {
+ m_eRanker = SPH_RANK_PROXIMITY_BM25;
+ if ( !strcmp ( sValue, "proximity_bm25" ) ) m_eRanker = SPH_RANK_PROXIMITY_BM25;
+ else if ( !strcmp ( sValue, "bm25" ) ) m_eRanker = SPH_RANK_BM25;
+ else if ( !strcmp ( sValue, "none" ) ) m_eRanker = SPH_RANK_NONE;
+ else if ( !strcmp ( sValue, "wordcount" ) ) m_eRanker = SPH_RANK_WORDCOUNT;
+ else if ( !strcmp ( sValue, "proximity" ) ) m_eRanker = SPH_RANK_PROXIMITY;
+ else if ( !strcmp ( sValue, "matchany" ) ) m_eRanker = SPH_RANK_MATCHANY;
+ else if ( !strcmp ( sValue, "fieldmask" ) ) m_eRanker = SPH_RANK_FIELDMASK;
+ else if ( !strcmp ( sValue, "sph04" ) ) m_eRanker = SPH_RANK_SPH04;
+ else if ( !strncmp ( sValue, "expr:", 5 ) )
+ {
+ m_eRanker = SPH_RANK_EXPR;
+ m_sRankExpr = sValue+5;
+ } else
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
+ SPH_RET(false);
+ }
+ } else if ( !strcmp ( sName, "sort" ) )
+ {
+ static const struct
+ {
+ const char * m_sName;
+ ESphSortOrder m_eSort;
+ } dSortModes[] =
+ {
+ { "relevance", SPH_SORT_RELEVANCE },
+ { "attr_desc:", SPH_SORT_ATTR_DESC },
+ { "attr_asc:", SPH_SORT_ATTR_ASC },
+ { "time_segments:", SPH_SORT_TIME_SEGMENTS },
+ { "extended:", SPH_SORT_EXTENDED },
+ { "expr:", SPH_SORT_EXPR }
+ };
+
+ int i;
+ const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
+ for ( i=0; i<nModes; i++ )
+ if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
+ {
+ m_eSort = dSortModes[i].m_eSort;
+ m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
+ break;
+ }
+ if ( i==nModes )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
+ SPH_RET(false);
+ }
+
+ } else if ( !strcmp ( sName, "groupby" ) )
+ {
+ static const struct
+ {
+ const char * m_sName;
+ ESphGroupBy m_eFunc;
+ } dGroupModes[] =
+ {
+ { "day:", SPH_GROUPBY_DAY },
+ { "week:", SPH_GROUPBY_WEEK },
+ { "month:", SPH_GROUPBY_MONTH },
+ { "year:", SPH_GROUPBY_YEAR },
+ { "attr:", SPH_GROUPBY_ATTR },
+ { "multi:", SPH_GROUPBY_MULTIPLE }
+ };
+
+ int i;
+ const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
+ for ( i=0; i<nModes; i++ )
+ if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
+ {
+ m_eGroupFunc = dGroupModes[i].m_eFunc;
+ m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
+ break;
+ }
+ if ( i==nModes )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
+ SPH_RET(false);
+ }
+
+ } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
+ ( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
+ {
+ for ( ;; )
+ {
+ char * p = sName;
+ CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
+ tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
+ tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
+
+ if (!( p = strchr ( sValue, ',' ) ))
+ break;
+ *p++ = '\0';
+
+ tFilter.m_sAttrName = chop ( sValue );
+ sValue = p;
+
+ if (!( p = strchr ( sValue, ',' ) ))
+ break;
+ *p++ = '\0';
+
+ if ( tFilter.m_eType==SPH_FILTER_RANGE )
+ {
+ tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
+ tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
+ } else
+ {
+ tFilter.m_fMinValue = (float)atof(sValue);
+ tFilter.m_fMaxValue = (float)atof(p);
+ }
+
+ // all ok
+ m_iFilters++;
+ break;
+ }
+
+ } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
+ ( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
+ {
+ for ( ;; )
+ {
+ CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
+ tFilter.m_eType = SPH_FILTER_VALUES;
+ tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
+
+ // get the attr name
+ while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
+ sValue++;
+ if ( !*sValue )
+ break;
+
+ tFilter.m_sAttrName = sValue;
+ while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) || myisjson(*sValue) ) )
+ sValue++;
+ if ( !*sValue )
+ break;
+ *sValue++ = '\0';
+
+ // get the values
+ tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
+ if ( !tFilter.m_iValues )
+ {
+ assert ( !tFilter.m_pValues );
+ break;
+ }
+
+ // all ok
+ m_iFilters++;
+ break;
+ }
+
+ } else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
+ {
+ bool bIndex = !strcmp ( sName, "indexweights" );
+ int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
+ char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
+ int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
+
+ *pCount = 0;
+
+ char * p = sValue;
+ while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
+ {
+ // extract attr name
+ if ( !myisattr(*p) )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
+ SPH_RET(false);
+ }
+
+ pNames[*pCount] = p;
+ while ( myisattr(*p) ) p++;
+
+ if ( *p!=',' )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
+ SPH_RET(false);
+ }
+ *p++ = '\0';
+
+ // extract attr value
+ char * sVal = p;
+ while ( isdigit(*p) ) p++;
+ if ( p==sVal )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
+ SPH_RET(false);
+ }
+ pWeights[*pCount] = atoi(sVal);
+ (*pCount)++;
+
+ if ( !*p )
+ break;
+ if ( *p!=',' )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
+ SPH_RET(false);
+ }
+ p++;
+ }
+
+ } else if ( !strcmp ( sName, "geoanchor" ) )
+ {
+ m_bGeoAnchor = false;
+ for ( ;; )
+ {
+ char * sLat = sValue;
+ char * p = sValue;
+
+ if (!( p = strchr ( p, ',' ) )) break;
+ *p++ = '\0';
+ char * sLong = p;
+
+ if (!( p = strchr ( p, ',' ) )) break;
+ *p++ = '\0';
+ char * sLatVal = p;
+
+ if (!( p = strchr ( p, ',' ) )) break;
+ *p++ = '\0';
+ char * sLongVal = p;
+
+ m_sGeoLatAttr = chop(sLat);
+ m_sGeoLongAttr = chop(sLong);
+ m_fGeoLatitude = (float)atof ( sLatVal );
+ m_fGeoLongitude = (float)atof ( sLongVal );
+ m_bGeoAnchor = true;
+ break;
+ }
+ if ( !m_bGeoAnchor )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
+ SPH_RET(false);
+ }
+ } else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
+ {
+ sName = NULL;
+ int iType = 0;
+ CSphSEQuery::Override_t * pOverride = NULL;
+
+ // get name and type
+ char * sRest = sValue;
+ for ( ;; )
+ {
+ sName = sRest;
+ if ( !*sName )
+ break;
+ if (!( sRest = strchr ( sRest, ',' ) ))
+ break;
+ *sRest++ = '\0';
+ char * sType = sRest;
+ if (!( sRest = strchr ( sRest, ',' ) ))
+ break;
+
+ static const struct
+ {
+ const char * m_sName;
+ int m_iType;
+ }
+ dAttrTypes[] =
+ {
+ { "int", SPH_ATTR_INTEGER },
+ { "timestamp", SPH_ATTR_TIMESTAMP },
+ { "bool", SPH_ATTR_BOOL },
+ { "float", SPH_ATTR_FLOAT },
+ { "bigint", SPH_ATTR_BIGINT }
+ };
+ for ( uint i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
+ if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
+ {
+ iType = dAttrTypes[i].m_iType;
+ break;
+ }
+ break;
+ }
+
+ // fail
+ if ( !sName || !*sName || !iType )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
+ SPH_RET(false);
+ }
+
+ // grab id:value pairs
+ sRest++;
+ while ( sRest )
+ {
+ char * sId = sRest;
+ if (!( sRest = strchr ( sRest, ':' ) )) break;
+ *sRest++ = '\0';
+ if (!( sRest - sId )) break;
+
+ sValue = sRest;
+ if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
+ *sRest++ = '\0';
+ if ( !*sValue )
+ break;
+
+ if ( !pOverride )
+ {
+ pOverride = new CSphSEQuery::Override_t;
+ pOverride->m_sName = chop(sName);
+ pOverride->m_iType = iType;
+ m_dOverrides.append ( pOverride );
+ }
+
+ ulonglong uId = strtoull ( sId, NULL, 10 );
+ CSphSEQuery::Override_t::Value_t tValue;
+ if ( iType==SPH_ATTR_FLOAT )
+ tValue.m_fValue = (float)atof(sValue);
+ else if ( iType==SPH_ATTR_BIGINT )
+ tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
+ else
+ tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
+
+ pOverride->m_dIds.append ( uId );
+ pOverride->m_dValues.append ( tValue );
+ }
+
+ if ( !pOverride )
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
+ SPH_RET(false);
+ }
+ SPH_RET(true);
+ } else
+ {
+ snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
+ SPH_RET(false);
+ }
+
+ // !COMMIT handle syntax errors
+
+ SPH_RET(true);
+}
+
+
+bool CSphSEQuery::Parse ()
+{
+ SPH_ENTER_METHOD();
+ SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
+
+ m_bQuery = false;
+ char * pCur = m_sQueryBuffer;
+ char * pNext = pCur;
+
+ while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
+ {
+ // handle escaped semicolons
+ if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
+ {
+ pNext++;
+ continue;
+ }
+
+ // handle semicolon-separated clauses
+ *pNext++ = '\0';
+ if ( !ParseField ( pCur ) )
+ SPH_RET(false);
+ pCur = pNext;
+ }
+
+ SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
+
+ SPH_RET(true);
+}
+
+
+void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
+{
+ SPH_ENTER_METHOD();
+ if ( m_iBufLeft<iBytes )
+ {
+ m_bBufOverrun = true;
+ SPH_VOID_RET();
+ }
+
+ memcpy ( m_pCur, pBytes, iBytes );
+
+ m_pCur += iBytes;
+ m_iBufLeft -= iBytes;
+ SPH_VOID_RET();
+}
+
+
+int CSphSEQuery::BuildRequest ( char ** ppBuffer )
+{
+ SPH_ENTER_METHOD();
+
+ // calc request length
+ int iReqSize = 128 + 4*m_iWeights
+ + strlen ( m_sSortBy )
+ + strlen ( m_sQuery )
+ + strlen ( m_sIndex )
+ + strlen ( m_sGroupBy )
+ + strlen ( m_sGroupSortBy )
+ + strlen ( m_sGroupDistinct )
+ + strlen ( m_sComment )
+ + strlen ( m_sSelect );
+ if ( m_eRanker==SPH_RANK_EXPR )
+ iReqSize += 4 + strlen(m_sRankExpr);
+ for ( int i=0; i<m_iFilters; i++ )
+ {
+ const CSphSEFilter & tFilter = m_dFilters[i];
+ iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
+ switch ( tFilter.m_eType )
+ {
+ case SPH_FILTER_VALUES: iReqSize += 4 + 8*tFilter.m_iValues; break;
+ case SPH_FILTER_RANGE: iReqSize += 16; break;
+ case SPH_FILTER_FLOATRANGE: iReqSize += 8; break;
+ }
+ }
+ if ( m_bGeoAnchor ) // 1.14+
+ iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
+ for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
+ iReqSize += 8 + strlen(m_sIndexWeight[i] );
+ for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
+ iReqSize += 8 + strlen(m_sFieldWeight[i] );
+ // overrides
+ iReqSize += 4;
+ for ( size_t i=0; i<m_dOverrides.elements(); i++ )
+ {
+ CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
+ const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
+ iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
+ }
+ // select
+ iReqSize += 4;
+
+ m_iBufLeft = 0;
+ SafeDeleteArray ( m_pBuf );
+
+ m_pBuf = new char [ iReqSize ];
+ if ( !m_pBuf )
+ SPH_RET(-1);
+
+ m_pCur = m_pBuf;
+ m_iBufLeft = iReqSize;
+ m_bBufOverrun = false;
+ (*ppBuffer) = m_pBuf;
+
+ // build request
+ SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
+ SendWord ( VER_COMMAND_SEARCH ); // command version
+ SendInt ( iReqSize-8 ); // packet body length
+ SendInt ( 0 ); // its a client
+
+ SendInt ( 1 ); // number of queries
+ SendInt ( m_iOffset );
+ SendInt ( m_iLimit );
+ SendInt ( m_eMode );
+ SendInt ( m_eRanker ); // 1.16+
+ if ( m_eRanker==SPH_RANK_EXPR )
+ SendString ( m_sRankExpr );
+ SendInt ( m_eSort );
+ SendString ( m_sSortBy ); // sort attr
+ SendString ( m_sQuery ); // query
+ SendInt ( m_iWeights );
+ for ( int j=0; j<m_iWeights; j++ )
+ SendInt ( m_pWeights[j] ); // weights
+ SendString ( m_sIndex ); // indexes
+ SendInt ( 1 ); // id64 range follows
+ SendUint64 ( m_iMinID ); // id/ts ranges
+ SendUint64 ( m_iMaxID );
+
+ SendInt ( m_iFilters );
+ for ( int j=0; j<m_iFilters; j++ )
+ {
+ const CSphSEFilter & tFilter = m_dFilters[j];
+ SendString ( tFilter.m_sAttrName );
+ SendInt ( tFilter.m_eType );
+
+ switch ( tFilter.m_eType )
+ {
+ case SPH_FILTER_VALUES:
+ SendInt ( tFilter.m_iValues );
+ for ( int k=0; k<tFilter.m_iValues; k++ )
+ SendUint64 ( tFilter.m_pValues[k] );
+ break;
+
+ case SPH_FILTER_RANGE:
+ SendUint64 ( tFilter.m_uMinValue );
+ SendUint64 ( tFilter.m_uMaxValue );
+ break;
+
+ case SPH_FILTER_FLOATRANGE:
+ SendFloat ( tFilter.m_fMinValue );
+ SendFloat ( tFilter.m_fMaxValue );
+ break;
+ }
+
+ SendInt ( tFilter.m_bExclude );
+ }
+
+ SendInt ( m_eGroupFunc );
+ SendString ( m_sGroupBy );
+ SendInt ( m_iMaxMatches );
+ SendString ( m_sGroupSortBy );
+ SendInt ( m_iCutoff ); // 1.9+
+ SendInt ( m_iRetryCount ); // 1.10+
+ SendInt ( m_iRetryDelay );
+ SendString ( m_sGroupDistinct ); // 1.11+
+ SendInt ( m_bGeoAnchor ); // 1.14+
+ if ( m_bGeoAnchor )
+ {
+ SendString ( m_sGeoLatAttr );
+ SendString ( m_sGeoLongAttr );
+ SendFloat ( m_fGeoLatitude );
+ SendFloat ( m_fGeoLongitude );
+ }
+ SendInt ( m_iIndexWeights ); // 1.15+
+ for ( int i=0; i<m_iIndexWeights; i++ )
+ {
+ SendString ( m_sIndexWeight[i] );
+ SendInt ( m_iIndexWeight[i] );
+ }
+ SendInt ( m_iMaxQueryTime ); // 1.17+
+ SendInt ( m_iFieldWeights ); // 1.18+
+ for ( int i=0; i<m_iFieldWeights; i++ )
+ {
+ SendString ( m_sFieldWeight[i] );
+ SendInt ( m_iFieldWeight[i] );
+ }
+ SendString ( m_sComment );
+
+ // overrides
+ SendInt ( m_dOverrides.elements() );
+ for ( size_t i=0; i<m_dOverrides.elements(); i++ )
+ {
+ CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
+ SendString ( pOverride->m_sName );
+ SendDword ( pOverride->m_iType );
+ SendInt ( pOverride->m_dIds.elements() );
+ for ( size_t j=0; j<pOverride->m_dIds.elements(); j++ )
+ {
+ SendUint64 ( pOverride->m_dIds.at(j) );
+ if ( pOverride->m_iType==SPH_ATTR_FLOAT )
+ SendFloat ( pOverride->m_dValues.at(j).m_fValue );
+ else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
+ SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
+ else
+ SendDword ( pOverride->m_dValues.at(j).m_uValue );
+ }
+ }
+
+ // select
+ SendString ( m_sSelect );
+
+ // detect buffer overruns and underruns, and report internal error
+ if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
+ SPH_RET(-1);
+
+ // all fine
+ SPH_RET ( iReqSize );
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// SPHINX HANDLER
+//////////////////////////////////////////////////////////////////////////////
+
+#if MYSQL_VERSION_ID<50100
+ha_sphinx::ha_sphinx ( TABLE_ARG * table )
+ : handler ( &sphinx_hton, table )
+#else
+ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
+ : handler ( hton, table )
+#endif
+ , m_pShare ( NULL )
+ , m_iMatchesTotal ( 0 )
+ , m_iCurrentPos ( 0 )
+ , m_pCurrentKey ( NULL )
+ , m_iCurrentKeyLen ( 0 )
+ , m_pResponse ( NULL )
+ , m_pResponseEnd ( NULL )
+ , m_pCur ( NULL )
+ , m_bUnpackError ( false )
+ , m_iFields ( 0 )
+ , m_dFields ( NULL )
+ , m_iAttrs ( 0 )
+ , m_dAttrs ( NULL )
+ , m_bId64 ( 0 )
+ , m_dUnboundFields ( NULL )
+{
+ SPH_ENTER_METHOD();
+ SPH_VOID_RET();
+}
+
+ha_sphinx::~ha_sphinx()
+{
+ SafeDeleteArray ( m_dAttrs );
+ SafeDeleteArray ( m_dUnboundFields );
+ if ( m_dFields )
+ {
+ for (uint32 i=0; i< m_iFields; i++ )
+ SafeDeleteArray ( m_dFields[i] );
+ delete [] m_dFields;
+ }
+}
+
+// Used for opening tables. The name will be the name of the file.
+// A table is opened when it needs to be opened. For instance
+// when a request comes in for a select on the table (tables are not
+// open and closed for each request, they are cached).
+//
+// Called from handler.cc by handler::ha_open(). The server opens all tables by
+// calling ha_open() which then calls the handler specific open().
+int ha_sphinx::open ( const char * name, int, uint )
+{
+ SPH_ENTER_METHOD();
+ m_pShare = get_share ( name, table );
+ if ( !m_pShare )
+ SPH_RET(1);
+
+ thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
+
+ thd_set_ha_data ( table->in_use, ht, 0 );
+
+ SPH_RET(0);
+}
+
+
+int ha_sphinx::Connect ( const char * sHost, ushort uPort )
+{
+ struct sockaddr_in sin;
+#ifndef _WIN32
+ struct sockaddr_un saun;
+#endif
+
+ int iDomain = 0;
+ int iSockaddrSize = 0;
+ struct sockaddr * pSockaddr = NULL;
+
+ in_addr_t ip_addr;
+
+ if ( uPort )
+ {
+ iDomain = AF_INET;
+ iSockaddrSize = sizeof(sin);
+ pSockaddr = (struct sockaddr *) &sin;
+
+ memset ( &sin, 0, sizeof(sin) );
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(uPort);
+
+ // prepare host address
+ if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
+ {
+ memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
+ } else
+ {
+ int tmp_errno;
+ bool bError = false;
+
+#if MYSQL_VERSION_ID>=50515
+ struct addrinfo *hp = NULL;
+ tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
+ if ( tmp_errno || !hp || !hp->ai_addr )
+ {
+ bError = true;
+ if ( hp )
+ freeaddrinfo ( hp );
+ }
+#else
+ struct hostent tmp_hostent, *hp;
+ char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
+ hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
+ if ( !hp )
+ {
+ my_gethostbyname_r_free();
+ bError = true;
+ }
+#endif
+
+ if ( bError )
+ {
+ char sError[256];
+ my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
+
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SPH_RET(-1);
+ }
+
+#if MYSQL_VERSION_ID>=50515
+ struct sockaddr_in *in = (sockaddr_in *)hp->ai_addr;
+ memcpy ( &sin.sin_addr, &in->sin_addr, Min ( sizeof(sin.sin_addr), sizeof(in->sin_addr) ) );
+ freeaddrinfo ( hp );
+#else
+ memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
+ my_gethostbyname_r_free();
+#endif
+ }
+ } else
+ {
+#ifndef _WIN32
+ iDomain = AF_UNIX;
+ iSockaddrSize = sizeof(saun);
+ pSockaddr = (struct sockaddr *) &saun;
+
+ memset ( &saun, 0, sizeof(saun) );
+ saun.sun_family = AF_UNIX;
+ strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
+#else
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
+ SPH_RET(-1);
+#endif
+ }
+
+ char sError[512];
+ int iSocket = (int) socket ( iDomain, SOCK_STREAM, 0 );
+
+ if ( iSocket<0 )
+ {
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
+ SPH_RET(-1);
+ }
+
+ if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
+ {
+ sphSockClose ( iSocket );
+ my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
+ sHost, errno, (int)uPort );
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SPH_RET(-1);
+ }
+
+ return iSocket;
+}
+
+
+int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
+{
+ SPH_ENTER_METHOD();
+
+ const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
+ ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
+
+ int iSocket = Connect ( sHost, uPort );
+ if ( iSocket<0 )
+ SPH_RET ( iSocket );
+
+ char sError[512];
+
+ int version;
+ if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
+ {
+ sphSockClose ( iSocket );
+ my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
+ sHost, (int)uPort );
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SPH_RET(-1);
+ }
+
+ uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
+ if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
+ {
+ sphSockClose ( iSocket );
+ my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
+ sHost, (int)uPort );
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SPH_RET(-1);
+ }
+
+ SPH_RET ( iSocket );
+}
+
+
+// Closes a table. We call the free_share() function to free any resources
+// that we have allocated in the "shared" structure.
+//
+// Called from sql_base.cc, sql_select.cc, and table.cc.
+// In sql_select.cc it is only used to close up temporary tables or during
+// the process where a temporary table is converted over to being a
+// myisam table.
+// For sql_base.cc look at close_data_tables().
+int ha_sphinx::close()
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( free_share ( m_pShare ) );
+}
+
+
+int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
+{
+ CSphSEThreadTable * pTable = GetTls ();
+ if ( pTable )
+ {
+ strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof pTable->m_tStats.m_sLastMessage - 1 );
+ pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
+ pTable->m_tStats.m_bLastError = true;
+ }
+
+ mysql_close ( pConn );
+
+ my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
+ return -1;
+}
+
+
+int ha_sphinx::extra ( enum ha_extra_function op )
+{
+ CSphSEThreadTable * pTable = GetTls();
+ if ( pTable )
+ {
+ if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
+ pTable->m_bReplace = true;
+ else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
+ pTable->m_bReplace = false;
+ }
+ return 0;
+}
+
+
+int ha_sphinx::write_row ( const byte * )
+{
+ SPH_ENTER_METHOD();
+ if ( !m_pShare || !m_pShare->m_bSphinxQL )
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+
+ // SphinxQL inserts only, pretty much similar to abandoned federated
+ char sQueryBuf[1024];
+ char sValueBuf[1024];
+ String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
+ String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
+ const char *query;
+ sQuery.length ( 0 );
+ sValue.length ( 0 );
+
+ CSphSEThreadTable * pTable = GetTls ();
+ query= pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO ";
+ sQuery.append (query, strlen(query));
+ sQuery.append ( m_pShare->m_sIndex, strlen(m_pShare->m_sIndex ));
+ sQuery.append (STRING_WITH_LEN(" (" ));
+
+ for ( Field ** ppField = table->field; *ppField; ppField++ )
+ {
+ sQuery.append ( (*ppField)->field_name.str,
+ strlen((*ppField)->field_name.str));
+ if ( ppField[1] )
+ sQuery.append (STRING_WITH_LEN(", "));
+ }
+ sQuery.append (STRING_WITH_LEN( ") VALUES (" ));
+
+ for ( Field ** ppField = table->field; *ppField; ppField++ )
+ {
+ if ( (*ppField)->is_null() )
+ {
+ sQuery.append (STRING_WITH_LEN( "''" ));
+
+ } else
+ {
+ THD *thd= ha_thd();
+ if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
+ {
+ Item_field * pWrap = new (thd->mem_root) Item_field(thd, *ppField); // autofreed by query arena, I assume
+ Item_func_unix_timestamp * pConv = new (thd->mem_root) Item_func_unix_timestamp(thd, pWrap);
+ pConv->quick_fix_field();
+ unsigned int uTs = (unsigned int) pConv->val_int();
+
+ uint len= my_snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
+ sQuery.append ( sValueBuf, len );
+
+ } else
+ {
+ (*ppField)->val_str ( &sValue );
+ sQuery.append ( '\'' );
+ sValue.print ( &sQuery );
+ sQuery.append ( '\'' );
+ sValue.length(0);
+ }
+ }
+
+ if ( ppField[1] )
+ sQuery.append (STRING_WITH_LEN(", "));
+ }
+ sQuery.append ( ')' );
+
+ // FIXME? pretty inefficient to reconnect every time under high load,
+ // but this was intentionally written for a low load scenario..
+ MYSQL * pConn = mysql_init ( NULL );
+ if ( !pConn )
+ SPH_RET ( ER_OUT_OF_RESOURCES );
+
+ unsigned int uTimeout = 1;
+ mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
+
+ my_bool my_true= 1;
+ mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
+
+ if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
+ SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
+
+ if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
+ SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
+
+ // all ok!
+ mysql_close ( pConn );
+ SPH_RET(0);
+}
+
+
+static inline bool IsIntegerFieldType ( enum_field_types eType )
+{
+ return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
+}
+
+
+static inline bool IsIDField ( Field * pField )
+{
+ enum_field_types eType = pField->type();
+
+ if ( eType==MYSQL_TYPE_LONGLONG )
+ return true;
+
+ if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
+ return true;
+
+ return false;
+}
+
+
+int ha_sphinx::delete_row ( const byte * )
+{
+ SPH_ENTER_METHOD();
+ if ( !m_pShare || !m_pShare->m_bSphinxQL )
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+
+ char sQueryBuf[1024];
+ String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
+ sQuery.length ( 0 );
+
+ sQuery.append (STRING_WITH_LEN( "DELETE FROM " ));
+ sQuery.append ( m_pShare->m_sIndex, strlen(m_pShare->m_sIndex));
+ sQuery.append (STRING_WITH_LEN( " WHERE id=" ));
+
+ char sValue[32];
+ uint length= my_snprintf ( sValue, sizeof(sValue), "%lld",
+ table->field[0]->val_int() );
+ sQuery.append ( sValue, length );
+
+ // FIXME? pretty inefficient to reconnect every time under high load,
+ // but this was intentionally written for a low load scenario..
+ MYSQL * pConn = mysql_init ( NULL );
+ if ( !pConn )
+ SPH_RET ( ER_OUT_OF_RESOURCES );
+
+ unsigned int uTimeout = 1;
+ mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
+
+ my_bool my_true= 1;
+ mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
+
+ if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
+ SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
+
+ if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
+ SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
+
+ // all ok!
+ mysql_close ( pConn );
+ SPH_RET(0);
+}
+
+
+int ha_sphinx::update_row ( const byte *, const byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+}
+
+
+// keynr is key (index) number
+// sorted is 1 if result MUST be sorted according to index
+int ha_sphinx::index_init ( uint keynr, bool )
+{
+ SPH_ENTER_METHOD();
+ active_index = keynr;
+
+ CSphSEThreadTable * pTable = GetTls();
+ if ( pTable )
+ pTable->m_bCondDone = false;
+
+ SPH_RET(0);
+}
+
+
+int ha_sphinx::index_end()
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(0);
+}
+
+
+bool ha_sphinx::CheckResponcePtr ( int iLen )
+{
+ if ( m_pCur+iLen>m_pResponseEnd )
+ {
+ m_pCur = m_pResponseEnd;
+ m_bUnpackError = true;
+ return false;
+ }
+
+ return true;
+}
+
+
+uint32 ha_sphinx::UnpackDword ()
+{
+ if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
+ {
+ return 0;
+ }
+
+ uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
+ m_pCur += sizeof(uint32); // NOLINT
+ return uRes;
+}
+
+
+char * ha_sphinx::UnpackString ()
+{
+ uint32 iLen = UnpackDword ();
+ if ( !iLen )
+ return NULL;
+
+ if ( !CheckResponcePtr ( iLen ) )
+ {
+ return NULL;
+ }
+
+ char * sRes = new char [ 1+iLen ];
+ memcpy ( sRes, m_pCur, iLen );
+ sRes[iLen] = '\0';
+ m_pCur += iLen;
+ return sRes;
+}
+
+
+bool ha_sphinx::UnpackSchema ()
+{
+ SPH_ENTER_METHOD();
+
+ // cleanup
+ if ( m_dFields )
+ for ( int i=0; i<(int)m_iFields; i++ )
+ SafeDeleteArray ( m_dFields[i] );
+ SafeDeleteArray ( m_dFields );
+
+ // unpack network packet
+ uint32 uStatus = UnpackDword ();
+ char * sMessage = NULL;
+
+ if ( uStatus!=SEARCHD_OK )
+ {
+ sMessage = UnpackString ();
+ CSphSEThreadTable * pTable = GetTls ();
+ if ( pTable )
+ {
+ strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
+ pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
+ pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
+ }
+
+ if ( uStatus==SEARCHD_ERROR )
+ {
+ char sError[1024];
+ my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SafeDeleteArray ( sMessage );
+ SPH_RET ( false );
+ }
+ }
+
+ m_iFields = UnpackDword ();
+ m_dFields = new char * [ m_iFields ];
+ if ( !m_dFields )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
+ SPH_RET(false);
+ }
+
+ for ( uint32 i=0; i<m_iFields; i++ )
+ m_dFields[i] = UnpackString ();
+
+ SafeDeleteArray ( m_dAttrs );
+ m_iAttrs = UnpackDword ();
+ m_dAttrs = new CSphSEAttr [ m_iAttrs ];
+ if ( !m_dAttrs )
+ {
+ for ( int i=0; i<(int)m_iFields; i++ )
+ SafeDeleteArray ( m_dFields[i] );
+ SafeDeleteArray ( m_dFields );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
+ SPH_RET(false);
+ }
+
+ for ( uint32 i=0; i<m_iAttrs; i++ )
+ {
+ m_dAttrs[i].m_sName = UnpackString ();
+ m_dAttrs[i].m_uType = UnpackDword ();
+ if ( m_bUnpackError ) // m_sName may be null
+ break;
+
+ m_dAttrs[i].m_iField = -1;
+ for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
+ {
+ const char * sTableField = m_pShare->m_sTableField[j];
+ const char * sAttrField = m_dAttrs[i].m_sName;
+ if ( m_dAttrs[i].m_sName[0]=='@' )
+ {
+ const char * sAtPrefix = "_sph_";
+ if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
+ continue;
+ sTableField += strlen(sAtPrefix);
+ sAttrField++;
+ }
+
+ if ( !strcasecmp ( sAttrField, sTableField ) )
+ {
+ // we're almost good, but
+ // let's enforce that timestamp columns can only receive timestamp attributes
+ if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
+ m_dAttrs[i].m_iField = j;
+ break;
+ }
+ }
+ }
+
+ m_iMatchesTotal = UnpackDword ();
+
+ m_bId64 = UnpackDword ();
+ if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
+ SPH_RET(false);
+ }
+
+ // network packet unpacked; build unbound fields map
+ SafeDeleteArray ( m_dUnboundFields );
+ m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
+
+ for ( int i=0; i<m_pShare->m_iTableFields; i++ )
+ {
+ if ( i<SPHINXSE_SYSTEM_COLUMNS )
+ m_dUnboundFields[i] = SPH_ATTR_NONE;
+
+ else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
+ m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
+
+ else
+ m_dUnboundFields[i] = SPH_ATTR_INTEGER;
+ }
+
+ for ( uint32 i=0; i<m_iAttrs; i++ )
+ if ( m_dAttrs[i].m_iField>=0 )
+ m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
+
+ if ( m_bUnpackError )
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
+
+ SPH_RET ( !m_bUnpackError );
+}
+
+
+bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
+{
+ assert ( pStats );
+
+ char * pCurSave = m_pCur;
+ for ( uint m=0; m<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); m++ ) // NOLINT
+ {
+ m_pCur += m_bId64 ? 12 : 8; // skip id+weight
+ for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
+ {
+ if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
+ {
+ // skip MVA list
+ uint32 uCount = UnpackDword ();
+ m_pCur += uCount*4;
+ } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
+ {
+ uint32 iLen = UnpackDword();
+ m_pCur += iLen;
+ } else // skip normal value
+ m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
+ }
+ }
+
+ pStats->m_iMatchesTotal = UnpackDword ();
+ pStats->m_iMatchesFound = UnpackDword ();
+ pStats->m_iQueryMsec = UnpackDword ();
+ pStats->m_iWords = UnpackDword ();
+
+ if ( m_bUnpackError )
+ return false;
+
+ if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
+ return false;
+
+ SafeDeleteArray ( pStats->m_dWords );
+ pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
+ if ( !pStats->m_dWords )
+ return false;
+
+ for ( int i=0; i<pStats->m_iWords; i++ )
+ {
+ CSphSEWordStats & tWord = pStats->m_dWords[i];
+ tWord.m_sWord = UnpackString ();
+ tWord.m_iDocs = UnpackDword ();
+ tWord.m_iHits = UnpackDword ();
+ }
+
+ if ( m_bUnpackError )
+ return false;
+
+ m_pCur = pCurSave;
+ return true;
+}
+
+
+/// condition pushdown implementation, to properly intercept WHERE clauses on my columns
+#if MYSQL_VERSION_ID<50610
+const COND * ha_sphinx::cond_push ( const COND * cond )
+#else
+const Item * ha_sphinx::cond_push ( const Item *cond )
+#endif
+{
+ // catch the simplest case: query_column="some text"
+ for ( ;; )
+ {
+ if ( cond->type()!=Item::FUNC_ITEM )
+ break;
+
+ Item_func * condf = (Item_func *)cond;
+ if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
+ break;
+
+ // get my tls
+ CSphSEThreadTable * pTable = GetTls ();
+ if ( !pTable )
+ break;
+
+ Item ** args = condf->arguments();
+ if ( !m_pShare->m_bSphinxQL )
+ {
+ // on non-QL tables, intercept query=value condition for SELECT
+ if (!( args[0]->type()==Item::FIELD_ITEM &&
+ args[1]->is_of_type(Item::CONST_ITEM,
+ STRING_RESULT)))
+ break;
+
+ Item_field * pField = (Item_field *) args[0];
+ if ( pField->field->field_index!=2 ) // FIXME! magic key index
+ break;
+
+ // copy the query, and let know that we intercepted this condition
+ String *pString= args[1]->val_str(NULL);
+ pTable->m_bQuery = true;
+ strncpy ( pTable->m_sQuery, pString->c_ptr(), sizeof(pTable->m_sQuery) );
+ pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
+ pTable->m_pQueryCharset = pString->charset();
+
+ } else
+ {
+ if (!( args[0]->type()==Item::FIELD_ITEM &&
+ args[1]->is_of_type(Item::CONST_ITEM,
+ INT_RESULT)))
+ break;
+
+ // on QL tables, intercept id=value condition for DELETE
+ Item_field * pField = (Item_field *) args[0];
+ if ( pField->field->field_index!=0 ) // FIXME! magic key index
+ break;
+
+ Item_int * pVal = (Item_int *) args[1];
+ pTable->m_iCondId = pVal->val_int();
+ pTable->m_bCondId = true;
+ }
+
+ // we intercepted this condition
+ return NULL;
+ }
+
+ // don't change anything
+ return cond;
+}
+
+
+/// condition popup
+void ha_sphinx::cond_pop ()
+{
+ CSphSEThreadTable * pTable = GetTls ();
+ if ( pTable )
+ pTable->m_bQuery = false;
+}
+
+
+/// get TLS (maybe allocate it, too)
+CSphSEThreadTable * ha_sphinx::GetTls()
+{
+ SPH_ENTER_METHOD()
+ // where do we store that pointer in today's version?
+ CSphTLS * pTls = (CSphTLS*) thd_get_ha_data ( table->in_use, ht );
+
+ CSphSEThreadTable * pTable = NULL;
+ // allocate if needed
+ if ( !pTls )
+ {
+ pTls = new CSphTLS ( this );
+ thd_set_ha_data(table->in_use, ht, pTls);
+ }
+ pTable = pTls->m_pHeadTable;
+
+ while ( pTable && pTable->m_pHandler!=this )
+ pTable = pTable->m_pTableNext;
+
+ if ( !pTable )
+ {
+ pTable = new CSphSEThreadTable ( this );
+ pTable->m_pTableNext = pTls->m_pHeadTable;
+ pTls->m_pHeadTable = pTable;
+ }
+
+ // errors will be handled by caller
+ return pTable;
+}
+
+
+// Positions an index cursor to the index specified in the handle. Fetches the
+// row if available. If the key value is null, begin at the first key of the
+// index.
+int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
+{
+ SPH_ENTER_METHOD();
+ char sError[256];
+
+ // set new data for thd->ha_data, it is used in show_status
+ CSphSEThreadTable * pTable = GetTls();
+ if ( !pTable )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+ pTable->m_tStats.Reset ();
+
+ // sphinxql table, just return the key once
+ if ( m_pShare->m_bSphinxQL )
+ {
+ // over and out
+ if ( pTable->m_bCondDone )
+ SPH_RET ( HA_ERR_END_OF_FILE );
+
+ // return a value from pushdown, if any
+ if ( pTable->m_bCondId )
+ {
+ table->field[0]->store ( pTable->m_iCondId, 1 );
+ pTable->m_bCondDone = true;
+ SPH_RET(0);
+ }
+
+ // return a value from key
+ longlong iRef = 0;
+ if ( key_len==4 )
+ iRef = uint4korr ( key );
+ else if ( key_len==8 )
+ iRef = uint8korr ( key );
+ else
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ table->field[0]->store ( iRef, 1 );
+ pTable->m_bCondDone = true;
+ SPH_RET(0);
+ }
+
+ // parse query
+ if ( pTable->m_bQuery )
+ {
+ // we have a query from condition pushdown
+ m_pCurrentKey = (const byte *) pTable->m_sQuery;
+ m_iCurrentKeyLen = strlen(pTable->m_sQuery);
+ } else
+ {
+ // just use the key (might be truncated)
+ m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
+ m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
+ pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
+ }
+
+ CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
+ if ( !q.Parse () )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ // do connect
+ int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
+ if ( iSocket<0 )
+ SPH_RET ( HA_ERR_END_OF_FILE );
+
+ // my buffer
+ char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
+ int iReqLen = q.BuildRequest ( &pBuffer );
+
+ if ( iReqLen<=0 )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ // send request
+ ::send ( iSocket, pBuffer, iReqLen, 0 );
+
+ // receive reply
+ char sHeader[8];
+ int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
+ if ( iGot!=sizeof(sHeader) )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
+ short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
+ uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
+ SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
+ uRespStatus, uRespVersion, uRespLength );
+
+ SafeDeleteArray ( m_pResponse );
+ if ( uRespLength<=SPHINXSE_MAX_ALLOC )
+ m_pResponse = new char [ uRespLength+1 ];
+
+ if ( !m_pResponse )
+ {
+ my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ int iRecvLength = 0;
+ while ( iRecvLength<(int)uRespLength )
+ {
+ int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
+ if ( iRecv<0 )
+ break;
+ iRecvLength += iRecv;
+ }
+
+ ::closesocket ( iSocket );
+ iSocket = -1;
+
+ if ( iRecvLength!=(int)uRespLength )
+ {
+ my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ // we'll have a message, at least
+ pTable->m_bStats = true;
+
+ // parse reply
+ m_iCurrentPos = 0;
+ m_pCur = m_pResponse;
+ m_pResponseEnd = m_pResponse + uRespLength;
+ m_bUnpackError = false;
+
+ if ( uRespStatus!=SEARCHD_OK )
+ {
+ char * sMessage = UnpackString ();
+ if ( !sMessage )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
+ uRespStatus, uRespLength );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
+ pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
+ SafeDeleteArray ( sMessage );
+
+ if ( uRespStatus!=SEARCHD_WARNING )
+ {
+ my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
+
+ pTable->m_tStats.m_bLastError = true;
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+ }
+
+ if ( !UnpackSchema () )
+ SPH_RET ( HA_ERR_END_OF_FILE );
+
+ if ( !UnpackStats ( &pTable->m_tStats ) )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ SPH_RET ( get_rec ( buf, key, key_len ) );
+}
+
+
+// Positions an index cursor to the index specified in key. Fetches the
+// row if any. This is only used to read whole keys.
+int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+}
+
+
+// Used to read forward through the index.
+int ha_sphinx::index_next ( byte * buf )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
+}
+
+
+int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( get_rec ( buf, key, keylen ) );
+}
+
+
+int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
+{
+ SPH_ENTER_METHOD();
+
+ if ( m_iCurrentPos>=m_iMatchesTotal )
+ {
+ SafeDeleteArray ( m_pResponse );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ #if MYSQL_VERSION_ID>50100
+ MY_BITMAP * org_bitmap = dbug_tmp_use_all_columns ( table, &table->write_set );
+ #endif
+ Field ** field = table->field;
+
+ // unpack and return the match
+ longlong uMatchID = UnpackDword ();
+ if ( m_bId64 )
+ uMatchID = ( uMatchID<<32 ) + UnpackDword();
+ uint32 uMatchWeight = UnpackDword ();
+
+ field[0]->store ( uMatchID, 1 );
+ field[1]->store ( uMatchWeight, 1 );
+ field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
+
+ for ( uint32 i=0; i<m_iAttrs; i++ )
+ {
+ longlong iValue64 = 0;
+ uint32 uValue = UnpackDword ();
+ if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
+ iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
+ if ( m_dAttrs[i].m_iField<0 )
+ {
+ // skip MVA or String
+ if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
+ {
+ for ( ; uValue>0 && !m_bUnpackError; uValue-- )
+ UnpackDword();
+ } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
+ {
+ m_pCur += uValue;
+ }
+ continue;
+ }
+
+ Field * af = field [ m_dAttrs[i].m_iField ];
+ switch ( m_dAttrs[i].m_uType )
+ {
+ case SPH_ATTR_INTEGER:
+ case SPH_ATTR_ORDINAL:
+ case SPH_ATTR_BOOL:
+ af->store ( uValue, 1 );
+ break;
+
+ case SPH_ATTR_FLOAT:
+ af->store ( sphDW2F(uValue) );
+ break;
+
+ case SPH_ATTR_TIMESTAMP:
+ if ( af->type()==MYSQL_TYPE_TIMESTAMP )
+ longstore ( af->ptr, uValue ); // because store() does not accept timestamps
+ else
+ af->store ( uValue, 1 );
+ break;
+
+ case SPH_ATTR_BIGINT:
+ af->store ( iValue64, 0 );
+ break;
+
+ case SPH_ATTR_STRING:
+ if ( !uValue )
+ af->store ( "", 0, &my_charset_bin );
+ else if ( CheckResponcePtr ( uValue ) )
+ {
+ af->store ( m_pCur, uValue, &my_charset_bin );
+ m_pCur += uValue;
+ }
+ break;
+
+ case SPH_ATTR_UINT64SET:
+ case SPH_ATTR_UINT32SET :
+ if ( uValue<=0 )
+ {
+ // shortcut, empty MVA set
+ af->store ( "", 0, &my_charset_bin );
+
+ } else
+ {
+ // convert MVA set to comma-separated string
+ char sBuf[1024]; // FIXME! magic size
+ char * pCur = sBuf;
+
+ if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
+ {
+ for ( ; uValue>0 && !m_bUnpackError; uValue-- )
+ {
+ uint32 uEntry = UnpackDword ();
+ if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
+ {
+ snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
+ while ( *pCur ) pCur++;
+ if ( uValue>1 )
+ *pCur++ = ','; // non-trailing commas
+ }
+ }
+ } else
+ {
+ for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
+ {
+ uint32 uEntryLo = UnpackDword ();
+ uint32 uEntryHi = UnpackDword();
+ if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
+ {
+ snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u%u", uEntryHi, uEntryLo );
+ while ( *pCur ) pCur++;
+ if ( uValue>2 )
+ *pCur++ = ','; // non-trailing commas
+ }
+ }
+ }
+
+ af->store ( sBuf, uint(pCur-sBuf), &my_charset_bin );
+ }
+ break;
+
+ default:
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
+ SafeDeleteArray ( m_pResponse );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+ }
+
+ if ( m_bUnpackError )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
+ SafeDeleteArray ( m_pResponse );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ // zero out unmapped fields
+ for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
+ if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
+ switch ( m_dUnboundFields[i] )
+ {
+ case SPH_ATTR_INTEGER: table->field[i]->store ( 0, 1 ); break;
+ case SPH_ATTR_TIMESTAMP: longstore ( table->field[i]->ptr, 0 ); break;
+ default:
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
+ "INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
+ SafeDeleteArray ( m_pResponse );
+ SPH_RET ( HA_ERR_END_OF_FILE );
+ }
+
+ memset ( buf, 0, table->s->null_bytes );
+ m_iCurrentPos++;
+
+ #if MYSQL_VERSION_ID > 50100
+ dbug_tmp_restore_column_map ( &table->write_set, org_bitmap );
+ #endif
+
+ SPH_RET(0);
+}
+
+
+// Used to read backwards through the index.
+int ha_sphinx::index_prev ( byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+}
+
+
+// index_first() asks for the first key in the index.
+//
+// Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
+// and sql_select.cc.
+int ha_sphinx::index_first ( byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_END_OF_FILE );
+}
+
+// index_last() asks for the last key in the index.
+//
+// Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
+// and sql_select.cc.
+int ha_sphinx::index_last ( byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+}
+
+
+int ha_sphinx::rnd_init ( bool )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(0);
+}
+
+
+int ha_sphinx::rnd_end()
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(0);
+}
+
+
+int ha_sphinx::rnd_next ( byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_END_OF_FILE );
+}
+
+
+void ha_sphinx::position ( const byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_VOID_RET();
+}
+
+
+// This is like rnd_next, but you are given a position to use
+// to determine the row. The position will be of the type that you stored in
+// ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
+// or position you saved when position() was called.
+// Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
+int ha_sphinx::rnd_pos ( byte *, byte * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+}
+
+
+#if MYSQL_VERSION_ID>=50030
+int ha_sphinx::info ( uint )
+#else
+void ha_sphinx::info ( uint )
+#endif
+{
+ SPH_ENTER_METHOD();
+
+ if ( table->s->keys>0 )
+ table->key_info[0].rec_per_key[0] = 1;
+
+ #if MYSQL_VERSION_ID>50100
+ stats.records = 20;
+ #else
+ records = 20;
+ #endif
+
+#if MYSQL_VERSION_ID>=50030
+ SPH_RET(0);
+#else
+ SPH_VOID_RET();
+#endif
+}
+
+
+int ha_sphinx::reset ()
+{
+ SPH_ENTER_METHOD();
+ CSphSEThreadTable * pTable = GetTls ();
+ if ( pTable )
+ pTable->m_bQuery = false;
+ SPH_RET(0);
+}
+
+
+int ha_sphinx::delete_all_rows()
+{
+ SPH_ENTER_METHOD();
+ SPH_RET ( HA_ERR_WRONG_COMMAND );
+}
+
+
+// First you should go read the section "locking functions for mysql" in
+// lock.cc to understand this.
+// This create a lock on the table. If you are implementing a storage engine
+// that can handle transacations look at ha_berkely.cc to see how you will
+// want to go about doing this. Otherwise you should consider calling flock()
+// here.
+//
+// Called from lock.cc by lock_external() and unlock_external(). Also called
+// from sql_table.cc by copy_data_between_tables().
+int ha_sphinx::external_lock ( THD *, int )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(0);
+}
+
+
+THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
+ enum thr_lock_type lock_type )
+{
+ SPH_ENTER_METHOD();
+
+ if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
+ m_tLock.type = lock_type;
+
+ *to++ = &m_tLock;
+ SPH_RET(to);
+}
+
+
+int ha_sphinx::delete_table ( const char * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(0);
+}
+
+
+// Renames a table from one name to another from alter table call.
+//
+// If you do not implement this, the default rename_table() is called from
+// handler.cc and it will delete all files with the file extensions returned
+// by bas_ext().
+//
+// Called from sql_table.cc by mysql_rename_table().
+int ha_sphinx::rename_table ( const char *, const char * )
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(0);
+}
+
+
+// Given a starting key, and an ending key estimate the number of rows that
+// will exist between the two. end_key may be empty which in case determine
+// if start_key matches any rows.
+//
+// Called from opt_range.cc by check_quick_keys().
+ha_rows ha_sphinx::records_in_range ( uint, const key_range *, const key_range *, page_range *)
+{
+ SPH_ENTER_METHOD();
+ SPH_RET(3); // low number to force index usage
+}
+
+#if MYSQL_VERSION_ID < 50610
+#define user_defined_key_parts key_parts
+#endif
+
+// create() is called to create a database. The variable name will have the name
+// of the table. When create() is called you do not need to worry about opening
+// the table. Also, the FRM file will have already been created so adjusting
+// create_info will not do you any good. You can overwrite the frm file at this
+// point if you wish to change the table definition, but there are no methods
+// currently provided for doing that.
+//
+// Called from handle.cc by ha_create_table().
+int ha_sphinx::create ( const char * name, TABLE * table_arg, HA_CREATE_INFO * )
+{
+ SPH_ENTER_METHOD();
+ char sError[256];
+
+ CSphSEShare tInfo;
+ if ( !ParseUrl ( &tInfo, table_arg, true ) )
+ SPH_RET(-1);
+
+ // check SphinxAPI table
+ for ( ; !tInfo.m_bSphinxQL; )
+ {
+ // check system fields (count and types)
+ if ( table_arg->s->fields<SPHINXSE_SYSTEM_COLUMNS )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
+ name, SPHINXSE_SYSTEM_COLUMNS );
+ break;
+ }
+
+ if ( !IsIDField ( table_arg->field[0] ) )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
+ break;
+ }
+
+ if ( !IsIntegerFieldType ( table_arg->field[1]->type() ) )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
+ break;
+ }
+
+ enum_field_types f2 = table_arg->field[2]->type();
+ if ( f2!=MYSQL_TYPE_VARCHAR
+ && f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
+ break;
+ }
+
+ // check attributes
+ int i;
+ for ( i=3; i<(int)table_arg->s->fields; i++ )
+ {
+ enum_field_types eType = table_arg->field[i]->type();
+ if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
+ name, i+1, table_arg->field[i]->field_name.str );
+ break;
+ }
+ }
+
+ if ( i!=(int)table_arg->s->fields )
+ break;
+
+ // check index
+ if (
+ table_arg->s->keys!=1 ||
+ table_arg->key_info[0].user_defined_key_parts!=1 ||
+ strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, table_arg->field[2]->field_name.str ) )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
+ name, table_arg->field[2]->field_name.str );
+ break;
+ }
+
+ // all good
+ sError[0] = '\0';
+ break;
+ }
+
+ // check SphinxQL table
+ for ( ; tInfo.m_bSphinxQL; )
+ {
+ sError[0] = '\0';
+
+ // check that 1st column is id, is of int type, and has an index
+ if ( strcmp ( table_arg->field[0]->field_name.str, "id" ) )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
+ break;
+ }
+
+ if ( !IsIDField ( table_arg->field[0] ) )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
+ break;
+ }
+
+ // check index
+ if (
+ table_arg->s->keys!=1 ||
+ table_arg->key_info[0].user_defined_key_parts!=1 ||
+ strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, "id" ) )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
+ break;
+ }
+
+ // check column types
+ for ( int i=1; i<(int)table_arg->s->fields; i++ )
+ {
+ enum_field_types eType = table_arg->field[i]->type();
+ if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
+ {
+ my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
+ name, i+1, table_arg->field[i]->field_name.str );
+ break;
+ }
+ }
+ if ( sError[0] )
+ break;
+
+ // all good
+ break;
+ }
+
+ // report and bail
+ if ( sError[0] )
+ {
+ my_printf_error(ER_CANT_CREATE_TABLE,
+ "Can\'t create table %s.%s (Error: %s)",
+ MYF(0),
+ table_arg->s->db.str,
+ table_arg->s->table_name.str, sError);
+ SPH_RET(-1);
+ }
+
+ SPH_RET(0);
+}
+
+// show functions
+
+#if MYSQL_VERSION_ID<50100
+#define SHOW_VAR_FUNC_BUFF_SIZE 1024
+#endif
+
+CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
+{
+#if MYSQL_VERSION_ID>50100
+ if ( sphinx_hton_ptr )
+ {
+ CSphTLS * pTls = (CSphTLS *) thd_get_ha_data ( thd, sphinx_hton_ptr );
+
+ if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
+ return &pTls->m_pHeadTable->m_tStats;
+ }
+#else
+ CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
+ if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
+ return &pTls->m_pHeadTable->m_tStats;
+#endif
+
+ out->type = SHOW_CHAR;
+ out->value = (char*) "";
+ return 0;
+}
+
+int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
+{
+ CSphSEStats * pStats = sphinx_get_stats ( thd, out );
+ if ( pStats )
+ {
+ out->type = SHOW_INT;
+ out->value = (char *) &pStats->m_iMatchesTotal;
+ }
+ return 0;
+}
+
+int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
+{
+ CSphSEStats * pStats = sphinx_get_stats ( thd, out );
+ if ( pStats )
+ {
+ out->type = SHOW_INT;
+ out->value = (char *) &pStats->m_iMatchesFound;
+ }
+ return 0;
+}
+
+int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
+{
+ CSphSEStats * pStats = sphinx_get_stats ( thd, out );
+ if ( pStats )
+ {
+ out->type = SHOW_INT;
+ out->value = (char *) &pStats->m_iQueryMsec;
+ }
+ return 0;
+}
+
+int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
+{
+ CSphSEStats * pStats = sphinx_get_stats ( thd, out );
+ if ( pStats )
+ {
+ out->type = SHOW_INT;
+ out->value = (char *) &pStats->m_iWords;
+ }
+ return 0;
+}
+
+int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
+{
+#if MYSQL_VERSION_ID>50100
+ if ( sphinx_hton_ptr )
+ {
+ CSphTLS * pTls = (CSphTLS *) thd_get_ha_data ( thd, sphinx_hton_ptr );
+#else
+ {
+ CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
+#endif
+ if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
+ {
+ CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
+ if ( pStats && pStats->m_iWords )
+ {
+ uint uBuffLen = 0;
+
+ out->type = SHOW_CHAR;
+ out->value = sBuffer;
+
+ // the following is partially based on code in sphinx_show_status()
+ sBuffer[0] = 0;
+ for ( int i=0; i<pStats->m_iWords; i++ )
+ {
+ CSphSEWordStats & tWord = pStats->m_dWords[i];
+ uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
+ tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
+ }
+
+ if ( uBuffLen > 0 )
+ {
+ // trim last space
+ sBuffer [ --uBuffLen ] = 0;
+
+ if ( pTls->m_pHeadTable->m_pQueryCharset )
+ {
+ // String::c_ptr() will nul-terminate the buffer.
+ //
+ // NOTE: It's not entirely clear whether this conversion is necessary at all.
+
+ String sConvert;
+ uint iErrors;
+ sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
+ memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
+ }
+ }
+
+ return 0;
+ }
+ }
+ }
+
+ out->type = SHOW_CHAR;
+ out->value = (char*) "";
+ return 0;
+}
+
+int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
+{
+ CSphSEStats * pStats = sphinx_get_stats ( thd, out );
+ out->type = SHOW_CHAR;
+ if ( pStats && pStats->m_bLastError )
+ {
+ out->value = pStats->m_sLastMessage;
+ }
+ else
+ out->value = (char*)"";
+ return 0;
+}
+
+#if MYSQL_VERSION_ID>50100
+struct st_mysql_storage_engine sphinx_storage_engine =
+{
+ MYSQL_HANDLERTON_INTERFACE_VERSION
+};
+
+struct st_mysql_show_var sphinx_status_vars[] =
+{
+ {"Sphinx_total", (char *)sphinx_showfunc_total, SHOW_SIMPLE_FUNC},
+ {"Sphinx_total_found", (char *)sphinx_showfunc_total_found, SHOW_SIMPLE_FUNC},
+ {"Sphinx_time", (char *)sphinx_showfunc_time, SHOW_SIMPLE_FUNC},
+ {"Sphinx_word_count", (char *)sphinx_showfunc_word_count, SHOW_SIMPLE_FUNC},
+ {"Sphinx_words", (char *)sphinx_showfunc_words, SHOW_SIMPLE_FUNC},
+ {"Sphinx_error", (char *)sphinx_showfunc_error, SHOW_SIMPLE_FUNC},
+ {0, 0, (enum_mysql_show_type)0}
+};
+
+
+maria_declare_plugin(sphinx)
+{
+ MYSQL_STORAGE_ENGINE_PLUGIN,
+ &sphinx_storage_engine,
+ sphinx_hton_name,
+ "Sphinx developers",
+ sphinx_hton_comment,
+ PLUGIN_LICENSE_GPL,
+ sphinx_init_func, // Plugin Init
+ sphinx_done_func, // Plugin Deinit
+ 0x0202, // 2.2
+ sphinx_status_vars,
+ NULL,
+ SPHINXSE_VERSION, // string version
+MariaDB_PLUGIN_MATURITY_GAMMA
+}
+maria_declare_plugin_end;
+
+#endif // >50100
+
+//
+// $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
+//
diff --git a/storage/sphinx/ha_sphinx.h b/storage/sphinx/ha_sphinx.h
new file mode 100644
index 00000000..f03e9d8c
--- /dev/null
+++ b/storage/sphinx/ha_sphinx.h
@@ -0,0 +1,175 @@
+//
+// $Id: ha_sphinx.h 4818 2014-09-24 08:53:38Z tomat $
+//
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface // gcc class implementation
+#endif
+
+
+#if MYSQL_VERSION_ID>=50515
+#define TABLE_ARG TABLE_SHARE
+#elif MYSQL_VERSION_ID>50100
+#define TABLE_ARG st_table_share
+#else
+#define TABLE_ARG st_table
+#endif
+
+
+#if MYSQL_VERSION_ID>=50120
+typedef uchar byte;
+#endif
+
+
+/// forward decls
+class THD;
+struct CSphReqQuery;
+struct CSphSEShare;
+struct CSphSEAttr;
+struct CSphSEStats;
+struct CSphSEThreadTable;
+
+/// Sphinx SE handler class
+class ha_sphinx final : public handler
+{
+protected:
+ THR_LOCK_DATA m_tLock; ///< MySQL lock
+
+ CSphSEShare * m_pShare; ///< shared lock info
+
+ uint m_iMatchesTotal;
+ uint m_iCurrentPos;
+ const byte * m_pCurrentKey;
+ uint m_iCurrentKeyLen;
+
+ char * m_pResponse; ///< searchd response storage
+ char * m_pResponseEnd; ///< searchd response storage end (points to wilderness!)
+ char * m_pCur; ///< current position into response
+ bool m_bUnpackError; ///< any errors while unpacking response
+
+public:
+#if MYSQL_VERSION_ID<50100
+ ha_sphinx ( TABLE_ARG * table_arg ); // NOLINT
+#else
+ ha_sphinx ( handlerton * hton, TABLE_ARG * table_arg );
+#endif
+ ~ha_sphinx ();
+
+ const char * table_type () const { return "SPHINX"; } ///< SE name for display purposes
+ const char * index_type ( uint ) { return "HASH"; } ///< index type name for display purposes
+
+ #if MYSQL_VERSION_ID>50100
+ ulonglong table_flags () const { return HA_CAN_INDEX_BLOBS |
+ HA_CAN_TABLE_CONDITION_PUSHDOWN; } ///< bitmap of implemented flags (see handler.h for more info)
+ #else
+ ulong table_flags () const { return HA_CAN_INDEX_BLOBS; } ///< bitmap of implemented flags (see handler.h for more info)
+ #endif
+
+ ulong index_flags ( uint, uint, bool ) const { return 0; } ///< bitmap of flags that says how SE implements indexes
+ uint max_supported_record_length () const { return HA_MAX_REC_LENGTH; }
+ uint max_supported_keys () const { return 1; }
+ uint max_supported_key_parts () const { return 1; }
+ uint max_supported_key_length () const { return MAX_KEY_LENGTH; }
+ uint max_supported_key_part_length () const { return MAX_KEY_LENGTH; }
+
+ #if MYSQL_VERSION_ID>50100
+ virtual double scan_time () { return (double)( stats.records+stats.deleted )/20.0 + 10; } ///< called in test_quick_select to determine if indexes should be used
+ #else
+ virtual double scan_time () { return (double)( records+deleted )/20.0 + 10; } ///< called in test_quick_select to determine if indexes should be used
+ #endif
+
+ virtual double read_time(uint index, uint ranges, ha_rows rows)
+ { return ranges + (double)rows/20.0 + 1; } ///< index read time estimate
+
+public:
+ int open ( const char * name, int mode, uint test_if_locked );
+ int close ();
+
+ int write_row ( const byte * buf );
+ int update_row ( const byte * old_data, const byte * new_data );
+ int delete_row ( const byte * buf );
+ int extra ( enum ha_extra_function op );
+
+ int index_init ( uint keynr, bool sorted ); // 5.1.x
+ int index_init ( uint keynr ) { return index_init ( keynr, false ); } // 5.0.x
+
+ int index_end ();
+ int index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function find_flag );
+ int index_read_idx ( byte * buf, uint idx, const byte * key, uint key_len, enum ha_rkey_function find_flag );
+ int index_next ( byte * buf );
+ int index_next_same ( byte * buf, const byte * key, uint keylen );
+ int index_prev ( byte * buf );
+ int index_first ( byte * buf );
+ int index_last ( byte * buf );
+
+ int get_rec ( byte * buf, const byte * key, uint keylen );
+
+ int rnd_init ( bool scan );
+ int rnd_end ();
+ int rnd_next ( byte * buf );
+ int rnd_pos ( byte * buf, byte * pos );
+ void position ( const byte * record );
+
+#if MYSQL_VERSION_ID>=50030
+ int info ( uint );
+#else
+ void info ( uint );
+#endif
+
+ int reset();
+ int external_lock ( THD * thd, int lock_type );
+ int delete_all_rows ();
+ ha_rows records_in_range ( uint inx, const key_range * min_key, const key_range * max_key, page_range *pages);
+
+ int delete_table ( const char * from );
+ int rename_table ( const char * from, const char * to );
+ int create ( const char * name, TABLE * form, HA_CREATE_INFO * create_info );
+
+ THR_LOCK_DATA ** store_lock ( THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type );
+
+public:
+#if MYSQL_VERSION_ID<50610
+ virtual const COND * cond_push ( const COND *cond );
+#else
+ virtual const Item * cond_push ( const Item *cond );
+#endif
+ virtual void cond_pop ();
+
+private:
+ uint32 m_iFields;
+ char ** m_dFields;
+
+ uint32 m_iAttrs;
+ CSphSEAttr * m_dAttrs;
+ int m_bId64;
+
+ int * m_dUnboundFields;
+
+private:
+ int Connect ( const char * sQueryHost, ushort uPort );
+ int ConnectAPI ( const char * sQueryHost, int iQueryPort );
+ int HandleMysqlError ( struct st_mysql * pConn, int iErrCode );
+
+ uint32 UnpackDword ();
+ char * UnpackString ();
+ bool UnpackSchema ();
+ bool UnpackStats ( CSphSEStats * pStats );
+ bool CheckResponcePtr ( int iLen );
+
+ CSphSEThreadTable * GetTls ();
+};
+
+
+#if MYSQL_VERSION_ID < 50100
+bool sphinx_show_status ( THD * thd );
+#endif
+
+int sphinx_showfunc_total_found ( THD *, SHOW_VAR *, char * );
+int sphinx_showfunc_total ( THD *, SHOW_VAR *, char * );
+int sphinx_showfunc_time ( THD *, SHOW_VAR *, char * );
+int sphinx_showfunc_word_count ( THD *, SHOW_VAR *, char * );
+int sphinx_showfunc_words ( THD *, SHOW_VAR *, char * );
+
+//
+// $Id: ha_sphinx.h 4818 2014-09-24 08:53:38Z tomat $
+//
diff --git a/storage/sphinx/make-patch.sh b/storage/sphinx/make-patch.sh
new file mode 100755
index 00000000..6fca5838
--- /dev/null
+++ b/storage/sphinx/make-patch.sh
@@ -0,0 +1,36 @@
+#!/bin/sh
+
+OUT=$1
+ORIG=$2
+NEW=$3
+
+if [ ! \( "$1" -a "$2" -a "$3" \) ]; then
+ echo "$0 <patch> <original> <new>"
+ exit 1
+fi
+
+FILES='
+/config/ac-macros/ha_sphinx.m4
+/configure.in
+/libmysqld/Makefile.am
+/sql/handler.cc
+/sql/handler.h
+/sql/Makefile.am
+/sql/mysqld.cc
+/sql/mysql_priv.h
+/sql/set_var.cc
+/sql/sql_lex.h
+/sql/sql_parse.cc
+/sql/sql_yacc.yy
+/sql/structs.h
+/sql/sql_show.cc
+'
+
+rm -f $OUT
+if [ -e $OUT ]; then
+ exit 1
+fi
+
+for name in $FILES; do
+ diff -BNru "$ORIG$name" "$NEW$name" >> $OUT
+done
diff --git a/storage/sphinx/mysql-test/sphinx/my.cnf b/storage/sphinx/mysql-test/sphinx/my.cnf
new file mode 100644
index 00000000..f60380b7
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/my.cnf
@@ -0,0 +1,29 @@
+!include include/default_my.cnf
+
+[source src1]
+type = xmlpipe2
+xmlpipe_command = cat @ENV.MTR_SUITE_DIR/testdata.xml
+
+[index test1]
+source = src1
+docinfo = extern
+charset_type = utf-8
+path = @ENV.MYSQLTEST_VARDIR/searchd/test1
+
+[indexer]
+mem_limit = 32M
+
+[searchd]
+read_timeout = 5
+max_children = 30
+seamless_rotate = 1
+preopen_indexes = 0
+unlink_old = 1
+log = @ENV.MYSQLTEST_VARDIR/searchd/sphinx-searchd.log
+query_log = @ENV.MYSQLTEST_VARDIR/searchd/sphinx-query.log
+#log-error = @ENV.MYSQLTEST_VARDIR/searchd/sphinx.log
+pid_file = @ENV.MYSQLTEST_VARDIR/run/searchd.pid
+listen = @ENV.SPHINXSEARCH_PORT
+
+[ENV]
+SPHINXSEARCH_PORT = @OPT.port
diff --git a/storage/sphinx/mysql-test/sphinx/sphinx.result b/storage/sphinx/mysql-test/sphinx/sphinx.result
new file mode 100644
index 00000000..c462d0cc
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/sphinx.result
@@ -0,0 +1,97 @@
+create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:SPHINXSEARCH_PORT/*";
+select * from ts where q='test';
+id w q
+1 2 test
+2 2 test
+4 1 test
+5 1 test
+drop table ts;
+create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:SPHINXSEARCH_PORT/*";
+select * from ts where q='test;filter=gid,1;mode=extended';
+id w q
+1 2379 test;filter=gid,1;mode=extended
+2 2379 test;filter=gid,1;mode=extended
+5 1412 test;filter=gid,1;mode=extended
+select * from ts where q='test|one;mode=extended';
+id w q
+1 3579 test|one;mode=extended
+2 2439 test|one;mode=extended
+4 1456 test|one;mode=extended
+5 1456 test|one;mode=extended
+select * from ts where q='test;offset=1;limit=1';
+id w q
+2 2 test;offset=1;limit=1
+alter table ts connection="sphinx://127.0.0.1:SPHINXSEARCH_PORT/test1";
+select id, w from ts where q='one';
+id w
+1 2
+drop table ts;
+create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, gid int not null, _sph_count int not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:SPHINXSEARCH_PORT/test1";
+select * from ts;
+id w q gid _sph_count
+select * from ts where q='';
+id w q gid _sph_count
+1 1 1 0
+2 1 1 0
+3 1 2 0
+4 1 2 0
+5 1 1 0
+select * from ts where q=';groupby=attr:gid';
+id w q gid _sph_count
+3 1 ;groupby=attr:gid 2 2
+1 1 ;groupby=attr:gid 1 3
+explain select * from ts where q=';groupby=attr:gid';
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE ts ref q q 257 const 3 Using where with pushed condition
+SET @save_optimizer_switch=@@optimizer_switch;
+SET optimizer_switch='index_condition_pushdown=off';
+explain select * from ts where q=';groupby=attr:gid';
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE ts ref q q 257 const 3 Using where with pushed condition
+SET optimizer_switch=@save_optimizer_switch;
+drop table ts;
+show status like "sphinx_total%";
+Variable_name Value
+Sphinx_total 2
+Sphinx_total_found 2
+show status like "sphinx_word%";
+Variable_name Value
+Sphinx_word_count 0
+Sphinx_words
+create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:SPHINXSEARCH_PORT/*";
+select * from ts where q=';filter=meta.foo_count,100';
+id w q
+1 1 ;filter=meta.foo_count,100
+select * from ts where q='test;filter=meta.sub.int,7';
+id w q
+5 1 test;filter=meta.sub.int,7
+select * from ts where q=';filter=meta.sub.list[0],4';
+id w q
+select * from ts where q=';filter=meta.sub.list[1],4';
+id w q
+5 1 ;filter=meta.sub.list[1],4
+select * from ts where q='test;range=meta.foo_count,100,500';
+id w q
+1 2 test;range=meta.foo_count,100,500
+5 1 test;range=meta.foo_count,100,500
+drop table ts;
+#
+# MDEV-19205: Sphinx unable to connect using a host name
+#
+create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://localhost:SPHINXSEARCH_PORT/*";
+select * from ts where q=';filter=meta.foo_count,100';
+id w q
+1 1 ;filter=meta.foo_count,100
+select * from ts where q='test;filter=meta.sub.int,7';
+id w q
+5 1 test;filter=meta.sub.int,7
+select * from ts where q=';filter=meta.sub.list[0],4';
+id w q
+select * from ts where q=';filter=meta.sub.list[1],4';
+id w q
+5 1 ;filter=meta.sub.list[1],4
+select * from ts where q='test;range=meta.foo_count,100,500';
+id w q
+1 2 test;range=meta.foo_count,100,500
+5 1 test;range=meta.foo_count,100,500
+drop table ts;
diff --git a/storage/sphinx/mysql-test/sphinx/sphinx.test b/storage/sphinx/mysql-test/sphinx/sphinx.test
new file mode 100644
index 00000000..b733a3fc
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/sphinx.test
@@ -0,0 +1,56 @@
+
+--replace_result $SPHINXSEARCH_PORT SPHINXSEARCH_PORT
+eval create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:$SPHINXSEARCH_PORT/*";
+select * from ts where q='test';
+drop table ts;
+
+--replace_result $SPHINXSEARCH_PORT SPHINXSEARCH_PORT
+eval create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:$SPHINXSEARCH_PORT/*";
+select * from ts where q='test;filter=gid,1;mode=extended';
+select * from ts where q='test|one;mode=extended';
+select * from ts where q='test;offset=1;limit=1';
+--replace_result $SPHINXSEARCH_PORT SPHINXSEARCH_PORT
+eval alter table ts connection="sphinx://127.0.0.1:$SPHINXSEARCH_PORT/test1";
+select id, w from ts where q='one';
+drop table ts;
+
+--replace_result $SPHINXSEARCH_PORT SPHINXSEARCH_PORT
+eval create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, gid int not null, _sph_count int not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:$SPHINXSEARCH_PORT/test1";
+select * from ts;
+select * from ts where q='';
+select * from ts where q=';groupby=attr:gid';
+explain select * from ts where q=';groupby=attr:gid';
+SET @save_optimizer_switch=@@optimizer_switch;
+SET optimizer_switch='index_condition_pushdown=off';
+explain select * from ts where q=';groupby=attr:gid';
+SET optimizer_switch=@save_optimizer_switch;
+drop table ts;
+
+#
+# Don't show sphinx error as this is different between sphinx versions
+# show status like "sphinx_error%";
+
+show status like "sphinx_total%";
+show status like "sphinx_word%";
+
+--replace_result $SPHINXSEARCH_PORT SPHINXSEARCH_PORT
+eval create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://127.0.0.1:$SPHINXSEARCH_PORT/*";
+select * from ts where q=';filter=meta.foo_count,100';
+select * from ts where q='test;filter=meta.sub.int,7';
+select * from ts where q=';filter=meta.sub.list[0],4';
+select * from ts where q=';filter=meta.sub.list[1],4';
+select * from ts where q='test;range=meta.foo_count,100,500';
+drop table ts;
+
+--echo #
+--echo # MDEV-19205: Sphinx unable to connect using a host name
+--echo #
+
+--replace_result $SPHINXSEARCH_PORT SPHINXSEARCH_PORT
+eval create table ts ( id bigint unsigned not null, w int not null, q varchar(255) not null, index(q) ) engine=sphinx connection="sphinx://localhost:$SPHINXSEARCH_PORT/*";
+select * from ts where q=';filter=meta.foo_count,100';
+select * from ts where q='test;filter=meta.sub.int,7';
+select * from ts where q=';filter=meta.sub.list[0],4';
+select * from ts where q=';filter=meta.sub.list[1],4';
+select * from ts where q='test;range=meta.foo_count,100,500';
+drop table ts;
diff --git a/storage/sphinx/mysql-test/sphinx/suite.opt b/storage/sphinx/mysql-test/sphinx/suite.opt
new file mode 100644
index 00000000..7b425b04
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/suite.opt
@@ -0,0 +1 @@
+--plugin-load-add=$HA_SPHINX_SO --sphinx
diff --git a/storage/sphinx/mysql-test/sphinx/suite.pm b/storage/sphinx/mysql-test/sphinx/suite.pm
new file mode 100644
index 00000000..e44a8e62
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/suite.pm
@@ -0,0 +1,153 @@
+package My::Suite::Sphinx;
+
+use My::SafeProcess;
+use My::File::Path;
+use mtr_report;
+
+@ISA = qw(My::Suite);
+
+############# initialization ######################
+sub locate_sphinx_binary {
+ my ($name)= @_;
+ my $res;
+ my @list= map "$_/$name", split /:/, $ENV{PATH};
+ my $env_override= $ENV{"SPHINXSEARCH_\U$name"};
+ @list= ($env_override) if $env_override;
+ for (@list) { return $_ if -x $_; }
+}
+
+# Look for Sphinx binaries
+my $exe_sphinx_indexer = &locate_sphinx_binary('indexer');
+return "'indexer' binary not found" unless $exe_sphinx_indexer;
+
+my $exe_sphinx_searchd = &locate_sphinx_binary('searchd');
+return "'searchd' binary not found" unless $exe_sphinx_searchd;
+
+my $sphinx_config= "$::opt_vardir/my_sphinx.conf";
+
+# Check for Sphinx engine
+
+return "SphinxSE not found" unless $ENV{HA_SPHINX_SO} or $::mysqld_variables{'sphinx'} eq "ON";
+
+{
+ local $_ = `"$exe_sphinx_searchd" --help`;
+ mtr_verbose("tool: $exe_sphinx_searchd\n$_");
+ my $ver = sprintf "%04d.%04d.%04d", (/([0-9]+)\.([0-9]+)(?:\.([0-9]+))?/);
+ return "Sphinx 2.0.4 or later is needed (found $ver) " unless $ver ge '0002.0000.0004';
+}
+
+############# action methods ######################
+
+sub write_sphinx_conf {
+ my ($config) = @_; # My::Config
+ my $res;
+
+ foreach my $group ($config->groups()) {
+ my $name= $group->{name};
+ # Only the ones relevant to Sphinx search.
+ next unless ($name eq 'indexer' or $name eq 'searchd' or
+ $name =~ /^(source|index) \w+$/);
+ $res .= "$name\n{\n";
+ foreach my $option ($group->options()) {
+ $res .= $option->name();
+ my $value= $option->value();
+ if (defined $value) {
+ $res .= "=$value";
+ }
+ $res .= "\n";
+ }
+ $res .= "}\n\n";
+ }
+ $res;
+}
+
+sub searchd_start {
+ my ($sphinx, $test) = @_; # My::Config::Group, My::Test
+
+ return unless $exe_sphinx_indexer and $exe_sphinx_searchd;
+ return if $sphinx->{proc}; # Already started
+
+ # First we must run the indexer to create the data.
+ my $sphinx_data_dir= "$::opt_vardir/" . $sphinx->name();
+ mkpath($sphinx_data_dir);
+ my $sphinx_log= $sphinx->value('#log-error');
+ my $sphinx_config= "$::opt_vardir/my_sphinx.conf";
+ my $cmd= "\"$exe_sphinx_indexer\" --config \"$sphinx_config\" test1 > \"$sphinx_log\" 2>&1";
+ &::mtr_verbose("cmd: $cmd");
+ system $cmd;
+
+ # Then start the searchd daemon.
+ my $args;
+ &::mtr_init_args(\$args);
+ &::mtr_add_arg($args, "--config");
+ &::mtr_add_arg($args, $sphinx_config);
+ &::mtr_add_arg($args, "--console");
+ &::mtr_add_arg($args, "--pidfile");
+
+ $sphinx->{'proc'}= My::SafeProcess->new
+ (
+ name => 'sphinx-' . $sphinx->name(),
+ path => $exe_sphinx_searchd,
+ args => \$args,
+ output => $sphinx_log,
+ error => $sphinx_log,
+ append => 1,
+ nocore => 1,
+ );
+ &::mtr_verbose("Started $sphinx->{proc}");
+}
+
+sub wait_exp_backoff {
+ my $timeout= shift; # Seconds
+ my $start_wait= shift; # Seconds
+ my $scale_factor= shift;
+
+ $searchd_status= "$exe_sphinx_searchd --status" .
+ " --config $sphinx_config > /dev/null 2>&1";
+
+ my $scale= $start_wait;
+ my $total_sleep= 0;
+ while (1) {
+ my $status = system($searchd_status);
+ if (not $status) {
+ return 0;
+ }
+ if ($total_sleep >= $timeout) {
+ last;
+ }
+
+ &::mtr_milli_sleep($scale * 1000);
+ $total_sleep+= $scale;
+ $scale*= $scale_factor;
+ }
+
+ &::mtr_warning("Getting a response from searchd timed out");
+ return 1
+}
+
+sub searchd_wait {
+ my ($sphinx) = @_; # My::Config::Group
+
+ return wait_exp_backoff(30, 0.1, 2)
+}
+
+############# declaration methods ######################
+
+sub config_files() {
+ ( 'my_sphinx.conf' => \&write_sphinx_conf )
+}
+
+sub servers {
+ ( qr/^searchd$/ => {
+ SORT => 400,
+ START => \&searchd_start,
+ WAIT => \&searchd_wait,
+ }
+ )
+}
+
+sub is_default { 0 }
+
+############# return an object ######################
+bless { };
+
diff --git a/storage/sphinx/mysql-test/sphinx/testdata.xml b/storage/sphinx/mysql-test/sphinx/testdata.xml
new file mode 100644
index 00000000..7ef05bce
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/testdata.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="utf-8"?>
+<sphinx:docset>
+
+<sphinx:schema>
+<sphinx:field name="title"/>
+<sphinx:field name="content"/>
+<sphinx:attr name="gid" type="int"/>
+<sphinx:attr name="meta" type="json"/>
+</sphinx:schema>
+
+<sphinx:document id="1">
+<title>test one</title>
+<content>this is my test document number one. also checking search within phrases.</content>
+<gid>1</gid>
+<meta>{ "foo_count": 100 }</meta>
+</sphinx:document>
+
+<sphinx:document id="2">
+<title>test two</title>
+<content>this is my test document number two</content>
+<gid>1</gid>
+</sphinx:document>
+
+<sphinx:document id="3">
+<title>another doc</title>
+<content>this is another group</content>
+<gid>2</gid>
+</sphinx:document>
+
+<sphinx:document id="4">
+<title>doc number four</title>
+<content>this is to test groups</content>
+<gid>2</gid>
+</sphinx:document>
+
+<sphinx:document id="5">
+<title>doc number five</title>
+<content>this is to test json filtering</content>
+<gid>1</gid>
+<meta>{ "foo_count": 200, "sub": { "list": [ 3, 4 ], "int": 7 } }</meta>
+</sphinx:document>
+
+</sphinx:docset>
+
diff --git a/storage/sphinx/mysql-test/sphinx/union-5539.result b/storage/sphinx/mysql-test/sphinx/union-5539.result
new file mode 100644
index 00000000..945e0141
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/union-5539.result
@@ -0,0 +1,16 @@
+create table ts (id bigint unsigned not null, w int not null, query varchar(255) not null, index(query)) engine=sphinx connection="sphinx://127.0.0.1:PORT/*";
+SELECT a.* FROM (SELECT * FROM ts si WHERE si.query=';mode=extended2;limit=1000000;maxmatches=500') AS a UNION SELECT b.* FROM (SELECT * FROM ts si WHERE si.query='@* 123nothingtofind123;mode=extended2;limit=1000000;maxmatches=500') AS b;
+id w query
+1 1 ;mode=extended2;limit=1000000;maxmatches=500
+2 1 ;mode=extended2;limit=1000000;maxmatches=500
+3 1 ;mode=extended2;limit=1000000;maxmatches=500
+4 1 ;mode=extended2;limit=1000000;maxmatches=500
+5 1 ;mode=extended2;limit=1000000;maxmatches=500
+SELECT a.* FROM (SELECT * FROM ts si WHERE si.query='@* 123nothingtofind123;mode=extended2;limit=1000000;maxmatches=500') AS a UNION SELECT b.* FROM (SELECT * FROM ts si WHERE si.query=';mode=extended2;limit=1000000;maxmatches=500') AS b;
+id w query
+1 1 ;mode=extended2;limit=1000000;maxmatches=500
+2 1 ;mode=extended2;limit=1000000;maxmatches=500
+3 1 ;mode=extended2;limit=1000000;maxmatches=500
+4 1 ;mode=extended2;limit=1000000;maxmatches=500
+5 1 ;mode=extended2;limit=1000000;maxmatches=500
+drop table ts;
diff --git a/storage/sphinx/mysql-test/sphinx/union-5539.test b/storage/sphinx/mysql-test/sphinx/union-5539.test
new file mode 100644
index 00000000..94cc2c02
--- /dev/null
+++ b/storage/sphinx/mysql-test/sphinx/union-5539.test
@@ -0,0 +1,11 @@
+#
+# MDEV-5539 Empty results in UNION with Sphinx engine
+#
+--replace_result $SPHINXSEARCH_PORT PORT
+eval create table ts (id bigint unsigned not null, w int not null, query varchar(255) not null, index(query)) engine=sphinx connection="sphinx://127.0.0.1:$SPHINXSEARCH_PORT/*";
+let $q1=SELECT * FROM ts si WHERE si.query=';mode=extended2;limit=1000000;maxmatches=500';
+let $q2=SELECT * FROM ts si WHERE si.query='@* 123nothingtofind123;mode=extended2;limit=1000000;maxmatches=500';
+eval SELECT a.* FROM ($q1) AS a UNION SELECT b.* FROM ($q2) AS b;
+eval SELECT a.* FROM ($q2) AS a UNION SELECT b.* FROM ($q1) AS b;
+drop table ts;
+
diff --git a/storage/sphinx/snippets_udf.cc b/storage/sphinx/snippets_udf.cc
new file mode 100644
index 00000000..8b87d9dc
--- /dev/null
+++ b/storage/sphinx/snippets_udf.cc
@@ -0,0 +1,825 @@
+//
+// $Id: snippets_udf.cc 4522 2014-01-30 11:00:18Z tomat $
+//
+
+//
+// Copyright (c) 2001-2014, Andrew Aksyonoff
+// Copyright (c) 2008-2014, Sphinx Technologies Inc
+// All rights reserved
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License. You should have
+// received a copy of the GPL license along with this program; if you
+// did not, you can find it at http://www.gnu.org/
+//
+
+#include <my_global.h>
+#include <string.h>
+#include <assert.h>
+
+#ifndef _WIN32
+#include <sys/un.h>
+#include <netdb.h>
+#else
+#include <winsock2.h>
+#endif
+
+#include <mysql_version.h>
+
+#if MYSQL_VERSION_ID>=50515
+#include "sql_class.h"
+#include "sql_array.h"
+#elif MYSQL_VERSION_ID>50100
+#include "mysql_priv.h"
+#include <mysql/plugin.h>
+#else
+#include "../mysql_priv.h"
+#endif
+
+#include <mysys_err.h>
+#include <my_sys.h>
+
+#if MYSQL_VERSION_ID>=50120
+typedef uchar byte;
+#endif
+
+/// partially copy-pasted stuff that should be moved elsewhere
+
+#ifdef UNALIGNED_RAM_ACCESS
+
+/// pass-through wrapper
+template < typename T > inline T sphUnalignedRead ( const T & tRef )
+{
+ return tRef;
+}
+
+/// pass-through wrapper
+template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
+{
+ *(T*)pPtr = tVal;
+}
+
+#else
+
+/// unaligned read wrapper for some architectures (eg. SPARC)
+template < typename T >
+inline T sphUnalignedRead ( const T & tRef )
+{
+ T uTmp;
+ byte * pSrc = (byte *) &tRef;
+ byte * pDst = (byte *) &uTmp;
+ for ( int i=0; i<(int)sizeof(T); i++ )
+ *pDst++ = *pSrc++;
+ return uTmp;
+}
+
+/// unaligned write wrapper for some architectures (eg. SPARC)
+template < typename T >
+void sphUnalignedWrite ( void * pPtr, const T & tVal )
+{
+ byte * pDst = (byte *) pPtr;
+ byte * pSrc = (byte *) &tVal;
+ for ( int i=0; i<(int)sizeof(T); i++ )
+ *pDst++ = *pSrc++;
+}
+
+#endif /* UNALIGNED_RAM_ACCESS */
+
+#define SPHINXSE_MAX_ALLOC (16*1024*1024)
+
+#define SafeDelete(_arg) { if ( _arg ) delete ( _arg ); (_arg) = NULL; }
+#define SafeDeleteArray(_arg) { if ( _arg ) delete [] ( _arg ); (_arg) = NULL; }
+
+#define Min(a,b) ((a)<(b)?(a):(b))
+#ifndef _WIN32
+typedef unsigned int DWORD;
+#endif
+inline DWORD sphF2DW ( float f ) { union { float f; uint32 d; } u; u.f = f; return u.d; }
+
+static char * sphDup ( const char * sSrc, int iLen=-1 )
+{
+ if ( !sSrc )
+ return NULL;
+
+ if ( iLen<0 )
+ iLen = strlen(sSrc);
+
+ char * sRes = new char [ 1+iLen ];
+ memcpy ( sRes, sSrc, iLen );
+ sRes[iLen] = '\0';
+ return sRes;
+}
+
+static inline void sphShowErrno ( const char * sCall )
+{
+ char sError[256];
+ snprintf ( sError, sizeof(sError), "%s() failed: [%d] %s", sCall, errno, strerror(errno) );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
+}
+
+static const bool sphReportErrors = true;
+
+static bool sphSend ( int iFd, const char * pBuffer, int iSize, bool bReportErrors = false )
+{
+ assert ( pBuffer );
+ assert ( iSize > 0 );
+
+ const int iResult = send ( iFd, pBuffer, iSize, 0 );
+ if ( iResult!=iSize )
+ {
+ if ( bReportErrors ) sphShowErrno("send");
+ return false;
+ }
+ return true;
+}
+
+static bool sphRecv ( int iFd, char * pBuffer, int iSize, bool bReportErrors = false )
+{
+ assert ( pBuffer );
+ assert ( iSize > 0 );
+
+ while ( iSize )
+ {
+ const int iResult = recv ( iFd, pBuffer, iSize, 0 );
+ if ( iResult > 0 )
+ {
+ iSize -= iResult;
+ pBuffer += iSize;
+ } else if ( iResult==0 )
+ {
+ if ( bReportErrors )
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "recv() failed: disconnected" );
+ return false;
+ } else
+ {
+ if ( bReportErrors ) sphShowErrno("recv");
+ return false;
+ }
+ }
+ return true;
+}
+
+enum
+{
+ SPHINX_SEARCHD_PROTO = 1,
+
+ SEARCHD_COMMAND_EXCERPT = 1,
+
+ VER_COMMAND_EXCERPT = 0x104,
+};
+
+/// known answers
+enum
+{
+ SEARCHD_OK = 0, ///< general success, command-specific reply follows
+ SEARCHD_ERROR = 1, ///< general failure, error message follows
+ SEARCHD_RETRY = 2, ///< temporary failure, error message follows, client should retry later
+ SEARCHD_WARNING = 3 ///< general success, warning message and command-specific reply follow
+};
+
+#define SPHINXSE_DEFAULT_SCHEME (char*) "sphinx"
+#define SPHINXSE_DEFAULT_HOST (char*) "127.0.0.1"
+#define SPHINXSE_DEFAULT_PORT 9312
+#define SPHINXSE_DEFAULT_INDEX (char*) "*"
+
+class CSphBuffer
+{
+private:
+ bool m_bOverrun;
+ int m_iSize;
+ int m_iLeft;
+ char * m_pBuffer;
+ char * m_pCurrent;
+
+public:
+ explicit CSphBuffer ( const int iSize )
+ : m_bOverrun ( false )
+ , m_iSize ( iSize )
+ , m_iLeft ( iSize )
+ {
+ assert ( iSize > 0 );
+ m_pBuffer = new char[iSize];
+ m_pCurrent = m_pBuffer;
+ }
+
+ ~CSphBuffer ()
+ {
+ SafeDeleteArray ( m_pBuffer );
+ }
+
+ const char * Ptr() const { return m_pBuffer; }
+
+ bool Finalize()
+ {
+ return !( m_bOverrun || m_iLeft!=0 || ( m_pCurrent - m_pBuffer )!=m_iSize );
+ }
+
+ void SendBytes ( const void * pBytes, int iBytes );
+
+ void SendWord ( short int v ) { v = ntohs(v); SendBytes ( &v, sizeof(v) ); } // NOLINT
+ void SendInt ( int v ) { v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
+ void SendDword ( DWORD v ) { v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
+ void SendUint64 ( ulonglong v ) { SendDword ( uint ( v>>32 ) ); SendDword ( uint ( v&0xFFFFFFFFUL ) ); }
+ void SendString ( const char * v ) { SendString ( v, strlen(v) ); }
+ void SendString ( const char * v, int iLen ) { SendDword(iLen); SendBytes ( v, iLen ); }
+ void SendFloat ( float v ) { SendDword ( sphF2DW(v) ); }
+};
+
+void CSphBuffer::SendBytes ( const void * pBytes, int iBytes )
+{
+ if ( m_iLeft < iBytes )
+ {
+ m_bOverrun = true;
+ return;
+ }
+
+ memcpy ( m_pCurrent, pBytes, iBytes );
+
+ m_pCurrent += iBytes;
+ m_iLeft -= iBytes;
+}
+
+struct CSphUrl
+{
+ char * m_sBuffer;
+ char * m_sFormatted;
+
+ char * m_sScheme;
+ char * m_sHost;
+ char * m_sIndex;
+
+ int m_iPort;
+
+ CSphUrl()
+ : m_sBuffer ( NULL )
+ , m_sFormatted ( NULL )
+ , m_sScheme ( SPHINXSE_DEFAULT_SCHEME )
+ , m_sHost ( SPHINXSE_DEFAULT_HOST )
+ , m_sIndex ( SPHINXSE_DEFAULT_INDEX )
+ , m_iPort ( SPHINXSE_DEFAULT_PORT )
+ {}
+
+ ~CSphUrl()
+ {
+ SafeDeleteArray ( m_sFormatted );
+ SafeDeleteArray ( m_sBuffer );
+ }
+
+ bool Parse ( const char * sUrl, int iLen );
+ int Connect();
+ const char * Format();
+};
+
+const char * CSphUrl::Format()
+{
+ if ( !m_sFormatted )
+ {
+ int iSize = 15 + strlen(m_sHost) + strlen(m_sIndex);
+ m_sFormatted = new char [ iSize ];
+ if ( m_iPort )
+ snprintf ( m_sFormatted, iSize, "inet://%s:%d/%s", m_sHost, m_iPort, m_sIndex );
+ else
+ snprintf ( m_sFormatted, iSize, "unix://%s/%s", m_sHost, m_sIndex );
+ }
+ return m_sFormatted;
+}
+
+// the following scheme variants are recognized
+//
+// inet://host/index
+// inet://host:port/index
+// unix://unix/domain/socket:index
+// unix://unix/domain/socket
+bool CSphUrl::Parse ( const char * sUrl, int iLen )
+{
+ bool bOk = true;
+ while ( iLen )
+ {
+ bOk = false;
+
+ m_sBuffer = sphDup ( sUrl, iLen );
+ m_sScheme = m_sBuffer;
+
+ m_sHost = strstr ( m_sBuffer, "://" );
+ if ( !m_sHost )
+ break;
+ m_sHost[0] = '\0';
+ m_sHost += 2;
+
+ if ( !strcmp ( m_sScheme, "unix" ) )
+ {
+ // unix-domain socket
+ m_iPort = 0;
+ if (!( m_sIndex = strrchr ( m_sHost, ':' ) ))
+ m_sIndex = SPHINXSE_DEFAULT_INDEX;
+ else
+ {
+ *m_sIndex++ = '\0';
+ if ( !*m_sIndex )
+ m_sIndex = SPHINXSE_DEFAULT_INDEX;
+ }
+ bOk = true;
+ break;
+ }
+ if ( strcmp ( m_sScheme, "sphinx" )!=0 && strcmp ( m_sScheme, "inet" )!=0 )
+ break;
+
+ // inet
+ m_sHost++;
+ char * sPort = strchr ( m_sHost, ':' );
+ if ( sPort )
+ {
+ *sPort++ = '\0';
+ if ( *sPort )
+ {
+ m_sIndex = strchr ( sPort, '/' );
+ if ( m_sIndex )
+ *m_sIndex++ = '\0';
+ else
+ m_sIndex = SPHINXSE_DEFAULT_INDEX;
+
+ m_iPort = atoi(sPort);
+ if ( !m_iPort )
+ m_iPort = SPHINXSE_DEFAULT_PORT;
+ }
+ } else
+ {
+ m_sIndex = strchr ( m_sHost, '/' );
+ if ( m_sIndex )
+ *m_sIndex++ = '\0';
+ else
+ m_sIndex = SPHINXSE_DEFAULT_INDEX;
+ }
+
+ bOk = true;
+ break;
+ }
+
+ return bOk;
+}
+
+int CSphUrl::Connect()
+{
+ struct sockaddr_in sin;
+#ifndef _WIN32
+ struct sockaddr_un saun;
+#endif
+
+ int iDomain = 0;
+ int iSockaddrSize = 0;
+ struct sockaddr * pSockaddr = NULL;
+
+ in_addr_t ip_addr;
+
+ if ( m_iPort )
+ {
+ iDomain = AF_INET;
+ iSockaddrSize = sizeof(sin);
+ pSockaddr = (struct sockaddr *) &sin;
+
+ memset ( &sin, 0, sizeof(sin) );
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons ( m_iPort );
+
+ // resolve address
+ if ( (int)( ip_addr = inet_addr ( m_sHost ) )!=(int)INADDR_NONE )
+ memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
+ else
+ {
+ int tmp_errno;
+ bool bError = false;
+
+#if MYSQL_VERSION_ID>=50515
+ struct addrinfo *hp = NULL;
+ tmp_errno = getaddrinfo ( m_sHost, NULL, NULL, &hp );
+ if ( !tmp_errno || !hp || !hp->ai_addr )
+ {
+ bError = true;
+ if ( hp )
+ freeaddrinfo ( hp );
+ }
+#else
+ struct hostent tmp_hostent, *hp;
+ char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
+ hp = my_gethostbyname_r ( m_sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
+ if ( !hp )
+ {
+ my_gethostbyname_r_free();
+ bError = true;
+ }
+#endif
+
+ if ( bError )
+ {
+ char sError[256];
+ my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", m_sHost );
+
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
+ return -1;
+ }
+
+#if MYSQL_VERSION_ID>=50515
+ memcpy ( &sin.sin_addr, hp->ai_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->ai_addrlen ) );
+ freeaddrinfo ( hp );
+#else
+ memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
+ my_gethostbyname_r_free();
+#endif
+ }
+ } else
+ {
+#ifndef _WIN32
+ iDomain = AF_UNIX;
+ iSockaddrSize = sizeof(saun);
+ pSockaddr = (struct sockaddr *) &saun;
+
+ memset ( &saun, 0, sizeof(saun) );
+ saun.sun_family = AF_UNIX;
+ strncpy ( saun.sun_path, m_sHost, sizeof(saun.sun_path)-1 );
+#else
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "Unix-domain sockets are not supported on Windows" );
+ return -1;
+#endif
+ }
+
+ // connect to searchd and exchange versions
+ uint uServerVersion;
+ uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
+ int iSocket = -1;
+ const char * pError = NULL;
+ do
+ {
+ iSocket = (int)socket ( iDomain, SOCK_STREAM, 0 );
+ if ( iSocket==-1 )
+ {
+ pError = "Failed to create client socket";
+ break;
+ }
+
+ if ( connect ( iSocket, pSockaddr, iSockaddrSize )==-1 )
+ {
+ pError = "Failed to connect to searchd";
+ break;
+ }
+
+ if ( !sphRecv ( iSocket, (char *)&uServerVersion, sizeof(uServerVersion) ) )
+ {
+ pError = "Failed to receive searchd version";
+ break;
+ }
+
+ if ( !sphSend ( iSocket, (char *)&uClientVersion, sizeof(uClientVersion) ) )
+ {
+ pError = "Failed to send client version";
+ break;
+ }
+ }
+ while(0);
+
+ // fixme: compare versions?
+
+ if ( pError )
+ {
+ char sError[1024];
+ snprintf ( sError, sizeof(sError), "%s [%d] %s", Format(), errno, strerror(errno) );
+ my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
+
+ if ( iSocket!=-1 )
+ close ( iSocket );
+
+ return -1;
+ }
+
+ return iSocket;
+}
+
+struct CSphResponse
+{
+ char * m_pBuffer;
+ char * m_pBody;
+
+ CSphResponse ()
+ : m_pBuffer ( NULL )
+ , m_pBody ( NULL )
+ {}
+
+ explicit CSphResponse ( DWORD uSize )
+ : m_pBody ( NULL )
+ {
+ m_pBuffer = new char[uSize];
+ }
+
+ ~CSphResponse ()
+ {
+ SafeDeleteArray ( m_pBuffer );
+ }
+
+ static CSphResponse * Read ( int iSocket, int iClientVersion );
+};
+
+CSphResponse *
+CSphResponse::Read ( int iSocket, int iClientVersion )
+{
+ char sHeader[8];
+ if ( !sphRecv ( iSocket, sHeader, sizeof(sHeader) ) )
+ return NULL;
+
+ int iStatus = ntohs ( sphUnalignedRead ( *(short int *) &sHeader[0] ) );
+ int iVersion = ntohs ( sphUnalignedRead ( *(short int *) &sHeader[2] ) );
+ DWORD uLength = ntohl ( sphUnalignedRead ( *(DWORD *) &sHeader[4] ) );
+
+ if ( iVersion<iClientVersion )
+ return NULL;
+
+ if ( uLength<=SPHINXSE_MAX_ALLOC )
+ {
+ CSphResponse * pResponse = new CSphResponse ( uLength );
+ if ( !sphRecv ( iSocket, pResponse->m_pBuffer, uLength ) )
+ {
+ SafeDelete ( pResponse );
+ return NULL;
+ }
+
+ pResponse->m_pBody = pResponse->m_pBuffer;
+ if ( iStatus!=SEARCHD_OK )
+ {
+ DWORD uSize = ntohl ( *(DWORD *)pResponse->m_pBuffer );
+ if ( iStatus==SEARCHD_WARNING )
+ {
+ pResponse->m_pBody += uSize; // fixme: report the warning somehow
+ } else
+ {
+ char * sMessage = sphDup ( pResponse->m_pBuffer + sizeof(DWORD), uSize );
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sMessage );
+ SafeDeleteArray ( sMessage );
+ SafeDelete ( pResponse );
+ return NULL;
+ }
+ }
+ return pResponse;
+ }
+ return NULL;
+}
+
+/// udf
+#ifdef _MSC_VER
+#define DLLEXPORT __declspec(dllexport)
+#else
+#define DLLEXPORT
+#endif
+
+extern "C"
+{
+ DLLEXPORT my_bool sphinx_snippets_init ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sMessage );
+ DLLEXPORT void sphinx_snippets_deinit ( UDF_INIT * pUDF );
+ DLLEXPORT char * sphinx_snippets ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sResult, unsigned long * pLength, char * pIsNull, char * sError );
+};
+
+#define MAX_MESSAGE_LENGTH 255
+#define MAX_RESULT_LENGTH 255
+
+struct CSphSnippets
+{
+ CSphUrl m_tUrl;
+ CSphResponse * m_pResponse;
+
+ int m_iBeforeMatch;
+ int m_iAfterMatch;
+ int m_iChunkSeparator;
+ int m_iStripMode;
+ int m_iPassageBoundary;
+ int m_iLimit;
+ int m_iLimitWords;
+ int m_iLimitPassages;
+ int m_iAround;
+ int m_iPassageId;
+ int m_iFlags;
+
+ CSphSnippets()
+ : m_pResponse(NULL)
+ , m_iBeforeMatch(0)
+ , m_iAfterMatch(0)
+ , m_iChunkSeparator(0)
+ , m_iStripMode(0)
+ , m_iPassageBoundary(0)
+ // defaults
+ , m_iLimit(256)
+ , m_iLimitWords(0)
+ , m_iLimitPassages(0)
+ , m_iAround(5)
+ , m_iPassageId(1)
+ , m_iFlags(1)
+ {
+ }
+
+ ~CSphSnippets()
+ {
+ SafeDelete ( m_pResponse );
+ }
+};
+
+#define KEYWORD(NAME) else if ( strncmp ( NAME, pArgs->attributes[i], pArgs->attribute_lengths[i] )==0 )
+
+#define CHECK_TYPE(TYPE) \
+ if ( pArgs->arg_type[i]!=TYPE ) \
+ { \
+ snprintf ( sMessage, MAX_MESSAGE_LENGTH, \
+ "%.*s argument must be a string", \
+ (int)pArgs->attribute_lengths[i], \
+ pArgs->attributes[i] ); \
+ bFail = true; \
+ break; \
+ } \
+ if ( TYPE==STRING_RESULT && !pArgs->args[i] ) \
+ { \
+ snprintf ( sMessage, MAX_MESSAGE_LENGTH, \
+ "%.*s argument must be constant (and not NULL)", \
+ (int)pArgs->attribute_lengths[i], \
+ pArgs->attributes[i] ); \
+ bFail = true; \
+ break; \
+ }
+
+#define STRING CHECK_TYPE(STRING_RESULT)
+#define INT CHECK_TYPE(INT_RESULT); int iValue =(int)*(long long *)pArgs->args[i]
+
+my_bool sphinx_snippets_init ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sMessage )
+{
+ if ( pArgs->arg_count < 3 )
+ {
+ strncpy ( sMessage, "insufficient arguments", MAX_MESSAGE_LENGTH );
+ return 1;
+ }
+
+ bool bFail = false;
+ CSphSnippets * pOpts = new CSphSnippets;
+ for ( uint i = 0; i < pArgs->arg_count; i++ )
+ {
+ if ( i < 3 )
+ {
+ if ( pArgs->arg_type[i]!=STRING_RESULT )
+ {
+ strncpy ( sMessage, "first three arguments must be of string type", MAX_MESSAGE_LENGTH );
+ bFail = true;
+ break;
+ }
+ }
+ KEYWORD("sphinx")
+ {
+ STRING;
+ if ( !pOpts->m_tUrl.Parse ( pArgs->args[i], pArgs->lengths[i] ) )
+ {
+ strncpy ( sMessage, "failed to parse connection string", MAX_MESSAGE_LENGTH );
+ bFail = true;
+ break;
+ }
+ }
+ KEYWORD("before_match") { STRING; pOpts->m_iBeforeMatch = i; }
+ KEYWORD("after_match") { STRING; pOpts->m_iAfterMatch = i; }
+ KEYWORD("chunk_separator") { STRING; pOpts->m_iChunkSeparator = i; }
+ KEYWORD("html_strip_mode") { STRING; pOpts->m_iStripMode = i; }
+ KEYWORD("passage_boundary") { STRING; pOpts->m_iPassageBoundary = i; }
+
+ KEYWORD("limit") { INT; pOpts->m_iLimit = iValue; }
+ KEYWORD("limit_words") { INT; pOpts->m_iLimitWords = iValue; }
+ KEYWORD("limit_passages") { INT; pOpts->m_iLimitPassages = iValue; }
+ KEYWORD("around") { INT; pOpts->m_iAround = iValue; }
+ KEYWORD("start_passage_id") { INT; pOpts->m_iPassageId = iValue; }
+
+ KEYWORD("exact_phrase") { INT; if ( iValue ) pOpts->m_iFlags |= 2; }
+ KEYWORD("single_passage") { INT; if ( iValue ) pOpts->m_iFlags |= 4; }
+ KEYWORD("use_boundaries") { INT; if ( iValue ) pOpts->m_iFlags |= 8; }
+ KEYWORD("weight_order") { INT; if ( iValue ) pOpts->m_iFlags |= 16; }
+ KEYWORD("query_mode") { INT; if ( iValue ) pOpts->m_iFlags |= 32; }
+ KEYWORD("force_all_words") { INT; if ( iValue ) pOpts->m_iFlags |= 64; }
+ KEYWORD("load_files") { INT; if ( iValue ) pOpts->m_iFlags |= 128; }
+ KEYWORD("allow_empty") { INT; if ( iValue ) pOpts->m_iFlags |= 256; }
+ KEYWORD("emit_zones") { INT; if ( iValue ) pOpts->m_iFlags |= 512; }
+ KEYWORD("load_files_scattered") { INT; if ( iValue ) pOpts->m_iFlags |= 1024; }
+ else
+ {
+ snprintf ( sMessage, MAX_MESSAGE_LENGTH, "unrecognized argument: %.*s",
+ (int)pArgs->attribute_lengths[i], pArgs->attributes[i] );
+ bFail = true;
+ break;
+ }
+ }
+
+ if ( bFail )
+ {
+ SafeDelete ( pOpts );
+ return 1;
+ }
+ pUDF->ptr = (char *)pOpts;
+ return 0;
+}
+
+#undef STRING
+#undef INT
+#undef KEYWORD
+#undef CHECK_TYPE
+
+#define ARG(i) pArgs->args[i], pArgs->lengths[i]
+#define ARG_LEN(VAR, LEN) ( VAR ? pArgs->lengths[VAR] : LEN )
+
+#define SEND_STRING(INDEX, DEFAULT) \
+ if ( INDEX ) \
+ tBuffer.SendString ( ARG(INDEX) ); \
+ else \
+ tBuffer.SendString ( DEFAULT, sizeof(DEFAULT) - 1 );
+
+
+char * sphinx_snippets ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sResult, unsigned long * pLength, char * pIsNull, char * pError )
+{
+ CSphSnippets * pOpts = (CSphSnippets *)pUDF->ptr;
+ assert ( pOpts );
+
+ if ( !pArgs->args[0] || !pArgs->args[1] || !pArgs->args[2] )
+ {
+ *pIsNull = 1;
+ return sResult;
+ }
+
+ const int iSize = 68 +
+ pArgs->lengths[1] + // index
+ pArgs->lengths[2] + // words
+ ARG_LEN ( pOpts->m_iBeforeMatch, 3 ) +
+ ARG_LEN ( pOpts->m_iAfterMatch, 4 ) +
+ ARG_LEN ( pOpts->m_iChunkSeparator, 5 ) +
+ ARG_LEN ( pOpts->m_iStripMode, 5 ) +
+ ARG_LEN ( pOpts->m_iPassageBoundary, 0 ) +
+ 4 + pArgs->lengths[0]; // document
+
+ CSphBuffer tBuffer(iSize);
+
+ tBuffer.SendWord ( SEARCHD_COMMAND_EXCERPT );
+ tBuffer.SendWord ( VER_COMMAND_EXCERPT );
+ tBuffer.SendDword ( iSize - 8 );
+
+ tBuffer.SendDword ( 0 );
+ tBuffer.SendDword ( pOpts->m_iFlags );
+
+ tBuffer.SendString ( ARG(1) ); // index
+ tBuffer.SendString ( ARG(2) ); // words
+
+ SEND_STRING ( pOpts->m_iBeforeMatch, "<b>" );
+ SEND_STRING ( pOpts->m_iAfterMatch, "</b>" );
+ SEND_STRING ( pOpts->m_iChunkSeparator, " ... " );
+
+ tBuffer.SendInt ( pOpts->m_iLimit );
+ tBuffer.SendInt ( pOpts->m_iAround );
+
+ tBuffer.SendInt ( pOpts->m_iLimitPassages );
+ tBuffer.SendInt ( pOpts->m_iLimitWords );
+ tBuffer.SendInt ( pOpts->m_iPassageId );
+
+ SEND_STRING ( pOpts->m_iStripMode, "index" );
+ SEND_STRING ( pOpts->m_iPassageBoundary, "" );
+
+ // single document
+ tBuffer.SendInt ( 1 );
+ tBuffer.SendString ( ARG(0) );
+
+ int iSocket = -1;
+ do
+ {
+ if ( !tBuffer.Finalize() )
+ {
+ my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: failed to build request" );
+ break;
+ }
+
+ iSocket = pOpts->m_tUrl.Connect();
+ if ( iSocket==-1 ) break;
+ if ( !sphSend ( iSocket, tBuffer.Ptr(), iSize, sphReportErrors ) ) break;
+
+ CSphResponse * pResponse = CSphResponse::Read ( iSocket, VER_COMMAND_EXCERPT );
+ if ( !pResponse ) break;
+
+ close ( iSocket );
+ pOpts->m_pResponse = pResponse;
+ *pLength = ntohl ( *(DWORD *)pResponse->m_pBody );
+ return pResponse->m_pBody + sizeof(DWORD);
+ }
+ while(0);
+
+ if ( iSocket!=-1 )
+ close ( iSocket );
+
+ *pError = 1;
+ return sResult;
+}
+
+#undef SEND_STRING
+#undef ARG_LEN
+#undef ARG
+
+void sphinx_snippets_deinit ( UDF_INIT * pUDF )
+{
+ CSphSnippets * pOpts = (CSphSnippets *)pUDF->ptr;
+ SafeDelete ( pOpts );
+}
+
+//
+// $Id: snippets_udf.cc 4522 2014-01-30 11:00:18Z tomat $
+//