diff options
Diffstat (limited to '')
-rw-r--r-- | src/functions.c | 1136 |
1 files changed, 1136 insertions, 0 deletions
diff --git a/src/functions.c b/src/functions.c new file mode 100644 index 0000000..2bdb586 --- /dev/null +++ b/src/functions.c @@ -0,0 +1,1136 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "functions.h" +#include "sds.h" +#include "dict.h" +#include "adlist.h" +#include "atomicvar.h" + +#define LOAD_TIMEOUT_MS 500 + +typedef enum { + restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace +} restorePolicy; + +static size_t engine_cache_memory = 0; + +/* Forward declaration */ +static void engineFunctionDispose(dict *d, void *obj); +static void engineStatsDispose(dict *d, void *obj); +static void engineLibraryDispose(dict *d, void *obj); +static int functionsVerifyName(sds name); + +typedef struct functionsLibEngineStats { + size_t n_lib; + size_t n_functions; +} functionsLibEngineStats; + +struct functionsLibCtx { + dict *libraries; /* Library name -> Library object */ + dict *functions; /* Function name -> Function object that can be used to run the function */ + size_t cache_memory; /* Overhead memory (structs, dictionaries, ..) used by all the functions */ + dict *engines_stats; /* Per engine statistics */ +}; + +typedef struct functionsLibMataData { + sds engine; + sds name; + sds code; +} functionsLibMataData; + +dictType engineDictType = { + dictSdsCaseHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +dictType functionDictType = { + dictSdsCaseHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare,/* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +dictType engineStatsDictType = { + dictSdsCaseHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare,/* key compare */ + dictSdsDestructor, /* key destructor */ + engineStatsDispose, /* val destructor */ + NULL /* allow to expand */ +}; + +dictType libraryFunctionDictType = { + dictSdsHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + engineFunctionDispose,/* val destructor */ + NULL /* allow to expand */ +}; + +dictType librariesDictType = { + dictSdsHash, /* hash function */ + dictSdsDup, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + engineLibraryDispose, /* val destructor */ + NULL /* allow to expand */ +}; + +/* Dictionary of engines */ +static dict *engines = NULL; + +/* Libraries Ctx. + * Contains the dictionary that map a library name to library object, + * Contains the dictionary that map a function name to function object, + * and the cache memory used by all the functions */ +static functionsLibCtx *curr_functions_lib_ctx = NULL; + +static size_t functionMallocSize(functionInfo *fi) { + return zmalloc_size(fi) + sdsZmallocSize(fi->name) + + (fi->desc ? sdsZmallocSize(fi->desc) : 0) + + fi->li->ei->engine->get_function_memory_overhead(fi->function); +} + +static size_t libraryMallocSize(functionLibInfo *li) { + return zmalloc_size(li) + sdsZmallocSize(li->name) + + sdsZmallocSize(li->code); +} + +static void engineStatsDispose(dict *d, void *obj) { + UNUSED(d); + functionsLibEngineStats *stats = obj; + zfree(stats); +} + +/* Dispose function memory */ +static void engineFunctionDispose(dict *d, void *obj) { + UNUSED(d); + if (!obj) { + return; + } + functionInfo *fi = obj; + sdsfree(fi->name); + if (fi->desc) { + sdsfree(fi->desc); + } + engine *engine = fi->li->ei->engine; + engine->free_function(engine->engine_ctx, fi->function); + zfree(fi); +} + +static void engineLibraryFree(functionLibInfo* li) { + if (!li) { + return; + } + dictRelease(li->functions); + sdsfree(li->name); + sdsfree(li->code); + zfree(li); +} + +static void engineLibraryDispose(dict *d, void *obj) { + UNUSED(d); + engineLibraryFree(obj); +} + +/* Clear all the functions from the given library ctx */ +void functionsLibCtxClear(functionsLibCtx *lib_ctx) { + dictEmpty(lib_ctx->functions, NULL); + dictEmpty(lib_ctx->libraries, NULL); + dictIterator *iter = dictGetIterator(lib_ctx->engines_stats); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionsLibEngineStats *stats = dictGetVal(entry); + stats->n_functions = 0; + stats->n_lib = 0; + } + dictReleaseIterator(iter); + curr_functions_lib_ctx->cache_memory = 0; +} + +void functionsLibCtxClearCurrent(int async) { + if (async) { + functionsLibCtx *old_l_ctx = curr_functions_lib_ctx; + curr_functions_lib_ctx = functionsLibCtxCreate(); + freeFunctionsAsync(old_l_ctx); + } else { + functionsLibCtxClear(curr_functions_lib_ctx); + } +} + +/* Free the given functions ctx */ +void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) { + functionsLibCtxClear(functions_lib_ctx); + dictRelease(functions_lib_ctx->functions); + dictRelease(functions_lib_ctx->libraries); + dictRelease(functions_lib_ctx->engines_stats); + zfree(functions_lib_ctx); +} + +/* Swap the current functions ctx with the given one. + * Free the old functions ctx. */ +void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) { + functionsLibCtxFree(curr_functions_lib_ctx); + curr_functions_lib_ctx = new_lib_ctx; +} + +/* return the current functions ctx */ +functionsLibCtx* functionsLibCtxGetCurrent() { + return curr_functions_lib_ctx; +} + +/* Create a new functions ctx */ +functionsLibCtx* functionsLibCtxCreate() { + functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx)); + ret->libraries = dictCreate(&librariesDictType); + ret->functions = dictCreate(&functionDictType); + ret->engines_stats = dictCreate(&engineStatsDictType); + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); + dictAdd(ret->engines_stats, ei->name, stats); + } + dictReleaseIterator(iter); + ret->cache_memory = 0; + return ret; +} + +/* + * Creating a function inside the given library. + * On success, return C_OK. + * On error, return C_ERR and set err output parameter with a relevant error message. + * + * Note: the code assumes 'name' is NULL terminated but not require it to be binary safe. + * the function will verify that the given name is following the naming format + * and return an error if its not. + */ +int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err) { + if (functionsVerifyName(name) != C_OK) { + *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long"); + return C_ERR; + } + + if (dictFetchValue(li->functions, name)) { + *err = sdsnew("Function already exists in the library"); + return C_ERR; + } + + functionInfo *fi = zmalloc(sizeof(*fi)); + *fi = (functionInfo) { + .name = name, + .function = function, + .li = li, + .desc = desc, + .f_flags = f_flags, + }; + + int res = dictAdd(li->functions, fi->name, fi); + serverAssert(res == DICT_OK); + + return C_OK; +} + +static functionLibInfo* engineLibraryCreate(sds name, engineInfo *ei, sds code) { + functionLibInfo *li = zmalloc(sizeof(*li)); + *li = (functionLibInfo) { + .name = sdsdup(name), + .functions = dictCreate(&libraryFunctionDictType), + .ei = ei, + .code = sdsdup(code), + }; + return li; +} + +static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo* li) { + dictIterator *iter = dictGetIterator(li->functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + int ret = dictDelete(lib_ctx->functions, fi->name); + serverAssert(ret == DICT_OK); + lib_ctx->cache_memory -= functionMallocSize(fi); + } + dictReleaseIterator(iter); + entry = dictUnlink(lib_ctx->libraries, li->name); + dictSetVal(lib_ctx->libraries, entry, NULL); + dictFreeUnlinkedEntry(lib_ctx->libraries, entry); + lib_ctx->cache_memory -= libraryMallocSize(li); + + /* update stats */ + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); + serverAssert(stats); + stats->n_lib--; + stats->n_functions -= dictSize(li->functions); +} + +static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo* li) { + dictIterator *iter = dictGetIterator(li->functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + dictAdd(lib_ctx->functions, fi->name, fi); + lib_ctx->cache_memory += functionMallocSize(fi); + } + dictReleaseIterator(iter); + + dictAdd(lib_ctx->libraries, li->name, li); + lib_ctx->cache_memory += libraryMallocSize(li); + + /* update stats */ + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); + serverAssert(stats); + stats->n_lib++; + stats->n_functions += dictSize(li->functions); +} + +/* Takes all libraries from lib_ctx_src and add to lib_ctx_dst. + * On collision, if 'replace' argument is true, replace the existing library with the new one. + * Otherwise abort and leave 'lib_ctx_dst' and 'lib_ctx_src' untouched. + * Return C_OK on success and C_ERR if aborted. If C_ERR is returned, set a relevant + * error message on the 'err' out parameter. + * */ +static int libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_lib_ctx_src, int replace, sds *err) { + int ret = C_ERR; + dictIterator *iter = NULL; + /* Stores the libraries we need to replace in case a revert is required. + * Only initialized when needed */ + list *old_libraries_list = NULL; + dictEntry *entry = NULL; + iter = dictGetIterator(functions_lib_ctx_src->libraries); + while ((entry = dictNext(iter))) { + functionLibInfo *li = dictGetVal(entry); + functionLibInfo *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name); + if (old_li) { + if (!replace) { + /* library already exists, failed the restore. */ + *err = sdscatfmt(sdsempty(), "Library %s already exists", li->name); + goto done; + } else { + if (!old_libraries_list) { + old_libraries_list = listCreate(); + listSetFreeMethod(old_libraries_list, (void (*)(void*))engineLibraryFree); + } + libraryUnlink(functions_lib_ctx_dst, old_li); + listAddNodeTail(old_libraries_list, old_li); + } + } + } + dictReleaseIterator(iter); + iter = NULL; + + /* Make sure no functions collision */ + iter = dictGetIterator(functions_lib_ctx_src->functions); + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + if (dictFetchValue(functions_lib_ctx_dst->functions, fi->name)) { + *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); + goto done; + } + } + dictReleaseIterator(iter); + iter = NULL; + + /* No collision, it is safe to link all the new libraries. */ + iter = dictGetIterator(functions_lib_ctx_src->libraries); + while ((entry = dictNext(iter))) { + functionLibInfo *li = dictGetVal(entry); + libraryLink(functions_lib_ctx_dst, li); + dictSetVal(functions_lib_ctx_src->libraries, entry, NULL); + } + dictReleaseIterator(iter); + iter = NULL; + + functionsLibCtxClear(functions_lib_ctx_src); + if (old_libraries_list) { + listRelease(old_libraries_list); + old_libraries_list = NULL; + } + ret = C_OK; + +done: + if (iter) dictReleaseIterator(iter); + if (old_libraries_list) { + /* Link back all libraries on tmp_l_ctx */ + while (listLength(old_libraries_list) > 0) { + listNode *head = listFirst(old_libraries_list); + functionLibInfo *li = listNodeValue(head); + listNodeValue(head) = NULL; + libraryLink(functions_lib_ctx_dst, li); + listDelNode(old_libraries_list, head); + } + listRelease(old_libraries_list); + } + return ret; +} + +/* Register an engine, should be called once by the engine on startup and give the following: + * + * - engine_name - name of the engine to register + * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */ +int functionsRegisterEngine(const char *engine_name, engine *engine) { + sds engine_name_sds = sdsnew(engine_name); + if (dictFetchValue(engines, engine_name_sds)) { + serverLog(LL_WARNING, "Same engine was registered twice"); + sdsfree(engine_name_sds); + return C_ERR; + } + + client *c = createClient(NULL); + c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT); + engineInfo *ei = zmalloc(sizeof(*ei)); + *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,}; + + dictAdd(engines, engine_name_sds, ei); + + engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) + + zmalloc_size(engine) + + engine->get_engine_memory_overhead(engine->engine_ctx); + + return C_OK; +} + +/* + * FUNCTION STATS + */ +void functionStatsCommand(client *c) { + if (scriptIsRunning() && scriptIsEval()) { + addReplyErrorObject(c, shared.slowevalerr); + return; + } + + addReplyMapLen(c, 2); + + addReplyBulkCString(c, "running_script"); + if (!scriptIsRunning()) { + addReplyNull(c); + } else { + addReplyMapLen(c, 3); + addReplyBulkCString(c, "name"); + addReplyBulkCString(c, scriptCurrFunction()); + addReplyBulkCString(c, "command"); + client *script_client = scriptGetCaller(); + addReplyArrayLen(c, script_client->argc); + for (int i = 0 ; i < script_client->argc ; ++i) { + addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr)); + } + addReplyBulkCString(c, "duration_ms"); + addReplyLongLong(c, scriptRunDuration()); + } + + addReplyBulkCString(c, "engines"); + addReplyMapLen(c, dictSize(engines)); + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + addReplyBulkCString(c, ei->name); + addReplyMapLen(c, 2); + functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name); + addReplyBulkCString(c, "libraries_count"); + addReplyLongLong(c, e_stats->n_lib); + addReplyBulkCString(c, "functions_count"); + addReplyLongLong(c, e_stats->n_functions); + } + dictReleaseIterator(iter); +} + +static void functionListReplyFlags(client *c, functionInfo *fi) { + /* First count the number of flags we have */ + int flagcount = 0; + for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) { + if (fi->f_flags & flag->flag) { + ++flagcount; + } + } + + addReplySetLen(c, flagcount); + + for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) { + if (fi->f_flags & flag->flag) { + addReplyStatus(c, flag->str); + } + } +} + +/* + * FUNCTION LIST [LIBRARYNAME PATTERN] [WITHCODE] + * + * Return general information about all the libraries: + * * Library name + * * The engine used to run the Library + * * Library description + * * Functions list + * * Library code (if WITHCODE is given) + * + * It is also possible to given library name pattern using + * LIBRARYNAME argument, if given, return only libraries + * that matches the given pattern. + */ +void functionListCommand(client *c) { + int with_code = 0; + sds library_name = NULL; + for (int i = 2 ; i < c->argc ; ++i) { + robj *next_arg = c->argv[i]; + if (!with_code && !strcasecmp(next_arg->ptr, "withcode")) { + with_code = 1; + continue; + } + if (!library_name && !strcasecmp(next_arg->ptr, "libraryname")) { + if (i >= c->argc - 1) { + addReplyError(c, "library name argument was not given"); + return; + } + library_name = c->argv[++i]->ptr; + continue; + } + addReplyErrorSds(c, sdscatfmt(sdsempty(), "Unknown argument %s", next_arg->ptr)); + return; + } + size_t reply_len = 0; + void *len_ptr = NULL; + if (library_name) { + len_ptr = addReplyDeferredLen(c); + } else { + /* If no pattern is asked we know the reply len and we can just set it */ + addReplyArrayLen(c, dictSize(curr_functions_lib_ctx->libraries)); + } + dictIterator *iter = dictGetIterator(curr_functions_lib_ctx->libraries); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionLibInfo *li = dictGetVal(entry); + if (library_name) { + if (!stringmatchlen(library_name, sdslen(library_name), li->name, sdslen(li->name), 1)) { + continue; + } + } + ++reply_len; + addReplyMapLen(c, with_code? 4 : 3); + addReplyBulkCString(c, "library_name"); + addReplyBulkCBuffer(c, li->name, sdslen(li->name)); + addReplyBulkCString(c, "engine"); + addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name)); + + addReplyBulkCString(c, "functions"); + addReplyArrayLen(c, dictSize(li->functions)); + dictIterator *functions_iter = dictGetIterator(li->functions); + dictEntry *function_entry = NULL; + while ((function_entry = dictNext(functions_iter))) { + functionInfo *fi = dictGetVal(function_entry); + addReplyMapLen(c, 3); + addReplyBulkCString(c, "name"); + addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); + addReplyBulkCString(c, "description"); + if (fi->desc) { + addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); + } else { + addReplyNull(c); + } + addReplyBulkCString(c, "flags"); + functionListReplyFlags(c, fi); + } + dictReleaseIterator(functions_iter); + + if (with_code) { + addReplyBulkCString(c, "library_code"); + addReplyBulkCBuffer(c, li->code, sdslen(li->code)); + } + } + dictReleaseIterator(iter); + if (len_ptr) { + setDeferredArrayLen(c, len_ptr, reply_len); + } +} + +/* + * FUNCTION DELETE <LIBRARY NAME> + */ +void functionDeleteCommand(client *c) { + robj *function_name = c->argv[2]; + functionLibInfo *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr); + if (!li) { + addReplyError(c, "Library not found"); + return; + } + + libraryUnlink(curr_functions_lib_ctx, li); + engineLibraryFree(li); + /* Indicate that the command changed the data so it will be replicated and + * counted as a data change (for persistence configuration) */ + server.dirty++; + addReply(c, shared.ok); +} + +/* FUNCTION KILL */ +void functionKillCommand(client *c) { + scriptKill(c, 0); +} + +/* Try to extract command flags if we can, returns the modified flags. + * Note that it does not guarantee the command arguments are right. */ +uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) { + robj *function_name = c->argv[1]; + c->cur_script = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); + if (!c->cur_script) + return cmd_flags; + functionInfo *fi = dictGetVal(c->cur_script); + uint64_t script_flags = fi->f_flags; + return scriptFlagsToCmdFlags(cmd_flags, script_flags); +} + +static void fcallCommandGeneric(client *c, int ro) { + /* Functions need to be fed to monitors before the commands they execute. */ + replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); + + robj *function_name = c->argv[1]; + dictEntry *de = c->cur_script; + if (!de) + de = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); + if (!de) { + addReplyError(c, "Function not found"); + return; + } + functionInfo *fi = dictGetVal(de); + engine *engine = fi->li->ei->engine; + + long long numkeys; + /* Get the number of arguments that are keys */ + if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { + addReplyError(c, "Bad number of keys provided"); + return; + } + if (numkeys > (c->argc - 3)) { + addReplyError(c, "Number of keys can't be greater than number of args"); + return; + } else if (numkeys < 0) { + addReplyError(c, "Number of keys can't be negative"); + return; + } + + scriptRunCtx run_ctx; + + if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) + return; + + engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, + c->argv + 3 + numkeys, c->argc - 3 - numkeys); + scriptResetRun(&run_ctx); +} + +/* + * FCALL <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> + */ +void fcallCommand(client *c) { + fcallCommandGeneric(c, 0); +} + +/* + * FCALL_RO <FUNCTION NAME> nkeys <key1 .. keyn> <arg1 .. argn> + */ +void fcallroCommand(client *c) { + fcallCommandGeneric(c, 1); +} + +/* + * FUNCTION DUMP + * + * Returns a binary payload representing all the libraries. + * Can be loaded using FUNCTION RESTORE + * + * The payload structure is the same as on RDB. Each library + * is saved separately with the following information: + * * Library name + * * Engine name + * * Library description + * * Library code + * RDB_OPCODE_FUNCTION2 is saved before each library to present + * that the payload is a library. + * RDB version and crc64 is saved at the end of the payload. + * The RDB version is saved for backward compatibility. + * crc64 is saved so we can verify the payload content. + */ +void functionDumpCommand(client *c) { + unsigned char buf[2]; + uint64_t crc; + rio payload; + rioInitWithBuffer(&payload, sdsempty()); + + rdbSaveFunctions(&payload); + + /* RDB version */ + buf[0] = RDB_VERSION & 0xff; + buf[1] = (RDB_VERSION >> 8) & 0xff; + payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, buf, 2); + + /* CRC64 */ + crc = crc64(0, (unsigned char*) payload.io.buffer.ptr, + sdslen(payload.io.buffer.ptr)); + memrev64ifbe(&crc); + payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, &crc, 8); + + addReplyBulkSds(c, payload.io.buffer.ptr); +} + +/* + * FUNCTION RESTORE <payload> [FLUSH|APPEND|REPLACE] + * + * Restore the libraries represented by the give payload. + * Restore policy to can be given to control how to handle existing libraries (default APPEND): + * * FLUSH: delete all existing libraries. + * * APPEND: appends the restored libraries to the existing libraries. On collision, abort. + * * REPLACE: appends the restored libraries to the existing libraries. + * On collision, replace the old libraries with the new libraries. + */ +void functionRestoreCommand(client *c) { + if (c->argc > 4) { + addReplySubcommandSyntaxError(c); + return; + } + + restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */ + sds data = c->argv[2]->ptr; + size_t data_len = sdslen(data); + rio payload; + sds err = NULL; + + if (c->argc == 4) { + const char *restore_policy_str = c->argv[3]->ptr; + if (!strcasecmp(restore_policy_str, "append")) { + restore_replicy = restorePolicy_Append; + } else if (!strcasecmp(restore_policy_str, "replace")) { + restore_replicy = restorePolicy_Replace; + } else if (!strcasecmp(restore_policy_str, "flush")) { + restore_replicy = restorePolicy_Flush; + } else { + addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE."); + return; + } + } + + uint16_t rdbver; + if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) { + addReplyError(c, "DUMP payload version or checksum are wrong"); + return; + } + + functionsLibCtx *functions_lib_ctx = functionsLibCtxCreate(); + rioInitWithBuffer(&payload, data); + + /* Read until reaching last 10 bytes that should contain RDB version and checksum. */ + while (data_len - payload.io.buffer.pos > 10) { + int type; + if ((type = rdbLoadType(&payload)) == -1) { + err = sdsnew("can not read data type"); + goto load_error; + } + if (type != RDB_OPCODE_FUNCTION && type != RDB_OPCODE_FUNCTION2) { + err = sdsnew("given type is not a function"); + goto load_error; + } + if (rdbFunctionLoad(&payload, rdbver, functions_lib_ctx, type, RDBFLAGS_NONE, &err) != C_OK) { + if (!err) { + err = sdsnew("failed loading the given functions payload"); + } + goto load_error; + } + } + + if (restore_replicy == restorePolicy_Flush) { + functionsLibCtxSwapWithCurrent(functions_lib_ctx); + functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */ + } else { + if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) != C_OK) { + goto load_error; + } + } + + /* Indicate that the command changed the data so it will be replicated and + * counted as a data change (for persistence configuration) */ + server.dirty++; + +load_error: + if (err) { + addReplyErrorSds(c, err); + } else { + addReply(c, shared.ok); + } + if (functions_lib_ctx) { + functionsLibCtxFree(functions_lib_ctx); + } +} + +/* FUNCTION FLUSH [ASYNC | SYNC] */ +void functionFlushCommand(client *c) { + if (c->argc > 3) { + addReplySubcommandSyntaxError(c); + return; + } + int async = 0; + if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) { + async = 0; + } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) { + async = 1; + } else if (c->argc == 2) { + async = server.lazyfree_lazy_user_flush ? 1 : 0; + } else { + addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option"); + return; + } + + functionsLibCtxClearCurrent(async); + + /* Indicate that the command changed the data so it will be replicated and + * counted as a data change (for persistence configuration) */ + server.dirty++; + addReply(c,shared.ok); +} + +/* FUNCTION HELP */ +void functionHelpCommand(client *c) { + const char *help[] = { +"LOAD [REPLACE] <FUNCTION CODE>", +" Create a new library with the given library name and code.", +"DELETE <LIBRARY NAME>", +" Delete the given library.", +"LIST [LIBRARYNAME PATTERN] [WITHCODE]", +" Return general information on all the libraries:", +" * Library name", +" * The engine used to run the Library", +" * Library description", +" * Functions list", +" * Library code (if WITHCODE is given)", +" It also possible to get only function that matches a pattern using LIBRARYNAME argument.", +"STATS", +" Return information about the current function running:", +" * Function name", +" * Command used to run the function", +" * Duration in MS that the function is running", +" If no function is running, return nil", +" In addition, returns a list of available engines.", +"KILL", +" Kill the current running function.", +"FLUSH [ASYNC|SYNC]", +" Delete all the libraries.", +" When called without the optional mode argument, the behavior is determined by the", +" lazyfree-lazy-user-flush configuration directive. Valid modes are:", +" * ASYNC: Asynchronously flush the libraries.", +" * SYNC: Synchronously flush the libraries.", +"DUMP", +" Return a serialized payload representing the current libraries, can be restored using FUNCTION RESTORE command", +"RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]", +" Restore the libraries represented by the given payload, it is possible to give a restore policy to", +" control how to handle existing libraries (default APPEND):", +" * FLUSH: delete all existing libraries.", +" * APPEND: appends the restored libraries to the existing libraries. On collision, abort.", +" * REPLACE: appends the restored libraries to the existing libraries, On collision, replace the old", +" libraries with the new libraries (notice that even on this option there is a chance of failure", +" in case of functions name collision with another library).", +NULL }; + addReplyHelp(c, help); +} + +/* Verify that the function name is of the format: [a-zA-Z0-9_][a-zA-Z0-9_]? */ +static int functionsVerifyName(sds name) { + if (sdslen(name) == 0) { + return C_ERR; + } + for (size_t i = 0 ; i < sdslen(name) ; ++i) { + char curr_char = name[i]; + if ((curr_char >= 'a' && curr_char <= 'z') || + (curr_char >= 'A' && curr_char <= 'Z') || + (curr_char >= '0' && curr_char <= '9') || + (curr_char == '_')) + { + continue; + } + return C_ERR; + } + return C_OK; +} + +int functionExtractLibMetaData(sds payload, functionsLibMataData *md, sds *err) { + sds name = NULL; + sds desc = NULL; + sds engine = NULL; + sds code = NULL; + if (strncmp(payload, "#!", 2) != 0) { + *err = sdsnew("Missing library metadata"); + return C_ERR; + } + char *shebang_end = strchr(payload, '\n'); + if (shebang_end == NULL) { + *err = sdsnew("Invalid library metadata"); + return C_ERR; + } + size_t shebang_len = shebang_end - payload; + sds shebang = sdsnewlen(payload, shebang_len); + int numparts; + sds *parts = sdssplitargs(shebang, &numparts); + sdsfree(shebang); + if (!parts || numparts == 0) { + *err = sdsnew("Invalid library metadata"); + sdsfreesplitres(parts, numparts); + return C_ERR; + } + engine = sdsdup(parts[0]); + sdsrange(engine, 2, -1); + for (int i = 1 ; i < numparts ; ++i) { + sds part = parts[i]; + if (strncasecmp(part, "name=", 5) == 0) { + if (name) { + *err = sdscatfmt(sdsempty(), "Invalid metadata value, name argument was given multiple times"); + goto error; + } + name = sdsdup(part); + sdsrange(name, 5, -1); + continue; + } + *err = sdscatfmt(sdsempty(), "Invalid metadata value given: %s", part); + goto error; + } + + if (!name) { + *err = sdsnew("Library name was not given"); + goto error; + } + + sdsfreesplitres(parts, numparts); + + md->name = name; + md->code = sdsnewlen(shebang_end, sdslen(payload) - shebang_len); + md->engine = engine; + + return C_OK; + +error: + if (name) sdsfree(name); + if (desc) sdsfree(desc); + if (engine) sdsfree(engine); + if (code) sdsfree(code); + sdsfreesplitres(parts, numparts); + return C_ERR; +} + +void functionFreeLibMetaData(functionsLibMataData *md) { + if (md->code) sdsfree(md->code); + if (md->name) sdsfree(md->name); + if (md->engine) sdsfree(md->engine); +} + +/* Compile and save the given library, return the loaded library name on success + * and NULL on failure. In case on failure the err out param is set with relevant error message */ +sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx, size_t timeout) { + dictIterator *iter = NULL; + dictEntry *entry = NULL; + functionLibInfo *new_li = NULL; + functionLibInfo *old_li = NULL; + functionsLibMataData md = {0}; + if (functionExtractLibMetaData(code, &md, err) != C_OK) { + return NULL; + } + + if (functionsVerifyName(md.name)) { + *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long"); + goto error; + } + + engineInfo *ei = dictFetchValue(engines, md.engine); + if (!ei) { + *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); + goto error; + } + engine *engine = ei->engine; + + old_li = dictFetchValue(lib_ctx->libraries, md.name); + if (old_li && !replace) { + old_li = NULL; + *err = sdscatfmt(sdsempty(), "Library '%S' already exists", md.name); + goto error; + } + + if (old_li) { + libraryUnlink(lib_ctx, old_li); + } + + new_li = engineLibraryCreate(md.name, ei, code); + if (engine->create(engine->engine_ctx, new_li, md.code, timeout, err) != C_OK) { + goto error; + } + + if (dictSize(new_li->functions) == 0) { + *err = sdsnew("No functions registered"); + goto error; + } + + /* Verify no duplicate functions */ + iter = dictGetIterator(new_li->functions); + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + if (dictFetchValue(lib_ctx->functions, fi->name)) { + /* functions name collision, abort. */ + *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); + goto error; + } + } + dictReleaseIterator(iter); + iter = NULL; + + libraryLink(lib_ctx, new_li); + + if (old_li) { + engineLibraryFree(old_li); + } + + sds loaded_lib_name = md.name; + md.name = NULL; + functionFreeLibMetaData(&md); + + return loaded_lib_name; + +error: + if (iter) dictReleaseIterator(iter); + if (new_li) engineLibraryFree(new_li); + if (old_li) libraryLink(lib_ctx, old_li); + functionFreeLibMetaData(&md); + return NULL; +} + +/* + * FUNCTION LOAD [REPLACE] <LIBRARY CODE> + * REPLACE - optional, replace existing library + * LIBRARY CODE - library code to pass to the engine + */ +void functionLoadCommand(client *c) { + int replace = 0; + int argc_pos = 2; + while (argc_pos < c->argc - 1) { + robj *next_arg = c->argv[argc_pos++]; + if (!strcasecmp(next_arg->ptr, "replace")) { + replace = 1; + continue; + } + addReplyErrorFormat(c, "Unknown option given: %s", (char*)next_arg->ptr); + return; + } + + if (argc_pos >= c->argc) { + addReplyError(c, "Function code is missing"); + return; + } + + robj *code = c->argv[argc_pos]; + sds err = NULL; + sds library_name = NULL; + size_t timeout = LOAD_TIMEOUT_MS; + if (mustObeyClient(c)) { + timeout = 0; + } + if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx, timeout))) + { + addReplyErrorSds(c, err); + return; + } + /* Indicate that the command changed the data so it will be replicated and + * counted as a data change (for persistence configuration) */ + server.dirty++; + addReplyBulkSds(c, library_name); +} + +/* Return memory usage of all the engines combine */ +unsigned long functionsMemory() { + dictIterator *iter = dictGetIterator(engines); + dictEntry *entry = NULL; + size_t engines_nemory = 0; + while ((entry = dictNext(iter))) { + engineInfo *ei = dictGetVal(entry); + engine *engine = ei->engine; + engines_nemory += engine->get_used_memory(engine->engine_ctx); + } + dictReleaseIterator(iter); + + return engines_nemory; +} + +/* Return memory overhead of all the engines combine */ +unsigned long functionsMemoryOverhead() { + size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) + + dictSlots(engines) * sizeof(dictEntry*); + memory_overhead += dictSize(curr_functions_lib_ctx->functions) * sizeof(dictEntry) + + dictSlots(curr_functions_lib_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsLibCtx); + memory_overhead += curr_functions_lib_ctx->cache_memory; + memory_overhead += engine_cache_memory; + + return memory_overhead; +} + +/* Returns the number of functions */ +unsigned long functionsNum() { + return dictSize(curr_functions_lib_ctx->functions); +} + +unsigned long functionsLibNum() { + return dictSize(curr_functions_lib_ctx->libraries); +} + +dict* functionsLibGet() { + return curr_functions_lib_ctx->libraries; +} + +size_t functionsLibCtxfunctionsLen(functionsLibCtx *functions_ctx) { + return dictSize(functions_ctx->functions); +} + +/* Initialize engine data structures. + * Should be called once on server initialization */ +int functionsInit() { + engines = dictCreate(&engineDictType); + + if (luaEngineInitEngine() != C_OK) { + return C_ERR; + } + + /* Must be initialized after engines initialization */ + curr_functions_lib_ctx = functionsLibCtxCreate(); + + return C_OK; +} |