/* * Copyright (c) 2021, Redis Ltd. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "functions.h" #include "sds.h" #include "dict.h" #include "adlist.h" #include "atomicvar.h" #define LOAD_TIMEOUT_MS 500 typedef enum { restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace } restorePolicy; static size_t engine_cache_memory = 0; /* Forward declaration */ static void engineFunctionDispose(dict *d, void *obj); static void engineStatsDispose(dict *d, void *obj); static void engineLibraryDispose(dict *d, void *obj); static int functionsVerifyName(sds name); typedef struct functionsLibEngineStats { size_t n_lib; size_t n_functions; } functionsLibEngineStats; struct functionsLibCtx { dict *libraries; /* Library name -> Library object */ dict *functions; /* Function name -> Function object that can be used to run the function */ size_t cache_memory; /* Overhead memory (structs, dictionaries, ..) used by all the functions */ dict *engines_stats; /* Per engine statistics */ }; typedef struct functionsLibMataData { sds engine; sds name; sds code; } functionsLibMataData; dictType engineDictType = { dictSdsCaseHash, /* hash function */ dictSdsDup, /* key dup */ NULL, /* val dup */ dictSdsKeyCaseCompare, /* key compare */ dictSdsDestructor, /* key destructor */ NULL, /* val destructor */ NULL /* allow to expand */ }; dictType functionDictType = { dictSdsCaseHash, /* hash function */ dictSdsDup, /* key dup */ NULL, /* val dup */ dictSdsKeyCaseCompare,/* key compare */ dictSdsDestructor, /* key destructor */ NULL, /* val destructor */ NULL /* allow to expand */ }; dictType engineStatsDictType = { dictSdsCaseHash, /* hash function */ dictSdsDup, /* key dup */ NULL, /* val dup */ dictSdsKeyCaseCompare,/* key compare */ dictSdsDestructor, /* key destructor */ engineStatsDispose, /* val destructor */ NULL /* allow to expand */ }; dictType libraryFunctionDictType = { dictSdsHash, /* hash function */ dictSdsDup, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ engineFunctionDispose,/* val destructor */ NULL /* allow to expand */ }; dictType librariesDictType = { dictSdsHash, /* hash function */ dictSdsDup, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ engineLibraryDispose, /* val destructor */ NULL /* allow to expand */ }; /* Dictionary of engines */ static dict *engines = NULL; /* Libraries Ctx. * Contains the dictionary that map a library name to library object, * Contains the dictionary that map a function name to function object, * and the cache memory used by all the functions */ static functionsLibCtx *curr_functions_lib_ctx = NULL; static size_t functionMallocSize(functionInfo *fi) { return zmalloc_size(fi) + sdsZmallocSize(fi->name) + (fi->desc ? sdsZmallocSize(fi->desc) : 0) + fi->li->ei->engine->get_function_memory_overhead(fi->function); } static size_t libraryMallocSize(functionLibInfo *li) { return zmalloc_size(li) + sdsZmallocSize(li->name) + sdsZmallocSize(li->code); } static void engineStatsDispose(dict *d, void *obj) { UNUSED(d); functionsLibEngineStats *stats = obj; zfree(stats); } /* Dispose function memory */ static void engineFunctionDispose(dict *d, void *obj) { UNUSED(d); if (!obj) { return; } functionInfo *fi = obj; sdsfree(fi->name); if (fi->desc) { sdsfree(fi->desc); } engine *engine = fi->li->ei->engine; engine->free_function(engine->engine_ctx, fi->function); zfree(fi); } static void engineLibraryFree(functionLibInfo* li) { if (!li) { return; } dictRelease(li->functions); sdsfree(li->name); sdsfree(li->code); zfree(li); } static void engineLibraryDispose(dict *d, void *obj) { UNUSED(d); engineLibraryFree(obj); } /* Clear all the functions from the given library ctx */ void functionsLibCtxClear(functionsLibCtx *lib_ctx) { dictEmpty(lib_ctx->functions, NULL); dictEmpty(lib_ctx->libraries, NULL); dictIterator *iter = dictGetIterator(lib_ctx->engines_stats); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { functionsLibEngineStats *stats = dictGetVal(entry); stats->n_functions = 0; stats->n_lib = 0; } dictReleaseIterator(iter); curr_functions_lib_ctx->cache_memory = 0; } void functionsLibCtxClearCurrent(int async) { if (async) { functionsLibCtx *old_l_ctx = curr_functions_lib_ctx; curr_functions_lib_ctx = functionsLibCtxCreate(); freeFunctionsAsync(old_l_ctx); } else { functionsLibCtxClear(curr_functions_lib_ctx); } } /* Free the given functions ctx */ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) { functionsLibCtxClear(functions_lib_ctx); dictRelease(functions_lib_ctx->functions); dictRelease(functions_lib_ctx->libraries); dictRelease(functions_lib_ctx->engines_stats); zfree(functions_lib_ctx); } /* Swap the current functions ctx with the given one. * Free the old functions ctx. */ void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx) { functionsLibCtxFree(curr_functions_lib_ctx); curr_functions_lib_ctx = new_lib_ctx; } /* return the current functions ctx */ functionsLibCtx* functionsLibCtxGetCurrent(void) { return curr_functions_lib_ctx; } /* Create a new functions ctx */ functionsLibCtx* functionsLibCtxCreate(void) { functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx)); ret->libraries = dictCreate(&librariesDictType); ret->functions = dictCreate(&functionDictType); ret->engines_stats = dictCreate(&engineStatsDictType); dictIterator *iter = dictGetIterator(engines); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { engineInfo *ei = dictGetVal(entry); functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); dictAdd(ret->engines_stats, ei->name, stats); } dictReleaseIterator(iter); ret->cache_memory = 0; return ret; } /* * Creating a function inside the given library. * On success, return C_OK. * On error, return C_ERR and set err output parameter with a relevant error message. * * Note: the code assumes 'name' is NULL terminated but not require it to be binary safe. * the function will verify that the given name is following the naming format * and return an error if its not. */ int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err) { if (functionsVerifyName(name) != C_OK) { *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long"); return C_ERR; } if (dictFetchValue(li->functions, name)) { *err = sdsnew("Function already exists in the library"); return C_ERR; } functionInfo *fi = zmalloc(sizeof(*fi)); *fi = (functionInfo) { .name = name, .function = function, .li = li, .desc = desc, .f_flags = f_flags, }; int res = dictAdd(li->functions, fi->name, fi); serverAssert(res == DICT_OK); return C_OK; } static functionLibInfo* engineLibraryCreate(sds name, engineInfo *ei, sds code) { functionLibInfo *li = zmalloc(sizeof(*li)); *li = (functionLibInfo) { .name = sdsdup(name), .functions = dictCreate(&libraryFunctionDictType), .ei = ei, .code = sdsdup(code), }; return li; } static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo* li) { dictIterator *iter = dictGetIterator(li->functions); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { functionInfo *fi = dictGetVal(entry); int ret = dictDelete(lib_ctx->functions, fi->name); serverAssert(ret == DICT_OK); lib_ctx->cache_memory -= functionMallocSize(fi); } dictReleaseIterator(iter); entry = dictUnlink(lib_ctx->libraries, li->name); dictSetVal(lib_ctx->libraries, entry, NULL); dictFreeUnlinkedEntry(lib_ctx->libraries, entry); lib_ctx->cache_memory -= libraryMallocSize(li); /* update stats */ functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); serverAssert(stats); stats->n_lib--; stats->n_functions -= dictSize(li->functions); } static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo* li) { dictIterator *iter = dictGetIterator(li->functions); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { functionInfo *fi = dictGetVal(entry); dictAdd(lib_ctx->functions, fi->name, fi); lib_ctx->cache_memory += functionMallocSize(fi); } dictReleaseIterator(iter); dictAdd(lib_ctx->libraries, li->name, li); lib_ctx->cache_memory += libraryMallocSize(li); /* update stats */ functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); serverAssert(stats); stats->n_lib++; stats->n_functions += dictSize(li->functions); } /* Takes all libraries from lib_ctx_src and add to lib_ctx_dst. * On collision, if 'replace' argument is true, replace the existing library with the new one. * Otherwise abort and leave 'lib_ctx_dst' and 'lib_ctx_src' untouched. * Return C_OK on success and C_ERR if aborted. If C_ERR is returned, set a relevant * error message on the 'err' out parameter. * */ static int libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_lib_ctx_src, int replace, sds *err) { int ret = C_ERR; dictIterator *iter = NULL; /* Stores the libraries we need to replace in case a revert is required. * Only initialized when needed */ list *old_libraries_list = NULL; dictEntry *entry = NULL; iter = dictGetIterator(functions_lib_ctx_src->libraries); while ((entry = dictNext(iter))) { functionLibInfo *li = dictGetVal(entry); functionLibInfo *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name); if (old_li) { if (!replace) { /* library already exists, failed the restore. */ *err = sdscatfmt(sdsempty(), "Library %s already exists", li->name); goto done; } else { if (!old_libraries_list) { old_libraries_list = listCreate(); listSetFreeMethod(old_libraries_list, (void (*)(void*))engineLibraryFree); } libraryUnlink(functions_lib_ctx_dst, old_li); listAddNodeTail(old_libraries_list, old_li); } } } dictReleaseIterator(iter); iter = NULL; /* Make sure no functions collision */ iter = dictGetIterator(functions_lib_ctx_src->functions); while ((entry = dictNext(iter))) { functionInfo *fi = dictGetVal(entry); if (dictFetchValue(functions_lib_ctx_dst->functions, fi->name)) { *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); goto done; } } dictReleaseIterator(iter); iter = NULL; /* No collision, it is safe to link all the new libraries. */ iter = dictGetIterator(functions_lib_ctx_src->libraries); while ((entry = dictNext(iter))) { functionLibInfo *li = dictGetVal(entry); libraryLink(functions_lib_ctx_dst, li); dictSetVal(functions_lib_ctx_src->libraries, entry, NULL); } dictReleaseIterator(iter); iter = NULL; functionsLibCtxClear(functions_lib_ctx_src); if (old_libraries_list) { listRelease(old_libraries_list); old_libraries_list = NULL; } ret = C_OK; done: if (iter) dictReleaseIterator(iter); if (old_libraries_list) { /* Link back all libraries on tmp_l_ctx */ while (listLength(old_libraries_list) > 0) { listNode *head = listFirst(old_libraries_list); functionLibInfo *li = listNodeValue(head); listNodeValue(head) = NULL; libraryLink(functions_lib_ctx_dst, li); listDelNode(old_libraries_list, head); } listRelease(old_libraries_list); } return ret; } /* Register an engine, should be called once by the engine on startup and give the following: * * - engine_name - name of the engine to register * - engine_ctx - the engine ctx that should be used by Redis to interact with the engine */ int functionsRegisterEngine(const char *engine_name, engine *engine) { sds engine_name_sds = sdsnew(engine_name); if (dictFetchValue(engines, engine_name_sds)) { serverLog(LL_WARNING, "Same engine was registered twice"); sdsfree(engine_name_sds); return C_ERR; } client *c = createClient(NULL); c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT); engineInfo *ei = zmalloc(sizeof(*ei)); *ei = (engineInfo ) { .name = engine_name_sds, .engine = engine, .c = c,}; dictAdd(engines, engine_name_sds, ei); engine_cache_memory += zmalloc_size(ei) + sdsZmallocSize(ei->name) + zmalloc_size(engine) + engine->get_engine_memory_overhead(engine->engine_ctx); return C_OK; } /* * FUNCTION STATS */ void functionStatsCommand(client *c) { if (scriptIsRunning() && scriptIsEval()) { addReplyErrorObject(c, shared.slowevalerr); return; } addReplyMapLen(c, 2); addReplyBulkCString(c, "running_script"); if (!scriptIsRunning()) { addReplyNull(c); } else { addReplyMapLen(c, 3); addReplyBulkCString(c, "name"); addReplyBulkCString(c, scriptCurrFunction()); addReplyBulkCString(c, "command"); client *script_client = scriptGetCaller(); addReplyArrayLen(c, script_client->argc); for (int i = 0 ; i < script_client->argc ; ++i) { addReplyBulkCBuffer(c, script_client->argv[i]->ptr, sdslen(script_client->argv[i]->ptr)); } addReplyBulkCString(c, "duration_ms"); addReplyLongLong(c, scriptRunDuration()); } addReplyBulkCString(c, "engines"); addReplyMapLen(c, dictSize(engines)); dictIterator *iter = dictGetIterator(engines); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { engineInfo *ei = dictGetVal(entry); addReplyBulkCString(c, ei->name); addReplyMapLen(c, 2); functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name); addReplyBulkCString(c, "libraries_count"); addReplyLongLong(c, e_stats->n_lib); addReplyBulkCString(c, "functions_count"); addReplyLongLong(c, e_stats->n_functions); } dictReleaseIterator(iter); } static void functionListReplyFlags(client *c, functionInfo *fi) { /* First count the number of flags we have */ int flagcount = 0; for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) { if (fi->f_flags & flag->flag) { ++flagcount; } } addReplySetLen(c, flagcount); for (scriptFlag *flag = scripts_flags_def; flag->str ; ++flag) { if (fi->f_flags & flag->flag) { addReplyStatus(c, flag->str); } } } /* * FUNCTION LIST [LIBRARYNAME PATTERN] [WITHCODE] * * Return general information about all the libraries: * * Library name * * The engine used to run the Library * * Library description * * Functions list * * Library code (if WITHCODE is given) * * It is also possible to given library name pattern using * LIBRARYNAME argument, if given, return only libraries * that matches the given pattern. */ void functionListCommand(client *c) { int with_code = 0; sds library_name = NULL; for (int i = 2 ; i < c->argc ; ++i) { robj *next_arg = c->argv[i]; if (!with_code && !strcasecmp(next_arg->ptr, "withcode")) { with_code = 1; continue; } if (!library_name && !strcasecmp(next_arg->ptr, "libraryname")) { if (i >= c->argc - 1) { addReplyError(c, "library name argument was not given"); return; } library_name = c->argv[++i]->ptr; continue; } addReplyErrorSds(c, sdscatfmt(sdsempty(), "Unknown argument %s", next_arg->ptr)); return; } size_t reply_len = 0; void *len_ptr = NULL; if (library_name) { len_ptr = addReplyDeferredLen(c); } else { /* If no pattern is asked we know the reply len and we can just set it */ addReplyArrayLen(c, dictSize(curr_functions_lib_ctx->libraries)); } dictIterator *iter = dictGetIterator(curr_functions_lib_ctx->libraries); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { functionLibInfo *li = dictGetVal(entry); if (library_name) { if (!stringmatchlen(library_name, sdslen(library_name), li->name, sdslen(li->name), 1)) { continue; } } ++reply_len; addReplyMapLen(c, with_code? 4 : 3); addReplyBulkCString(c, "library_name"); addReplyBulkCBuffer(c, li->name, sdslen(li->name)); addReplyBulkCString(c, "engine"); addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name)); addReplyBulkCString(c, "functions"); addReplyArrayLen(c, dictSize(li->functions)); dictIterator *functions_iter = dictGetIterator(li->functions); dictEntry *function_entry = NULL; while ((function_entry = dictNext(functions_iter))) { functionInfo *fi = dictGetVal(function_entry); addReplyMapLen(c, 3); addReplyBulkCString(c, "name"); addReplyBulkCBuffer(c, fi->name, sdslen(fi->name)); addReplyBulkCString(c, "description"); if (fi->desc) { addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc)); } else { addReplyNull(c); } addReplyBulkCString(c, "flags"); functionListReplyFlags(c, fi); } dictReleaseIterator(functions_iter); if (with_code) { addReplyBulkCString(c, "library_code"); addReplyBulkCBuffer(c, li->code, sdslen(li->code)); } } dictReleaseIterator(iter); if (len_ptr) { setDeferredArrayLen(c, len_ptr, reply_len); } } /* * FUNCTION DELETE */ void functionDeleteCommand(client *c) { robj *function_name = c->argv[2]; functionLibInfo *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr); if (!li) { addReplyError(c, "Library not found"); return; } libraryUnlink(curr_functions_lib_ctx, li); engineLibraryFree(li); /* Indicate that the command changed the data so it will be replicated and * counted as a data change (for persistence configuration) */ server.dirty++; addReply(c, shared.ok); } /* FUNCTION KILL */ void functionKillCommand(client *c) { scriptKill(c, 0); } /* Try to extract command flags if we can, returns the modified flags. * Note that it does not guarantee the command arguments are right. */ uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) { robj *function_name = c->argv[1]; c->cur_script = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); if (!c->cur_script) return cmd_flags; functionInfo *fi = dictGetVal(c->cur_script); uint64_t script_flags = fi->f_flags; return scriptFlagsToCmdFlags(cmd_flags, script_flags); } static void fcallCommandGeneric(client *c, int ro) { /* Functions need to be fed to monitors before the commands they execute. */ replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); robj *function_name = c->argv[1]; dictEntry *de = c->cur_script; if (!de) de = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); if (!de) { addReplyError(c, "Function not found"); return; } functionInfo *fi = dictGetVal(de); engine *engine = fi->li->ei->engine; long long numkeys; /* Get the number of arguments that are keys */ if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { addReplyError(c, "Bad number of keys provided"); return; } if (numkeys > (c->argc - 3)) { addReplyError(c, "Number of keys can't be greater than number of args"); return; } else if (numkeys < 0) { addReplyError(c, "Number of keys can't be negative"); return; } scriptRunCtx run_ctx; if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) return; engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, c->argv + 3 + numkeys, c->argc - 3 - numkeys); scriptResetRun(&run_ctx); } /* * FCALL nkeys */ void fcallCommand(client *c) { fcallCommandGeneric(c, 0); } /* * FCALL_RO nkeys */ void fcallroCommand(client *c) { fcallCommandGeneric(c, 1); } /* * FUNCTION DUMP * * Returns a binary payload representing all the libraries. * Can be loaded using FUNCTION RESTORE * * The payload structure is the same as on RDB. Each library * is saved separately with the following information: * * Library name * * Engine name * * Library description * * Library code * RDB_OPCODE_FUNCTION2 is saved before each library to present * that the payload is a library. * RDB version and crc64 is saved at the end of the payload. * The RDB version is saved for backward compatibility. * crc64 is saved so we can verify the payload content. */ void functionDumpCommand(client *c) { unsigned char buf[2]; uint64_t crc; rio payload; rioInitWithBuffer(&payload, sdsempty()); rdbSaveFunctions(&payload); /* RDB version */ buf[0] = RDB_VERSION & 0xff; buf[1] = (RDB_VERSION >> 8) & 0xff; payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, buf, 2); /* CRC64 */ crc = crc64(0, (unsigned char*) payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr)); memrev64ifbe(&crc); payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, &crc, 8); addReplyBulkSds(c, payload.io.buffer.ptr); } /* * FUNCTION RESTORE [FLUSH|APPEND|REPLACE] * * Restore the libraries represented by the give payload. * Restore policy to can be given to control how to handle existing libraries (default APPEND): * * FLUSH: delete all existing libraries. * * APPEND: appends the restored libraries to the existing libraries. On collision, abort. * * REPLACE: appends the restored libraries to the existing libraries. * On collision, replace the old libraries with the new libraries. */ void functionRestoreCommand(client *c) { if (c->argc > 4) { addReplySubcommandSyntaxError(c); return; } restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */ sds data = c->argv[2]->ptr; size_t data_len = sdslen(data); rio payload; sds err = NULL; if (c->argc == 4) { const char *restore_policy_str = c->argv[3]->ptr; if (!strcasecmp(restore_policy_str, "append")) { restore_replicy = restorePolicy_Append; } else if (!strcasecmp(restore_policy_str, "replace")) { restore_replicy = restorePolicy_Replace; } else if (!strcasecmp(restore_policy_str, "flush")) { restore_replicy = restorePolicy_Flush; } else { addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE."); return; } } uint16_t rdbver; if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) { addReplyError(c, "DUMP payload version or checksum are wrong"); return; } functionsLibCtx *functions_lib_ctx = functionsLibCtxCreate(); rioInitWithBuffer(&payload, data); /* Read until reaching last 10 bytes that should contain RDB version and checksum. */ while (data_len - payload.io.buffer.pos > 10) { int type; if ((type = rdbLoadType(&payload)) == -1) { err = sdsnew("can not read data type"); goto load_error; } if (type == RDB_OPCODE_FUNCTION_PRE_GA) { err = sdsnew("Pre-GA function format not supported"); goto load_error; } if (type != RDB_OPCODE_FUNCTION2) { err = sdsnew("given type is not a function"); goto load_error; } if (rdbFunctionLoad(&payload, rdbver, functions_lib_ctx, RDBFLAGS_NONE, &err) != C_OK) { if (!err) { err = sdsnew("failed loading the given functions payload"); } goto load_error; } } if (restore_replicy == restorePolicy_Flush) { functionsLibCtxSwapWithCurrent(functions_lib_ctx); functions_lib_ctx = NULL; /* avoid releasing the f_ctx in the end */ } else { if (libraryJoin(curr_functions_lib_ctx, functions_lib_ctx, restore_replicy == restorePolicy_Replace, &err) != C_OK) { goto load_error; } } /* Indicate that the command changed the data so it will be replicated and * counted as a data change (for persistence configuration) */ server.dirty++; load_error: if (err) { addReplyErrorSds(c, err); } else { addReply(c, shared.ok); } if (functions_lib_ctx) { functionsLibCtxFree(functions_lib_ctx); } } /* FUNCTION FLUSH [ASYNC | SYNC] */ void functionFlushCommand(client *c) { if (c->argc > 3) { addReplySubcommandSyntaxError(c); return; } int async = 0; if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) { async = 0; } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) { async = 1; } else if (c->argc == 2) { async = server.lazyfree_lazy_user_flush ? 1 : 0; } else { addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option"); return; } functionsLibCtxClearCurrent(async); /* Indicate that the command changed the data so it will be replicated and * counted as a data change (for persistence configuration) */ server.dirty++; addReply(c,shared.ok); } /* FUNCTION HELP */ void functionHelpCommand(client *c) { const char *help[] = { "LOAD [REPLACE] ", " Create a new library with the given library name and code.", "DELETE ", " Delete the given library.", "LIST [LIBRARYNAME PATTERN] [WITHCODE]", " Return general information on all the libraries:", " * Library name", " * The engine used to run the Library", " * Library description", " * Functions list", " * Library code (if WITHCODE is given)", " It also possible to get only function that matches a pattern using LIBRARYNAME argument.", "STATS", " Return information about the current function running:", " * Function name", " * Command used to run the function", " * Duration in MS that the function is running", " If no function is running, return nil", " In addition, returns a list of available engines.", "KILL", " Kill the current running function.", "FLUSH [ASYNC|SYNC]", " Delete all the libraries.", " When called without the optional mode argument, the behavior is determined by the", " lazyfree-lazy-user-flush configuration directive. Valid modes are:", " * ASYNC: Asynchronously flush the libraries.", " * SYNC: Synchronously flush the libraries.", "DUMP", " Return a serialized payload representing the current libraries, can be restored using FUNCTION RESTORE command", "RESTORE [FLUSH|APPEND|REPLACE]", " Restore the libraries represented by the given payload, it is possible to give a restore policy to", " control how to handle existing libraries (default APPEND):", " * FLUSH: delete all existing libraries.", " * APPEND: appends the restored libraries to the existing libraries. On collision, abort.", " * REPLACE: appends the restored libraries to the existing libraries, On collision, replace the old", " libraries with the new libraries (notice that even on this option there is a chance of failure", " in case of functions name collision with another library).", NULL }; addReplyHelp(c, help); } /* Verify that the function name is of the format: [a-zA-Z0-9_][a-zA-Z0-9_]? */ static int functionsVerifyName(sds name) { if (sdslen(name) == 0) { return C_ERR; } for (size_t i = 0 ; i < sdslen(name) ; ++i) { char curr_char = name[i]; if ((curr_char >= 'a' && curr_char <= 'z') || (curr_char >= 'A' && curr_char <= 'Z') || (curr_char >= '0' && curr_char <= '9') || (curr_char == '_')) { continue; } return C_ERR; } return C_OK; } int functionExtractLibMetaData(sds payload, functionsLibMataData *md, sds *err) { sds name = NULL; sds desc = NULL; sds engine = NULL; sds code = NULL; if (strncmp(payload, "#!", 2) != 0) { *err = sdsnew("Missing library metadata"); return C_ERR; } char *shebang_end = strchr(payload, '\n'); if (shebang_end == NULL) { *err = sdsnew("Invalid library metadata"); return C_ERR; } size_t shebang_len = shebang_end - payload; sds shebang = sdsnewlen(payload, shebang_len); int numparts; sds *parts = sdssplitargs(shebang, &numparts); sdsfree(shebang); if (!parts || numparts == 0) { *err = sdsnew("Invalid library metadata"); sdsfreesplitres(parts, numparts); return C_ERR; } engine = sdsdup(parts[0]); sdsrange(engine, 2, -1); for (int i = 1 ; i < numparts ; ++i) { sds part = parts[i]; if (strncasecmp(part, "name=", 5) == 0) { if (name) { *err = sdscatfmt(sdsempty(), "Invalid metadata value, name argument was given multiple times"); goto error; } name = sdsdup(part); sdsrange(name, 5, -1); continue; } *err = sdscatfmt(sdsempty(), "Invalid metadata value given: %s", part); goto error; } if (!name) { *err = sdsnew("Library name was not given"); goto error; } sdsfreesplitres(parts, numparts); md->name = name; md->code = sdsnewlen(shebang_end, sdslen(payload) - shebang_len); md->engine = engine; return C_OK; error: if (name) sdsfree(name); if (desc) sdsfree(desc); if (engine) sdsfree(engine); if (code) sdsfree(code); sdsfreesplitres(parts, numparts); return C_ERR; } void functionFreeLibMetaData(functionsLibMataData *md) { if (md->code) sdsfree(md->code); if (md->name) sdsfree(md->name); if (md->engine) sdsfree(md->engine); } /* Compile and save the given library, return the loaded library name on success * and NULL on failure. In case on failure the err out param is set with relevant error message */ sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx, size_t timeout) { dictIterator *iter = NULL; dictEntry *entry = NULL; functionLibInfo *new_li = NULL; functionLibInfo *old_li = NULL; functionsLibMataData md = {0}; if (functionExtractLibMetaData(code, &md, err) != C_OK) { return NULL; } if (functionsVerifyName(md.name)) { *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one character long"); goto error; } engineInfo *ei = dictFetchValue(engines, md.engine); if (!ei) { *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); goto error; } engine *engine = ei->engine; old_li = dictFetchValue(lib_ctx->libraries, md.name); if (old_li && !replace) { old_li = NULL; *err = sdscatfmt(sdsempty(), "Library '%S' already exists", md.name); goto error; } if (old_li) { libraryUnlink(lib_ctx, old_li); } new_li = engineLibraryCreate(md.name, ei, code); if (engine->create(engine->engine_ctx, new_li, md.code, timeout, err) != C_OK) { goto error; } if (dictSize(new_li->functions) == 0) { *err = sdsnew("No functions registered"); goto error; } /* Verify no duplicate functions */ iter = dictGetIterator(new_li->functions); while ((entry = dictNext(iter))) { functionInfo *fi = dictGetVal(entry); if (dictFetchValue(lib_ctx->functions, fi->name)) { /* functions name collision, abort. */ *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); goto error; } } dictReleaseIterator(iter); iter = NULL; libraryLink(lib_ctx, new_li); if (old_li) { engineLibraryFree(old_li); } sds loaded_lib_name = md.name; md.name = NULL; functionFreeLibMetaData(&md); return loaded_lib_name; error: if (iter) dictReleaseIterator(iter); if (new_li) engineLibraryFree(new_li); if (old_li) libraryLink(lib_ctx, old_li); functionFreeLibMetaData(&md); return NULL; } /* * FUNCTION LOAD [REPLACE] * REPLACE - optional, replace existing library * LIBRARY CODE - library code to pass to the engine */ void functionLoadCommand(client *c) { int replace = 0; int argc_pos = 2; while (argc_pos < c->argc - 1) { robj *next_arg = c->argv[argc_pos++]; if (!strcasecmp(next_arg->ptr, "replace")) { replace = 1; continue; } addReplyErrorFormat(c, "Unknown option given: %s", (char*)next_arg->ptr); return; } if (argc_pos >= c->argc) { addReplyError(c, "Function code is missing"); return; } robj *code = c->argv[argc_pos]; sds err = NULL; sds library_name = NULL; size_t timeout = LOAD_TIMEOUT_MS; if (mustObeyClient(c)) { timeout = 0; } if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx, timeout))) { addReplyErrorSds(c, err); return; } /* Indicate that the command changed the data so it will be replicated and * counted as a data change (for persistence configuration) */ server.dirty++; addReplyBulkSds(c, library_name); } /* Return memory usage of all the engines combine */ unsigned long functionsMemory(void) { dictIterator *iter = dictGetIterator(engines); dictEntry *entry = NULL; size_t engines_nemory = 0; while ((entry = dictNext(iter))) { engineInfo *ei = dictGetVal(entry); engine *engine = ei->engine; engines_nemory += engine->get_used_memory(engine->engine_ctx); } dictReleaseIterator(iter); return engines_nemory; } /* Return memory overhead of all the engines combine */ unsigned long functionsMemoryOverhead(void) { size_t memory_overhead = dictMemUsage(engines); memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); memory_overhead += sizeof(functionsLibCtx); memory_overhead += curr_functions_lib_ctx->cache_memory; memory_overhead += engine_cache_memory; return memory_overhead; } /* Returns the number of functions */ unsigned long functionsNum(void) { return dictSize(curr_functions_lib_ctx->functions); } unsigned long functionsLibNum(void) { return dictSize(curr_functions_lib_ctx->libraries); } dict* functionsLibGet(void) { return curr_functions_lib_ctx->libraries; } size_t functionsLibCtxfunctionsLen(functionsLibCtx *functions_ctx) { return dictSize(functions_ctx->functions); } /* Initialize engine data structures. * Should be called once on server initialization */ int functionsInit(void) { engines = dictCreate(&engineDictType); if (luaEngineInitEngine() != C_OK) { return C_ERR; } /* Must be initialized after engines initialization */ curr_functions_lib_ctx = functionsLibCtxCreate(); return C_OK; }