diff options
Diffstat (limited to 'tests/modules/blockonbackground.c')
-rw-r--r-- | tests/modules/blockonbackground.c | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/tests/modules/blockonbackground.c b/tests/modules/blockonbackground.c new file mode 100644 index 0000000..2e3b1a5 --- /dev/null +++ b/tests/modules/blockonbackground.c @@ -0,0 +1,295 @@ +#define _XOPEN_SOURCE 700 +#include "redismodule.h" +#include <stdio.h> +#include <stdlib.h> +#include <pthread.h> +#include <time.h> + +#define UNUSED(x) (void)(x) + +/* Reply callback for blocking command BLOCK.DEBUG */ +int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + int *myint = RedisModule_GetBlockedClientPrivateData(ctx); + return RedisModule_ReplyWithLongLong(ctx,*myint); +} + +/* Timeout callback for blocking command BLOCK.DEBUG */ +int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx); + RedisModule_BlockedClientMeasureTimeEnd(bc); + return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); +} + +/* Private data freeing callback for BLOCK.DEBUG command. */ +void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { + UNUSED(ctx); + RedisModule_Free(privdata); +} + +/* Private data freeing callback for BLOCK.BLOCK command. */ +void HelloBlock_FreeStringData(RedisModuleCtx *ctx, void *privdata) { + RedisModule_FreeString(ctx, (RedisModuleString*)privdata); +} + +/* The thread entry point that actually executes the blocking part + * of the command BLOCK.DEBUG. */ +void *BlockDebug_ThreadMain(void *arg) { + void **targ = arg; + RedisModuleBlockedClient *bc = targ[0]; + long long delay = (unsigned long)targ[1]; + long long enable_time_track = (unsigned long)targ[2]; + if (enable_time_track) + RedisModule_BlockedClientMeasureTimeStart(bc); + RedisModule_Free(targ); + + struct timespec ts; + ts.tv_sec = delay / 1000; + ts.tv_nsec = (delay % 1000) * 1000000; + nanosleep(&ts, NULL); + int *r = RedisModule_Alloc(sizeof(int)); + *r = rand(); + if (enable_time_track) + RedisModule_BlockedClientMeasureTimeEnd(bc); + RedisModule_UnblockClient(bc,r); + return NULL; +} + +/* The thread entry point that actually executes the blocking part + * of the command BLOCK.DOUBLE_DEBUG. */ +void *DoubleBlock_ThreadMain(void *arg) { + void **targ = arg; + RedisModuleBlockedClient *bc = targ[0]; + long long delay = (unsigned long)targ[1]; + RedisModule_BlockedClientMeasureTimeStart(bc); + RedisModule_Free(targ); + struct timespec ts; + ts.tv_sec = delay / 1000; + ts.tv_nsec = (delay % 1000) * 1000000; + nanosleep(&ts, NULL); + int *r = RedisModule_Alloc(sizeof(int)); + *r = rand(); + RedisModule_BlockedClientMeasureTimeEnd(bc); + /* call again RedisModule_BlockedClientMeasureTimeStart() and + * RedisModule_BlockedClientMeasureTimeEnd and ensure that the + * total execution time is 2x the delay. */ + RedisModule_BlockedClientMeasureTimeStart(bc); + nanosleep(&ts, NULL); + RedisModule_BlockedClientMeasureTimeEnd(bc); + + RedisModule_UnblockClient(bc,r); + return NULL; +} + +void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) { + RedisModule_Log(ctx,"warning","Blocked client %p disconnected!", + (void*)bc); +} + +/* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with + * a random number. Timeout is the command timeout, so that you can test + * what happens when the delay is greater than the timeout. */ +int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) return RedisModule_WrongArity(ctx); + long long delay; + long long timeout; + + if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx,"ERR invalid count"); + } + + if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx,"ERR invalid count"); + } + + pthread_t tid; + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + + /* Here we set a disconnection handler, however since this module will + * block in sleep() in a thread, there is not much we can do in the + * callback, so this is just to show you the API. */ + RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected); + + /* Now that we setup a blocking client, we need to pass the control + * to the thread. However we need to pass arguments to the thread: + * the delay and a reference to the blocked client handle. */ + void **targ = RedisModule_Alloc(sizeof(void*)*3); + targ[0] = bc; + targ[1] = (void*)(unsigned long) delay; + // pass 1 as flag to enable time tracking + targ[2] = (void*)(unsigned long) 1; + + if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) { + RedisModule_AbortBlock(bc); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + return REDISMODULE_OK; +} + +/* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with + * a random number. Timeout is the command timeout, so that you can test + * what happens when the delay is greater than the timeout. + * this command does not track background time so the background time should no appear in stats*/ +int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) return RedisModule_WrongArity(ctx); + long long delay; + long long timeout; + + if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx,"ERR invalid count"); + } + + if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx,"ERR invalid count"); + } + + pthread_t tid; + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + + /* Here we set a disconnection handler, however since this module will + * block in sleep() in a thread, there is not much we can do in the + * callback, so this is just to show you the API. */ + RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected); + + /* Now that we setup a blocking client, we need to pass the control + * to the thread. However we need to pass arguments to the thread: + * the delay and a reference to the blocked client handle. */ + void **targ = RedisModule_Alloc(sizeof(void*)*3); + targ[0] = bc; + targ[1] = (void*)(unsigned long) delay; + // pass 0 as flag to enable time tracking + targ[2] = (void*)(unsigned long) 0; + + if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) { + RedisModule_AbortBlock(bc); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + return REDISMODULE_OK; +} + +/* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds, + * then reply with a random number. + * This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart() + * and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */ +int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) return RedisModule_WrongArity(ctx); + long long delay; + + if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx,"ERR invalid count"); + } + + pthread_t tid; + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0); + + /* Now that we setup a blocking client, we need to pass the control + * to the thread. However we need to pass arguments to the thread: + * the delay and a reference to the blocked client handle. */ + void **targ = RedisModule_Alloc(sizeof(void*)*2); + targ[0] = bc; + targ[1] = (void*)(unsigned long) delay; + + if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) { + RedisModule_AbortBlock(bc); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + return REDISMODULE_OK; +} + +RedisModuleBlockedClient *blocked_client = NULL; + +/* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released + * or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is + * registered. + */ +int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (RedisModule_IsBlockedReplyRequest(ctx)) { + RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx); + return RedisModule_ReplyWithString(ctx, r); + } else if (RedisModule_IsBlockedTimeoutRequest(ctx)) { + RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */ + blocked_client = NULL; + return RedisModule_ReplyWithSimpleString(ctx, "Timed out"); + } + + if (argc != 2) return RedisModule_WrongArity(ctx); + long long timeout; + + if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR invalid timeout"); + } + if (blocked_client) { + return RedisModule_ReplyWithError(ctx, "ERR another client already blocked"); + } + + /* Block client. We use this function as both a reply and optional timeout + * callback and differentiate the different code flows above. + */ + blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand, + timeout > 0 ? Block_RedisCommand : NULL, HelloBlock_FreeStringData, timeout); + return REDISMODULE_OK; +} + +/* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise. + */ +int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0); + return REDISMODULE_OK; +} + +/* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply. + */ +int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) return RedisModule_WrongArity(ctx); + if (!blocked_client) { + return RedisModule_ReplyWithError(ctx, "ERR No blocked client"); + } + + RedisModuleString *replystr = argv[1]; + RedisModule_RetainString(ctx, replystr); + RedisModule_UnblockClient(blocked_client, replystr); + blocked_client = NULL; + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + + if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"block.debug", + HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"block.double_debug", + HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"block.debug_no_track", + HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "block.block", + Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"block.is_blocked", + IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"block.release", + Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} |