summaryrefslogtreecommitdiffstats
path: root/src/logreqres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/logreqres.c')
-rw-r--r--src/logreqres.c315
1 files changed, 315 insertions, 0 deletions
diff --git a/src/logreqres.c b/src/logreqres.c
new file mode 100644
index 0000000..6e7621d
--- /dev/null
+++ b/src/logreqres.c
@@ -0,0 +1,315 @@
+/*
+ * Copyright (c) 2021, Redis Ltd.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* This file implements the interface of logging clients' requests and
+ * responses into a file.
+ * This feature needs the LOG_REQ_RES macro to be compiled and is turned
+ * on by the req-res-logfile config."
+ *
+ * Some examples:
+ *
+ * PING:
+ *
+ * 4
+ * ping
+ * 12
+ * __argv_end__
+ * +PONG
+ *
+ * LRANGE:
+ *
+ * 6
+ * lrange
+ * 4
+ * list
+ * 1
+ * 0
+ * 2
+ * -1
+ * 12
+ * __argv_end__
+ * *1
+ * $3
+ * ele
+ *
+ * The request is everything up until the __argv_end__ marker.
+ * The format is:
+ * <number of characters>
+ * <the argument>
+ *
+ * After __argv_end__ the response appears, and the format is
+ * RESP (2 or 3, depending on what the client has configured)
+ */
+
+#include "server.h"
+#include <ctype.h>
+
+#ifdef LOG_REQ_RES
+
+/* ----- Helpers ----- */
+
+static int reqresShouldLog(client *c) {
+ if (!server.req_res_logfile)
+ return 0;
+
+ /* Ignore client with streaming non-standard response */
+ if (c->flags & (CLIENT_PUBSUB|CLIENT_MONITOR|CLIENT_SLAVE))
+ return 0;
+
+ /* We only work on masters (didn't implement reqresAppendResponse to work on shared slave buffers) */
+ if (getClientType(c) == CLIENT_TYPE_MASTER)
+ return 0;
+
+ return 1;
+}
+
+static size_t reqresAppendBuffer(client *c, void *buf, size_t len) {
+ if (!c->reqres.buf) {
+ c->reqres.capacity = max(len, 1024);
+ c->reqres.buf = zmalloc(c->reqres.capacity);
+ } else if (c->reqres.capacity - c->reqres.used < len) {
+ c->reqres.capacity += len;
+ c->reqres.buf = zrealloc(c->reqres.buf, c->reqres.capacity);
+ }
+
+ memcpy(c->reqres.buf + c->reqres.used, buf, len);
+ c->reqres.used += len;
+ return len;
+}
+
+/* Functions for requests */
+
+static size_t reqresAppendArg(client *c, char *arg, size_t arg_len) {
+ char argv_len_buf[LONG_STR_SIZE];
+ size_t argv_len_buf_len = ll2string(argv_len_buf,sizeof(argv_len_buf),(long)arg_len);
+ size_t ret = reqresAppendBuffer(c, argv_len_buf, argv_len_buf_len);
+ ret += reqresAppendBuffer(c, "\r\n", 2);
+ ret += reqresAppendBuffer(c, arg, arg_len);
+ ret += reqresAppendBuffer(c, "\r\n", 2);
+ return ret;
+}
+
+/* ----- API ----- */
+
+
+/* Zero out the clientReqResInfo struct inside the client,
+ * and free the buffer if needed */
+void reqresReset(client *c, int free_buf) {
+ if (free_buf && c->reqres.buf)
+ zfree(c->reqres.buf);
+ memset(&c->reqres, 0, sizeof(c->reqres));
+}
+
+/* Save the offset of the reply buffer (or the reply list).
+ * Should be called when adding a reply (but it will only save the offset
+ * on the very first time it's called, because of c->reqres.offset.saved)
+ * The idea is:
+ * 1. When a client is executing a command, we save the reply offset.
+ * 2. During the execution, the reply offset may grow, as addReply* functions are called.
+ * 3. When client is done with the command (commandProcessed), reqresAppendResponse
+ * is called.
+ * 4. reqresAppendResponse will append the diff between the current offset and the one from step (1)
+ * 5. When client is reset before the next command, we clear c->reqres.offset.saved and start again
+ *
+ * We cannot reply on c->sentlen to keep track because it depends on the network
+ * (reqresAppendResponse will always write the whole buffer, unlike writeToClient)
+ *
+ * Ideally, we would just have this code inside reqresAppendRequest, which is called
+ * from processCommand, but we cannot save the reply offset inside processCommand
+ * because of the following pipe-lining scenario:
+ * set rd [redis_deferring_client]
+ * set buf ""
+ * append buf "SET key vale\r\n"
+ * append buf "BLPOP mylist 0\r\n"
+ * $rd write $buf
+ * $rd flush
+ *
+ * Let's assume we save the reply offset in processCommand
+ * When BLPOP is processed the offset is 5 (+OK\r\n from the SET)
+ * Then beforeSleep is called, the +OK is written to network, and bufpos is 0
+ * When the client is finally unblocked, the cached offset is 5, but bufpos is already
+ * 0, so we would miss the first 5 bytes of the reply.
+ **/
+void reqresSaveClientReplyOffset(client *c) {
+ if (!reqresShouldLog(c))
+ return;
+
+ if (c->reqres.offset.saved)
+ return;
+
+ c->reqres.offset.saved = 1;
+
+ c->reqres.offset.bufpos = c->bufpos;
+ if (listLength(c->reply) && listNodeValue(listLast(c->reply))) {
+ c->reqres.offset.last_node.index = listLength(c->reply) - 1;
+ c->reqres.offset.last_node.used = ((clientReplyBlock *)listNodeValue(listLast(c->reply)))->used;
+ } else {
+ c->reqres.offset.last_node.index = 0;
+ c->reqres.offset.last_node.used = 0;
+ }
+}
+
+size_t reqresAppendRequest(client *c) {
+ robj **argv = c->argv;
+ int argc = c->argc;
+
+ serverAssert(argc);
+
+ if (!reqresShouldLog(c))
+ return 0;
+
+ /* Ignore commands that have streaming non-standard response */
+ sds cmd = argv[0]->ptr;
+ if (!strcasecmp(cmd,"debug") || /* because of DEBUG SEGFAULT */
+ !strcasecmp(cmd,"sync") ||
+ !strcasecmp(cmd,"psync") ||
+ !strcasecmp(cmd,"monitor") ||
+ !strcasecmp(cmd,"subscribe") ||
+ !strcasecmp(cmd,"unsubscribe") ||
+ !strcasecmp(cmd,"ssubscribe") ||
+ !strcasecmp(cmd,"sunsubscribe") ||
+ !strcasecmp(cmd,"psubscribe") ||
+ !strcasecmp(cmd,"punsubscribe"))
+ {
+ return 0;
+ }
+
+ c->reqres.argv_logged = 1;
+
+ size_t ret = 0;
+ for (int i = 0; i < argc; i++) {
+ if (sdsEncodedObject(argv[i])) {
+ ret += reqresAppendArg(c, argv[i]->ptr, sdslen(argv[i]->ptr));
+ } else if (argv[i]->encoding == OBJ_ENCODING_INT) {
+ char buf[LONG_STR_SIZE];
+ size_t len = ll2string(buf,sizeof(buf),(long)argv[i]->ptr);
+ ret += reqresAppendArg(c, buf, len);
+ } else {
+ serverPanic("Wrong encoding in reqresAppendRequest()");
+ }
+ }
+ return ret + reqresAppendArg(c, "__argv_end__", 12);
+}
+
+size_t reqresAppendResponse(client *c) {
+ size_t ret = 0;
+
+ if (!reqresShouldLog(c))
+ return 0;
+
+ if (!c->reqres.argv_logged) /* Example: UNSUBSCRIBE */
+ return 0;
+
+ if (!c->reqres.offset.saved) /* Example: module client blocked on keys + CLIENT KILL */
+ return 0;
+
+ /* First append the static reply buffer */
+ if (c->bufpos > c->reqres.offset.bufpos) {
+ size_t written = reqresAppendBuffer(c, c->buf + c->reqres.offset.bufpos, c->bufpos - c->reqres.offset.bufpos);
+ ret += written;
+ }
+
+ int curr_index = 0;
+ size_t curr_used = 0;
+ if (listLength(c->reply)) {
+ curr_index = listLength(c->reply) - 1;
+ curr_used = ((clientReplyBlock *)listNodeValue(listLast(c->reply)))->used;
+ }
+
+ /* Now, append reply bytes from the reply list */
+ if (curr_index > c->reqres.offset.last_node.index ||
+ curr_used > c->reqres.offset.last_node.used)
+ {
+ int i = 0;
+ listIter iter;
+ listNode *curr;
+ clientReplyBlock *o;
+ listRewind(c->reply, &iter);
+ while ((curr = listNext(&iter)) != NULL) {
+ size_t written;
+
+ /* Skip nodes we had already processed */
+ if (i < c->reqres.offset.last_node.index) {
+ i++;
+ continue;
+ }
+ o = listNodeValue(curr);
+ if (o->used == 0) {
+ i++;
+ continue;
+ }
+ if (i == c->reqres.offset.last_node.index) {
+ /* Write the potentially incomplete node, which had data from
+ * before the current command started */
+ written = reqresAppendBuffer(c,
+ o->buf + c->reqres.offset.last_node.used,
+ o->used - c->reqres.offset.last_node.used);
+ } else {
+ /* New node */
+ written = reqresAppendBuffer(c, o->buf, o->used);
+ }
+ ret += written;
+ i++;
+ }
+ }
+ serverAssert(ret);
+
+ /* Flush both request and response to file */
+ FILE *fp = fopen(server.req_res_logfile, "a");
+ serverAssert(fp);
+ fwrite(c->reqres.buf, c->reqres.used, 1, fp);
+ fclose(fp);
+
+ return ret;
+}
+
+#else /* #ifdef LOG_REQ_RES */
+
+/* Just mimic the API without doing anything */
+
+void reqresReset(client *c, int free_buf) {
+ UNUSED(c);
+ UNUSED(free_buf);
+}
+
+inline void reqresSaveClientReplyOffset(client *c) {
+ UNUSED(c);
+}
+
+inline size_t reqresAppendRequest(client *c) {
+ UNUSED(c);
+ return 0;
+}
+
+inline size_t reqresAppendResponse(client *c) {
+ UNUSED(c);
+ return 0;
+}
+
+#endif /* #ifdef LOG_REQ_RES */