summaryrefslogtreecommitdiffstats
path: root/tests/modules/propagate.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/modules/propagate.c')
-rw-r--r--tests/modules/propagate.c403
1 files changed, 403 insertions, 0 deletions
diff --git a/tests/modules/propagate.c b/tests/modules/propagate.c
new file mode 100644
index 0000000..d5132a5
--- /dev/null
+++ b/tests/modules/propagate.c
@@ -0,0 +1,403 @@
+/* This module is used to test the propagation (replication + AOF) of
+ * commands, via the RedisModule_Replicate() interface, in asynchronous
+ * contexts, such as callbacks not implementing commands, and thread safe
+ * contexts.
+ *
+ * We create a timer callback and a threads using a thread safe context.
+ * Using both we try to propagate counters increments, and later we check
+ * if the replica contains the changes as expected.
+ *
+ * -----------------------------------------------------------------------------
+ *
+ * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redismodule.h"
+#include <pthread.h>
+#include <errno.h>
+
+#define UNUSED(V) ((void) V)
+
+RedisModuleCtx *detached_ctx = NULL;
+
+static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+ REDISMODULE_NOT_USED(key);
+
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c!", "notifications");
+ RedisModule_FreeCallReply(rep);
+
+ return REDISMODULE_OK;
+}
+
+/* Timer callback. */
+void timerHandler(RedisModuleCtx *ctx, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(data);
+
+ static int times = 0;
+
+ RedisModule_Replicate(ctx,"INCR","c","timer");
+ times++;
+
+ if (times < 3)
+ RedisModule_CreateTimer(ctx,100,timerHandler,NULL);
+ else
+ times = 0;
+}
+
+int propagateTestTimerCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModuleTimerID timer_id =
+ RedisModule_CreateTimer(ctx,100,timerHandler,NULL);
+ REDISMODULE_NOT_USED(timer_id);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+/* Timer callback. */
+void timerNestedHandler(RedisModuleCtx *ctx, void *data) {
+ int repl = (long long)data;
+
+ /* The goal is the trigger a module command that calls RM_Replicate
+ * in order to test MULTI/EXEC structure */
+ RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-start","1");
+ RedisModuleCallReply *reply = RedisModule_Call(ctx,"propagate-test.nested", repl? "!" : "");
+ RedisModule_FreeCallReply(reply);
+ reply = RedisModule_Call(ctx, "INCR", repl? "c!" : "c", "timer-nested-middle");
+ RedisModule_FreeCallReply(reply);
+ RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-end","1");
+}
+
+int propagateTestTimerNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModuleTimerID timer_id =
+ RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)0);
+ REDISMODULE_NOT_USED(timer_id);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+int propagateTestTimerNestedReplCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModuleTimerID timer_id =
+ RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)1);
+ REDISMODULE_NOT_USED(timer_id);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+void timerHandlerMaxmemory(RedisModuleCtx *ctx, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(data);
+
+ RedisModuleCallReply *reply = RedisModule_Call(ctx,"SETEX","ccc!","timer-maxmemory-volatile-start","100","1");
+ RedisModule_FreeCallReply(reply);
+ reply = RedisModule_Call(ctx, "CONFIG", "ccc!", "SET", "maxmemory", "1");
+ RedisModule_FreeCallReply(reply);
+
+ RedisModule_Replicate(ctx, "INCR", "c", "timer-maxmemory-middle");
+
+ reply = RedisModule_Call(ctx,"SETEX","ccc!","timer-maxmemory-volatile-end","100","1");
+ RedisModule_FreeCallReply(reply);
+}
+
+int propagateTestTimerMaxmemoryCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModuleTimerID timer_id =
+ RedisModule_CreateTimer(ctx,100,timerHandlerMaxmemory,(void*)1);
+ REDISMODULE_NOT_USED(timer_id);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+void timerHandlerEval(RedisModuleCtx *ctx, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(data);
+
+ RedisModuleCallReply *reply = RedisModule_Call(ctx,"INCRBY","cc!","timer-eval-start","1");
+ RedisModule_FreeCallReply(reply);
+ reply = RedisModule_Call(ctx, "EVAL", "cccc!", "redis.call('set',KEYS[1],ARGV[1])", "1", "foo", "bar");
+ RedisModule_FreeCallReply(reply);
+
+ RedisModule_Replicate(ctx, "INCR", "c", "timer-eval-middle");
+
+ reply = RedisModule_Call(ctx,"INCRBY","cc!","timer-eval-end","1");
+ RedisModule_FreeCallReply(reply);
+}
+
+int propagateTestTimerEvalCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModuleTimerID timer_id =
+ RedisModule_CreateTimer(ctx,100,timerHandlerEval,(void*)1);
+ REDISMODULE_NOT_USED(timer_id);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+/* The thread entry point. */
+void *threadMain(void *arg) {
+ REDISMODULE_NOT_USED(arg);
+ RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(NULL);
+ RedisModule_SelectDb(ctx,9); /* Tests ran in database number 9. */
+ for (int i = 0; i < 3; i++) {
+ RedisModule_ThreadSafeContextLock(ctx);
+ RedisModule_Replicate(ctx,"INCR","c","a-from-thread");
+ RedisModuleCallReply *reply = RedisModule_Call(ctx,"INCR","c!","thread-call");
+ RedisModule_FreeCallReply(reply);
+ RedisModule_Replicate(ctx,"INCR","c","b-from-thread");
+ RedisModule_ThreadSafeContextUnlock(ctx);
+ }
+ RedisModule_FreeThreadSafeContext(ctx);
+ return NULL;
+}
+
+int propagateTestThreadCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ pthread_t tid;
+ if (pthread_create(&tid,NULL,threadMain,NULL) != 0)
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ REDISMODULE_NOT_USED(tid);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+/* The thread entry point. */
+void *threadDetachedMain(void *arg) {
+ REDISMODULE_NOT_USED(arg);
+ RedisModule_SelectDb(detached_ctx,9); /* Tests ran in database number 9. */
+
+ RedisModule_ThreadSafeContextLock(detached_ctx);
+ RedisModule_Replicate(detached_ctx,"INCR","c","thread-detached-before");
+ RedisModuleCallReply *reply = RedisModule_Call(detached_ctx,"INCR","c!","thread-detached-1");
+ RedisModule_FreeCallReply(reply);
+ reply = RedisModule_Call(detached_ctx,"INCR","c!","thread-detached-2");
+ RedisModule_FreeCallReply(reply);
+ RedisModule_Replicate(detached_ctx,"INCR","c","thread-detached-after");
+ RedisModule_ThreadSafeContextUnlock(detached_ctx);
+
+ return NULL;
+}
+
+int propagateTestDetachedThreadCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ pthread_t tid;
+ if (pthread_create(&tid,NULL,threadDetachedMain,NULL) != 0)
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ REDISMODULE_NOT_USED(tid);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+int propagateTestSimpleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ /* Replicate two commands to test MULTI/EXEC wrapping. */
+ RedisModule_Replicate(ctx,"INCR","c","counter-1");
+ RedisModule_Replicate(ctx,"INCR","c","counter-2");
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleCallReply *reply;
+
+ /* This test mixes multiple propagation systems. */
+ reply = RedisModule_Call(ctx, "INCR", "c!", "using-call");
+ RedisModule_FreeCallReply(reply);
+
+ RedisModule_Replicate(ctx,"INCR","c","counter-1");
+ RedisModule_Replicate(ctx,"INCR","c","counter-2");
+
+ reply = RedisModule_Call(ctx, "INCR", "c!", "after-call");
+ RedisModule_FreeCallReply(reply);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+int propagateTestNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleCallReply *reply;
+
+ /* This test mixes multiple propagation systems. */
+ reply = RedisModule_Call(ctx, "INCR", "c!", "using-call");
+ RedisModule_FreeCallReply(reply);
+
+ reply = RedisModule_Call(ctx,"propagate-test.simple", "!");
+ RedisModule_FreeCallReply(reply);
+
+ RedisModule_Replicate(ctx,"INCR","c","counter-3");
+ RedisModule_Replicate(ctx,"INCR","c","counter-4");
+
+ reply = RedisModule_Call(ctx, "INCR", "c!", "after-call");
+ RedisModule_FreeCallReply(reply);
+
+ reply = RedisModule_Call(ctx, "INCR", "c!", "before-call-2");
+ RedisModule_FreeCallReply(reply);
+
+ reply = RedisModule_Call(ctx, "keyspace.incr_case1", "c!", "asdf"); /* Propagates INCR */
+ RedisModule_FreeCallReply(reply);
+
+ reply = RedisModule_Call(ctx, "keyspace.del_key_copy", "c!", "asdf"); /* Propagates DEL */
+ RedisModule_FreeCallReply(reply);
+
+ reply = RedisModule_Call(ctx, "INCR", "c!", "after-call-2");
+ RedisModule_FreeCallReply(reply);
+
+ RedisModule_ReplyWithSimpleString(ctx,"OK");
+ return REDISMODULE_OK;
+}
+
+int propagateTestIncr(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleCallReply *reply;
+
+ /* This test propagates the module command, not the INCR it executes. */
+ reply = RedisModule_Call(ctx, "INCR", "s", argv[1]);
+ RedisModule_ReplyWithCallReply(ctx,reply);
+ RedisModule_FreeCallReply(reply);
+ RedisModule_ReplicateVerbatim(ctx);
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx,"propagate-test",1,REDISMODULE_APIVER_1)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ detached_ctx = RedisModule_GetDetachedThreadSafeContext(ctx);
+
+ if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_ALL, KeySpace_NotificationGeneric) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.timer",
+ propagateTestTimerCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested",
+ propagateTestTimerNestedCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested-repl",
+ propagateTestTimerNestedReplCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.timer-maxmemory",
+ propagateTestTimerMaxmemoryCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.timer-eval",
+ propagateTestTimerEvalCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.thread",
+ propagateTestThreadCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.detached-thread",
+ propagateTestDetachedThreadCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.simple",
+ propagateTestSimpleCommand,
+ "",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.mixed",
+ propagateTestMixedCommand,
+ "write",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.nested",
+ propagateTestNestedCommand,
+ "write",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"propagate-test.incr",
+ propagateTestIncr,
+ "write",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnUnload(RedisModuleCtx *ctx) {
+ UNUSED(ctx);
+
+ if (detached_ctx)
+ RedisModule_FreeThreadSafeContext(detached_ctx);
+
+ return REDISMODULE_OK;
+}