diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
commit | 06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/connect/array.cpp | |
parent | Initial commit. (diff) | |
download | mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/connect/array.cpp')
-rw-r--r-- | storage/connect/array.cpp | 1151 |
1 files changed, 1151 insertions, 0 deletions
diff --git a/storage/connect/array.cpp b/storage/connect/array.cpp new file mode 100644 index 00000000..1eeb4ac0 --- /dev/null +++ b/storage/connect/array.cpp @@ -0,0 +1,1151 @@ +/************* Array C++ Functions Source Code File (.CPP) *************/ +/* Name: ARRAY.CPP Version 2.3 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2005-2019 */ +/* */ +/* This file contains the XOBJECT derived class ARRAY functions. */ +/* ARRAY is used for elaborate type of processing, such as sorting */ +/* and dichotomic search (Find). This new version does not use sub */ +/* classes anymore for the different types but relies entirely on the */ +/* functionalities provided by the VALUE and VALBLK classes. */ +/* Currently the only supported types are STRING, SHORT, int, DATE, */ +/* TOKEN, DOUBLE, and Compressed Strings. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant MariaDB header file. */ +/***********************************************************************/ +#include "my_global.h" +#include "sql_class.h" +//#include "sql_time.h" + +#if defined(_WIN32) +//#include <windows.h> +#else // !_WIN32 +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <stdint.h> // for uintprt_h +#endif // !_WIN32 + +/***********************************************************************/ +/* Include required application header files */ +/* global.h is header containing all global Plug declarations. */ +/* plgdbsem.h is header containing the DB applic. declarations. */ +/* xobject.h is header containing XOBJECT derived classes declares. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "xtable.h" +#include "array.h" +//#include "select.h" +//#include "query.h" +//#include "token.h" + +/***********************************************************************/ +/* Macro definitions. */ +/***********************************************************************/ +#if defined(_DEBUG) +#define ASSERT(B) assert(B); +#else +#define ASSERT(B) +#endif + +/***********************************************************************/ +/* DB static external variables. */ +/***********************************************************************/ +extern MBLOCK Nmblk; /* Used to initialize MBLOCK's */ + +/***********************************************************************/ +/* External functions. */ +/***********************************************************************/ +BYTE OpBmp(PGLOBAL g, OPVAL opc); +void EncodeValue(int *lp, char *strp, int n); +PARRAY MakeValueArray(PGLOBAL g, PPARM pp); // avoid gcc warning + +/***********************************************************************/ +/* MakeValueArray: Makes a value array from a value list. */ +/***********************************************************************/ +PARRAY MakeValueArray(PGLOBAL g, PPARM pp) +{ + int n, valtyp = 0; + size_t len = 0; + PARRAY par; + PPARM parmp; + + if (!pp) + return NULL; + + /*********************************************************************/ + /* New version with values coming in a list. */ + /*********************************************************************/ + if ((valtyp = pp->Type) != TYPE_STRING) + len = 1; + + xtrc(1, "valtyp=%d len=%d\n", valtyp, len); + + /*********************************************************************/ + /* Firstly check the list and count the number of values in it. */ + /*********************************************************************/ + for (n = 0, parmp = pp; parmp; n++, parmp = parmp->Next) + if (parmp->Type != valtyp) { + snprintf(g->Message, sizeof(g->Message), MSG(BAD_PARAM_TYPE), "MakeValueArray", parmp->Type); + return NULL; + } else if (valtyp == TYPE_STRING) + len = MY_MAX(len, strlen((char*)parmp->Value)); + + /*********************************************************************/ + /* Make an array object with one block of the proper size. */ + /*********************************************************************/ + par = new(g) ARRAY(g, valtyp, n, (int)len); + + if (par->GetResultType() == TYPE_ERROR) + return NULL; // Memory allocation error in ARRAY + + /*********************************************************************/ + /* All is right now, fill the array block. */ + /*********************************************************************/ + for (parmp = pp; parmp; parmp = parmp->Next) + switch (valtyp) { + case TYPE_STRING: + par->AddValue(g, (PSZ)parmp->Value); + break; + case TYPE_SHORT: + par->AddValue(g, *(short*)parmp->Value); + break; + case TYPE_INT: + par->AddValue(g, *(int*)parmp->Value); + break; + case TYPE_DOUBLE: + par->AddValue(g, *(double*)parmp->Value); + break; + case TYPE_PCHAR: + par->AddValue(g, parmp->Value); + break; + case TYPE_VOID: + // Integer stored inside pp->Value + par->AddValue(g, parmp->Intval); + break; + } // endswitch valtyp + + /*********************************************************************/ + /* Send back resulting array. */ + /*********************************************************************/ + return par; +} // end of MakeValueArray + +/* -------------------------- Class ARRAY ---------------------------- */ + +/***********************************************************************/ +/* ARRAY public constructor. */ +/***********************************************************************/ +ARRAY::ARRAY(PGLOBAL g, int type, int size, int length, int prec) + : CSORT(false) + { + Nval = 0; + Ndif = 0; + Bot = 0; + Top = 0; + Size = size; + Type = type; + Xsize = -1; + Len = 1; + X = 0; + Inf = 0; + Sup = 0; + + switch (type) { + case TYPE_STRING: + Len = length; + /* fall through */ + case TYPE_SHORT: + case TYPE_INT: + case TYPE_DOUBLE: + case TYPE_PCHAR: + Type = type; + break; + case TYPE_VOID: + Type = TYPE_INT; + break; +#if 0 + case TYPE_TOKEN: + break; + case TYPE_LIST: + Len = 0; + prec = length; + break; +#endif // 0 + default: // This is illegal an causes an ill formed array building + snprintf(g->Message, sizeof(g->Message), MSG(BAD_ARRAY_TYPE), type); + Type = TYPE_ERROR; + return; + } // endswitch type + + Valblk = new(g) MBVALS; + + if (!(Vblp = Valblk->Allocate(g, Type, Len, prec, Size))) + Type = TYPE_ERROR; + else if (!Valblk->GetMemp() && Type != TYPE_LIST) + // The error message was built by PlgDBalloc + Type = TYPE_ERROR; + else if (type != TYPE_PCHAR) + Value = AllocateValue(g, type, Len, prec); + + Constant = true; + } // end of ARRAY constructor + +#if 0 +/***********************************************************************/ +/* ARRAY public constructor from a QUERY. */ +/***********************************************************************/ +ARRAY::ARRAY(PGLOBAL g, PQUERY qryp) : CSORT(false) + { + Type = qryp->GetColType(0); + Nval = qryp->GetNblin(); + Ndif = 0; + Bot = 0; + Top = 0; + Size = Nval; + Xsize = -1; + Len = qryp->GetColLength(0); + X = Inf = Sup = 0; + Correlated = false; + + switch (Type) { + case TYPE_STRING: + case TYPE_SHORT: + case TYPE_INT: + case TYPE_DATE: + case TYPE_DOUBLE: +// case TYPE_TOKEN: +// case TYPE_LIST: +// Valblk = qryp->GetCol(0)->Result; +// Vblp = qryp->GetColBlk(0); +// Value = qryp->GetColValue(0); +// break; + default: // This is illegal an causes an ill formed array building + snprintf(g->Message, sizeof(g->Message), MSG(BAD_ARRAY_TYPE), Type); + Type = TYPE_ERROR; + } // endswitch type + + if (!Valblk || (!Valblk->GetMemp() && Type != TYPE_LIST)) + // The error message was built by ??? + Type = TYPE_ERROR; + + Constant = true; + } // end of ARRAY constructor + +/***********************************************************************/ +/* ARRAY constructor from a TYPE_LIST subarray. */ +/***********************************************************************/ +ARRAY::ARRAY(PGLOBAL g, PARRAY par, int k) : CSORT(false) + { + int prec; + LSTBLK *lp; + + if (par->Type != TYPE_LIST) { + Type = TYPE_ERROR; + return; + } // endif Type + + lp = (LSTBLK*)par->Vblp; + + Nval = par->Nval; + Ndif = 0; + Bot = 0; + Top = 0; + Size = par->Size; + Xsize = -1; + + Valblk = lp->Mbvk[k]; + Vblp = Valblk->Vblk; + Type = Vblp->GetType(); + Len = (Type == TYPE_STRING) ? Vblp->GetVlen() : 0; + prec = (Type == TYPE_FLOAT) ? 2 : 0; + Value = AllocateValue(g, Type, Len, prec, NULL); + Constant = true; + } // end of ARRAY constructor + +/***********************************************************************/ +/* Empty: reset the array for a new use (correlated queries). */ +/* Note: this is temporary as correlated queries will not use arrays */ +/* anymore with future optimized algorithms. */ +/***********************************************************************/ +void ARRAY::Empty(void) + { + assert(Correlated); + Nval = Ndif = 0; + Bot = Top = X = Inf = Sup = 0; + } // end of Empty +#endif // 0 + +/***********************************************************************/ +/* Add a string element to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, PSZ strp) +{ + if (Type != TYPE_STRING) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), GetTypeName(Type), "CHAR"); + return true; + } // endif Type + + xtrc(1, " adding string(%d): '%s'\n", Nval, strp); + Vblp->SetValue(strp, Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Add a char pointer element to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, void *p) +{ + if (Type != TYPE_PCHAR) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), GetTypeName(Type), "PCHAR"); + return true; + } // endif Type + + xtrc(1, " adding pointer(%d): %p\n", Nval, p); + Vblp->SetValue((PSZ)p, Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Add a short integer element to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, short n) +{ + if (Type != TYPE_SHORT) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), GetTypeName(Type), "SHORT"); + return true; + } // endif Type + + xtrc(1, " adding SHORT(%d): %hd\n", Nval, n); + Vblp->SetValue(n, Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Add an integer element to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, int n) +{ + if (Type != TYPE_INT) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), GetTypeName(Type), "INTEGER"); + return true; + } // endif Type + + xtrc(1, " adding int(%d): %d\n", Nval, n); + Vblp->SetValue(n, Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Add a double float element to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, double d) +{ + if (Type != TYPE_DOUBLE) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), GetTypeName(Type), "DOUBLE"); + return true; + } // endif Type + + xtrc(1, " adding float(%d): %lf\n", Nval, d); + Value->SetValue(d); + Vblp->SetValue(Value, Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Add the value of a XOBJECT block to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, PXOB xp) +{ + if (Type != xp->GetResultType()) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), + GetTypeName(xp->GetResultType()), GetTypeName(Type)); + return true; + } // endif Type + + xtrc(1, " adding (%d) from xp=%p\n", Nval, xp); + Vblp->SetValue(xp->GetValue(), Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Add a value to an array. */ +/***********************************************************************/ +bool ARRAY::AddValue(PGLOBAL g, PVAL vp) +{ + if (Type != vp->GetType()) { + snprintf(g->Message, sizeof(g->Message), MSG(ADD_BAD_TYPE), + GetTypeName(vp->GetType()), GetTypeName(Type)); + return true; + } // endif Type + + xtrc(1, " adding (%d) from vp=%p\n", Nval, vp); + Vblp->SetValue(vp, Nval++); + return false; +} // end of AddValue + +/***********************************************************************/ +/* Retrieve the nth value of the array. */ +/***********************************************************************/ +void ARRAY::GetNthValue(PVAL valp, int n) + { + valp->SetValue_pvblk(Vblp, n); + } // end of GetNthValue + +#if 0 +/***********************************************************************/ +/* Retrieve the nth subvalue of a list array. */ +/***********************************************************************/ +bool ARRAY::GetSubValue(PGLOBAL g, PVAL valp, int *kp) + { + PVBLK vblp; + + if (Type != TYPE_LIST) { + snprintf(g->Message, sizeof(g->Message), MSG(NO_SUB_VAL), Type); + return true; + } // endif Type + + vblp = ((LSTBLK*)Vblp)->Mbvk[kp[0]]->Vblk; + valp->SetValue_pvblk(vblp, kp[1]); + return false; + } // end of GetSubValue +#endif // 0 + +/***********************************************************************/ +/* Return the nth value of an integer array. */ +/***********************************************************************/ +int ARRAY::GetIntValue(int n) + { + assert (Type == TYPE_INT); + return Vblp->GetIntValue(n); + } // end of GetIntValue + +/***********************************************************************/ +/* Return the nth value of a STRING array. */ +/***********************************************************************/ +char *ARRAY::GetStringValue(int n) + { + assert (Type == TYPE_STRING || Type == TYPE_PCHAR); + return Vblp->GetCharValue(n); + } // end of GetStringValue + +/***********************************************************************/ +/* Find whether a value is in an array. */ +/* Provide a conversion limited to the Value limitation. */ +/***********************************************************************/ +bool ARRAY::Find(PVAL valp) + { + int n; + PVAL vp; + + if (Type != valp->GetType()) { + Value->SetValue_pval(valp); + vp = Value; + } else + vp = valp; + + Inf = Bot, Sup = Top; + + while (Sup - Inf > 1) { + X = (Inf + Sup) >> 1; + n = Vblp->CompVal(vp, X); + + if (n < 0) + Sup = X; + else if (n > 0) + Inf = X; + else + return true; + + } // endwhile + + return false; + } // end of Find + +/***********************************************************************/ +/* ARRAY: Compare routine for a list of values. */ +/***********************************************************************/ +BYTE ARRAY::Vcompare(PVAL vp, int n) + { + Value->SetValue_pvblk(Vblp, n); + return vp->TestValue(Value); + } // end of Vcompare + +/***********************************************************************/ +/* Test a filter condition on an array depending on operator and mod. */ +/* Modificator values are 1: ANY (or SOME) and 2: ALL. */ +/***********************************************************************/ +bool ARRAY::FilTest(PGLOBAL g, PVAL valp, OPVAL opc, int opm) + { + int i; + PVAL vp; + BYTE bt = OpBmp(g, opc); + int top = Nval - 1; + + if (top < 0) // Array is empty + // Return true for ALL because it means that there are no item that + // does not verify the condition, which is true indeed. + // Return false for ANY because true means that there is at least + // one item that verifies the condition, which is false. + return opm == 2; + + if (valp) { + if (Type != valp->GetType()) { + Value->SetValue_pval(valp); + vp = Value; + } else + vp = valp; + + } else if (opc != OP_EXIST) { + snprintf(g->Message, sizeof(g->Message), MSG(MISSING_ARG), opc); + throw (int)TYPE_ARRAY; + } else // OP_EXIST + return Nval > 0; + + if (opc == OP_IN || (opc == OP_EQ && opm == 1)) + return Find(vp); + else if (opc == OP_NE && opm == 2) + return !Find(vp); + else if (opc == OP_EQ && opm == 2) + return (Ndif == 1) ? !(Vcompare(vp, 0) & bt) : false; + else if (opc == OP_NE && opm == 1) + return (Ndif == 1) ? !(Vcompare(vp, 0) & bt) : true; + + if (Type != TYPE_LIST) { + if (opc == OP_GT || opc == OP_GE) + return !(Vcompare(vp, (opm == 1) ? 0 : top) & bt); + else + return !(Vcompare(vp, (opm == 2) ? 0 : top) & bt); + + } // endif Type + + // Case of TYPE_LIST + if (opm == 2) { + for (i = 0; i < Nval; i++) + if (Vcompare(vp, i) & bt) + return false; + + return true; + } else { // opm == 1 + for (i = 0; i < Nval; i++) + if (!(Vcompare(vp, i) & bt)) + return true; + + return false; + } // endif opm + + } // end of FilTest + +/***********************************************************************/ +/* Test whether this array can be converted to TYPE_SHORT. */ +/* Must be called after the array is sorted. */ +/***********************************************************************/ +bool ARRAY::CanBeShort(void) + { + int* To_Val = (int*)Valblk->GetMemp(); + + if (Type != TYPE_INT || !Ndif) + return false; + + // Because the array is sorted, this is true if all the array + // int values are in the range of SHORT values + return (To_Val[0] >= -32768 && To_Val[Nval-1] < 32768); + } // end of CanBeShort + +/***********************************************************************/ +/* Convert an array to new numeric type k. */ +/* Note: conversion is always made in ascending order from STRING to */ +/* short to int to double so no precision is lost in the conversion. */ +/* One exception is converting from int to short compatible arrays. */ +/***********************************************************************/ +int ARRAY::Convert(PGLOBAL g, int k, PVAL vp) + { + int i, prec = 0; + bool b = false; + PMBV ovblk = Valblk; + PVBLK ovblp = Vblp; + + Type = k; // k is the new type + Valblk = new(g) MBVALS; + + switch (Type) { + case TYPE_DOUBLE: + prec = 2; + /* fall through */ + case TYPE_SHORT: + case TYPE_INT: + case TYPE_DATE: + Len = 1; + break; + default: + snprintf(g->Message, sizeof(g->Message), MSG(BAD_CONV_TYPE), Type); + return TYPE_ERROR; + } // endswitch k + + Size = Nval; + Nval = 0; + Vblp = Valblk->Allocate(g, Type, Len, prec, Size); + + if (!Valblk->GetMemp()) + // The error message was built by PlgDBalloc + return TYPE_ERROR; + else + Value = AllocateValue(g, Type, Len, prec); + + /*********************************************************************/ + /* Converting STRING to DATE can be done according to date format. */ + /*********************************************************************/ + if (Type == TYPE_DATE && ovblp->GetType() == TYPE_STRING && vp) + { + if (((DTVAL*)Value)->SetFormat(g, vp)) + return TYPE_ERROR; + else + b = true; // Sort the new array on date internal values + } + + /*********************************************************************/ + /* Do the actual conversion. */ + /*********************************************************************/ + for (i = 0; i < Size; i++) { + Value->SetValue_pvblk(ovblp, i); + + if (AddValue(g, Value)) + return TYPE_ERROR; + + } // endfor i + + /*********************************************************************/ + /* For sorted arrays, get the initial find values. */ + /*********************************************************************/ + if (b) + Sort(g); + + ovblk->Free(); + return Type; + } // end of Convert + +/***********************************************************************/ +/* ARRAY Save: save value at i (used while rordering). */ +/***********************************************************************/ +void ARRAY::Save(int i) + { + Value->SetValue_pvblk(Vblp, i); + } // end of Save + +/***********************************************************************/ +/* ARRAY Restore: restore value to j (used while rordering). */ +/***********************************************************************/ +void ARRAY::Restore(int j) + { + Vblp->SetValue(Value, j); + } // end of Restore + +/***********************************************************************/ +/* ARRAY Move: move value from k to j (used while rordering). */ +/***********************************************************************/ +void ARRAY::Move(int j, int k) + { + Vblp->Move(k, j); // VALBLK does the opposite !!! + } // end of Move + +/***********************************************************************/ +/* ARRAY: Compare routine for one LIST value (ascending only). */ +/***********************************************************************/ +int ARRAY::Qcompare(int *i1, int *i2) + { + return Vblp->CompVal(*i1, *i2); + } // end of Qcompare + +/***********************************************************************/ +/* Mainly meant to set the character arrays case sensitiveness. */ +/***********************************************************************/ +void ARRAY::SetPrecision(PGLOBAL g, int p) + { + if (Vblp == NULL) { + strcpy(g->Message, MSG(PREC_VBLP_NULL)); + throw (int)TYPE_ARRAY; + } // endif Vblp + + bool was = Vblp->IsCi(); + + if (was && !p) { + strcpy(g->Message, MSG(BAD_SET_CASE)); + throw (int)TYPE_ARRAY; + } // endif Vblp + + if (was || !p) + return; + else + Vblp->SetPrec(p); + + if (!was && Type == TYPE_STRING) + // Must be resorted to eliminate duplicate strings + if (Sort(g)) + throw (int)TYPE_ARRAY; + + } // end of SetPrecision + +/***********************************************************************/ +/* Sort and eliminate distinct values from an array. */ +/* Note: this is done by making a sorted index on distinct values. */ +/* Returns false if Ok or true in case of error. */ +/***********************************************************************/ +bool ARRAY::Sort(PGLOBAL g) + { + int i, j, k; + + // This is to avoid multiply allocating for correlated subqueries + if (Nval > Xsize) { + if (Xsize >= 0) { + // Was already allocated + PlgDBfree(Index); + PlgDBfree(Offset); + } // endif Xsize + + // Prepare non conservative sort with offet values + Index.Size = Nval * sizeof(int); + + if (!PlgDBalloc(g, NULL, Index)) + goto error; + + Offset.Size = (Nval + 1) * sizeof(int); + + if (!PlgDBalloc(g, NULL, Offset)) + goto error; + + Xsize = Nval; + } // endif Nval + + // Call the sort program, it returns the number of distinct values + Ndif = Qsort(g, Nval); + + if (Ndif < 0) + goto error; + + // Use the sort index to reorder the data in storage so it will + // be physically sorted and Index can be removed. + for (i = 0; i < Nval; i++) { + if (Pex[i] == i || Pex[i] == Nval) + // Already placed or already moved + continue; + + Save(i); + + for (j = i;; j = k) { + k = Pex[j]; + Pex[j] = Nval; // Mark position as set + + if (k == i) { + Restore(j); + break; // end of loop + } else + Move(j, k); + + } // endfor j + + } // endfor i + + // Reduce the size of the To_Val array if Ndif < Nval + if (Ndif < Nval) { + for (i = 1; i < Ndif; i++) + if (i != Pof[i]) + break; + + for (; i < Ndif; i++) + Move(i, Pof[i]); + + Nval = Ndif; + } // endif ndif + +//if (!Correlated) { + if (Size > Nval) { + Size = Nval; + Valblk->ReAllocate(g, Size); + } // endif Size + + // Index and Offset are not used anymore + PlgDBfree(Index); + PlgDBfree(Offset); + Xsize = -1; +// } // endif Correlated + + Bot = -1; // For non optimized search + Top = Ndif; // Find searches the whole array. + return false; + + error: + Nval = Ndif = 0; + Valblk->Free(); + PlgDBfree(Index); + PlgDBfree(Offset); + return true; + } // end of Sort + +/***********************************************************************/ +/* Sort and return the sort index. */ +/* Note: This is meant if the array contains unique values. */ +/* Returns Index.Memp if Ok or NULL in case of error. */ +/***********************************************************************/ +void *ARRAY::GetSortIndex(PGLOBAL g) + { + // Prepare non conservative sort with offet values + Index.Size = Nval * sizeof(int); + + if (!PlgDBalloc(g, NULL, Index)) + goto error; + + Offset.Size = (Nval + 1) * sizeof(int); + + if (!PlgDBalloc(g, NULL, Offset)) + goto error; + + // Call the sort program, it returns the number of distinct values + Ndif = Qsort(g, Nval); + + if (Ndif < 0) + goto error; + + if (Ndif < Nval) + goto error; + + PlgDBfree(Offset); + return Index.Memp; + + error: + Nval = Ndif = 0; + Valblk->Free(); + PlgDBfree(Index); + PlgDBfree(Offset); + return NULL; + } // end of GetSortIndex + +/***********************************************************************/ +/* Block filter testing for IN operator on Column/Array operands. */ +/* Here we call Find that returns true if the value is in the array */ +/* with X equal to the index of the found value in the array, or */ +/* false if the value is not in the array with Inf and Sup being the */ +/* indexes of the array values that are immediately below and over */ +/* the not found value. This enables to restrict the array to the */ +/* values that are between the min and max block values and to return */ +/* the indication of whether the Find will be always true, always not */ +/* true or other. */ +/***********************************************************************/ +int ARRAY::BlockTest(PGLOBAL, int opc, int opm, + void *minp, void *maxp, bool s) + { + bool bin, bax, pin, pax, veq, all = (opm == 2); + + if (Ndif == 0) // Array is empty + // Return true for ALL because it means that there are no item that + // does not verify the condition, which is true indeed. + // Return false for ANY because true means that there is at least + // one item that verifies the condition, which is false. + return (all) ? 2 : -2; + else if (opc == OP_EQ && all && Ndif > 1) + return -2; + else if (opc == OP_NE && !all && Ndif > 1) + return 2; +// else if (Ndif == 1) +// all = false; + + // veq is true when all values in the block are equal + switch (Type) { + case TYPE_STRING: veq = (Vblp->IsCi()) + ? !stricmp((char*)minp, (char*)maxp) + : !strcmp((char*)minp, (char*)maxp); break; + case TYPE_SHORT: veq = *(short*)minp == *(short*)maxp; break; + case TYPE_INT: veq = *(int*)minp == *(int*)maxp; break; + case TYPE_DOUBLE: veq = *(double*)minp == *(double*)maxp; break; + default: veq = false; // Error ? + } // endswitch type + + if (!s) + Bot = -1; + + Top = Ndif; // Reset Top at top of list + Value->SetBinValue(maxp); + Top = (bax = Find(Value)) ? X + 1 : Sup; + + if (bax) { + if (opc == OP_EQ) + return (veq) ? 1 : 0; + else if (opc == OP_NE) + return (veq) ? -1 : 0; + + if (X == 0) switch (opc) { + // Max value is equal to min list value + case OP_LE: return 1; break; + case OP_LT: return (veq) ? -1 : 0; break; + case OP_GE: return (veq) ? 1 : 0; break; + case OP_GT: return -1; break; + } // endswitch opc + + pax = (opc == OP_GE) ? (X < Ndif - 1) : true; + } else if (Inf == Bot) { + // Max value is smaller than min list value + return (opc == OP_LT || opc == OP_LE || opc == OP_NE) ? 1 : -1; + } else + pax = (Sup < Ndif); // True if max value is inside the list value + + if (!veq) { + Value->SetBinValue(minp); + bin = Find(Value); + } else + bin = bax; + + Bot = (bin) ? X - 1 : Inf; + + if (bin) { + if (opc == OP_EQ || opc == OP_NE) + return 0; + + if (X == Ndif - 1) switch (opc) { + case OP_GE: return (s) ? 2 : 1; break; + case OP_GT: return (veq) ? -1 : 0; break; + case OP_LE: return (veq) ? 1 : 0; break; + case OP_LT: return (s) ? -2 : -1; break; + } // endswitch opc + + pin = (opc == OP_LE) ? (X > 0) : true; + } else if (Sup == Ndif) { + // Min value is greater than max list value + if (opc == OP_GT || opc == OP_GE || opc == OP_NE) + return (s) ? 2 : 1; + else + return (s) ? -2 : -1; + + } else + pin = (Inf >= 0); // True if min value is inside the list value + + if (Top - Bot <= 1) { + // No list item between min and max value +#if defined(_DEBUG) + assert (!bin && !bax); +#endif + switch (opc) { + case OP_EQ: return -1; break; + case OP_NE: return 1; break; + default: return (all) ? -1 : 1; break; + } // endswitch opc + + } // endif + +#if defined(_DEBUG) + assert (Ndif > 1); // if Ndif = 1 we should have returned already +#endif + + // At this point, if there are no logical errors in the algorithm, + // the only possible overlaps between the array and the block are: + // Array: +-------+ +-------+ +-------+ +-----+ + // Block: +-----+ +---+ +------+ +--------+ + // true: pax pin pax pin + if (all) switch (opc) { + case OP_GT: + case OP_GE: return (pax) ? -1 : 0; break; + case OP_LT: + case OP_LE: return (pin) ? -1 : 0; break; + } // endswitch opc + + return 0; + } // end of BlockTest + +/***********************************************************************/ +/* MakeArrayList: Makes a value list from an SQL IN array (in work). */ +/***********************************************************************/ +PSZ ARRAY::MakeArrayList(PGLOBAL g) +{ + char *p, *tp; + int i; + size_t z, len = 2; + + if (Type == TYPE_LIST) + return (PSZ)("(?" "?" "?)"); // To be implemented + + z = MY_MAX(24, GetTypeSize(Type, Len) + 4); + tp = (char*)PlugSubAlloc(g, NULL, z); + + for (i = 0; i < Nval; i++) { + Value->SetValue_pvblk(Vblp, i); + Value->Prints(g, tp, z); + len += strlen(tp); + } // enfor i + + xtrc(1, "Arraylist: len=%d\n", len); + p = (char *)PlugSubAlloc(g, NULL, len); + safe_strcpy(p, len, "("); + + for (i = 0; i < Nval;) { + Value->SetValue_pvblk(Vblp, i); + Value->Prints(g, tp, z); + safe_strcat(p, len, tp); + safe_strcat(p, len, (++i == Nval) ? ")" : ","); + } // enfor i + + xtrc(1, "Arraylist: newlen=%d\n", strlen(p)); + return p; +} // end of MakeArrayList + +/***********************************************************************/ +/* Make file output of ARRAY contents. */ +/***********************************************************************/ +void ARRAY::Printf(PGLOBAL g, FILE *f, uint n) +{ + char m[64]; + int lim = MY_MIN(Nval,10); + + memset(m, ' ', n); // Make margin string + m[n] = '\0'; + fprintf(f, "%sARRAY: type=%d\n", m, Type); + memset(m, ' ', n + 2); // Make margin string + m[n] = '\0'; + + if (Type != TYPE_LIST) { + fprintf(f, "%sblock=%p numval=%d\n", m, Valblk->GetMemp(), Nval); + + if (Vblp) + for (int i = 0; i < lim; i++) { + Value->SetValue_pvblk(Vblp, i); + Value->Printf(g, f, n+4); + } // endfor i + + } else + fprintf(f, "%sVALLST: numval=%d\n", m, Nval); + +} // end of Printf + +/***********************************************************************/ +/* Make string output of ARRAY contents. */ +/***********************************************************************/ +void ARRAY::Prints(PGLOBAL, char *ps, uint z) +{ + if (z < 16) + return; + + sprintf(ps, "ARRAY: type=%d\n", Type); + // More to be implemented later +} // end of Prints + +/* -------------------------- Class MULAR ---------------------------- */ + +/***********************************************************************/ +/* MULAR public constructor. */ +/***********************************************************************/ +MULAR::MULAR(PGLOBAL g, int n) : CSORT(false) + { + Narray = n; + Pars = (PARRAY*)PlugSubAlloc(g, NULL, n * sizeof(PARRAY)); + } // end of MULAR constructor + +/***********************************************************************/ +/* MULAR: Compare routine multiple arrays. */ +/***********************************************************************/ +int MULAR::Qcompare(int *i1, int *i2) + { + int i, n = 0; + + for (i = 0; i < Narray; i++) + if ((n = Pars[i]->Qcompare(i1, i2))) + break; + + return n; + } // end of Qcompare + +/***********************************************************************/ +/* Sort and eliminate distinct values from multiple arrays. */ +/* Note: this is done by making a sorted index on distinct values. */ +/* Returns false if Ok or true in case of error. */ +/***********************************************************************/ +bool MULAR::Sort(PGLOBAL g) + { + int i, j, k, n, nval, ndif; + + // All arrays must have the same number of values + nval = Pars[0]->Nval; + + for (n = 1; n < Narray; n++) + if (Pars[n]->Nval != nval) { + strcpy(g->Message, MSG(BAD_ARRAY_VAL)); + return true; + } // endif nval + + // Prepare non conservative sort with offet values + Index.Size = nval * sizeof(int); + + if (!PlgDBalloc(g, NULL, Index)) + goto error; + + Offset.Size = (nval + 1) * sizeof(int); + + if (!PlgDBalloc(g, NULL, Offset)) + goto error; + + // Call the sort program, it returns the number of distinct values + ndif = Qsort(g, nval); + + if (ndif < 0) + goto error; + + // Use the sort index to reorder the data in storage so it will + // be physically sorted and Index can be removed. + for (i = 0; i < nval; i++) { + if (Pex[i] == i || Pex[i] == nval) + // Already placed or already moved + continue; + + for (n = 0; n < Narray; n++) + Pars[n]->Save(i); + + for (j = i;; j = k) { + k = Pex[j]; + Pex[j] = nval; // Mark position as set + + if (k == i) { + for (n = 0; n < Narray; n++) + Pars[n]->Restore(j); + + break; // end of loop + } else + for (n = 0; n < Narray; n++) + Pars[n]->Move(j, k); + + } // endfor j + + } // endfor i + + // Reduce the size of the To_Val array if ndif < nval + if (ndif < nval) { + for (i = 1; i < ndif; i++) + if (i != Pof[i]) + break; + + for (; i < ndif; i++) + for (n = 0; n < Narray; n++) + Pars[n]->Move(i, Pof[i]); + + for (n = 0; n < Narray; n++) { + Pars[n]->Nval = ndif; + Pars[n]->Size = ndif; + Pars[n]->Valblk->ReAllocate(g, ndif); + } // endfor n + + } // endif ndif + + // Index and Offset are not used anymore + PlgDBfree(Index); + PlgDBfree(Offset); + + for (n = 0; n < Narray; n++) { + Pars[n]->Bot = -1; // For non optimized search + Pars[n]->Top = ndif; // Find searches the whole array. + } // endfor n + + return false; + + error: + PlgDBfree(Index); + PlgDBfree(Offset); + return true; + } // end of Sort |