summaryrefslogtreecommitdiffstats
path: root/src/tm-threads.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/tm-threads.h')
-rw-r--r--src/tm-threads.h291
1 files changed, 291 insertions, 0 deletions
diff --git a/src/tm-threads.h b/src/tm-threads.h
new file mode 100644
index 0000000..4ca55f9
--- /dev/null
+++ b/src/tm-threads.h
@@ -0,0 +1,291 @@
+/* Copyright (C) 2007-2011 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ * \author Anoop Saldanha <anoopsaldanha@gmail.com>
+ */
+
+#ifndef __TM_THREADS_H__
+#define __TM_THREADS_H__
+
+#include "tmqh-packetpool.h"
+#include "tm-threads-common.h"
+#include "tm-modules.h"
+#include "flow.h" // for the FlowQueue
+
+#ifdef OS_WIN32
+static inline void SleepUsec(uint64_t usec)
+{
+ uint64_t msec = 1;
+ if (usec > 1000) {
+ msec = usec / 1000;
+ }
+ Sleep(msec);
+}
+#define SleepMsec(msec) Sleep((msec))
+#else
+#define SleepUsec(usec) usleep((usec))
+#define SleepMsec(msec) usleep((msec) * 1000)
+#endif
+
+#define TM_QUEUE_NAME_MAX 16
+#define TM_THREAD_NAME_MAX 16
+
+typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
+
+typedef struct TmSlot_ {
+ /* function pointers */
+ union {
+ TmSlotFunc SlotFunc;
+ TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
+ TmEcode (*Management)(ThreadVars *, void *);
+ };
+ /** linked list of slots, used when a pipeline has multiple slots
+ * in a single thread. */
+ struct TmSlot_ *slot_next;
+
+ SC_ATOMIC_DECLARE(void *, slot_data);
+
+ /** copy of the TmModule::flags */
+ uint8_t tm_flags;
+
+ /* store the thread module id */
+ int tm_id;
+
+ TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
+ void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
+ TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
+
+ /* data storage */
+ const void *slot_initdata;
+
+} TmSlot;
+
+extern ThreadVars *tv_root[TVT_MAX];
+
+extern SCMutex tv_root_lock;
+
+void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
+
+ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
+ void *(fn_p)(void *), int);
+ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
+ const char *);
+ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
+ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
+ int mucond);
+ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
+ int mucond);
+TmEcode TmThreadSpawn(ThreadVars *);
+void TmThreadKillThreadsFamily(int family);
+void TmThreadKillThreads(void);
+void TmThreadClearThreadsFamily(int family);
+void TmThreadAppend(ThreadVars *, int);
+void TmThreadSetGroupName(ThreadVars *tv, const char *name);
+
+TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
+TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
+TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);
+TmEcode TmThreadSetupOptions(ThreadVars *);
+void TmThreadSetPrio(ThreadVars *);
+int TmThreadGetNbThreads(uint8_t type);
+
+void TmThreadInitMC(ThreadVars *);
+void TmThreadTestThreadUnPaused(ThreadVars *);
+void TmThreadContinue(ThreadVars *);
+void TmThreadContinueThreads(void);
+void TmThreadCheckThreadState(void);
+TmEcode TmThreadWaitOnThreadInit(void);
+
+int TmThreadsCheckFlag(ThreadVars *, uint32_t);
+void TmThreadsSetFlag(ThreadVars *, uint32_t);
+void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
+void TmThreadWaitForFlag(ThreadVars *, uint32_t);
+
+TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
+
+void TmThreadDisablePacketThreads(void);
+void TmThreadDisableReceiveThreads(void);
+
+uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
+
+TmEcode TmThreadWaitOnThreadRunning(void);
+
+TmEcode TmThreadsProcessDecodePseudoPackets(
+ ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot);
+
+static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
+{
+ while (1) {
+ Packet *p = PacketDequeueNoLock(pq);
+ if (unlikely(p == NULL))
+ break;
+ TmqhOutputPacketpool(NULL, p);
+ }
+}
+
+static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
+{
+ if (p != NULL) {
+ TmqhOutputPacketpool(tv, p);
+ }
+ TmThreadsCleanDecodePQ(&tv->decode_pq);
+ if (tv->stream_pq_local) {
+ SCMutexLock(&tv->stream_pq_local->mutex_q);
+ TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
+ SCMutexUnlock(&tv->stream_pq_local->mutex_q);
+ }
+ TmThreadsSetFlag(tv, THV_FAILED);
+}
+
+/**
+ * \brief Handle timeout from the capture layer. Checks
+ * stream_pq which may have been filled by the flow
+ * manager.
+ * \param s pipeline to run on these packets.
+ */
+static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
+{
+ PacketQueue *pq = tv->stream_pq_local;
+ if (pq && pq->len > 0) {
+ while (1) {
+ SCMutexLock(&pq->mutex_q);
+ Packet *extra_p = PacketDequeue(pq);
+ SCMutexUnlock(&pq->mutex_q);
+ if (extra_p == NULL)
+ break;
+#ifdef DEBUG_VALIDATION
+ BUG_ON(extra_p->flow != NULL);
+#endif
+ TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker);
+ if (r == TM_ECODE_FAILED) {
+ TmThreadsSlotProcessPktFail(tv, tv->tm_flowworker, extra_p);
+ break;
+ }
+ tv->tmqh_out(tv, extra_p);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+/**
+ * \brief Process the rest of the functions (if any) and queue.
+ */
+static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
+{
+ if (s == NULL) {
+ tv->tmqh_out(tv, p);
+ return TM_ECODE_OK;
+ }
+
+ TmEcode r = TmThreadsSlotVarRun(tv, p, s);
+ if (unlikely(r == TM_ECODE_FAILED)) {
+ TmThreadsSlotProcessPktFail(tv, s, p);
+ return TM_ECODE_FAILED;
+ }
+
+ tv->tmqh_out(tv, p);
+
+ TmThreadsHandleInjectedPackets(tv);
+
+ return TM_ECODE_OK;
+}
+
+/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
+ * Allow caller to supply their own packet
+ *
+ * Meant for detect reload process that interrupts an sleeping capture thread
+ * to force a packet through the engine to complete a reload */
+static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
+{
+ TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
+ if (p == NULL)
+ p = PacketGetFromQueueOrAlloc();
+ if (p != NULL) {
+ p->flags |= PKT_PSEUDO_STREAM_END;
+ PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
+ if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
+ TmqhOutputPacketpool(tv, p);
+ }
+ }
+}
+
+/** \brief handle capture timeout
+ * When a capture method times out we check for house keeping
+ * tasks in the capture thread.
+ *
+ * \param p packet. Capture method may have taken a packet from
+ * the pool prior to the timing out call. We will then
+ * use that packet. Otherwise we can get our own.
+ */
+static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
+{
+ if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
+ TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+ return;
+
+ } else {
+ if (TmThreadsHandleInjectedPackets(tv) == false) {
+ /* see if we have to do some house keeping */
+ if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+ TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+ return;
+ }
+ }
+ }
+
+ /* packet could have been passed to us that we won't use
+ * return it to the pool. */
+ if (p != NULL)
+ tv->tmqh_out(tv, p);
+}
+
+static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
+{
+ if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
+ return;
+ }
+ /* find the correct slot */
+ TmSlot *s = tv->tm_slots;
+ TmModule *tm = TmModuleGetById(s->tm_id);
+ if (tm->flags & TM_FLAG_RECEIVE_TM) {
+ /* if the method supports it, BreakLoop. Otherwise we rely on
+ * the capture method's recv timeout */
+ if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
+ tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
+ }
+ TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
+ }
+}
+
+void TmThreadsListThreads(void);
+int TmThreadsRegisterThread(ThreadVars *tv, const int type);
+void TmThreadsUnregisterThread(const int id);
+void TmThreadsInjectFlowById(Flow *f, const int id);
+
+void TmThreadsInitThreadsTimestamp(const SCTime_t ts);
+void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts);
+void TmThreadsGetMinimalTimestamp(struct timeval *ts);
+uint16_t TmThreadsGetWorkerThreadMax(void);
+bool TmThreadsTimeSubsysIsReady(void);
+
+#endif /* __TM_THREADS_H__ */