summaryrefslogtreecommitdiffstats
path: root/src/flow-manager.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/flow-manager.c1266
1 files changed, 1266 insertions, 0 deletions
diff --git a/src/flow-manager.c b/src/flow-manager.c
new file mode 100644
index 0000000..e5e1aa2
--- /dev/null
+++ b/src/flow-manager.c
@@ -0,0 +1,1266 @@
+/* Copyright (C) 2007-2023 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Anoop Saldanha <anoopsaldanha@gmail.com>
+ * \author Victor Julien <victor@inliniac.net>
+ */
+
+#include "suricata-common.h"
+#include "conf.h"
+#include "threadvars.h"
+#include "tm-threads.h"
+#include "runmodes.h"
+
+#include "util-random.h"
+#include "util-time.h"
+
+#include "flow.h"
+#include "flow-queue.h"
+#include "flow-hash.h"
+#include "flow-util.h"
+#include "flow-private.h"
+#include "flow-timeout.h"
+#include "flow-manager.h"
+#include "flow-storage.h"
+#include "flow-spare-pool.h"
+
+#include "stream-tcp-reassemble.h"
+#include "stream-tcp.h"
+
+#include "util-unittest.h"
+#include "util-unittest-helper.h"
+#include "util-device.h"
+
+#include "util-debug.h"
+
+#include "threads.h"
+#include "detect.h"
+#include "detect-engine-state.h"
+#include "stream.h"
+
+#include "app-layer-parser.h"
+
+#include "host-timeout.h"
+#include "defrag-timeout.h"
+#include "ippair-timeout.h"
+#include "app-layer-htp-range.h"
+
+#include "output-flow.h"
+
+#include "runmode-unix-socket.h"
+
+/* Run mode selected at suricata.c */
+extern int run_mode;
+
+/** queue to pass flows to cleanup/log thread(s) */
+FlowQueue flow_recycle_q;
+
+/* multi flow manager support */
+static uint32_t flowmgr_number = 1;
+/* atomic counter for flow managers, to assign instance id */
+SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
+
+/* multi flow recycler support */
+static uint32_t flowrec_number = 1;
+/* atomic counter for flow recyclers, to assign instance id */
+SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
+SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
+SC_ATOMIC_EXTERN(unsigned int, flow_flags);
+
+SCCtrlCondT flow_manager_ctrl_cond;
+SCCtrlMutex flow_manager_ctrl_mutex;
+SCCtrlCondT flow_recycler_ctrl_cond;
+SCCtrlMutex flow_recycler_ctrl_mutex;
+
+void FlowTimeoutsInit(void)
+{
+ SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
+}
+
+void FlowTimeoutsEmergency(void)
+{
+ SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
+}
+
+/* 1 seconds */
+#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
+#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
+/* 0.3 seconds */
+#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
+#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
+#define NEW_FLOW_COUNT_COND 10
+
+typedef struct FlowTimeoutCounters_ {
+ uint32_t rows_checked;
+ uint32_t rows_skipped;
+ uint32_t rows_empty;
+ uint32_t rows_maxlen;
+
+ uint32_t flows_checked;
+ uint32_t flows_notimeout;
+ uint32_t flows_timeout;
+ uint32_t flows_removed;
+ uint32_t flows_aside;
+ uint32_t flows_aside_needs_work;
+
+ uint32_t bypassed_count;
+ uint64_t bypassed_pkts;
+ uint64_t bypassed_bytes;
+} FlowTimeoutCounters;
+
+/**
+ * \brief Used to disable flow manager thread(s).
+ *
+ * \todo Kinda hackish since it uses the tv name to identify flow manager
+ * thread. We need an all weather identification scheme.
+ */
+void FlowDisableFlowManagerThread(void)
+{
+ SCMutexLock(&tv_root_lock);
+ /* flow manager thread(s) is/are a part of mgmt threads */
+ for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
+ if (strncasecmp(tv->name, thread_name_flow_mgr,
+ strlen(thread_name_flow_mgr)) == 0)
+ {
+ TmThreadsSetFlag(tv, THV_KILL);
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ struct timeval start_ts;
+ struct timeval cur_ts;
+ gettimeofday(&start_ts, NULL);
+
+again:
+ gettimeofday(&cur_ts, NULL);
+ if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
+ FatalError("unable to get all flow manager "
+ "threads to shutdown in time");
+ }
+
+ SCMutexLock(&tv_root_lock);
+ for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
+ if (strncasecmp(tv->name, thread_name_flow_mgr,
+ strlen(thread_name_flow_mgr)) == 0)
+ {
+ if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+ SCMutexUnlock(&tv_root_lock);
+ /* sleep outside lock */
+ SleepMsec(1);
+ goto again;
+ }
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ /* reset count, so we can kill and respawn (unix socket) */
+ SC_ATOMIC_SET(flowmgr_cnt, 0);
+ return;
+}
+
+/** \internal
+ * \brief check if a flow is timed out
+ *
+ * \param f flow
+ * \param ts timestamp
+ *
+ * \retval 0 not timed out
+ * \retval 1 timed out
+ */
+static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
+{
+ uint32_t flow_times_out_at = f->timeout_at;
+ if (emerg) {
+ extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
+ flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
+ }
+ if (*next_ts == 0 || flow_times_out_at < *next_ts)
+ *next_ts = flow_times_out_at;
+
+ /* do the timeout check */
+ if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/** \internal
+ * \brief check timeout of captured bypassed flow by querying capture method
+ *
+ * \param f Flow
+ * \param ts timestamp
+ * \param counters Flow timeout counters
+ *
+ * \retval 0 not timeout
+ * \retval 1 timeout (or not capture bypassed)
+ */
+static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
+{
+#ifdef CAPTURE_OFFLOAD
+ if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
+ return 1;
+ }
+
+ FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
+ if (fc && fc->BypassUpdate) {
+ /* flow will be possibly updated */
+ uint64_t pkts_tosrc = fc->tosrcpktcnt;
+ uint64_t bytes_tosrc = fc->tosrcbytecnt;
+ uint64_t pkts_todst = fc->todstpktcnt;
+ uint64_t bytes_todst = fc->todstbytecnt;
+ bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
+ if (update) {
+ SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
+ pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
+ bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
+ pkts_todst = fc->todstpktcnt - pkts_todst;
+ bytes_todst = fc->todstbytecnt - bytes_todst;
+ if (f->livedev) {
+ SC_ATOMIC_ADD(f->livedev->bypassed,
+ pkts_tosrc + pkts_todst);
+ }
+ counters->bypassed_pkts += pkts_tosrc + pkts_todst;
+ counters->bypassed_bytes += bytes_tosrc + bytes_todst;
+ return 0;
+ } else {
+ SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
+ if (f->livedev) {
+ if (FLOW_IS_IPV4(f)) {
+ LiveDevSubBypassStats(f->livedev, 1, AF_INET);
+ } else if (FLOW_IS_IPV6(f)) {
+ LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
+ }
+ }
+ counters->bypassed_count++;
+ return 1;
+ }
+ }
+#endif /* CAPTURE_OFFLOAD */
+ return 1;
+}
+
+typedef struct FlowManagerTimeoutThread {
+ /* used to temporarily store flows that have timed out and are
+ * removed from the hash */
+ FlowQueuePrivate aside_queue;
+} FlowManagerTimeoutThread;
+
+static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
+{
+ FlowQueuePrivate recycle = { NULL, NULL, 0 };
+ counters->flows_aside += td->aside_queue.len;
+
+ uint32_t cnt = 0;
+ Flow *f;
+ while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
+ /* flow is still locked */
+
+ if (f->proto == IPPROTO_TCP &&
+ !(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
+ !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
+ /* Send the flow to its thread */
+ FlowForceReassemblyForFlow(f);
+ FLOWLOCK_UNLOCK(f);
+ /* flow ownership is passed to the worker thread */
+
+ counters->flows_aside_needs_work++;
+ continue;
+ }
+ FLOWLOCK_UNLOCK(f);
+
+ FlowQueuePrivateAppendFlow(&recycle, f);
+ if (recycle.len == 100) {
+ FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
+ FlowWakeupFlowRecyclerThread();
+ }
+ cnt++;
+ }
+ if (recycle.len) {
+ FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
+ FlowWakeupFlowRecyclerThread();
+ }
+ return cnt;
+}
+
+/**
+ * \internal
+ *
+ * \brief check all flows in a hash row for timing out
+ *
+ * \param f last flow in the hash row
+ * \param ts timestamp
+ * \param emergency bool indicating emergency mode
+ * \param counters ptr to FlowTimeoutCounters structure
+ */
+static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
+ int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
+{
+ uint32_t checked = 0;
+ Flow *prev_f = NULL;
+
+ do {
+ checked++;
+
+ /* check flow timeout based on lastts and state. Both can be
+ * accessed w/o Flow lock as we do have the hash row lock (so flow
+ * can't disappear) and flow_state is atomic. lastts can only
+ * be modified when we have both the flow and hash row lock */
+
+ /* timeout logic goes here */
+ if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
+
+ counters->flows_notimeout++;
+
+ prev_f = f;
+ f = f->next;
+ continue;
+ }
+
+ FLOWLOCK_WRLOCK(f);
+
+ Flow *next_flow = f->next;
+
+ /* never prune a flow that is used by a packet we
+ * are currently processing in one of the threads */
+ if (!FlowBypassedTimeout(f, ts, counters)) {
+ FLOWLOCK_UNLOCK(f);
+ prev_f = f;
+ f = f->next;
+ continue;
+ }
+
+ f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
+
+ counters->flows_timeout++;
+
+ RemoveFromHash(f, prev_f);
+
+ FlowQueuePrivateAppendFlow(&td->aside_queue, f);
+ /* flow is still locked in the queue */
+
+ f = next_flow;
+ } while (f != NULL);
+
+ counters->flows_checked += checked;
+ if (checked > counters->rows_maxlen)
+ counters->rows_maxlen = checked;
+}
+
+static void FlowManagerHashRowClearEvictedList(
+ FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
+{
+ do {
+ FLOWLOCK_WRLOCK(f);
+ Flow *next_flow = f->next;
+ f->next = NULL;
+ f->fb = NULL;
+
+ FlowQueuePrivateAppendFlow(&td->aside_queue, f);
+ /* flow is still locked in the queue */
+
+ f = next_flow;
+ } while (f != NULL);
+}
+
+/**
+ * \brief time out flows from the hash
+ *
+ * \param ts timestamp
+ * \param hash_min min hash index to consider
+ * \param hash_max max hash index to consider
+ * \param counters ptr to FlowTimeoutCounters structure
+ *
+ * \retval cnt number of timed out flow
+ */
+static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
+ const uint32_t hash_max, FlowTimeoutCounters *counters)
+{
+ uint32_t cnt = 0;
+ const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
+ const uint32_t rows_checked = hash_max - hash_min;
+ uint32_t rows_skipped = 0;
+ uint32_t rows_empty = 0;
+
+#if __WORDSIZE==64
+#define BITS 64
+#define TYPE uint64_t
+#else
+#define BITS 32
+#define TYPE uint32_t
+#endif
+
+ const uint32_t ts_secs = SCTIME_SECS(ts);
+ for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
+ TYPE check_bits = 0;
+ const uint32_t check = MIN(BITS, (hash_max - idx));
+ for (uint32_t i = 0; i < check; i++) {
+ FlowBucket *fb = &flow_hash[idx+i];
+ check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
+ fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
+ << (TYPE)i;
+ }
+ if (check_bits == 0)
+ continue;
+
+ for (uint32_t i = 0; i < check; i++) {
+ FlowBucket *fb = &flow_hash[idx+i];
+ if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
+ FBLOCK_LOCK(fb);
+ Flow *evicted = NULL;
+ if (fb->evicted != NULL || fb->head != NULL) {
+ if (fb->evicted != NULL) {
+ /* transfer out of bucket so we can do additional work outside
+ * of the bucket lock */
+ evicted = fb->evicted;
+ fb->evicted = NULL;
+ }
+ if (fb->head != NULL) {
+ uint32_t next_ts = 0;
+ FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
+
+ if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
+ SC_ATOMIC_SET(fb->next_ts, next_ts);
+ }
+ if (fb->evicted == NULL && fb->head == NULL) {
+ SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
+ }
+ } else {
+ SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
+ rows_empty++;
+ }
+ FBLOCK_UNLOCK(fb);
+ /* processed evicted list */
+ if (evicted) {
+ FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
+ }
+ } else {
+ rows_skipped++;
+ }
+ }
+ if (td->aside_queue.len) {
+ cnt += ProcessAsideQueue(td, counters);
+ }
+ }
+
+ counters->rows_checked += rows_checked;
+ counters->rows_skipped += rows_skipped;
+ counters->rows_empty += rows_empty;
+
+ if (td->aside_queue.len) {
+ cnt += ProcessAsideQueue(td, counters);
+ }
+ counters->flows_removed += cnt;
+ /* coverity[missing_unlock : FALSE] */
+ return cnt;
+}
+
+/** \internal
+ * \brief handle timeout for a slice of hash rows
+ * If we wrap around we call FlowTimeoutHash twice */
+static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
+ const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
+ const uint32_t rows, uint32_t *pos)
+{
+ uint32_t start = 0;
+ uint32_t end = 0;
+ uint32_t cnt = 0;
+ uint32_t rows_left = rows;
+
+again:
+ start = hash_min + (*pos);
+ if (start >= hash_max) {
+ start = hash_min;
+ }
+ end = start + rows_left;
+ if (end > hash_max) {
+ end = hash_max;
+ }
+ *pos = (end == hash_max) ? hash_min : end;
+ rows_left = rows_left - (end - start);
+
+ cnt += FlowTimeoutHash(td, ts, start, end, counters);
+ if (rows_left) {
+ goto again;
+ }
+ return cnt;
+}
+
+/**
+ * \internal
+ *
+ * \brief move all flows out of a hash row
+ *
+ * \param f last flow in the hash row
+ *
+ * \retval cnt removed out flows
+ */
+static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
+{
+ uint32_t cnt = 0;
+
+ do {
+ FLOWLOCK_WRLOCK(f);
+
+ Flow *next_flow = f->next;
+
+ /* remove from the hash */
+ if (mode == 0) {
+ RemoveFromHash(f, NULL);
+ } else {
+ FlowBucket *fb = f->fb;
+ fb->evicted = f->next;
+ f->next = NULL;
+ f->fb = NULL;
+ }
+ f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
+
+ /* no one is referring to this flow, removed from hash
+ * so we can unlock it and move it to the recycle queue. */
+ FLOWLOCK_UNLOCK(f);
+ FlowQueuePrivateAppendFlow(recycle_q, f);
+
+ cnt++;
+
+ f = next_flow;
+ } while (f != NULL);
+
+ return cnt;
+}
+
+/**
+ * \brief remove all flows from the hash
+ *
+ * \retval cnt number of removes out flows
+ */
+static uint32_t FlowCleanupHash(void)
+{
+ FlowQueuePrivate local_queue = { NULL, NULL, 0 };
+ uint32_t cnt = 0;
+
+ for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
+ FlowBucket *fb = &flow_hash[idx];
+
+ FBLOCK_LOCK(fb);
+
+ if (fb->head != NULL) {
+ /* we have a flow, or more than one */
+ cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
+ }
+ if (fb->evicted != NULL) {
+ /* we have a flow, or more than one */
+ cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
+ }
+
+ FBLOCK_UNLOCK(fb);
+ if (local_queue.len >= 25) {
+ FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
+ FlowWakeupFlowRecyclerThread();
+ }
+ }
+ FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
+ FlowWakeupFlowRecyclerThread();
+
+ return cnt;
+}
+
+typedef struct FlowQueueTimeoutCounters {
+ uint32_t flows_removed;
+ uint32_t flows_timeout;
+} FlowQueueTimeoutCounters;
+
+typedef struct FlowCounters_ {
+ uint16_t flow_mgr_full_pass;
+ uint16_t flow_mgr_rows_sec;
+
+ uint16_t flow_mgr_spare;
+ uint16_t flow_emerg_mode_enter;
+ uint16_t flow_emerg_mode_over;
+
+ uint16_t flow_mgr_flows_checked;
+ uint16_t flow_mgr_flows_notimeout;
+ uint16_t flow_mgr_flows_timeout;
+ uint16_t flow_mgr_flows_aside;
+ uint16_t flow_mgr_flows_aside_needs_work;
+
+ uint16_t flow_mgr_rows_maxlen;
+
+ uint16_t flow_bypassed_cnt_clo;
+ uint16_t flow_bypassed_pkts;
+ uint16_t flow_bypassed_bytes;
+
+ uint16_t memcap_pressure;
+ uint16_t memcap_pressure_max;
+} FlowCounters;
+
+typedef struct FlowManagerThreadData_ {
+ uint32_t instance;
+ uint32_t min;
+ uint32_t max;
+
+ FlowCounters cnt;
+
+ FlowManagerTimeoutThread timeout;
+} FlowManagerThreadData;
+
+static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
+{
+ fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
+ fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
+
+ fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
+ fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
+ fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
+
+ fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
+ fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
+ fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
+ fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
+ fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
+ fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
+
+ fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
+ fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
+ fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
+
+ fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
+ fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
+}
+
+static void FlowCountersUpdate(
+ ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
+{
+ StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
+ StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
+
+ StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
+ StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
+ StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work,
+ (uint64_t)counters->flows_aside_needs_work);
+
+ StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
+ StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
+ StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
+
+ StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
+}
+
+static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
+{
+ FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
+ if (ftd == NULL)
+ return TM_ECODE_FAILED;
+
+ ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
+ SCLogDebug("flow manager instance %u", ftd->instance);
+
+ /* set the min and max value used for hash row walking
+ * each thread has it's own section of the flow hash */
+ uint32_t range = flow_config.hash_size / flowmgr_number;
+
+ ftd->min = ftd->instance * range;
+ ftd->max = (ftd->instance + 1) * range;
+
+ /* last flow-manager takes on hash_size % flowmgr_number extra rows */
+ if ((ftd->instance + 1) == flowmgr_number) {
+ ftd->max = flow_config.hash_size;
+ }
+ BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
+
+ SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
+
+ /* pass thread data back to caller */
+ *data = ftd;
+
+ FlowCountersInit(t, &ftd->cnt);
+
+ PacketPoolInit();
+ return TM_ECODE_OK;
+}
+
+static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
+{
+ StreamTcpThreadCacheCleanup();
+ PacketPoolDestroy();
+ SCFree(data);
+ return TM_ECODE_OK;
+}
+
+/** \internal
+ * \brief calculate number of rows to scan and how much time to sleep
+ * based on the busy score `mp` (0 idle, 100 max busy).
+ *
+ * We try to to make sure we scan the hash once a second. The number size
+ * of the slice of the hash scanned is determined by our busy score 'mp'.
+ * We sleep for the remainder of the second after processing the slice,
+ * or at least an approximation of it.
+ * A minimum busy score of 10 is assumed to avoid a longer than 10 second
+ * full hash pass. This is to avoid burstiness in scanning when there is
+ * a rapid increase of the busy score, which could lead to the flow manager
+ * suddenly scanning a much larger slice of the hash leading to a burst
+ * in scan/eviction work.
+ */
+static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
+ uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
+{
+ if (emergency) {
+ *wu_rows = rows;
+ *wu_sleep = 250;
+ return;
+ }
+ /* minimum busy score is 10 */
+ const uint32_t emp = MAX(mp, 10);
+ const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
+ /* calc how much time we estimate the work will take, in ms. We assume
+ * each row takes an average of 1usec. Maxing out at 1sec. */
+ const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
+ /* calc how much time we need to sleep to get to the per second cadence
+ * but sleeping for at least 250ms. */
+ const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
+ SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
+ sleep_per_unit);
+
+ *wu_sleep = sleep_per_unit;
+ *wu_rows = rows_per_sec;
+ *rows_sec = rows_per_sec;
+}
+
+/** \brief Thread that manages the flow table and times out flows.
+ *
+ * \param td ThreadVars cast to void ptr
+ *
+ * Keeps an eye on the spare list, alloc flows if needed...
+ */
+static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
+{
+ FlowManagerThreadData *ftd = thread_data;
+ const uint32_t rows = ftd->max - ftd->min;
+ const bool time_is_live = TimeModeIsLive();
+
+ uint32_t emerg_over_cnt = 0;
+ uint64_t next_run_ms = 0;
+ uint32_t pos = 0;
+ uint32_t rows_sec = 0;
+ uint32_t rows_per_wu = 0;
+ uint64_t sleep_per_wu = 0;
+ bool prev_emerg = false;
+ uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
+ SCTime_t ts;
+
+ /* don't start our activities until time is setup */
+ while (!TimeModeIsReady()) {
+ if (suricata_ctl_flags != 0)
+ return TM_ECODE_OK;
+ usleep(10);
+ }
+
+ uint32_t mp = MemcapsGetPressure() * 100;
+ if (ftd->instance == 0) {
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
+ }
+ GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
+ StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
+
+ TmThreadsSetFlag(th_v, THV_RUNNING);
+
+ while (1)
+ {
+ if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
+ TmThreadsSetFlag(th_v, THV_PAUSED);
+ TmThreadTestThreadUnPaused(th_v);
+ TmThreadsUnsetFlag(th_v, THV_PAUSED);
+ }
+
+ bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
+
+ /* Get the time */
+ ts = TimeGet();
+ SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
+ uint64_t ts_ms = SCTIME_MSECS(ts);
+ const bool emerge_p = (emerg && !prev_emerg);
+ if (emerge_p) {
+ next_run_ms = 0;
+ prev_emerg = true;
+ SCLogNotice("Flow emergency mode entered...");
+ StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
+ }
+ if (ts_ms >= next_run_ms) {
+ if (ftd->instance == 0) {
+ const uint32_t sq_len = FlowSpareGetPoolSize();
+ const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
+ /* see if we still have enough spare flows */
+ if (spare_perc < 90 || spare_perc > 110) {
+ FlowSparePoolUpdate(sq_len);
+ }
+ }
+
+ /* try to time out flows */
+ // clang-format off
+ FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
+ // clang-format on
+
+ if (emerg) {
+ /* in emergency mode, do a full pass of the hash table */
+ FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
+ StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
+ } else {
+ SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
+ rows_per_wu);
+
+ const uint32_t ppos = pos;
+ FlowTimeoutHashInChunks(
+ &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
+ if (ppos > pos) {
+ StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
+ }
+ }
+
+ const uint32_t spare_pool_len = FlowSpareGetPoolSize();
+ StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
+
+ FlowCountersUpdate(th_v, ftd, &counters);
+
+ if (emerg == true) {
+ SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
+ "flow_spare_q status: %" PRIu32 "%% flows at the queue",
+ spare_pool_len, flow_config.prealloc,
+ spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
+
+ /* only if we have pruned this "emergency_recovery" percentage
+ * of flows, we will unset the emergency bit */
+ if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
+ flow_config.emergency_recovery) {
+ emerg_over_cnt++;
+ } else {
+ emerg_over_cnt = 0;
+ }
+
+ if (emerg_over_cnt >= 30) {
+ SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
+ FlowTimeoutsReset();
+
+ emerg = false;
+ prev_emerg = false;
+ emerg_over_cnt = 0;
+ SCLogNotice("Flow emergency mode over, back to normal... unsetting"
+ " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
+ "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
+ "%% flows at the queue",
+ (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
+ spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
+
+ StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
+ }
+ }
+
+ /* update work units */
+ const uint32_t pmp = mp;
+ mp = MemcapsGetPressure() * 100;
+ if (ftd->instance == 0) {
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
+ }
+ GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
+ if (pmp != mp) {
+ StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
+ }
+
+ next_run_ms = ts_ms + sleep_per_wu;
+ }
+ if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
+ if (ftd->instance == 0) {
+ DefragTimeoutHash(ts);
+ HostTimeoutHash(ts);
+ IPPairTimeoutHash(ts);
+ HttpRangeContainersTimeoutHash(ts);
+ other_last_sec = (uint32_t)SCTIME_SECS(ts);
+ }
+ }
+
+ if (TmThreadsCheckFlag(th_v, THV_KILL)) {
+ StatsSyncCounters(th_v);
+ break;
+ }
+
+ if (emerg || !time_is_live) {
+ usleep(250);
+ } else {
+ struct timeval cond_tv;
+ gettimeofday(&cond_tv, NULL);
+ struct timeval add_tv;
+ add_tv.tv_sec = 0;
+ add_tv.tv_usec = (sleep_per_wu * 1000);
+ timeradd(&cond_tv, &add_tv, &cond_tv);
+
+ struct timespec cond_time = FROM_TIMEVAL(cond_tv);
+ SCCtrlMutexLock(&flow_manager_ctrl_mutex);
+ while (1) {
+ int rc = SCCtrlCondTimedwait(
+ &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
+ if (rc == ETIMEDOUT || rc < 0)
+ break;
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
+ break;
+ }
+ }
+ SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
+ }
+
+ SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
+
+ StatsSyncCountersIfSignalled(th_v);
+ }
+ return TM_ECODE_OK;
+}
+
+/** \brief spawn the flow manager thread */
+void FlowManagerThreadSpawn(void)
+{
+ intmax_t setting = 1;
+ (void)ConfGetInt("flow.managers", &setting);
+
+ if (setting < 1 || setting > 1024) {
+ FatalError("invalid flow.managers setting %" PRIdMAX, setting);
+ }
+ flowmgr_number = (uint32_t)setting;
+
+ SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
+ SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
+
+ SCLogConfig("using %u flow manager threads", flowmgr_number);
+ StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);
+
+ for (uint32_t u = 0; u < flowmgr_number; u++) {
+ char name[TM_THREAD_NAME_MAX];
+ snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
+
+ ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
+ "FlowManager", 0);
+ BUG_ON(tv_flowmgr == NULL);
+
+ if (tv_flowmgr == NULL) {
+ FatalError("flow manager thread creation failed");
+ }
+ if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
+ FatalError("flow manager thread spawn failed");
+ }
+ }
+ return;
+}
+
+typedef struct FlowRecyclerThreadData_ {
+ void *output_thread_data;
+
+ uint16_t counter_flows;
+ uint16_t counter_queue_avg;
+ uint16_t counter_queue_max;
+
+ uint16_t counter_flow_active;
+ uint16_t counter_tcp_active_sessions;
+ FlowEndCounters fec;
+} FlowRecyclerThreadData;
+
+static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
+{
+ FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
+ if (ftd == NULL)
+ return TM_ECODE_FAILED;
+ if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
+ SCLogError("initializing flow log API for thread failed");
+ SCFree(ftd);
+ return TM_ECODE_FAILED;
+ }
+ SCLogDebug("output_thread_data %p", ftd->output_thread_data);
+
+ ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
+ ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
+ ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
+
+ ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
+ ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
+
+ FlowEndCountersRegister(t, &ftd->fec);
+
+ *data = ftd;
+ return TM_ECODE_OK;
+}
+
+static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
+{
+ StreamTcpThreadCacheCleanup();
+
+ FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
+ if (ftd->output_thread_data != NULL)
+ OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
+
+ SCFree(data);
+ return TM_ECODE_OK;
+}
+
+static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
+{
+ FLOWLOCK_WRLOCK(f);
+
+ (void)OutputFlowLog(tv, ftd->output_thread_data, f);
+
+ FlowEndCountersUpdate(tv, &ftd->fec, f);
+ if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
+ StatsDecr(tv, ftd->counter_tcp_active_sessions);
+ }
+ StatsDecr(tv, ftd->counter_flow_active);
+
+ FlowClearMemory(f, f->protomap);
+ FLOWLOCK_UNLOCK(f);
+}
+
+extern uint32_t flow_spare_pool_block_size;
+
+/** \brief Thread that manages timed out flows.
+ *
+ * \param td ThreadVars cast to void ptr
+ */
+static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
+{
+ FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
+ BUG_ON(ftd == NULL);
+ const bool time_is_live = TimeModeIsLive();
+ uint64_t recycled_cnt = 0;
+ FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
+
+ TmThreadsSetFlag(th_v, THV_RUNNING);
+
+ while (1)
+ {
+ if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
+ TmThreadsSetFlag(th_v, THV_PAUSED);
+ TmThreadTestThreadUnPaused(th_v);
+ TmThreadsUnsetFlag(th_v, THV_PAUSED);
+ }
+ SC_ATOMIC_ADD(flowrec_busy,1);
+ FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
+
+ StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
+ StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
+
+ const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
+
+ /* Get the time */
+ SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
+
+ uint64_t cnt = 0;
+ Flow *f;
+ while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
+ Recycler(th_v, ftd, f);
+ cnt++;
+
+ /* for every full sized block, add it to the spare pool */
+ FlowQueuePrivateAppendFlow(&ret_queue, f);
+ if (ret_queue.len == flow_spare_pool_block_size) {
+ FlowSparePoolReturnFlows(&ret_queue);
+ }
+ }
+ if (ret_queue.len > 0) {
+ FlowSparePoolReturnFlows(&ret_queue);
+ }
+ if (cnt > 0) {
+ recycled_cnt += cnt;
+ StatsAddUI64(th_v, ftd->counter_flows, cnt);
+ }
+ SC_ATOMIC_SUB(flowrec_busy,1);
+
+ if (bail) {
+ break;
+ }
+
+ const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
+ if (emerg || !time_is_live) {
+ usleep(250);
+ } else {
+ struct timeval cond_tv;
+ gettimeofday(&cond_tv, NULL);
+ cond_tv.tv_sec += 1;
+ struct timespec cond_time = FROM_TIMEVAL(cond_tv);
+ SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
+ while (1) {
+ int rc = SCCtrlCondTimedwait(
+ &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
+ if (rc == ETIMEDOUT || rc < 0) {
+ break;
+ }
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
+ break;
+ }
+ if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
+ break;
+ }
+ }
+ SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
+ }
+
+ SCLogDebug("woke up...");
+
+ StatsSyncCountersIfSignalled(th_v);
+ }
+ StatsSyncCounters(th_v);
+ SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
+ return TM_ECODE_OK;
+}
+
+static bool FlowRecyclerReadyToShutdown(void)
+{
+ if (SC_ATOMIC_GET(flowrec_busy) != 0) {
+ return false;
+ }
+ uint32_t len = 0;
+ FQLOCK_LOCK(&flow_recycle_q);
+ len = flow_recycle_q.qlen;
+ FQLOCK_UNLOCK(&flow_recycle_q);
+
+ return ((len == 0));
+}
+
+/** \brief spawn the flow recycler thread */
+void FlowRecyclerThreadSpawn(void)
+{
+ intmax_t setting = 1;
+ (void)ConfGetInt("flow.recyclers", &setting);
+
+ if (setting < 1 || setting > 1024) {
+ FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
+ }
+ flowrec_number = (uint32_t)setting;
+
+ SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
+ SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
+
+ SCLogConfig("using %u flow recycler threads", flowrec_number);
+
+ for (uint32_t u = 0; u < flowrec_number; u++) {
+ char name[TM_THREAD_NAME_MAX];
+ snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
+
+ ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
+ "FlowRecycler", 0);
+
+ if (tv_flowrec == NULL) {
+ FatalError("flow recycler thread creation failed");
+ }
+ if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
+ FatalError("flow recycler thread spawn failed");
+ }
+ }
+ return;
+}
+
+/**
+ * \brief Used to disable flow recycler thread(s).
+ *
+ * \note this should only be called when the flow manager is already gone
+ *
+ * \todo Kinda hackish since it uses the tv name to identify flow recycler
+ * thread. We need an all weather identification scheme.
+ */
+void FlowDisableFlowRecyclerThread(void)
+{
+ /* move all flows still in the hash to the recycler queue */
+#ifndef DEBUG
+ (void)FlowCleanupHash();
+#else
+ uint32_t flows = FlowCleanupHash();
+ SCLogDebug("flows to progress: %u", flows);
+#endif
+
+ /* make sure all flows are processed */
+ do {
+ FlowWakeupFlowRecyclerThread();
+ usleep(10);
+ } while (FlowRecyclerReadyToShutdown() == false);
+
+ SCMutexLock(&tv_root_lock);
+ /* flow recycler thread(s) is/are a part of mgmt threads */
+ for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
+ if (strncasecmp(tv->name, thread_name_flow_rec,
+ strlen(thread_name_flow_rec)) == 0)
+ {
+ TmThreadsSetFlag(tv, THV_KILL);
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ struct timeval start_ts;
+ struct timeval cur_ts;
+ gettimeofday(&start_ts, NULL);
+
+again:
+ gettimeofday(&cur_ts, NULL);
+ if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
+ FatalError("unable to get all flow recycler "
+ "threads to shutdown in time");
+ }
+
+ SCMutexLock(&tv_root_lock);
+ for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
+ if (strncasecmp(tv->name, thread_name_flow_rec,
+ strlen(thread_name_flow_rec)) == 0)
+ {
+ if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+ SCMutexUnlock(&tv_root_lock);
+ FlowWakeupFlowRecyclerThread();
+ /* sleep outside lock */
+ SleepMsec(1);
+ goto again;
+ }
+ }
+ }
+ SCMutexUnlock(&tv_root_lock);
+
+ /* reset count, so we can kill and respawn (unix socket) */
+ SC_ATOMIC_SET(flowrec_cnt, 0);
+ return;
+}
+
+void TmModuleFlowManagerRegister (void)
+{
+ tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
+ tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
+ tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
+ tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
+ tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
+ tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
+ SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
+
+ SC_ATOMIC_INIT(flowmgr_cnt);
+ SC_ATOMIC_INITPTR(flow_timeouts);
+}
+
+void TmModuleFlowRecyclerRegister (void)
+{
+ tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
+ tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
+ tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
+ tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
+ tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
+ tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
+ SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
+
+ SC_ATOMIC_INIT(flowrec_cnt);
+ SC_ATOMIC_INIT(flowrec_busy);
+}