diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
commit | dcc721a95bef6f0d8e6d8775b8efe33e5aecd562 (patch) | |
tree | 66a2774cd0ee294d019efd71d2544c70f42b2842 /tests/snmptrapreceiver.py | |
parent | Initial commit. (diff) | |
download | rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.tar.xz rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.zip |
Adding upstream version 8.2402.0.upstream/8.2402.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/snmptrapreceiver.py')
-rwxr-xr-x | tests/snmptrapreceiver.py | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/tests/snmptrapreceiver.py b/tests/snmptrapreceiver.py new file mode 100755 index 0000000..643a8c4 --- /dev/null +++ b/tests/snmptrapreceiver.py @@ -0,0 +1,104 @@ +# call this via "python[3] script name" +import sys +from pysnmp.entity import engine, config +from pysnmp.carrier.asyncore.dgram import udp +from pysnmp.entity.rfc3413 import ntfrcv +from pysnmp.smi import builder, view, compiler, rfc1902 +from pyasn1.type.univ import OctetString + +# Global variables +snmpport = 10162 +snmpip = "127.0.0.1" +szOutputfile = "snmp.out" +szSnmpLogfile = "snmp_server.log" + +# For vrebose output +bDebug = False + +# Read command line params +if len(sys.argv) > 1: + snmpport = int(sys.argv[1]) +if len(sys.argv) > 2: + snmpip = sys.argv[2] +if len(sys.argv) > 3: + szOutputfile = sys.argv[3] +if len(sys.argv) > 4: + szSnmpLogfile = sys.argv[4] + +# Create output files +outputFile = open(szOutputfile,"w+") +logFile = open(szSnmpLogfile,"a+") + +# Assemble MIB viewer +mibBuilder = builder.MibBuilder() +compiler.addMibCompiler(mibBuilder, sources=['file:///usr/share/snmp/mibs', 'file:///var/lib/snmp/mibs', '/usr/local/share/snmp/mibs/']) +mibViewController = view.MibViewController(mibBuilder) +# Pre-load MIB modules we expect to work with +try: + mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'SYSLOG-MSG-MIB') +except Exception: + print("Failed loading MIBs") + +# Create SNMP engine with autogenernated engineID and pre-bound to socket transport dispatcher +snmpEngine = engine.SnmpEngine() + +# Transport setup +# UDP over IPv4, add listening interface/port +config.addTransport( + snmpEngine, + udp.domainName + (1,), + udp.UdpTransport().openServerMode((snmpip, snmpport)) +) + +# SNMPv1/2c setup +# SecurityName <-> CommunityName mapping +config.addV1System(snmpEngine, 'my-area', 'public') + +print("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile)) +logFile.write("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile)) +logFile.flush() + +# Callback function for receiving notifications +# noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal +def cbReceiverSnmp(snmpEngine, stateReference, contextEngineId, contextName, varBinds, cbCtx): + transportDomain, transportAddress = snmpEngine.msgAndPduDsp.getTransportInfo(stateReference) + if (bDebug): + szDebug = str("Notification From: %s, Domain: %s, SNMP Engine: %s, Context: %s" % + (transportAddress, transportDomain, contextEngineId.prettyPrint(), contextName.prettyPrint())) + print(szDebug) + logFile.write(szDebug) + logFile.flush() + + # Create output String + szOut = "Trap Source{}, Trap OID {}".format(transportAddress, transportDomain) + + varBinds = [rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds] + + for name, val in varBinds: + # Append to output String + szOut = szOut + ", Oid: {}, Value: {}".format(name.prettyPrint(), val.prettyPrint()) + + if isinstance(val, OctetString): + if (name.prettyPrint() != "SNMP-COMMUNITY-MIB::snmpTrapAddress.0"): + szOctets = val.asOctets()#.rstrip('\r').rstrip('\n') + szOut = szOut + ", Octets: {}".format(szOctets) + if (bDebug): + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) + outputFile.write(szOut) + if "\n" not in szOut: + outputFile.write("\n") + outputFile.flush() + + +# Register SNMP Application at the SNMP engine +ntfrcv.NotificationReceiver(snmpEngine, cbReceiverSnmp) + +# this job would never finish +snmpEngine.transportDispatcher.jobStarted(1) + +# Run I/O dispatcher which would receive queries and send confirmations +try: + snmpEngine.transportDispatcher.runDispatcher() +except: + snmpEngine.transportDispatcher.closeDispatcher() + raise |