diff options
Diffstat (limited to '')
-rw-r--r-- | src/flow-manager.c | 1266 |
1 files changed, 1266 insertions, 0 deletions
diff --git a/src/flow-manager.c b/src/flow-manager.c new file mode 100644 index 0000000..e5e1aa2 --- /dev/null +++ b/src/flow-manager.c @@ -0,0 +1,1266 @@ +/* Copyright (C) 2007-2023 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Anoop Saldanha <anoopsaldanha@gmail.com> + * \author Victor Julien <victor@inliniac.net> + */ + +#include "suricata-common.h" +#include "conf.h" +#include "threadvars.h" +#include "tm-threads.h" +#include "runmodes.h" + +#include "util-random.h" +#include "util-time.h" + +#include "flow.h" +#include "flow-queue.h" +#include "flow-hash.h" +#include "flow-util.h" +#include "flow-private.h" +#include "flow-timeout.h" +#include "flow-manager.h" +#include "flow-storage.h" +#include "flow-spare-pool.h" + +#include "stream-tcp-reassemble.h" +#include "stream-tcp.h" + +#include "util-unittest.h" +#include "util-unittest-helper.h" +#include "util-device.h" + +#include "util-debug.h" + +#include "threads.h" +#include "detect.h" +#include "detect-engine-state.h" +#include "stream.h" + +#include "app-layer-parser.h" + +#include "host-timeout.h" +#include "defrag-timeout.h" +#include "ippair-timeout.h" +#include "app-layer-htp-range.h" + +#include "output-flow.h" + +#include "runmode-unix-socket.h" + +/* Run mode selected at suricata.c */ +extern int run_mode; + +/** queue to pass flows to cleanup/log thread(s) */ +FlowQueue flow_recycle_q; + +/* multi flow manager support */ +static uint32_t flowmgr_number = 1; +/* atomic counter for flow managers, to assign instance id */ +SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt); + +/* multi flow recycler support */ +static uint32_t flowrec_number = 1; +/* atomic counter for flow recyclers, to assign instance id */ +SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt); +SC_ATOMIC_DECLARE(uint32_t, flowrec_busy); +SC_ATOMIC_EXTERN(unsigned int, flow_flags); + +SCCtrlCondT flow_manager_ctrl_cond; +SCCtrlMutex flow_manager_ctrl_mutex; +SCCtrlCondT flow_recycler_ctrl_cond; +SCCtrlMutex flow_recycler_ctrl_mutex; + +void FlowTimeoutsInit(void) +{ + SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal); +} + +void FlowTimeoutsEmergency(void) +{ + SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg); +} + +/* 1 seconds */ +#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1 +#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0 +/* 0.3 seconds */ +#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0 +#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000 +#define NEW_FLOW_COUNT_COND 10 + +typedef struct FlowTimeoutCounters_ { + uint32_t rows_checked; + uint32_t rows_skipped; + uint32_t rows_empty; + uint32_t rows_maxlen; + + uint32_t flows_checked; + uint32_t flows_notimeout; + uint32_t flows_timeout; + uint32_t flows_removed; + uint32_t flows_aside; + uint32_t flows_aside_needs_work; + + uint32_t bypassed_count; + uint64_t bypassed_pkts; + uint64_t bypassed_bytes; +} FlowTimeoutCounters; + +/** + * \brief Used to disable flow manager thread(s). + * + * \todo Kinda hackish since it uses the tv name to identify flow manager + * thread. We need an all weather identification scheme. + */ +void FlowDisableFlowManagerThread(void) +{ + SCMutexLock(&tv_root_lock); + /* flow manager thread(s) is/are a part of mgmt threads */ + for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { + if (strncasecmp(tv->name, thread_name_flow_mgr, + strlen(thread_name_flow_mgr)) == 0) + { + TmThreadsSetFlag(tv, THV_KILL); + } + } + SCMutexUnlock(&tv_root_lock); + + struct timeval start_ts; + struct timeval cur_ts; + gettimeofday(&start_ts, NULL); + +again: + gettimeofday(&cur_ts, NULL); + if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) { + FatalError("unable to get all flow manager " + "threads to shutdown in time"); + } + + SCMutexLock(&tv_root_lock); + for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { + if (strncasecmp(tv->name, thread_name_flow_mgr, + strlen(thread_name_flow_mgr)) == 0) + { + if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + SCMutexUnlock(&tv_root_lock); + /* sleep outside lock */ + SleepMsec(1); + goto again; + } + } + } + SCMutexUnlock(&tv_root_lock); + + /* reset count, so we can kill and respawn (unix socket) */ + SC_ATOMIC_SET(flowmgr_cnt, 0); + return; +} + +/** \internal + * \brief check if a flow is timed out + * + * \param f flow + * \param ts timestamp + * + * \retval 0 not timed out + * \retval 1 timed out + */ +static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg) +{ + uint32_t flow_times_out_at = f->timeout_at; + if (emerg) { + extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; + flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); + } + if (*next_ts == 0 || flow_times_out_at < *next_ts) + *next_ts = flow_times_out_at; + + /* do the timeout check */ + if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) { + return 0; + } + + return 1; +} + +/** \internal + * \brief check timeout of captured bypassed flow by querying capture method + * + * \param f Flow + * \param ts timestamp + * \param counters Flow timeout counters + * + * \retval 0 not timeout + * \retval 1 timeout (or not capture bypassed) + */ +static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters) +{ +#ifdef CAPTURE_OFFLOAD + if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) { + return 1; + } + + FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID()); + if (fc && fc->BypassUpdate) { + /* flow will be possibly updated */ + uint64_t pkts_tosrc = fc->tosrcpktcnt; + uint64_t bytes_tosrc = fc->tosrcbytecnt; + uint64_t pkts_todst = fc->todstpktcnt; + uint64_t bytes_todst = fc->todstbytecnt; + bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts)); + if (update) { + SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f)); + pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc; + bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc; + pkts_todst = fc->todstpktcnt - pkts_todst; + bytes_todst = fc->todstbytecnt - bytes_todst; + if (f->livedev) { + SC_ATOMIC_ADD(f->livedev->bypassed, + pkts_tosrc + pkts_todst); + } + counters->bypassed_pkts += pkts_tosrc + pkts_todst; + counters->bypassed_bytes += bytes_tosrc + bytes_todst; + return 0; + } else { + SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f)); + if (f->livedev) { + if (FLOW_IS_IPV4(f)) { + LiveDevSubBypassStats(f->livedev, 1, AF_INET); + } else if (FLOW_IS_IPV6(f)) { + LiveDevSubBypassStats(f->livedev, 1, AF_INET6); + } + } + counters->bypassed_count++; + return 1; + } + } +#endif /* CAPTURE_OFFLOAD */ + return 1; +} + +typedef struct FlowManagerTimeoutThread { + /* used to temporarily store flows that have timed out and are + * removed from the hash */ + FlowQueuePrivate aside_queue; +} FlowManagerTimeoutThread; + +static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters) +{ + FlowQueuePrivate recycle = { NULL, NULL, 0 }; + counters->flows_aside += td->aside_queue.len; + + uint32_t cnt = 0; + Flow *f; + while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) { + /* flow is still locked */ + + if (f->proto == IPPROTO_TCP && + !(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) && + !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) { + /* Send the flow to its thread */ + FlowForceReassemblyForFlow(f); + FLOWLOCK_UNLOCK(f); + /* flow ownership is passed to the worker thread */ + + counters->flows_aside_needs_work++; + continue; + } + FLOWLOCK_UNLOCK(f); + + FlowQueuePrivateAppendFlow(&recycle, f); + if (recycle.len == 100) { + FlowQueueAppendPrivate(&flow_recycle_q, &recycle); + FlowWakeupFlowRecyclerThread(); + } + cnt++; + } + if (recycle.len) { + FlowQueueAppendPrivate(&flow_recycle_q, &recycle); + FlowWakeupFlowRecyclerThread(); + } + return cnt; +} + +/** + * \internal + * + * \brief check all flows in a hash row for timing out + * + * \param f last flow in the hash row + * \param ts timestamp + * \param emergency bool indicating emergency mode + * \param counters ptr to FlowTimeoutCounters structure + */ +static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, + int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts) +{ + uint32_t checked = 0; + Flow *prev_f = NULL; + + do { + checked++; + + /* check flow timeout based on lastts and state. Both can be + * accessed w/o Flow lock as we do have the hash row lock (so flow + * can't disappear) and flow_state is atomic. lastts can only + * be modified when we have both the flow and hash row lock */ + + /* timeout logic goes here */ + if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) { + + counters->flows_notimeout++; + + prev_f = f; + f = f->next; + continue; + } + + FLOWLOCK_WRLOCK(f); + + Flow *next_flow = f->next; + + /* never prune a flow that is used by a packet we + * are currently processing in one of the threads */ + if (!FlowBypassedTimeout(f, ts, counters)) { + FLOWLOCK_UNLOCK(f); + prev_f = f; + f = f->next; + continue; + } + + f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; + + counters->flows_timeout++; + + RemoveFromHash(f, prev_f); + + FlowQueuePrivateAppendFlow(&td->aside_queue, f); + /* flow is still locked in the queue */ + + f = next_flow; + } while (f != NULL); + + counters->flows_checked += checked; + if (checked > counters->rows_maxlen) + counters->rows_maxlen = checked; +} + +static void FlowManagerHashRowClearEvictedList( + FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters) +{ + do { + FLOWLOCK_WRLOCK(f); + Flow *next_flow = f->next; + f->next = NULL; + f->fb = NULL; + + FlowQueuePrivateAppendFlow(&td->aside_queue, f); + /* flow is still locked in the queue */ + + f = next_flow; + } while (f != NULL); +} + +/** + * \brief time out flows from the hash + * + * \param ts timestamp + * \param hash_min min hash index to consider + * \param hash_max max hash index to consider + * \param counters ptr to FlowTimeoutCounters structure + * + * \retval cnt number of timed out flow + */ +static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min, + const uint32_t hash_max, FlowTimeoutCounters *counters) +{ + uint32_t cnt = 0; + const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)); + const uint32_t rows_checked = hash_max - hash_min; + uint32_t rows_skipped = 0; + uint32_t rows_empty = 0; + +#if __WORDSIZE==64 +#define BITS 64 +#define TYPE uint64_t +#else +#define BITS 32 +#define TYPE uint32_t +#endif + + const uint32_t ts_secs = SCTIME_SECS(ts); + for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) { + TYPE check_bits = 0; + const uint32_t check = MIN(BITS, (hash_max - idx)); + for (uint32_t i = 0; i < check; i++) { + FlowBucket *fb = &flow_hash[idx+i]; + check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT( + fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs) + << (TYPE)i; + } + if (check_bits == 0) + continue; + + for (uint32_t i = 0; i < check; i++) { + FlowBucket *fb = &flow_hash[idx+i]; + if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) { + FBLOCK_LOCK(fb); + Flow *evicted = NULL; + if (fb->evicted != NULL || fb->head != NULL) { + if (fb->evicted != NULL) { + /* transfer out of bucket so we can do additional work outside + * of the bucket lock */ + evicted = fb->evicted; + fb->evicted = NULL; + } + if (fb->head != NULL) { + uint32_t next_ts = 0; + FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts); + + if (SC_ATOMIC_GET(fb->next_ts) != next_ts) + SC_ATOMIC_SET(fb->next_ts, next_ts); + } + if (fb->evicted == NULL && fb->head == NULL) { + SC_ATOMIC_SET(fb->next_ts, UINT_MAX); + } + } else { + SC_ATOMIC_SET(fb->next_ts, UINT_MAX); + rows_empty++; + } + FBLOCK_UNLOCK(fb); + /* processed evicted list */ + if (evicted) { + FlowManagerHashRowClearEvictedList(td, evicted, ts, counters); + } + } else { + rows_skipped++; + } + } + if (td->aside_queue.len) { + cnt += ProcessAsideQueue(td, counters); + } + } + + counters->rows_checked += rows_checked; + counters->rows_skipped += rows_skipped; + counters->rows_empty += rows_empty; + + if (td->aside_queue.len) { + cnt += ProcessAsideQueue(td, counters); + } + counters->flows_removed += cnt; + /* coverity[missing_unlock : FALSE] */ + return cnt; +} + +/** \internal + * \brief handle timeout for a slice of hash rows + * If we wrap around we call FlowTimeoutHash twice */ +static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts, + const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters, + const uint32_t rows, uint32_t *pos) +{ + uint32_t start = 0; + uint32_t end = 0; + uint32_t cnt = 0; + uint32_t rows_left = rows; + +again: + start = hash_min + (*pos); + if (start >= hash_max) { + start = hash_min; + } + end = start + rows_left; + if (end > hash_max) { + end = hash_max; + } + *pos = (end == hash_max) ? hash_min : end; + rows_left = rows_left - (end - start); + + cnt += FlowTimeoutHash(td, ts, start, end, counters); + if (rows_left) { + goto again; + } + return cnt; +} + +/** + * \internal + * + * \brief move all flows out of a hash row + * + * \param f last flow in the hash row + * + * \retval cnt removed out flows + */ +static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode) +{ + uint32_t cnt = 0; + + do { + FLOWLOCK_WRLOCK(f); + + Flow *next_flow = f->next; + + /* remove from the hash */ + if (mode == 0) { + RemoveFromHash(f, NULL); + } else { + FlowBucket *fb = f->fb; + fb->evicted = f->next; + f->next = NULL; + f->fb = NULL; + } + f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN; + + /* no one is referring to this flow, removed from hash + * so we can unlock it and move it to the recycle queue. */ + FLOWLOCK_UNLOCK(f); + FlowQueuePrivateAppendFlow(recycle_q, f); + + cnt++; + + f = next_flow; + } while (f != NULL); + + return cnt; +} + +/** + * \brief remove all flows from the hash + * + * \retval cnt number of removes out flows + */ +static uint32_t FlowCleanupHash(void) +{ + FlowQueuePrivate local_queue = { NULL, NULL, 0 }; + uint32_t cnt = 0; + + for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) { + FlowBucket *fb = &flow_hash[idx]; + + FBLOCK_LOCK(fb); + + if (fb->head != NULL) { + /* we have a flow, or more than one */ + cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0); + } + if (fb->evicted != NULL) { + /* we have a flow, or more than one */ + cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1); + } + + FBLOCK_UNLOCK(fb); + if (local_queue.len >= 25) { + FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); + FlowWakeupFlowRecyclerThread(); + } + } + FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); + FlowWakeupFlowRecyclerThread(); + + return cnt; +} + +typedef struct FlowQueueTimeoutCounters { + uint32_t flows_removed; + uint32_t flows_timeout; +} FlowQueueTimeoutCounters; + +typedef struct FlowCounters_ { + uint16_t flow_mgr_full_pass; + uint16_t flow_mgr_rows_sec; + + uint16_t flow_mgr_spare; + uint16_t flow_emerg_mode_enter; + uint16_t flow_emerg_mode_over; + + uint16_t flow_mgr_flows_checked; + uint16_t flow_mgr_flows_notimeout; + uint16_t flow_mgr_flows_timeout; + uint16_t flow_mgr_flows_aside; + uint16_t flow_mgr_flows_aside_needs_work; + + uint16_t flow_mgr_rows_maxlen; + + uint16_t flow_bypassed_cnt_clo; + uint16_t flow_bypassed_pkts; + uint16_t flow_bypassed_bytes; + + uint16_t memcap_pressure; + uint16_t memcap_pressure_max; +} FlowCounters; + +typedef struct FlowManagerThreadData_ { + uint32_t instance; + uint32_t min; + uint32_t max; + + FlowCounters cnt; + + FlowManagerTimeoutThread timeout; +} FlowManagerThreadData; + +static void FlowCountersInit(ThreadVars *t, FlowCounters *fc) +{ + fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t); + fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t); + + fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t); + fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t); + fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t); + + fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t); + fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t); + fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t); + fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t); + fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t); + fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t); + + fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t); + fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t); + fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t); + + fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t); + fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t); +} + +static void FlowCountersUpdate( + ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters) +{ + StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked); + StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout); + + StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout); + StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside); + StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work, + (uint64_t)counters->flows_aside_needs_work); + + StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count); + StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts); + StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes); + + StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen); +} + +static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data) +{ + FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1); + SCLogDebug("flow manager instance %u", ftd->instance); + + /* set the min and max value used for hash row walking + * each thread has it's own section of the flow hash */ + uint32_t range = flow_config.hash_size / flowmgr_number; + + ftd->min = ftd->instance * range; + ftd->max = (ftd->instance + 1) * range; + + /* last flow-manager takes on hash_size % flowmgr_number extra rows */ + if ((ftd->instance + 1) == flowmgr_number) { + ftd->max = flow_config.hash_size; + } + BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size); + + SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max); + + /* pass thread data back to caller */ + *data = ftd; + + FlowCountersInit(t, &ftd->cnt); + + PacketPoolInit(); + return TM_ECODE_OK; +} + +static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data) +{ + StreamTcpThreadCacheCleanup(); + PacketPoolDestroy(); + SCFree(data); + return TM_ECODE_OK; +} + +/** \internal + * \brief calculate number of rows to scan and how much time to sleep + * based on the busy score `mp` (0 idle, 100 max busy). + * + * We try to to make sure we scan the hash once a second. The number size + * of the slice of the hash scanned is determined by our busy score 'mp'. + * We sleep for the remainder of the second after processing the slice, + * or at least an approximation of it. + * A minimum busy score of 10 is assumed to avoid a longer than 10 second + * full hash pass. This is to avoid burstiness in scanning when there is + * a rapid increase of the busy score, which could lead to the flow manager + * suddenly scanning a much larger slice of the hash leading to a burst + * in scan/eviction work. + */ +static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency, + uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec) +{ + if (emergency) { + *wu_rows = rows; + *wu_sleep = 250; + return; + } + /* minimum busy score is 10 */ + const uint32_t emp = MAX(mp, 10); + const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100)); + /* calc how much time we estimate the work will take, in ms. We assume + * each row takes an average of 1usec. Maxing out at 1sec. */ + const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000); + /* calc how much time we need to sleep to get to the per second cadence + * but sleeping for at least 250ms. */ + const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit); + SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec, + sleep_per_unit); + + *wu_sleep = sleep_per_unit; + *wu_rows = rows_per_sec; + *rows_sec = rows_per_sec; +} + +/** \brief Thread that manages the flow table and times out flows. + * + * \param td ThreadVars cast to void ptr + * + * Keeps an eye on the spare list, alloc flows if needed... + */ +static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) +{ + FlowManagerThreadData *ftd = thread_data; + const uint32_t rows = ftd->max - ftd->min; + const bool time_is_live = TimeModeIsLive(); + + uint32_t emerg_over_cnt = 0; + uint64_t next_run_ms = 0; + uint32_t pos = 0; + uint32_t rows_sec = 0; + uint32_t rows_per_wu = 0; + uint64_t sleep_per_wu = 0; + bool prev_emerg = false; + uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */ + SCTime_t ts; + + /* don't start our activities until time is setup */ + while (!TimeModeIsReady()) { + if (suricata_ctl_flags != 0) + return TM_ECODE_OK; + usleep(10); + } + + uint32_t mp = MemcapsGetPressure() * 100; + if (ftd->instance == 0) { + StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); + StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); + } + GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec); + StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); + + TmThreadsSetFlag(th_v, THV_RUNNING); + + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + + bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0); + + /* Get the time */ + ts = TimeGet(); + SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts)); + uint64_t ts_ms = SCTIME_MSECS(ts); + const bool emerge_p = (emerg && !prev_emerg); + if (emerge_p) { + next_run_ms = 0; + prev_emerg = true; + SCLogNotice("Flow emergency mode entered..."); + StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter); + } + if (ts_ms >= next_run_ms) { + if (ftd->instance == 0) { + const uint32_t sq_len = FlowSpareGetPoolSize(); + const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1); + /* see if we still have enough spare flows */ + if (spare_perc < 90 || spare_perc > 110) { + FlowSparePoolUpdate(sq_len); + } + } + + /* try to time out flows */ + // clang-format off + FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; + // clang-format on + + if (emerg) { + /* in emergency mode, do a full pass of the hash table */ + FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters); + StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); + } else { + SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos, + rows_per_wu); + + const uint32_t ppos = pos; + FlowTimeoutHashInChunks( + &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos); + if (ppos > pos) { + StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); + } + } + + const uint32_t spare_pool_len = FlowSpareGetPoolSize(); + StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len); + + FlowCountersUpdate(th_v, ftd, &counters); + + if (emerg == true) { + SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32 + "flow_spare_q status: %" PRIu32 "%% flows at the queue", + spare_pool_len, flow_config.prealloc, + spare_pool_len * 100 / MAX(flow_config.prealloc, 1)); + + /* only if we have pruned this "emergency_recovery" percentage + * of flows, we will unset the emergency bit */ + if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) > + flow_config.emergency_recovery) { + emerg_over_cnt++; + } else { + emerg_over_cnt = 0; + } + + if (emerg_over_cnt >= 30) { + SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); + FlowTimeoutsReset(); + + emerg = false; + prev_emerg = false; + emerg_over_cnt = 0; + SCLogNotice("Flow emergency mode over, back to normal... unsetting" + " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", " + "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32 + "%% flows at the queue", + (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts), + spare_pool_len * 100 / MAX(flow_config.prealloc, 1)); + + StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over); + } + } + + /* update work units */ + const uint32_t pmp = mp; + mp = MemcapsGetPressure() * 100; + if (ftd->instance == 0) { + StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); + StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); + } + GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); + if (pmp != mp) { + StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); + } + + next_run_ms = ts_ms + sleep_per_wu; + } + if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) { + if (ftd->instance == 0) { + DefragTimeoutHash(ts); + HostTimeoutHash(ts); + IPPairTimeoutHash(ts); + HttpRangeContainersTimeoutHash(ts); + other_last_sec = (uint32_t)SCTIME_SECS(ts); + } + } + + if (TmThreadsCheckFlag(th_v, THV_KILL)) { + StatsSyncCounters(th_v); + break; + } + + if (emerg || !time_is_live) { + usleep(250); + } else { + struct timeval cond_tv; + gettimeofday(&cond_tv, NULL); + struct timeval add_tv; + add_tv.tv_sec = 0; + add_tv.tv_usec = (sleep_per_wu * 1000); + timeradd(&cond_tv, &add_tv, &cond_tv); + + struct timespec cond_time = FROM_TIMEVAL(cond_tv); + SCCtrlMutexLock(&flow_manager_ctrl_mutex); + while (1) { + int rc = SCCtrlCondTimedwait( + &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time); + if (rc == ETIMEDOUT || rc < 0) + break; + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { + break; + } + } + SCCtrlMutexUnlock(&flow_manager_ctrl_mutex); + } + + SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":""); + + StatsSyncCountersIfSignalled(th_v); + } + return TM_ECODE_OK; +} + +/** \brief spawn the flow manager thread */ +void FlowManagerThreadSpawn(void) +{ + intmax_t setting = 1; + (void)ConfGetInt("flow.managers", &setting); + + if (setting < 1 || setting > 1024) { + FatalError("invalid flow.managers setting %" PRIdMAX, setting); + } + flowmgr_number = (uint32_t)setting; + + SCCtrlCondInit(&flow_manager_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL); + + SCLogConfig("using %u flow manager threads", flowmgr_number); + StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse); + + for (uint32_t u = 0; u < flowmgr_number; u++) { + char name[TM_THREAD_NAME_MAX]; + snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1); + + ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name, + "FlowManager", 0); + BUG_ON(tv_flowmgr == NULL); + + if (tv_flowmgr == NULL) { + FatalError("flow manager thread creation failed"); + } + if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { + FatalError("flow manager thread spawn failed"); + } + } + return; +} + +typedef struct FlowRecyclerThreadData_ { + void *output_thread_data; + + uint16_t counter_flows; + uint16_t counter_queue_avg; + uint16_t counter_queue_max; + + uint16_t counter_flow_active; + uint16_t counter_tcp_active_sessions; + FlowEndCounters fec; +} FlowRecyclerThreadData; + +static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data) +{ + FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) { + SCLogError("initializing flow log API for thread failed"); + SCFree(ftd); + return TM_ECODE_FAILED; + } + SCLogDebug("output_thread_data %p", ftd->output_thread_data); + + ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t); + ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t); + ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t); + + ftd->counter_flow_active = StatsRegisterCounter("flow.active", t); + ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t); + + FlowEndCountersRegister(t, &ftd->fec); + + *data = ftd; + return TM_ECODE_OK; +} + +static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) +{ + StreamTcpThreadCacheCleanup(); + + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data; + if (ftd->output_thread_data != NULL) + OutputFlowLogThreadDeinit(t, ftd->output_thread_data); + + SCFree(data); + return TM_ECODE_OK; +} + +static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f) +{ + FLOWLOCK_WRLOCK(f); + + (void)OutputFlowLog(tv, ftd->output_thread_data, f); + + FlowEndCountersUpdate(tv, &ftd->fec, f); + if (f->proto == IPPROTO_TCP && f->protoctx != NULL) { + StatsDecr(tv, ftd->counter_tcp_active_sessions); + } + StatsDecr(tv, ftd->counter_flow_active); + + FlowClearMemory(f, f->protomap); + FLOWLOCK_UNLOCK(f); +} + +extern uint32_t flow_spare_pool_block_size; + +/** \brief Thread that manages timed out flows. + * + * \param td ThreadVars cast to void ptr + */ +static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) +{ + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; + BUG_ON(ftd == NULL); + const bool time_is_live = TimeModeIsLive(); + uint64_t recycled_cnt = 0; + FlowQueuePrivate ret_queue = { NULL, NULL, 0 }; + + TmThreadsSetFlag(th_v, THV_RUNNING); + + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + SC_ATOMIC_ADD(flowrec_busy,1); + FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q); + + StatsAddUI64(th_v, ftd->counter_queue_avg, list.len); + StatsSetUI64(th_v, ftd->counter_queue_max, list.len); + + const int bail = (TmThreadsCheckFlag(th_v, THV_KILL)); + + /* Get the time */ + SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet())); + + uint64_t cnt = 0; + Flow *f; + while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) { + Recycler(th_v, ftd, f); + cnt++; + + /* for every full sized block, add it to the spare pool */ + FlowQueuePrivateAppendFlow(&ret_queue, f); + if (ret_queue.len == flow_spare_pool_block_size) { + FlowSparePoolReturnFlows(&ret_queue); + } + } + if (ret_queue.len > 0) { + FlowSparePoolReturnFlows(&ret_queue); + } + if (cnt > 0) { + recycled_cnt += cnt; + StatsAddUI64(th_v, ftd->counter_flows, cnt); + } + SC_ATOMIC_SUB(flowrec_busy,1); + + if (bail) { + break; + } + + const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY); + if (emerg || !time_is_live) { + usleep(250); + } else { + struct timeval cond_tv; + gettimeofday(&cond_tv, NULL); + cond_tv.tv_sec += 1; + struct timespec cond_time = FROM_TIMEVAL(cond_tv); + SCCtrlMutexLock(&flow_recycler_ctrl_mutex); + while (1) { + int rc = SCCtrlCondTimedwait( + &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time); + if (rc == ETIMEDOUT || rc < 0) { + break; + } + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { + break; + } + if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) { + break; + } + } + SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex); + } + + SCLogDebug("woke up..."); + + StatsSyncCountersIfSignalled(th_v); + } + StatsSyncCounters(th_v); + SCLogPerf("%"PRIu64" flows processed", recycled_cnt); + return TM_ECODE_OK; +} + +static bool FlowRecyclerReadyToShutdown(void) +{ + if (SC_ATOMIC_GET(flowrec_busy) != 0) { + return false; + } + uint32_t len = 0; + FQLOCK_LOCK(&flow_recycle_q); + len = flow_recycle_q.qlen; + FQLOCK_UNLOCK(&flow_recycle_q); + + return ((len == 0)); +} + +/** \brief spawn the flow recycler thread */ +void FlowRecyclerThreadSpawn(void) +{ + intmax_t setting = 1; + (void)ConfGetInt("flow.recyclers", &setting); + + if (setting < 1 || setting > 1024) { + FatalError("invalid flow.recyclers setting %" PRIdMAX, setting); + } + flowrec_number = (uint32_t)setting; + + SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); + + SCLogConfig("using %u flow recycler threads", flowrec_number); + + for (uint32_t u = 0; u < flowrec_number; u++) { + char name[TM_THREAD_NAME_MAX]; + snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1); + + ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name, + "FlowRecycler", 0); + + if (tv_flowrec == NULL) { + FatalError("flow recycler thread creation failed"); + } + if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) { + FatalError("flow recycler thread spawn failed"); + } + } + return; +} + +/** + * \brief Used to disable flow recycler thread(s). + * + * \note this should only be called when the flow manager is already gone + * + * \todo Kinda hackish since it uses the tv name to identify flow recycler + * thread. We need an all weather identification scheme. + */ +void FlowDisableFlowRecyclerThread(void) +{ + /* move all flows still in the hash to the recycler queue */ +#ifndef DEBUG + (void)FlowCleanupHash(); +#else + uint32_t flows = FlowCleanupHash(); + SCLogDebug("flows to progress: %u", flows); +#endif + + /* make sure all flows are processed */ + do { + FlowWakeupFlowRecyclerThread(); + usleep(10); + } while (FlowRecyclerReadyToShutdown() == false); + + SCMutexLock(&tv_root_lock); + /* flow recycler thread(s) is/are a part of mgmt threads */ + for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { + if (strncasecmp(tv->name, thread_name_flow_rec, + strlen(thread_name_flow_rec)) == 0) + { + TmThreadsSetFlag(tv, THV_KILL); + } + } + SCMutexUnlock(&tv_root_lock); + + struct timeval start_ts; + struct timeval cur_ts; + gettimeofday(&start_ts, NULL); + +again: + gettimeofday(&cur_ts, NULL); + if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) { + FatalError("unable to get all flow recycler " + "threads to shutdown in time"); + } + + SCMutexLock(&tv_root_lock); + for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { + if (strncasecmp(tv->name, thread_name_flow_rec, + strlen(thread_name_flow_rec)) == 0) + { + if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + SCMutexUnlock(&tv_root_lock); + FlowWakeupFlowRecyclerThread(); + /* sleep outside lock */ + SleepMsec(1); + goto again; + } + } + } + SCMutexUnlock(&tv_root_lock); + + /* reset count, so we can kill and respawn (unix socket) */ + SC_ATOMIC_SET(flowrec_cnt, 0); + return; +} + +void TmModuleFlowManagerRegister (void) +{ + tmm_modules[TMM_FLOWMANAGER].name = "FlowManager"; + tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit; + tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit; + tmm_modules[TMM_FLOWMANAGER].Management = FlowManager; + tmm_modules[TMM_FLOWMANAGER].cap_flags = 0; + tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name); + + SC_ATOMIC_INIT(flowmgr_cnt); + SC_ATOMIC_INITPTR(flow_timeouts); +} + +void TmModuleFlowRecyclerRegister (void) +{ + tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler"; + tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit; + tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit; + tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler; + tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0; + tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name); + + SC_ATOMIC_INIT(flowrec_cnt); + SC_ATOMIC_INIT(flowrec_busy); +} |