//
// $Id: snippets_udf.cc 4522 2014-01-30 11:00:18Z tomat $
//

//
// Copyright (c) 2001-2014, Andrew Aksyonoff
// Copyright (c) 2008-2014, Sphinx Technologies Inc
// All rights reserved
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License. You should have
// received a copy of the GPL license along with this program; if you
// did not, you can find it at http://www.gnu.org/
//

#include <my_global.h>
#include <string.h>
#include <assert.h>

#ifndef _WIN32
#include <sys/un.h>
#include <netdb.h>
#else
#include <winsock2.h>
#endif

#include <mysql_version.h>

#if MYSQL_VERSION_ID>=50515
#include "sql_class.h"
#include "sql_array.h"
#elif MYSQL_VERSION_ID>50100
#include "mysql_priv.h"
#include <mysql/plugin.h>
#else
#include "../mysql_priv.h"
#endif

#include <mysys_err.h>
#include <my_sys.h>

#if MYSQL_VERSION_ID>=50120
typedef uchar byte;
#endif

/// partially copy-pasted stuff that should be moved elsewhere

#ifdef UNALIGNED_RAM_ACCESS

/// pass-through wrapper
template < typename T > inline T sphUnalignedRead ( const T & tRef )
{
	return tRef;
}

/// pass-through wrapper
template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
{
	*(T*)pPtr = tVal;
}

#else

/// unaligned read wrapper for some architectures (eg. SPARC)
template < typename T >
inline T sphUnalignedRead ( const T & tRef )
{
	T uTmp;
	byte * pSrc = (byte *) &tRef;
	byte * pDst = (byte *) &uTmp;
	for ( int i=0; i<(int)sizeof(T); i++ )
		*pDst++ = *pSrc++;
	return uTmp;
}

/// unaligned write wrapper for some architectures (eg. SPARC)
template < typename T >
void sphUnalignedWrite ( void * pPtr, const T & tVal )
{
	byte * pDst = (byte *) pPtr;
	byte * pSrc = (byte *) &tVal;
	for ( int i=0; i<(int)sizeof(T); i++ )
		*pDst++ = *pSrc++;
}

#endif /* UNALIGNED_RAM_ACCESS */

#define SPHINXSE_MAX_ALLOC			(16*1024*1024)

#define SafeDelete(_arg)		{ if ( _arg ) delete ( _arg );		(_arg) = NULL; }
#define SafeDeleteArray(_arg)	{ if ( _arg ) delete [] ( _arg );	(_arg) = NULL; }

#define Min(a,b) ((a)<(b)?(a):(b))
#ifndef _WIN32
typedef unsigned int DWORD;
#endif
inline DWORD sphF2DW ( float f ) { union { float f; uint32 d; } u; u.f = f; return u.d; }

static char * sphDup ( const char * sSrc, int iLen=-1 )
{
	if ( !sSrc )
		return NULL;

	if ( iLen<0 )
		iLen = strlen(sSrc);

	char * sRes = new char [ 1+iLen ];
	memcpy ( sRes, sSrc, iLen );
	sRes[iLen] = '\0';
	return sRes;
}

static inline void sphShowErrno ( const char * sCall )
{
	char sError[256];
	snprintf ( sError, sizeof(sError), "%s() failed: [%d] %s", sCall, errno, strerror(errno) );
	my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
}

static const bool sphReportErrors = true;

static bool sphSend ( int iFd, const char * pBuffer, int iSize, bool bReportErrors = false )
{
	assert ( pBuffer );
	assert ( iSize > 0 );

	const int iResult = send ( iFd, pBuffer, iSize, 0 );
	if ( iResult!=iSize )
	{
		if ( bReportErrors ) sphShowErrno("send");
		return false;
	}
	return true;
}

static bool sphRecv ( int iFd, char * pBuffer, int iSize, bool bReportErrors = false )
{
	assert ( pBuffer );
	assert ( iSize > 0 );

	while ( iSize )
	{
		const int iResult = recv ( iFd, pBuffer, iSize, 0 );
		if ( iResult > 0 )
		{
			iSize -= iResult;
			pBuffer += iSize;
		} else if ( iResult==0 )
		{
			if ( bReportErrors )
				my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "recv() failed: disconnected" );
			return false;
		} else
		{
			if ( bReportErrors ) sphShowErrno("recv");
			return false;
		}
	}
	return true;
}

enum
{
	SPHINX_SEARCHD_PROTO		= 1,

	SEARCHD_COMMAND_EXCERPT		= 1,

	VER_COMMAND_EXCERPT		= 0x104,
};

/// known answers
enum
{
	SEARCHD_OK		= 0,	///< general success, command-specific reply follows
	SEARCHD_ERROR	= 1,	///< general failure, error message follows
	SEARCHD_RETRY	= 2,	///< temporary failure, error message follows, client should retry later
	SEARCHD_WARNING	= 3		///< general success, warning message and command-specific reply follow
};

#define SPHINXSE_DEFAULT_SCHEME		(char*) "sphinx"
#define SPHINXSE_DEFAULT_HOST		(char*) "127.0.0.1"
#define SPHINXSE_DEFAULT_PORT		9312
#define SPHINXSE_DEFAULT_INDEX		(char*) "*"

class CSphBuffer
{
private:
	bool m_bOverrun;
	int m_iSize;
	int m_iLeft;
	char * m_pBuffer;
	char * m_pCurrent;

public:
	explicit CSphBuffer ( const int iSize )
		: m_bOverrun ( false )
		, m_iSize ( iSize )
		, m_iLeft ( iSize )
	{
		assert ( iSize > 0 );
		m_pBuffer = new char[iSize];
		m_pCurrent = m_pBuffer;
	}

	~CSphBuffer ()
	{
		SafeDeleteArray ( m_pBuffer );
	}

	const char * Ptr() const { return m_pBuffer; }

	bool Finalize()
	{
		return !( m_bOverrun || m_iLeft!=0 || ( m_pCurrent - m_pBuffer )!=m_iSize );
	}

	void SendBytes ( const void * pBytes, int iBytes );

	void SendWord ( short int v )					{ v = ntohs(v); SendBytes ( &v, sizeof(v) ); } // NOLINT
	void SendInt ( int v )							{ v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
	void SendDword ( DWORD v )						{ v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
	void SendUint64 ( ulonglong v )					{ SendDword ( uint ( v>>32 ) ); SendDword ( uint ( v&0xFFFFFFFFUL ) ); }
	void SendString ( const char * v )				{ SendString ( v, strlen(v) ); }
	void SendString ( const char * v, int iLen )	{ SendDword(iLen); SendBytes ( v, iLen ); }
	void SendFloat ( float v )						{ SendDword ( sphF2DW(v) ); }
};

void CSphBuffer::SendBytes ( const void * pBytes, int iBytes )
{
	if ( m_iLeft < iBytes )
	{
		m_bOverrun = true;
		return;
	}

	memcpy ( m_pCurrent, pBytes, iBytes );

	m_pCurrent += iBytes;
	m_iLeft -= iBytes;
}

struct CSphUrl
{
	char * m_sBuffer;
	char * m_sFormatted;

	char * m_sScheme;
	char * m_sHost;
	char * m_sIndex;

	int m_iPort;

	CSphUrl()
		: m_sBuffer ( NULL )
		, m_sFormatted ( NULL )
		, m_sScheme ( SPHINXSE_DEFAULT_SCHEME )
		, m_sHost ( SPHINXSE_DEFAULT_HOST )
		, m_sIndex ( SPHINXSE_DEFAULT_INDEX )
		, m_iPort ( SPHINXSE_DEFAULT_PORT )
	{}

	~CSphUrl()
	{
		SafeDeleteArray ( m_sFormatted );
		SafeDeleteArray ( m_sBuffer );
	}

	bool Parse ( const char * sUrl, int iLen );
	int Connect();
	const char * Format();
};

const char * CSphUrl::Format()
{
	if ( !m_sFormatted )
	{
		int iSize = 15 + strlen(m_sHost) + strlen(m_sIndex);
		m_sFormatted = new char [ iSize ];
		if ( m_iPort )
			snprintf ( m_sFormatted, iSize, "inet://%s:%d/%s", m_sHost, m_iPort, m_sIndex );
		else
			snprintf ( m_sFormatted, iSize, "unix://%s/%s", m_sHost, m_sIndex );
	}
	return m_sFormatted;
}

// the following scheme variants are recognized
//
// inet://host/index
// inet://host:port/index
// unix://unix/domain/socket:index
// unix://unix/domain/socket
bool CSphUrl::Parse ( const char * sUrl, int iLen )
{
	bool bOk = true;
	while ( iLen )
	{
		bOk = false;

		m_sBuffer = sphDup ( sUrl, iLen );
		m_sScheme = m_sBuffer;

		m_sHost = strstr ( m_sBuffer, "://" );
		if ( !m_sHost )
			break;
		m_sHost[0] = '\0';
		m_sHost += 2;

		if ( !strcmp ( m_sScheme, "unix" ) )
		{
			// unix-domain socket
			m_iPort = 0;
			if (!( m_sIndex = strrchr ( m_sHost, ':' ) ))
				m_sIndex = SPHINXSE_DEFAULT_INDEX;
			else
			{
				*m_sIndex++ = '\0';
				if ( !*m_sIndex )
					m_sIndex = SPHINXSE_DEFAULT_INDEX;
			}
			bOk = true;
			break;
		}
		if ( strcmp ( m_sScheme, "sphinx" )!=0 && strcmp ( m_sScheme, "inet" )!=0 )
			break;

		// inet
		m_sHost++;
		char * sPort = strchr ( m_sHost, ':' );
		if ( sPort )
		{
			*sPort++ = '\0';
			if ( *sPort )
			{
				m_sIndex = strchr ( sPort, '/' );
				if ( m_sIndex )
					*m_sIndex++ = '\0';
				else
					m_sIndex = SPHINXSE_DEFAULT_INDEX;

				m_iPort = atoi(sPort);
				if ( !m_iPort )
					m_iPort = SPHINXSE_DEFAULT_PORT;
			}
		} else
		{
			m_sIndex = strchr ( m_sHost, '/' );
			if ( m_sIndex )
				*m_sIndex++ = '\0';
			else
				m_sIndex = SPHINXSE_DEFAULT_INDEX;
		}

		bOk = true;
		break;
	}

	return bOk;
}

int CSphUrl::Connect()
{
	struct sockaddr_in sin;
#ifndef _WIN32
	struct sockaddr_un saun;
#endif

	int iDomain = 0;
	int iSockaddrSize = 0;
	struct sockaddr * pSockaddr = NULL;

	in_addr_t ip_addr;

	if ( m_iPort )
	{
		iDomain = AF_INET;
		iSockaddrSize = sizeof(sin);
		pSockaddr = (struct sockaddr *) &sin;

		memset ( &sin, 0, sizeof(sin) );
		sin.sin_family = AF_INET;
		sin.sin_port = htons ( m_iPort );

		// resolve address
		if ( (int)( ip_addr = inet_addr ( m_sHost ) )!=(int)INADDR_NONE )
			memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
		else
		{
			int tmp_errno;
			bool bError = false;

#if MYSQL_VERSION_ID>=50515
			struct addrinfo *hp = NULL;
			tmp_errno = getaddrinfo ( m_sHost, NULL, NULL, &hp );
			if ( !tmp_errno || !hp || !hp->ai_addr )
			{
				bError = true;
				if ( hp )
					freeaddrinfo ( hp );
			}
#else
			struct hostent tmp_hostent, *hp;
			char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
			hp = my_gethostbyname_r ( m_sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
			if ( !hp )
			{
				my_gethostbyname_r_free();
				bError = true;
			}
#endif

			if ( bError )
			{
				char sError[256];
				my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", m_sHost );

				my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
				return -1;
			}

#if MYSQL_VERSION_ID>=50515
			memcpy ( &sin.sin_addr, hp->ai_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->ai_addrlen ) );
			freeaddrinfo ( hp );
#else
			memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
			my_gethostbyname_r_free();
#endif
		}
	} else
	{
#ifndef _WIN32
		iDomain = AF_UNIX;
		iSockaddrSize = sizeof(saun);
		pSockaddr = (struct sockaddr *) &saun;

		memset ( &saun, 0, sizeof(saun) );
		saun.sun_family = AF_UNIX;
		strncpy ( saun.sun_path, m_sHost, sizeof(saun.sun_path)-1 );
#else
		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "Unix-domain sockets are not supported on Windows" );
		return -1;
#endif
	}

	// connect to searchd and exchange versions
	uint uServerVersion;
	uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
	int iSocket = -1;
	const char * pError = NULL;
	do
	{
		iSocket = (int)socket ( iDomain, SOCK_STREAM, 0 );
		if ( iSocket==-1 )
		{
			pError = "Failed to create client socket";
			break;
		}

		if ( connect ( iSocket, pSockaddr, iSockaddrSize )==-1 )
		{
			pError = "Failed to connect to searchd";
			break;
		}

		if ( !sphRecv ( iSocket, (char *)&uServerVersion, sizeof(uServerVersion) ) )
		{
			pError = "Failed to receive searchd version";
			break;
		}

		if ( !sphSend ( iSocket, (char *)&uClientVersion, sizeof(uClientVersion) ) )
		{
			pError = "Failed to send client version";
			break;
		}
	}
	while(0);

	// fixme: compare versions?

	if ( pError )
	{
		char sError[1024];
		snprintf ( sError, sizeof(sError), "%s [%d] %s", Format(), errno, strerror(errno) );
		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );

		if ( iSocket!=-1 )
			close ( iSocket );

		return -1;
	}

	return iSocket;
}

struct CSphResponse
{
	char * m_pBuffer;
	char * m_pBody;

	CSphResponse ()
		: m_pBuffer ( NULL )
		, m_pBody ( NULL )
	{}

	explicit CSphResponse ( DWORD uSize )
		: m_pBody ( NULL )
	{
		m_pBuffer = new char[uSize];
	}

	~CSphResponse ()
	{
		SafeDeleteArray ( m_pBuffer );
	}

	static CSphResponse * Read ( int iSocket, int iClientVersion );
};

CSphResponse *
CSphResponse::Read ( int iSocket, int iClientVersion )
{
	char sHeader[8];
	if ( !sphRecv ( iSocket, sHeader, sizeof(sHeader) ) )
		return NULL;

	int iStatus = ntohs ( sphUnalignedRead ( *(short int *) &sHeader[0] ) );
	int iVersion = ntohs ( sphUnalignedRead ( *(short int *) &sHeader[2] ) );
	DWORD uLength = ntohl ( sphUnalignedRead ( *(DWORD *)		&sHeader[4] ) );

	if ( iVersion<iClientVersion )
		return NULL;

	if ( uLength<=SPHINXSE_MAX_ALLOC )
	{
		CSphResponse * pResponse = new CSphResponse ( uLength );
		if ( !sphRecv ( iSocket, pResponse->m_pBuffer, uLength ) )
		{
			SafeDelete ( pResponse );
			return NULL;
		}

		pResponse->m_pBody = pResponse->m_pBuffer;
		if ( iStatus!=SEARCHD_OK )
		{
			DWORD uSize = ntohl ( *(DWORD *)pResponse->m_pBuffer );
			if ( iStatus==SEARCHD_WARNING )
			{
				pResponse->m_pBody += uSize; // fixme: report the warning somehow
			} else
			{
				char * sMessage = sphDup ( pResponse->m_pBuffer + sizeof(DWORD), uSize );
				my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sMessage );
				SafeDeleteArray ( sMessage );
				SafeDelete ( pResponse );
				return NULL;
			}
		}
		return pResponse;
	}
	return NULL;
}

/// udf
#ifdef _MSC_VER
#define DLLEXPORT __declspec(dllexport)
#else
#define DLLEXPORT 
#endif

extern "C"
{
	DLLEXPORT my_bool sphinx_snippets_init ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sMessage );
	DLLEXPORT void sphinx_snippets_deinit ( UDF_INIT * pUDF );
	DLLEXPORT char * sphinx_snippets ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sResult, unsigned long * pLength, char * pIsNull, char * sError );
};

#define MAX_MESSAGE_LENGTH 255
#define MAX_RESULT_LENGTH 255

struct CSphSnippets
{
	CSphUrl m_tUrl;
	CSphResponse * m_pResponse;

	int m_iBeforeMatch;
	int m_iAfterMatch;
	int m_iChunkSeparator;
	int m_iStripMode;
	int m_iPassageBoundary;
	int m_iLimit;
	int m_iLimitWords;
	int m_iLimitPassages;
	int m_iAround;
	int m_iPassageId;
	int m_iFlags;

	CSphSnippets()
		: m_pResponse(NULL)
		, m_iBeforeMatch(0)
		, m_iAfterMatch(0)
		, m_iChunkSeparator(0)
		, m_iStripMode(0)
		, m_iPassageBoundary(0)
		// defaults
		, m_iLimit(256)
		, m_iLimitWords(0)
		, m_iLimitPassages(0)
		, m_iAround(5)
		, m_iPassageId(1)
		, m_iFlags(1)
	{
	}

	~CSphSnippets()
	{
		SafeDelete ( m_pResponse );
	}
};

#define KEYWORD(NAME) else if ( strncmp ( NAME, pArgs->attributes[i], pArgs->attribute_lengths[i] )==0 )

#define CHECK_TYPE(TYPE)											\
	if ( pArgs->arg_type[i]!=TYPE )									\
	{																\
		snprintf ( sMessage, MAX_MESSAGE_LENGTH,					\
					"%.*s argument must be a string",				\
					(int)pArgs->attribute_lengths[i],				\
					pArgs->attributes[i] );							\
		bFail = true;												\
		break;														\
	}																\
	if ( TYPE==STRING_RESULT && !pArgs->args[i] )					\
	{																\
		snprintf ( sMessage, MAX_MESSAGE_LENGTH,					\
					"%.*s argument must be constant (and not NULL)",	\
					(int)pArgs->attribute_lengths[i],				\
					pArgs->attributes[i] );							\
		bFail = true;												\
		break;														\
	}

#define STRING CHECK_TYPE(STRING_RESULT)
#define INT CHECK_TYPE(INT_RESULT); int iValue =(int)*(long long *)pArgs->args[i]

my_bool sphinx_snippets_init ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sMessage )
{
	if ( pArgs->arg_count < 3 )
	{
		strncpy ( sMessage, "insufficient arguments", MAX_MESSAGE_LENGTH );
		return 1;
	}

	bool bFail = false;
	CSphSnippets * pOpts = new CSphSnippets;
	for ( uint i = 0; i < pArgs->arg_count; i++ )
	{
		if ( i < 3 )
		{
			if ( pArgs->arg_type[i]!=STRING_RESULT )
			{
				strncpy ( sMessage, "first three arguments must be of string type", MAX_MESSAGE_LENGTH );
				bFail = true;
				break;
			}
		}
		KEYWORD("sphinx")
		{
			STRING;
			if ( !pOpts->m_tUrl.Parse ( pArgs->args[i], pArgs->lengths[i] ) )
			{
				strncpy ( sMessage, "failed to parse connection string", MAX_MESSAGE_LENGTH );
				bFail = true;
				break;
			}
		}
		KEYWORD("before_match")		{ STRING; pOpts->m_iBeforeMatch = i; }
		KEYWORD("after_match")		{ STRING; pOpts->m_iAfterMatch = i; }
		KEYWORD("chunk_separator")	{ STRING; pOpts->m_iChunkSeparator = i; }
		KEYWORD("html_strip_mode")	{ STRING; pOpts->m_iStripMode = i; }
		KEYWORD("passage_boundary") { STRING; pOpts->m_iPassageBoundary = i; }

		KEYWORD("limit")			{ INT; pOpts->m_iLimit = iValue; }
		KEYWORD("limit_words")		{ INT; pOpts->m_iLimitWords = iValue; }
		KEYWORD("limit_passages")	{ INT; pOpts->m_iLimitPassages = iValue; }
		KEYWORD("around")			{ INT; pOpts->m_iAround = iValue; }
		KEYWORD("start_passage_id") { INT; pOpts->m_iPassageId = iValue; }

		KEYWORD("exact_phrase")		{ INT; if ( iValue ) pOpts->m_iFlags |= 2; }
		KEYWORD("single_passage")	{ INT; if ( iValue ) pOpts->m_iFlags |= 4; }
		KEYWORD("use_boundaries")	{ INT; if ( iValue ) pOpts->m_iFlags |= 8; }
		KEYWORD("weight_order")		{ INT; if ( iValue ) pOpts->m_iFlags |= 16; }
		KEYWORD("query_mode")		{ INT; if ( iValue ) pOpts->m_iFlags |= 32; }
		KEYWORD("force_all_words")	{ INT; if ( iValue ) pOpts->m_iFlags |= 64; }
		KEYWORD("load_files")		{ INT; if ( iValue ) pOpts->m_iFlags |= 128; }
		KEYWORD("allow_empty")		{ INT; if ( iValue ) pOpts->m_iFlags |= 256; }
		KEYWORD("emit_zones")		{ INT; if ( iValue ) pOpts->m_iFlags |= 512; }
		KEYWORD("load_files_scattered") { INT; if ( iValue ) pOpts->m_iFlags |= 1024; }
		else
		{
			snprintf ( sMessage, MAX_MESSAGE_LENGTH, "unrecognized argument: %.*s",
						(int)pArgs->attribute_lengths[i], pArgs->attributes[i] );
			bFail = true;
			break;
		}
	}

	if ( bFail )
	{
		SafeDelete ( pOpts );
		return 1;
	}
	pUDF->ptr = (char *)pOpts;
	return 0;
}

#undef STRING
#undef INT
#undef KEYWORD
#undef CHECK_TYPE

#define ARG(i) pArgs->args[i], pArgs->lengths[i]
#define ARG_LEN(VAR, LEN) ( VAR ? pArgs->lengths[VAR] : LEN )

#define SEND_STRING(INDEX, DEFAULT)							\
	if ( INDEX )											\
		tBuffer.SendString ( ARG(INDEX) );					\
	else													\
		tBuffer.SendString ( DEFAULT, sizeof(DEFAULT) - 1 );


char * sphinx_snippets ( UDF_INIT * pUDF, UDF_ARGS * pArgs, char * sResult, unsigned long * pLength, char * pIsNull, char * pError )
{
	CSphSnippets * pOpts = (CSphSnippets *)pUDF->ptr;
	assert ( pOpts );

	if ( !pArgs->args[0] || !pArgs->args[1] || !pArgs->args[2] )
	{
		*pIsNull = 1;
		return sResult;
	}

	const int iSize = 68 +
		pArgs->lengths[1] + // index
		pArgs->lengths[2] + // words
		ARG_LEN ( pOpts->m_iBeforeMatch, 3 ) +
		ARG_LEN ( pOpts->m_iAfterMatch, 4 ) +
		ARG_LEN ( pOpts->m_iChunkSeparator, 5 ) +
		ARG_LEN ( pOpts->m_iStripMode, 5 ) +
		ARG_LEN ( pOpts->m_iPassageBoundary, 0 ) +
		4 + pArgs->lengths[0]; // document

	CSphBuffer tBuffer(iSize);

	tBuffer.SendWord ( SEARCHD_COMMAND_EXCERPT );
	tBuffer.SendWord ( VER_COMMAND_EXCERPT );
	tBuffer.SendDword ( iSize - 8 );

	tBuffer.SendDword ( 0 );
	tBuffer.SendDword ( pOpts->m_iFlags );

	tBuffer.SendString ( ARG(1) ); // index
	tBuffer.SendString ( ARG(2) ); // words

	SEND_STRING ( pOpts->m_iBeforeMatch, "<b>" );
	SEND_STRING ( pOpts->m_iAfterMatch, "</b>" );
	SEND_STRING ( pOpts->m_iChunkSeparator, " ... " );

	tBuffer.SendInt ( pOpts->m_iLimit );
	tBuffer.SendInt ( pOpts->m_iAround );

	tBuffer.SendInt ( pOpts->m_iLimitPassages );
	tBuffer.SendInt ( pOpts->m_iLimitWords );
	tBuffer.SendInt ( pOpts->m_iPassageId );

	SEND_STRING ( pOpts->m_iStripMode, "index" );
	SEND_STRING ( pOpts->m_iPassageBoundary, "" );

	// single document
	tBuffer.SendInt ( 1 );
	tBuffer.SendString ( ARG(0) );

	int iSocket = -1;
	do
	{
		if ( !tBuffer.Finalize() )
		{
			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: failed to build request" );
			break;
		}

		iSocket = pOpts->m_tUrl.Connect();
		if ( iSocket==-1 ) break;
		if ( !sphSend ( iSocket, tBuffer.Ptr(), iSize, sphReportErrors ) ) break;

		CSphResponse * pResponse = CSphResponse::Read ( iSocket, VER_COMMAND_EXCERPT );
		if ( !pResponse ) break;

		close ( iSocket );
		pOpts->m_pResponse = pResponse;
		*pLength = ntohl ( *(DWORD *)pResponse->m_pBody );
		return pResponse->m_pBody + sizeof(DWORD);
	}
	while(0);

	if ( iSocket!=-1 )
		close ( iSocket );

	*pError = 1;
	return sResult;
}

#undef SEND_STRING
#undef ARG_LEN
#undef ARG

void sphinx_snippets_deinit ( UDF_INIT * pUDF )
{
	CSphSnippets * pOpts = (CSphSnippets *)pUDF->ptr;
	SafeDelete ( pOpts );
}

//
// $Id: snippets_udf.cc 4522 2014-01-30 11:00:18Z tomat $
//