summaryrefslogtreecommitdiffstats
path: root/tests/snmptrapreceiver.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 16:28:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 16:28:20 +0000
commitdcc721a95bef6f0d8e6d8775b8efe33e5aecd562 (patch)
tree66a2774cd0ee294d019efd71d2544c70f42b2842 /tests/snmptrapreceiver.py
parentInitial commit. (diff)
downloadrsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.tar.xz
rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.zip
Adding upstream version 8.2402.0.upstream/8.2402.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/snmptrapreceiver.py')
-rwxr-xr-xtests/snmptrapreceiver.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/tests/snmptrapreceiver.py b/tests/snmptrapreceiver.py
new file mode 100755
index 0000000..643a8c4
--- /dev/null
+++ b/tests/snmptrapreceiver.py
@@ -0,0 +1,104 @@
+# call this via "python[3] script name"
+import sys
+from pysnmp.entity import engine, config
+from pysnmp.carrier.asyncore.dgram import udp
+from pysnmp.entity.rfc3413 import ntfrcv
+from pysnmp.smi import builder, view, compiler, rfc1902
+from pyasn1.type.univ import OctetString
+
+# Global variables
+snmpport = 10162
+snmpip = "127.0.0.1"
+szOutputfile = "snmp.out"
+szSnmpLogfile = "snmp_server.log"
+
+# For vrebose output
+bDebug = False
+
+# Read command line params
+if len(sys.argv) > 1:
+ snmpport = int(sys.argv[1])
+if len(sys.argv) > 2:
+ snmpip = sys.argv[2]
+if len(sys.argv) > 3:
+ szOutputfile = sys.argv[3]
+if len(sys.argv) > 4:
+ szSnmpLogfile = sys.argv[4]
+
+# Create output files
+outputFile = open(szOutputfile,"w+")
+logFile = open(szSnmpLogfile,"a+")
+
+# Assemble MIB viewer
+mibBuilder = builder.MibBuilder()
+compiler.addMibCompiler(mibBuilder, sources=['file:///usr/share/snmp/mibs', 'file:///var/lib/snmp/mibs', '/usr/local/share/snmp/mibs/'])
+mibViewController = view.MibViewController(mibBuilder)
+# Pre-load MIB modules we expect to work with
+try:
+ mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'SYSLOG-MSG-MIB')
+except Exception:
+ print("Failed loading MIBs")
+
+# Create SNMP engine with autogenernated engineID and pre-bound to socket transport dispatcher
+snmpEngine = engine.SnmpEngine()
+
+# Transport setup
+# UDP over IPv4, add listening interface/port
+config.addTransport(
+ snmpEngine,
+ udp.domainName + (1,),
+ udp.UdpTransport().openServerMode((snmpip, snmpport))
+)
+
+# SNMPv1/2c setup
+# SecurityName <-> CommunityName mapping
+config.addV1System(snmpEngine, 'my-area', 'public')
+
+print("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile))
+logFile.write("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile))
+logFile.flush()
+
+# Callback function for receiving notifications
+# noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal
+def cbReceiverSnmp(snmpEngine, stateReference, contextEngineId, contextName, varBinds, cbCtx):
+ transportDomain, transportAddress = snmpEngine.msgAndPduDsp.getTransportInfo(stateReference)
+ if (bDebug):
+ szDebug = str("Notification From: %s, Domain: %s, SNMP Engine: %s, Context: %s" %
+ (transportAddress, transportDomain, contextEngineId.prettyPrint(), contextName.prettyPrint()))
+ print(szDebug)
+ logFile.write(szDebug)
+ logFile.flush()
+
+ # Create output String
+ szOut = "Trap Source{}, Trap OID {}".format(transportAddress, transportDomain)
+
+ varBinds = [rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds]
+
+ for name, val in varBinds:
+ # Append to output String
+ szOut = szOut + ", Oid: {}, Value: {}".format(name.prettyPrint(), val.prettyPrint())
+
+ if isinstance(val, OctetString):
+ if (name.prettyPrint() != "SNMP-COMMUNITY-MIB::snmpTrapAddress.0"):
+ szOctets = val.asOctets()#.rstrip('\r').rstrip('\n')
+ szOut = szOut + ", Octets: {}".format(szOctets)
+ if (bDebug):
+ print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
+ outputFile.write(szOut)
+ if "\n" not in szOut:
+ outputFile.write("\n")
+ outputFile.flush()
+
+
+# Register SNMP Application at the SNMP engine
+ntfrcv.NotificationReceiver(snmpEngine, cbReceiverSnmp)
+
+# this job would never finish
+snmpEngine.transportDispatcher.jobStarted(1)
+
+# Run I/O dispatcher which would receive queries and send confirmations
+try:
+ snmpEngine.transportDispatcher.runDispatcher()
+except:
+ snmpEngine.transportDispatcher.closeDispatcher()
+ raise