summaryrefslogtreecommitdiffstats
path: root/src/stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.h')
-rw-r--r--src/stream.h147
1 files changed, 147 insertions, 0 deletions
diff --git a/src/stream.h b/src/stream.h
new file mode 100644
index 0000000..bfc1654
--- /dev/null
+++ b/src/stream.h
@@ -0,0 +1,147 @@
+#ifndef STREAM_H
+#define STREAM_H
+
+#include "rax.h"
+#include "listpack.h"
+
+/* Stream item ID: a 128 bit number composed of a milliseconds time and
+ * a sequence counter. IDs generated in the same millisecond (or in a past
+ * millisecond if the clock jumped backward) will use the millisecond time
+ * of the latest generated ID and an incremented sequence. */
+typedef struct streamID {
+ uint64_t ms; /* Unix time in milliseconds. */
+ uint64_t seq; /* Sequence number. */
+} streamID;
+
+typedef struct stream {
+ rax *rax; /* The radix tree holding the stream. */
+ uint64_t length; /* Current number of elements inside this stream. */
+ streamID last_id; /* Zero if there are yet no items. */
+ streamID first_id; /* The first non-tombstone entry, zero if empty. */
+ streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
+ uint64_t entries_added; /* All time count of elements added. */
+ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
+} stream;
+
+/* We define an iterator to iterate stream items in an abstract way, without
+ * caring about the radix tree + listpack representation. Technically speaking
+ * the iterator is only used inside streamReplyWithRange(), so could just
+ * be implemented inside the function, but practically there is the AOF
+ * rewriting code that also needs to iterate the stream to emit the XADD
+ * commands. */
+typedef struct streamIterator {
+ stream *stream; /* The stream we are iterating. */
+ streamID master_id; /* ID of the master entry at listpack head. */
+ uint64_t master_fields_count; /* Master entries # of fields. */
+ unsigned char *master_fields_start; /* Master entries start in listpack. */
+ unsigned char *master_fields_ptr; /* Master field to emit next. */
+ int entry_flags; /* Flags of entry we are emitting. */
+ int rev; /* True if iterating end to start (reverse). */
+ int skip_tombstones; /* True if not emitting tombstone entries. */
+ uint64_t start_key[2]; /* Start key as 128 bit big endian. */
+ uint64_t end_key[2]; /* End key as 128 bit big endian. */
+ raxIterator ri; /* Rax iterator. */
+ unsigned char *lp; /* Current listpack. */
+ unsigned char *lp_ele; /* Current listpack cursor. */
+ unsigned char *lp_flags; /* Current entry flags pointer. */
+ /* Buffers used to hold the string of lpGet() when the element is
+ * integer encoded, so that there is no string representation of the
+ * element inside the listpack itself. */
+ unsigned char field_buf[LP_INTBUF_SIZE];
+ unsigned char value_buf[LP_INTBUF_SIZE];
+} streamIterator;
+
+/* Consumer group. */
+typedef struct streamCG {
+ streamID last_id; /* Last delivered (not acknowledged) ID for this
+ group. Consumers that will just ask for more
+ messages will served with IDs > than this. */
+ long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
+ XGROUP SETID, ...), this is the total number of
+ group reads. In the real world, the reasoning behind
+ this value is detailed at the top comment of
+ streamEstimateDistanceFromFirstEverEntry(). */
+ rax *pel; /* Pending entries list. This is a radix tree that
+ has every message delivered to consumers (without
+ the NOACK option) that was yet not acknowledged
+ as processed. The key of the radix tree is the
+ ID as a 64 bit big endian number, while the
+ associated value is a streamNACK structure.*/
+ rax *consumers; /* A radix tree representing the consumers by name
+ and their associated representation in the form
+ of streamConsumer structures. */
+} streamCG;
+
+/* A specific consumer in a consumer group. */
+typedef struct streamConsumer {
+ mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
+ mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
+ sds name; /* Consumer name. This is how the consumer
+ will be identified in the consumer group
+ protocol. Case sensitive. */
+ rax *pel; /* Consumer specific pending entries list: all
+ the pending messages delivered to this
+ consumer not yet acknowledged. Keys are
+ big endian message IDs, while values are
+ the same streamNACK structure referenced
+ in the "pel" of the consumer group structure
+ itself, so the value is shared. */
+} streamConsumer;
+
+/* Pending (yet not acknowledged) message in a consumer group. */
+typedef struct streamNACK {
+ mstime_t delivery_time; /* Last time this message was delivered. */
+ uint64_t delivery_count; /* Number of times this message was delivered.*/
+ streamConsumer *consumer; /* The consumer this message was delivered to
+ in the last delivery. */
+} streamNACK;
+
+/* Stream propagation information, passed to functions in order to propagate
+ * XCLAIM commands to AOF and slaves. */
+typedef struct streamPropInfo {
+ robj *keyname;
+ robj *groupname;
+} streamPropInfo;
+
+/* Prototypes of exported APIs. */
+struct client;
+
+/* Flags for streamCreateConsumer */
+#define SCC_DEFAULT 0
+#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
+#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
+
+#define SCG_INVALID_ENTRIES_READ -1
+
+stream *streamNew(void);
+void freeStream(stream *s);
+unsigned long streamLength(const robj *subject);
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
+int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
+void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
+void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
+void streamIteratorStop(streamIterator *si);
+streamCG *streamLookupCG(stream *s, sds groupname);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
+streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
+streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
+streamNACK *streamCreateNACK(streamConsumer *consumer);
+void streamDecodeID(void *buf, streamID *id);
+int streamCompareID(streamID *a, streamID *b);
+void streamFreeNACK(streamNACK *na);
+int streamIncrID(streamID *id);
+int streamDecrID(streamID *id);
+void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
+robj *streamDup(robj *o);
+int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
+int streamParseID(const robj *o, streamID *id);
+robj *createObjectFromStreamID(streamID *id);
+int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
+int streamDeleteItem(stream *s, streamID *id);
+void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
+long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
+int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
+int64_t streamTrimByID(stream *s, streamID minid, int approx);
+
+#endif