diff options
Diffstat (limited to 'src/stream.h')
-rw-r--r-- | src/stream.h | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/src/stream.h b/src/stream.h new file mode 100644 index 0000000..bfc1654 --- /dev/null +++ b/src/stream.h @@ -0,0 +1,147 @@ +#ifndef STREAM_H +#define STREAM_H + +#include "rax.h" +#include "listpack.h" + +/* Stream item ID: a 128 bit number composed of a milliseconds time and + * a sequence counter. IDs generated in the same millisecond (or in a past + * millisecond if the clock jumped backward) will use the millisecond time + * of the latest generated ID and an incremented sequence. */ +typedef struct streamID { + uint64_t ms; /* Unix time in milliseconds. */ + uint64_t seq; /* Sequence number. */ +} streamID; + +typedef struct stream { + rax *rax; /* The radix tree holding the stream. */ + uint64_t length; /* Current number of elements inside this stream. */ + streamID last_id; /* Zero if there are yet no items. */ + streamID first_id; /* The first non-tombstone entry, zero if empty. */ + streamID max_deleted_entry_id; /* The maximal ID that was deleted. */ + uint64_t entries_added; /* All time count of elements added. */ + rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ +} stream; + +/* We define an iterator to iterate stream items in an abstract way, without + * caring about the radix tree + listpack representation. Technically speaking + * the iterator is only used inside streamReplyWithRange(), so could just + * be implemented inside the function, but practically there is the AOF + * rewriting code that also needs to iterate the stream to emit the XADD + * commands. */ +typedef struct streamIterator { + stream *stream; /* The stream we are iterating. */ + streamID master_id; /* ID of the master entry at listpack head. */ + uint64_t master_fields_count; /* Master entries # of fields. */ + unsigned char *master_fields_start; /* Master entries start in listpack. */ + unsigned char *master_fields_ptr; /* Master field to emit next. */ + int entry_flags; /* Flags of entry we are emitting. */ + int rev; /* True if iterating end to start (reverse). */ + int skip_tombstones; /* True if not emitting tombstone entries. */ + uint64_t start_key[2]; /* Start key as 128 bit big endian. */ + uint64_t end_key[2]; /* End key as 128 bit big endian. */ + raxIterator ri; /* Rax iterator. */ + unsigned char *lp; /* Current listpack. */ + unsigned char *lp_ele; /* Current listpack cursor. */ + unsigned char *lp_flags; /* Current entry flags pointer. */ + /* Buffers used to hold the string of lpGet() when the element is + * integer encoded, so that there is no string representation of the + * element inside the listpack itself. */ + unsigned char field_buf[LP_INTBUF_SIZE]; + unsigned char value_buf[LP_INTBUF_SIZE]; +} streamIterator; + +/* Consumer group. */ +typedef struct streamCG { + streamID last_id; /* Last delivered (not acknowledged) ID for this + group. Consumers that will just ask for more + messages will served with IDs > than this. */ + long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no + XGROUP SETID, ...), this is the total number of + group reads. In the real world, the reasoning behind + this value is detailed at the top comment of + streamEstimateDistanceFromFirstEverEntry(). */ + rax *pel; /* Pending entries list. This is a radix tree that + has every message delivered to consumers (without + the NOACK option) that was yet not acknowledged + as processed. The key of the radix tree is the + ID as a 64 bit big endian number, while the + associated value is a streamNACK structure.*/ + rax *consumers; /* A radix tree representing the consumers by name + and their associated representation in the form + of streamConsumer structures. */ +} streamCG; + +/* A specific consumer in a consumer group. */ +typedef struct streamConsumer { + mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */ + mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */ + sds name; /* Consumer name. This is how the consumer + will be identified in the consumer group + protocol. Case sensitive. */ + rax *pel; /* Consumer specific pending entries list: all + the pending messages delivered to this + consumer not yet acknowledged. Keys are + big endian message IDs, while values are + the same streamNACK structure referenced + in the "pel" of the consumer group structure + itself, so the value is shared. */ +} streamConsumer; + +/* Pending (yet not acknowledged) message in a consumer group. */ +typedef struct streamNACK { + mstime_t delivery_time; /* Last time this message was delivered. */ + uint64_t delivery_count; /* Number of times this message was delivered.*/ + streamConsumer *consumer; /* The consumer this message was delivered to + in the last delivery. */ +} streamNACK; + +/* Stream propagation information, passed to functions in order to propagate + * XCLAIM commands to AOF and slaves. */ +typedef struct streamPropInfo { + robj *keyname; + robj *groupname; +} streamPropInfo; + +/* Prototypes of exported APIs. */ +struct client; + +/* Flags for streamCreateConsumer */ +#define SCC_DEFAULT 0 +#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ +#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */ + +#define SCG_INVALID_ENTRIES_READ -1 + +stream *streamNew(void); +void freeStream(stream *s); +unsigned long streamLength(const robj *subject); +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); +int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); +void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); +void streamIteratorRemoveEntry(streamIterator *si, streamID *current); +void streamIteratorStop(streamIterator *si); +streamCG *streamLookupCG(stream *s, sds groupname); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name); +streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); +streamNACK *streamCreateNACK(streamConsumer *consumer); +void streamDecodeID(void *buf, streamID *id); +int streamCompareID(streamID *a, streamID *b); +void streamFreeNACK(streamNACK *na); +int streamIncrID(streamID *id); +int streamDecrID(streamID *id); +void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); +robj *streamDup(robj *o); +int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep); +int streamParseID(const robj *o, streamID *id); +robj *createObjectFromStreamID(streamID *id); +int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given); +int streamDeleteItem(stream *s, streamID *id); +void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); +long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); +int64_t streamTrimByLength(stream *s, long long maxlen, int approx); +int64_t streamTrimByID(stream *s, streamID minid, int approx); + +#endif |