1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
|
.. include:: <isonum.txt>
*****************************
Imhiredis: Redis input plugin
*****************************
==================== =====================================
**Module Name:** **imhiredis**
**Author:** Jeremie Jourdin <jeremie.jourdin@advens.fr>
**Contributors:** Theo Bertin <theo.bertin@advens.fr>
==================== =====================================
Purpose
=======
Imhiredis is an input module reading arbitrary entries from Redis.
It uses the `hiredis library <https://github.com/redis/hiredis.git>`_ to query Redis instances using 3 modes:
- **queues**, using `LIST <https://redis.io/commands#list>`_ commands
- **channels**, using `SUBSCRIBE <https://redis.io/commands#pubsub>`_ commands
- **streams**, using `XREAD/XREADGROUP <https://redis.io/commands/?group=stream>`_ commands
.. _imhiredis_queue_mode:
Queue mode
----------
The **queue mode** uses Redis LISTs to push/pop messages to/from lists. It allows simple and efficient uses of Redis as a queueing system, providing both LIFO and FIFO methods.
This mode should be preferred if the user wants to use Redis as a caching system, with one (or many) Rsyslog instances POP'ing out entries.
.. Warning::
This mode was configured to provide optimal performances while not straining Redis, but as imhiredis has to poll the instance some trade-offs had to be made:
- imhiredis POPs entries by batches of 10 to improve performances (size of batch is configurable via the batchsize parameter)
- when no entries are left in the list, the module sleeps for 1 second before checking the list again. This means messages might be delayed by as much as 1 second between a push to the list and a pop by imhiredis (entries will still be POP'ed out as fast as possible while the list is not empty)
.. _imhiredis_channel_mode:
Channel mode
------------
The **subscribe** mode uses Redis PUB/SUB system to listen to messages published to Redis' channels. It allows performant use of Redis as a message broker.
This mode should be preferred to use Redis as a message broker, with zero, one or many subscribers listening to new messages.
.. Warning::
This mode shouldn't be used if messages are to be reliably processed, as messages published when no Imhiredis is listening will result in the loss of the message.
.. _imhiredis_stream_mode:
Stream mode
------------
The **stream** mode uses `Redis Streams system <https://redis.io/docs/data-types/streams/>`_ to read entries published to Redis' streams. It is a good alternative when:
- sharing work is desired
- not losing any log (even in the case of a crash) is mandatory
This mode is especially useful to define pools of workers that do various processing along the way, while ensuring not a single log is lost during processing by a worker.
.. note::
As Redis streams do not insert simple values in keys, but rather fleid/value pairs, this mode can also be useful when handling structured data. This is better shown with the examples for the parameter :ref:`imhiredis_fields`.
This mode also adds additional internal metadata to the message, it won't be included in json data or regular fields, but
- **$.redis!stream** will be added to the message, with the value of the source stream
- **$.redis!index** will be added to the message, with the exact ID of the entry
- **$.redis!group** will be added in the message (if :ref:`imhiredis_stream_consumergroup` is set), with the value of the group used to read the entry
- **$.redis!consumer** will be added in the message (if :ref:`imhiredis_stream_consumername` is set), with the value of the consumer name used to read the entry
This is especially useful when used with the omhiredis module, to allow it to get the required information semi-automatically (custom templates will still be required in the user configuration)
.. Warning::
This mode is the most reliable to handle entries stored in Redis, but it might also be the one with the most overhead. Although still minimal, make sure to test the different options and determine if this mode is right for you!
Master/Replica
--------------
This module is able to automatically connect to the master instance of a master/replica(s) cluster. Simply providing a valid connection entry point (being the current master or a valid replica), Imhiredis is able to redirect to the master node on startup and when states change between nodes.
Configuration Parameters
========================
.. note::
Parameter names are case-insensitive
Input Parameters
----------------
.. _imhiredis_mode:
mode
^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "subscribe", "yes", "none"
| Defines the mode to use for the module.
| Should be either "**subscribe**" (:ref:`imhiredis_channel_mode`), "**queue**" (:ref:`imhiredis_queue_mode`) or "**stream**" (:ref:`imhiredis_stream_mode`) (case-sensitive).
ruleset
^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "none", "no", "none"
Assign messages from this input to a specific Rsyslog ruleset.
batchsize
^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"number", "10", "yes", "none"
Defines the dequeue batch size for redis pipelining.
imhiredis will read "**batchsize**" elements from redis at a time.
When using the :ref:`imhiredis_queue_mode`, defines the size of the batch to use with LPOP / RPOP.
.. _imhiredis_key:
key
^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "none", "yes", "none"
Defines either the name of the list to use (for :ref:`imhiredis_queue_mode`) or the channel to listen to (for :ref:`imhiredis_channel_mode`).
.. _imhiredis_socketPath:
socketPath
^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "no", "if no :ref:`imhiredis_server` provided", "none"
Defines the socket to use when trying to connect to Redis. Will be ignored if both :ref:`imhiredis_server` and :ref:`imhiredis_socketPath` are given.
.. _imhiredis_server:
server
^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"ip", "127.0.0.1", "if no :ref:`imhiredis_socketPath` provided", "none"
The Redis server's IP to connect to.
.. _imhiredis_port:
port
^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"number", "6379", "no", "none"
The Redis server's port to use when connecting via IP.
.. _imhiredis_password:
password
^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "none", "no", "none"
The password to use when connecting to a Redis node, if necessary.
.. _imhiredis_uselpop:
uselpop
^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"boolean", "no", "no", "none"
| When using the :ref:`imhiredis_queue_mode`, defines if imhiredis should use a LPOP instruction instead of a RPOP (the default).
| Has no influence on the :ref:`imhiredis_channel_mode` and will be ignored if set with this mode.
.. _imhiredis_stream_consumergroup:
stream.consumerGroup
^^^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "", "no", "none"
| When using the :ref:`imhiredis_stream_mode`, defines a consumer group name to use (see `the XREADGROUP documentation <https://redis.io/commands/xreadgroup/>`_ for details). This parameter activates the use of **XREADGROUP** commands, in replacement to simple XREADs.
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
If this parameter is set, :ref:`imhiredis_stream_consumername` should also be set
.. _imhiredis_stream_consumername:
stream.consumerName
^^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "", "no", "none"
| When using the :ref:`imhiredis_stream_mode`, defines a consumer name to use (see `the XREADGROUP documentation <https://redis.io/commands/xreadgroup/>`_ for details). This parameter activates the use of **XREADGROUP** commands, in replacement to simple XREADs.
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
If this parameter is set, :ref:`imhiredis_stream_consumergroup` should also be set
.. _imhiredis_stream_readfrom:
stream.readFrom
^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"word", "$", "no", "none"
| When using the :ref:`imhiredis_stream_mode`, defines the `starting ID <https://redis.io/docs/data-types/streams-tutorial/#entry-ids>`_ for XREAD/XREADGROUP commands (can also use special IDs, see `documentation <https://redis.io/docs/data-types/streams-tutorial/#special-ids-in-the-streams-api>`_).
| Has no influence in the other modes (queue or channel) and will be ignored.
.. _imhiredis_stream_consumerack:
stream.consumerACK
^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"boolean", "on", "no", "none"
| When using :ref:`imhiredis_stream_mode` with :ref:`imhiredis_stream_consumergroup` and :ref:`imhiredis_stream_consumername`, determines if the module should directly acknowledge the ID once read from the Consumer Group.
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
When using Consumer Groups and imhiredis, omhiredis can also integrate with this workflow to acknowledge a processed message once put back in another stream (or somewhere else). This parameter is then useful set to **off** to let the omhiredis module acknowledge the input ID once the message is correctly sent.
.. _imhiredis_stream_autoclaimidletime:
stream.autoclaimIdleTime
^^^^^^^^^^^^^^^^^^^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"positive number", "0", "no", "none"
| When using :ref:`imhiredis_stream_mode` with :ref:`imhiredis_stream_consumergroup` and :ref:`imhiredis_stream_consumername`, determines if the module should check for pending IDs that exceed this time (**in milliseconds**) to assume the original consumer failed to acknowledge the log and claim them for their own (see `the redis ducumentation <https://redis.io/docs/data-types/streams-tutorial/#automatic-claiming>`_ on this subject for more details on how that works).
| Has no influence in the other modes (queue or channel) and will be ignored.
.. note::
If this parameter is set, the AUTOCLAIM operation will also take into account the specified :ref:`imhiredis_stream_readfrom` parameter. **If its value is '$' (default), the AUTOCLAIM commands will use '0-0' as the starting ID**.
.. _imhiredis_fields:
fields
^^^^^^
.. csv-table::
:header: "type", "default", "mandatory", "|FmtObsoleteName| directive"
:widths: auto
:class: parameter-table
"array", "[]", "no", "none"
| When using :ref:`imhiredis_stream_mode`, the module won't get a simple entry but will instead get hashes, with field/value pairs.
| By default, the module will insert every value into their respective field in the **$!** object, but this parameter can change this behaviour, for each entry the value will be a string where:
- if the entry begins with a **!** or a **.**, it will be taken as a key to take into the original entry
- if the entry doesn't begin with a **!** or a **.**, the value will be taken verbatim
- in addition, if the value is prefixed with a **:<key>:** pattern, the value (verbatim or taken from the entry) will be inserted in this specific key (or subkey)
*Examples*:
.. csv-table::
:header: "configuration", "result"
:widths: auto
:class: parameter-table
``["static_value"]``, the value "static_value" will be inserted in $!static_value
``[":key:static_value"]``, the value "static_value" will be inserted in $!key
``["!field"]``, the value of the field "field" will be inserted in $!field
``[":key!subkey:!field"]``, the value of the field "field" will be inserted in $!key!subkey
|