diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
commit | 3f619478f796eddbba6e39502fe941b285dd97b1 (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/connect/cmgoconn.cpp | |
parent | Initial commit. (diff) | |
download | mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/connect/cmgoconn.cpp')
-rw-r--r-- | storage/connect/cmgoconn.cpp | 1120 |
1 files changed, 1120 insertions, 0 deletions
diff --git a/storage/connect/cmgoconn.cpp b/storage/connect/cmgoconn.cpp new file mode 100644 index 00000000..e61806d4 --- /dev/null +++ b/storage/connect/cmgoconn.cpp @@ -0,0 +1,1120 @@ +/************ CMgoConn C++ Functions Source Code File (.CPP) ***********/ +/* Name: CMgoConn.CPP Version 1.1 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 - 2021 */ +/* */ +/* This file contains the MongoDB C connection classes functions. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant MariaDB header file. */ +/***********************************************************************/ +#include <my_global.h> + +/***********************************************************************/ +/* Required objects includes. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "colblk.h" +#include "xobject.h" +#include "xtable.h" +#include "filter.h" +#include "cmgoconn.h" + +bool CMgoConn::IsInit = false; + +bool IsArray(PSZ s); +bool MakeSelector(PGLOBAL g, PFIL fp, PSTRG s); +int GetDefaultPrec(void); + +/* --------------------------- Class INCOL --------------------------- */ + +/***********************************************************************/ +/* Add a column in the column list. */ +/***********************************************************************/ +void INCOL::AddCol(PGLOBAL g, PCOL colp, char *jp) +{ + char *p; + PKC kp, kcp; + + if ((p = strchr(jp, '.'))) { + PINCOL icp; + + *p++ = 0; + + for (kp = Klist; kp; kp = kp->Next) + if (kp->Incolp && !strcmp(jp, kp->Key)) + break; + + if (!kp) { + icp = new(g) INCOL(); + kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL)); + kcp->Next = NULL; + kcp->Incolp = icp; + kcp->Colp = NULL; + kcp->Key = PlugDup(g, jp); + kcp->Array = IsArray(p); + + if (Klist) { + for (kp = Klist; kp->Next; kp = kp->Next); + + kp->Next = kcp; + } else + Klist = kcp; + + } else + icp = kp->Incolp; + + *(p - 1) = '.'; + icp->AddCol(g, colp, p); + } else { + kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL)); + + kcp->Next = NULL; + kcp->Incolp = NULL; + kcp->Colp = colp; + kcp->Key = jp; + kcp->Array = IsArray(jp); + + if (Klist) { + for (kp = Klist; kp->Next; kp = kp->Next); + + kp->Next = kcp; + } else + Klist = kcp; + + } // endif jp + +} // end of AddCol + +/***********************************************************************/ +/* Clear. */ +/***********************************************************************/ +void INCOL::Init(void) +{ + bson_init(Child); + + for (PKC kp = Klist; kp; kp = kp->Next) + if (kp->Incolp) + kp->Incolp->Init(); + +} // end of init + +/***********************************************************************/ +/* Destroy. */ +/***********************************************************************/ +void INCOL::Destroy(void) +{ + bson_destroy(Child); + + for (PKC kp = Klist; kp; kp = kp->Next) + if (kp->Incolp) + kp->Incolp->Destroy(); + +} // end of Destroy + +/* -------------------------- Class CMgoConn ------------------------- */ + +/***********************************************************************/ +/* Implementation of the CMgoConn class. */ +/***********************************************************************/ +CMgoConn::CMgoConn(PGLOBAL g, PCPARM pcg) +{ + Pcg = pcg; + Uri = NULL; +//Pool = NULL; + Client = NULL; + Database = NULL; + Collection = NULL; + Cursor = NULL; + Document = NULL; + Query = NULL; + Opts = NULL; + Fpc = NULL; + fp = NULL; + m_Connected = false; +} // end of CMgoConn standard constructor + +/***********************************************************************/ +/* Required to initialize libmongoc's internals. */ +/***********************************************************************/ +void CMgoConn::mongo_init(bool init) +{ + if (init) + mongoc_init(); + else if (IsInit) + mongoc_cleanup(); + + IsInit = init; +} // end of mongo_init + +/***********************************************************************/ +/* Connect to the MongoDB server and get the collection. */ +/***********************************************************************/ +bool CMgoConn::Connect(PGLOBAL g) +{ + if (!Pcg->Db_name || !Pcg->Coll_name) { + // This would crash in mongoc_client_get_collection + snprintf(g->Message, sizeof(g->Message), "Missing DB or collection name"); + return true; + } // endif name + + if (!IsInit) +#if defined(_WIN32) + __try { + mongo_init(true); + } __except (EXCEPTION_EXECUTE_HANDLER) { + snprintf(g->Message, sizeof(g->Message), "Cannot load MongoDB C driver"); + return true; + } // end try/except +#else // !_WIN32 + mongo_init(true); +#endif // !_WIN32 + + Uri = mongoc_uri_new_with_error(Pcg->Uristr, &Error); + + if (!Uri) { + snprintf(g->Message, sizeof(g->Message), "Failed to parse URI: \"%s\" Msg: %s", + Pcg->Uristr, Error.message); + return true; + } // endif Uri + +#if 0 + // Create a new client pool instance + Pool = mongoc_client_pool_new(Uri); + mongoc_client_pool_set_error_api(Pool, 2); + + // Register the application name so we can track it in the profile logs + // on the server. This can also be done from the URI. + mongoc_client_pool_set_appname(Pool, "Connect"); + + // Create a new client instance + Client = mongoc_client_pool_pop(Pool); +#else + // Create a new client instance + Client = mongoc_client_new_from_uri (Uri); + + if (!Client) { + snprintf(g->Message, sizeof(g->Message), "Failed to get Client"); + return true; + } // endif Client + + // Register the application name so we can track it in the profile logs + // on the server. This can also be done from the URI (see other examples). + mongoc_client_set_appname (Client, "Connect"); + + // Get a handle on the database + // Database = mongoc_client_get_database (Client, Pcg->Db_name); +#endif // 0 + + // Get a handle on the collection + Collection = mongoc_client_get_collection(Client, Pcg->Db_name, Pcg->Coll_name); + + if (!Collection) { + snprintf(g->Message, sizeof(g->Message), "Failed to get Collection %s.%s", + Pcg->Db_name, Pcg->Coll_name); + return true; + } // endif Collection + + /*********************************************************************/ + /* Link a Fblock. This make possible to automatically close it */ + /* in case of error (throw). */ + /*********************************************************************/ + PDBUSER dbuserp = (PDBUSER)g->Activityp->Aptr; + + fp = (PFBLOCK)PlugSubAlloc(g, NULL, sizeof(FBLOCK)); + fp->Type = TYPE_FB_MONGO; + fp->Fname = NULL; + fp->Next = dbuserp->Openlist; + dbuserp->Openlist = fp; + fp->Count = 1; + fp->Length = 0; + fp->Memory = NULL; + fp->Mode = MODE_ANY; + fp->File = this; + fp->Handle = 0; + + m_Connected = true; + return false; +} // end of Connect + +/***********************************************************************/ +/* CollSize: returns the number of documents in the collection. */ +/***********************************************************************/ +int CMgoConn::CollSize(PGLOBAL g) +{ + int cnt; + bson_t* query; + const char* jf = NULL; + + if (Pcg->Pipe) + return 10; + else if (Pcg->Filter) + jf = Pcg->Filter; + + if (jf) { + query = bson_new_from_json((const uint8_t*)jf, -1, &Error); + + if (!query) { + htrc("Wrong filter: %s", Error.message); + return 10; + } // endif Query + + } else + query = bson_new(); + +#if defined(DEVELOPMENT) + if (jf) + cnt = (int)mongoc_collection_count_documents(Collection, + query, NULL, NULL, NULL, &Error); + else + cnt = (int)mongoc_collection_estimated_document_count( + Collection, NULL, NULL, NULL, &Error); +#else + cnt = (int)mongoc_collection_count(Collection, + MONGOC_QUERY_NONE, query, 0, 0, NULL, &Error); +#endif + + if (cnt < 0) { + htrc("Collection count: %s", Error.message); + cnt = 2; + } // endif Cardinal + + bson_destroy(query); + return cnt; +} // end of CollSize + +/***********************************************************************/ +/* Project: make the projection avoid path collision. */ +/***********************************************************************/ +void CMgoConn::Project(PGLOBAL g, PSTRG s) +{ + bool m, b = false; + size_t n; + PSZ path; + PCOL cp; + PTDB tp = Pcg->Tdbp; + PTHP hp, php = NULL, * nphp = &php; + + for (cp = tp->GetColumns(); cp; cp = cp->GetNext()) { + path = cp->GetJpath(g, true); + + // Resolve path collision + for (hp = php; hp; hp = hp->Next) { + if (strlen(path) < strlen(hp->Path)) { + n = strlen(path); + m = true; + } else { + n = strlen(hp->Path); + m = false; + } // endif path + + if (!strncmp(path, hp->Path, n)) + break; + + } // endfor hp + + if (!hp) { + // New path + hp = (PTHP)PlugSubAlloc(g, NULL, sizeof(PTH)); + hp->Path = path; + hp->Name = cp->GetName(); + hp->Next = NULL; + *nphp = hp; + nphp = &hp->Next; + } else if (m) // Smaller path must replace longer one + hp->Path = path; + + } // endfor cp + + for (hp = php; hp; hp = hp->Next) { + if (b) + s->Append(",\""); + else + b = true; + + if (*hp->Path == '{') { + // This is a Mongo defined column + s->Append(hp->Name); + s->Append("\":"); + s->Append(hp->Path); + } else { + s->Append(hp->Path); + s->Append("\":1"); + } // endif Path + + } // endfor hp + +} // end of Project + +/***********************************************************************/ +/* MakeCursor: make the cursor used to retrieve documents. */ +/***********************************************************************/ +bool CMgoConn::MakeCursor(PGLOBAL g) +{ + const char *p; + bool id, all = false; + PCSZ options = Pcg->Options; + PTDB tp = Pcg->Tdbp; + PCOL cp; + PSTRG s = NULL; + PFIL filp = tp->GetFilter(); + + id = (tp->GetMode() == MODE_UPDATE || tp->GetMode() == MODE_DELETE); + + if (options && !stricmp(options, "all")) { + options = NULL; + all = true; + } else for (cp = tp->GetColumns(); cp && !all; cp = cp->GetNext()) + if (cp->GetFmt() && !strcmp(cp->GetFmt(), "*") && !options) + all = true; + else if (!id) + id = !strcmp(cp->GetFmt() ? cp->GetFmt() : cp->GetName(), "_id"); + + if (Pcg->Pipe) { + if (trace(1)) + htrc("Pipeline: %s\n", options); + + p = strrchr(options, ']'); + + if (!p) { + snprintf(g->Message, sizeof(g->Message), "Missing ] in pipeline"); + return true; + } else + *(char*)p = 0; + + s = new(g) STRING(g, 1023, (PSZ)options); + + if (filp) { + s->Append(",{\"$match\":"); + + if (MakeSelector(g, filp, s)) { + snprintf(g->Message, sizeof(g->Message), "Failed making selector"); + return true; + } else + s->Append('}'); + + tp->SetFilter(NULL); // Not needed anymore + } // endif To_Filter + + if (tp->GetColumns() && !strstr(s->GetStr(), "$project")) { + // Project list + s->Append(",{\"$project\":{\""); + + if (!id) + s->Append("_id\":0,\""); + + Project(g, s); + s->Append("}}"); + } // endif all + + s->Append("]}"); + s->Resize(s->GetLength() + 1); + *(char*)p = ']'; // Restore Colist for discovery + p = s->GetStr(); + + if (trace(33)) + htrc("New Pipeline: %s\n", p); + + Query = bson_new_from_json((const uint8_t *)p, -1, &Error); + + if (!Query) { + snprintf(g->Message, sizeof(g->Message), "Wrong pipeline: %s", Error.message); + return true; + } // endif Query + + Cursor = mongoc_collection_aggregate(Collection, MONGOC_QUERY_NONE, + Query, NULL, NULL); + + if (mongoc_cursor_error(Cursor, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo aggregate Failure: %s", Error.message); + return true; + } // endif error + + } else { + if (Pcg->Filter || filp) { + if (trace(1)) { + if (Pcg->Filter) + htrc("Filter: %s\n", Pcg->Filter); + + if (filp) { + char buf[512]; + + filp->Prints(g, buf, 511); + htrc("To_Filter: %s\n", buf); + } // endif To_Filter + + } // endif trace + + s = new(g) STRING(g, 1023, (PSZ)Pcg->Filter); + + if (filp) { + if (Pcg->Filter) + s->Append(','); + + if (MakeSelector(g, filp, s)) { + snprintf(g->Message, sizeof(g->Message), "Failed making selector"); + return true; + } // endif Selector + + tp->SetFilter(NULL); // Not needed anymore + } // endif To_Filter + + if (trace(33)) + htrc("selector: %s\n", s->GetStr()); + + s->Resize(s->GetLength() + 1); + Query = bson_new_from_json((const uint8_t *)s->GetStr(), -1, &Error); + + if (!Query) { + snprintf(g->Message, sizeof(g->Message), "Wrong filter: %s", Error.message); + return true; + } // endif Query + + } else + Query = bson_new(); + + if (!all) { + if (options && *options) { + if (trace(1)) + htrc("options=%s\n", options); + + p = options; + } else if (tp->GetColumns()) { + // Projection list + if (s) + s->Set("{\"projection\":{\""); + else + s = new(g) STRING(g, 511, "{\"projection\":{\""); + + if (!id) + s->Append("_id\":0,\""); + + Project(g, s); + s->Append("}}"); + s->Resize(s->GetLength() + 1); + p = s->GetStr(); + } else { + // count(*) ? + p = "{\"projection\":{\"_id\":1}}"; + } // endif Options + + Opts = bson_new_from_json((const uint8_t *)p, -1, &Error); + + if (!Opts) { + snprintf(g->Message, sizeof(g->Message), "Wrong options: %s", Error.message); + return true; + } // endif Opts + + } // endif all + + Cursor = mongoc_collection_find_with_opts(Collection, Query, Opts, NULL); + } // endif Pipe + + return false; +} // end of MakeCursor + +/***********************************************************************/ +/* Fetch next document. */ +/***********************************************************************/ +int CMgoConn::ReadNext(PGLOBAL g) +{ + int rc = RC_OK; + + if (!Cursor && MakeCursor(g)) { + rc = RC_FX; + } else if (mongoc_cursor_next(Cursor, &Document)) { + if (trace(512)) { + bson_iter_t iter; + ShowDocument(&iter, Document, ""); + } else if (trace(1)) + htrc("%s\n", GetDocument(g)); + + } else if (mongoc_cursor_error(Cursor, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo Cursor Failure: %s", Error.message); + rc = RC_FX; + } else + rc = RC_EF; + + return rc; +} // end of Fetch + +/***********************************************************************/ +/* Get the Json string of the current document. */ +/***********************************************************************/ +PSZ CMgoConn::GetDocument(PGLOBAL g) +{ + char *str = bson_as_json(Document, NULL); + PSZ doc = PlugDup(g, str); + + bson_free(str); + return doc; +} // end of GetDocument + +/***********************************************************************/ +/* Use to trace restaurants document contains. */ +/***********************************************************************/ +void CMgoConn::ShowDocument(bson_iter_t *iter, const bson_t *doc, const char *k) +{ + if (!doc || bson_iter_init(iter, doc)) { + const char *key; + + while (bson_iter_next(iter)) { + key = bson_iter_key(iter); + htrc("Found element key: \"%s\"\n", key); + + switch (bson_iter_type(iter)) { + case BSON_TYPE_UTF8: + htrc("%s.%s=\"%s\"\n", k, key, bson_iter_utf8(iter, NULL)); + break; + case BSON_TYPE_INT32: + htrc("%s.%s=%d\n", k, key, bson_iter_int32(iter)); + break; + case BSON_TYPE_INT64: + htrc("%s.%s=%lld\n", k, key, bson_iter_int64(iter)); + break; + case BSON_TYPE_DOUBLE: + htrc("%s.%s=%g\n", k, key, bson_iter_double(iter)); + break; + case BSON_TYPE_DATE_TIME: + htrc("%s.%s=date(%lld)\n", k, key, bson_iter_date_time(iter)); + break; + case BSON_TYPE_OID: { + char str[25]; + + bson_oid_to_string(bson_iter_oid(iter), str); + htrc("%s.%s=%s\n", k, key, str); + } break; + case BSON_TYPE_DECIMAL128: { + char str[BSON_DECIMAL128_STRING]; + bson_decimal128_t dec; + + bson_iter_decimal128(iter, &dec); + bson_decimal128_to_string(&dec, str); + htrc("%s.%s=%s\n", k, key, str); + } break; + case BSON_TYPE_DOCUMENT: { + bson_iter_t child; + + if (bson_iter_recurse(iter, &child)) + ShowDocument(&child, NULL, key); + + } break; + case BSON_TYPE_ARRAY: { + bson_t* arr; + bson_iter_t itar; + const uint8_t* data = NULL; + uint32_t len = 0; + + bson_iter_array(iter, &len, &data); + arr = bson_new_from_data(data, len); + ShowDocument(&itar, arr, key); + } break; + } // endswitch iter + + } // endwhile bson_iter_next + + } // endif bson_iter_init + +} // end of ShowDocument + +/***********************************************************************/ +/* Group columns for inserting or updating. */ +/***********************************************************************/ +void CMgoConn::MakeColumnGroups(PGLOBAL g) +{ + Fpc = new(g) INCOL(); + + for (PCOL colp = Pcg->Tdbp->GetColumns(); colp; colp = colp->GetNext()) + if (!colp->IsSpecial()) + Fpc->AddCol(g, colp, colp->GetJpath(g, false)); + +} // end of MakeColumnGroups + +/***********************************************************************/ +/* DocWrite. */ +/***********************************************************************/ +bool CMgoConn::DocWrite(PGLOBAL g, PINCOL icp) +{ + for (PKC kp = icp->Klist; kp; kp = kp->Next) + if (kp->Incolp) { + bool isdoc = !kp->Array; + + if (isdoc) + BSON_APPEND_DOCUMENT_BEGIN(icp->Child, kp->Key, kp->Incolp->Child); + else + BSON_APPEND_ARRAY_BEGIN(icp->Child, kp->Key, kp->Incolp->Child); + + if (DocWrite(g, kp->Incolp)) + return true; + + if (isdoc) + bson_append_document_end(icp->Child, kp->Incolp->Child); + else + bson_append_array_end(icp->Child, kp->Incolp->Child); + + } else if (AddValue(g, kp->Colp, icp->Child, kp->Key, false)) + return true; + + return false; +} // end of DocWrite + +/***********************************************************************/ +/* WriteDB: Data Base write routine for CMGO access method. */ +/***********************************************************************/ +int CMgoConn::Write(PGLOBAL g) +{ + int rc = RC_OK; + PTDB tp = Pcg->Tdbp; + + if (tp->GetMode() == MODE_INSERT) { + if (!Pcg->Line) { + Fpc->Init(); + + if (DocWrite(g, Fpc)) + return RC_FX; + + if (trace(2)) { + char* str = bson_as_json(Fpc->Child, NULL); + htrc("Inserting: %s\n", str); + bson_free(str); + } // endif trace + + if (!mongoc_collection_insert(Collection, MONGOC_INSERT_NONE, + Fpc->Child, NULL, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo insert: %s", Error.message); + rc = RC_FX; + } // endif insert + + } else { + const uint8_t* val = (const uint8_t*)Pcg->Line; + bson_t* doc = bson_new_from_json(val, -1, &Error); + + if (doc && trace(2)) { + char* str = bson_as_json(doc, NULL); + htrc("Inserting: %s\n", str); + bson_free(str); + } // endif trace + + if (!doc) { + snprintf(g->Message, sizeof(g->Message), "bson_new_from_json: %s", Error.message); + rc = RC_FX; + } else if (!mongoc_collection_insert(Collection, + MONGOC_INSERT_NONE, doc, NULL, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo insert: %s", Error.message); + bson_destroy(doc); + rc = RC_FX; + } // endif insert + + } // endif Line + + } else { + bool b = false; + bson_iter_t iter; + bson_t *query = bson_new(); + + bson_iter_init(&iter, Document); + + if (bson_iter_find(&iter, "_id")) + switch (bson_iter_type(&iter)) { + case BSON_TYPE_OID: + b = BSON_APPEND_OID(query, "_id", bson_iter_oid(&iter)); + break; + case BSON_TYPE_UTF8: + b = BSON_APPEND_UTF8(query, "_id", bson_iter_utf8(&iter, NULL)); + break; + case BSON_TYPE_INT32: + b = BSON_APPEND_INT32(query, "_id", bson_iter_int32(&iter)); + break; + case BSON_TYPE_INT64: + b = BSON_APPEND_INT64(query, "_id", bson_iter_int64(&iter)); + break; + case BSON_TYPE_DOUBLE: + b = BSON_APPEND_DOUBLE(query, "_id", bson_iter_double(&iter)); + break; + default: + break; + } // endswitch iter + + if (b) { + if (trace(2)) { + char *str = bson_as_json(query, NULL); + htrc("update query: %s\n", str); + bson_free(str); + } // endif trace + + if (tp->GetMode() == MODE_UPDATE) { + bson_t child; + bson_t *update = bson_new(); + + BSON_APPEND_DOCUMENT_BEGIN(update, "$set", &child); + + for (PCOL colp = tp->GetSetCols(); colp; colp = colp->GetNext()) + if (AddValue(g, colp, &child, colp->GetJpath(g, false), true)) + rc = RC_FX; + + bson_append_document_end(update, &child); + + if (rc == RC_OK) + if (!mongoc_collection_update(Collection, MONGOC_UPDATE_NONE, + query, update, NULL, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo update: %s", Error.message); + rc = RC_FX; + } // endif update + + bson_destroy(update); + } else if (!mongoc_collection_remove(Collection, + MONGOC_REMOVE_SINGLE_REMOVE, query, NULL, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo delete: %s", Error.message); + rc = RC_FX; + } // endif remove + + } else { + snprintf(g->Message, sizeof(g->Message), "Mongo update: cannot find _id"); + rc = RC_FX; + } // endif b + + bson_destroy(query); + } // endif Mode + + return rc; +} // end of Write + +/***********************************************************************/ +/* Remove all documents from the collection. */ +/***********************************************************************/ +bool CMgoConn::DocDelete(PGLOBAL g) +{ + Query = bson_new(); + + if (!mongoc_collection_remove(Collection, MONGOC_REMOVE_NONE, + Query, NULL, &Error)) { + snprintf(g->Message, sizeof(g->Message), "Mongo remove all: %s", Error.message); + return true; + } // endif remove + + return false; +} // end of DocDelete + +/***********************************************************************/ +/* Rewind the collection. */ +/***********************************************************************/ +void CMgoConn::Rewind(void) +{ + mongoc_cursor_t *cursor = mongoc_cursor_clone(Cursor); + + mongoc_cursor_destroy(Cursor); + Cursor = cursor; +} // end of Rewind + +/***********************************************************************/ +/* Table close routine for MONGO tables. */ +/***********************************************************************/ +void CMgoConn::Close(void) +{ + if (Query) bson_destroy(Query); + if (Opts) bson_destroy(Opts); + if (Cursor) mongoc_cursor_destroy(Cursor); + if (Collection) mongoc_collection_destroy(Collection); +//if (Client) mongoc_client_pool_push(Pool, Client); +//if (Pool) mongoc_client_pool_destroy(Pool); + if (Client) mongoc_client_destroy(Client); + if (Uri) mongoc_uri_destroy(Uri); + if (Fpc) Fpc->Destroy(); + if (fp) fp->Count = 0; +} // end of Close + +/***********************************************************************/ +/* Mini: used to suppress blanks to json strings. */ +/***********************************************************************/ +char *CMgoConn::Mini(PGLOBAL g, PCOL colp, const bson_t *bson, bool b) +{ + char *s, *str = NULL; + char *Mbuf = (char*)PlugSubAlloc(g, NULL, (size_t)colp->GetLength() + 1); + int i, j = 0, k = 0, n = 0, m = GetDefaultPrec(); + bool ok = true, dbl = false; + double d; + size_t len; + + if (b) + s = str = bson_array_as_json(bson, &len); + else + s = str = bson_as_json(bson, &len); + + if (len > (size_t)colp->GetLength()) { + snprintf(g->Message, sizeof(g->Message), "Value too long for column %s", colp->GetName()); + bson_free(str); + throw (int)TYPE_AM_MGO; + } // endif len + + for (i = 0; i < colp->GetLength() && s[i]; i++) { + switch (s[i]) { + case ' ': + if (ok) continue; + break; + case '"': + ok = !ok; + break; + case '.': + if (j) dbl = true; + break; + default: + if (ok) { + if (isdigit(s[i])) { + if (!j) j = k; + if (dbl) n++; + } else if (dbl && n > m) { + Mbuf[k] = 0; + d = atof(Mbuf + j); + n = sprintf(Mbuf + j, "%.*f", m, d); + k = j + n; + j = n = 0; + } else if (j) + j = n = 0; + + } // endif ok + + break; + } // endswitch s[i] + + Mbuf[k++] = s[i]; + } // endfor i + + bson_free(str); + + Mbuf[k] = 0; + return Mbuf; +} // end of Mini + +/***********************************************************************/ +/* Retrieve the column value from the document. */ +/***********************************************************************/ +void CMgoConn::GetColumnValue(PGLOBAL g, PCOL colp) +{ + char *jpath = colp->GetJpath(g, false); + bool b = false; + PVAL value = colp->GetValue(); + bson_iter_t Iter; // Used to retrieve column value + bson_iter_t Desc; // Descendant iter + + if (*jpath == '{') + jpath = colp->GetName(); // This is a Mongo defined column + + if (!*jpath || !strcmp(jpath, "*")) { + value->SetValue_psz(Mini(g, colp, Document, false)); + } else if (bson_iter_init(&Iter, Document) && + bson_iter_find_descendant(&Iter, jpath, &Desc)) { + switch (bson_iter_type(&Desc)) { + case BSON_TYPE_UTF8: + value->SetValue_psz((PSZ)bson_iter_utf8(&Desc, NULL)); + break; + case BSON_TYPE_INT32: + value->SetValue(bson_iter_int32(&Desc)); + break; + case BSON_TYPE_INT64: + value->SetValue(bson_iter_int64(&Desc)); + break; + case BSON_TYPE_DOUBLE: + value->SetValue(bson_iter_double(&Desc)); + break; + case BSON_TYPE_DATE_TIME: + value->SetValue(bson_iter_date_time(&Desc) / 1000); + break; + case BSON_TYPE_BOOL: + b = bson_iter_bool(&Desc); + + if (value->IsTypeNum()) + value->SetValue(b ? 1 : 0); + else + value->SetValue_psz(b ? "true" : "false"); + + break; + case BSON_TYPE_OID: { + char str[25]; + + bson_oid_to_string(bson_iter_oid(&Desc), str); + value->SetValue_psz(str); + } break; + case BSON_TYPE_ARRAY: + b = true; + // passthru + case BSON_TYPE_DOCUMENT: + { // All this because MongoDB can return the wrong type + int i = 0; + const uint8_t *data = NULL; + uint32_t len = 0; + + for (; i < 2; i++) { + if (b) // Try array first + bson_iter_array(&Desc, &len, &data); + else + bson_iter_document(&Desc, &len, &data); + + if (!data) { + len = 0; + b = !b; + } else + break; + + } // endfor i + + if (data) { + bson_t *doc = bson_new_from_data(data, len); + + value->SetValue_psz(Mini(g, colp, doc, b)); + bson_destroy(doc); + } else { + // ... or we can also come here in case of NULL! + value->Reset(); + value->SetNull(true); + } // endif data + + } break; + case BSON_TYPE_NULL: + // Apparently this does not work... + value->Reset(); + value->SetNull(true); + break; + case BSON_TYPE_DECIMAL128: { + char str[BSON_DECIMAL128_STRING]; + bson_decimal128_t dec; + + bson_iter_decimal128(&Desc, &dec); + bson_decimal128_to_string(&dec, str); + value->SetValue_psz(str); +// bson_free(str); + } break; + default: + value->Reset(); + break; + } // endswitch Desc + + } else { + // Field does not exist + value->Reset(); + value->SetNull(true); + } // endif Iter + +} // end of GetColumnValue + +/***********************************************************************/ +/* AddValue: Add column value to the document to insert or update. */ +/***********************************************************************/ +bool CMgoConn::AddValue(PGLOBAL g, PCOL colp, bson_t *doc, char *key, bool upd) +{ + bool rc = false; + PVAL value = colp->GetValue(); + + if (value->IsNull()) { +// if (upd) + rc = BSON_APPEND_NULL(doc, key); +// else +// return false; + + } else switch (colp->GetResultType()) { + case TYPE_STRING: + if (colp->Stringify()) { + const uint8_t *val = (const uint8_t*)value->GetCharValue(); + bson_t *bsn = bson_new_from_json(val, -1, &Error); + + if (!bsn) { + sprintf (g->Message, "AddValue: %s", Error.message); + return true; + } else if (*key) { + if (*val == '[') + rc = BSON_APPEND_ARRAY(doc, key, bsn); + else + rc = BSON_APPEND_DOCUMENT(doc, key, bsn); + + } else { + bson_copy_to (bsn, doc); + rc = true; + } // endif's + + bson_free(bsn); + } else + rc = BSON_APPEND_UTF8(doc, key, value->GetCharValue()); + + break; + case TYPE_INT: + case TYPE_SHORT: + rc = BSON_APPEND_INT32(doc, key, value->GetIntValue()); + break; + case TYPE_TINY: + rc = BSON_APPEND_BOOL(doc, key, value->GetIntValue()); + break; + case TYPE_BIGINT: + rc = BSON_APPEND_INT64(doc, key, value->GetBigintValue()); + break; + case TYPE_DOUBLE: + rc = BSON_APPEND_DOUBLE(doc, key, value->GetFloatValue()); + break; + case TYPE_DECIM: + {bson_decimal128_t dec; + + if (bson_decimal128_from_string(value->GetCharValue(), &dec)) + rc = BSON_APPEND_DECIMAL128(doc, key, &dec); + + } break; + case TYPE_DATE: + rc = BSON_APPEND_DATE_TIME(doc, key, value->GetBigintValue() * 1000); + break; + default: + snprintf(g->Message, sizeof(g->Message), "Type %d not supported yet", colp->GetResultType()); + return true; + } // endswitch Buf_Type + + if (!rc) { + snprintf(g->Message, sizeof(g->Message), "Adding value failed"); + return true; + } else + return false; + +} // end of AddValue + +#if 0 +void *CMgoConn::mgo_alloc(size_t n) +{ + char *mst = (char*)PlgDBSubAlloc(G, NULL, n + sizeof(size_t)); + + if (mst) { + *(size_t*)mst = n; + return mst + sizeof(size_t); + } // endif mst + + return NULL; +} // end of mgo_alloc + +void *CMgoConn::mgo_calloc(size_t n, size_t sz) +{ + void *m = mgo_alloc(n * sz); + + if (m) + memset(m, 0, n * sz); + + return m; +} // end of mgo_calloc + +void *CMgoConn::mgo_realloc(void *m, size_t n) +{ + if (!m) + return n ? mgo_alloc(n) : NULL; + + size_t *osz = (size_t*)((char*)m - sizeof(size_t)); + + if (n > *osz) { + void *nwm = mgo_alloc(n); + + if (nwm) + memcpy(nwm, m, *osz); + + return nwm; + } else { + *osz = n; + return m; + } // endif n + +} // end of mgo_realloc +#endif // 0 + |