summaryrefslogtreecommitdiffstats
path: root/tests/snmptrapreceiver.py
blob: 643a8c43bf58f88935a4dc357c12ca3e94a2291d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# call this via "python[3] script name"
import sys
from pysnmp.entity import engine, config
from pysnmp.carrier.asyncore.dgram import udp
from pysnmp.entity.rfc3413 import ntfrcv
from pysnmp.smi import builder, view, compiler, rfc1902
from pyasn1.type.univ import OctetString

# Global variables
snmpport = 10162
snmpip = "127.0.0.1"
szOutputfile = "snmp.out"
szSnmpLogfile = "snmp_server.log"

# For vrebose output
bDebug = False

# Read command line params
if len(sys.argv) > 1:
	snmpport = int(sys.argv[1])
if len(sys.argv) > 2:
	snmpip = sys.argv[2]
if len(sys.argv) > 3:
	szOutputfile = sys.argv[3]
if len(sys.argv) > 4:
	szSnmpLogfile = sys.argv[4]

# Create output files
outputFile = open(szOutputfile,"w+")
logFile = open(szSnmpLogfile,"a+")

# Assemble MIB viewer
mibBuilder = builder.MibBuilder()
compiler.addMibCompiler(mibBuilder, sources=['file:///usr/share/snmp/mibs', 'file:///var/lib/snmp/mibs', '/usr/local/share/snmp/mibs/'])
mibViewController = view.MibViewController(mibBuilder)
# Pre-load MIB modules we expect to work with
try:
	mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'SYSLOG-MSG-MIB')
except Exception:
	print("Failed loading MIBs")

# Create SNMP engine with autogenernated engineID and pre-bound to socket transport dispatcher
snmpEngine = engine.SnmpEngine()

# Transport setup
# UDP over IPv4, add listening interface/port
config.addTransport(
	snmpEngine,
	udp.domainName + (1,),
	udp.UdpTransport().openServerMode((snmpip, snmpport))
)

# SNMPv1/2c setup
# SecurityName <-> CommunityName mapping
config.addV1System(snmpEngine, 'my-area', 'public')

print("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile))
logFile.write("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile))
logFile.flush()

# Callback function for receiving notifications
# noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal
def cbReceiverSnmp(snmpEngine, stateReference, contextEngineId, contextName, varBinds, cbCtx):
	transportDomain, transportAddress = snmpEngine.msgAndPduDsp.getTransportInfo(stateReference)
	if (bDebug):
		szDebug = str("Notification From: %s, Domain: %s, SNMP Engine: %s, Context: %s" %
			(transportAddress, transportDomain, contextEngineId.prettyPrint(), contextName.prettyPrint()))
		print(szDebug)
		logFile.write(szDebug)
		logFile.flush()

	# Create output String
	szOut = "Trap Source{}, Trap OID {}".format(transportAddress, transportDomain)

	varBinds = [rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds]

	for name, val in varBinds:
		# Append to output String
		szOut = szOut + ", Oid: {}, Value: {}".format(name.prettyPrint(), val.prettyPrint())

		if isinstance(val, OctetString):
			if (name.prettyPrint() != "SNMP-COMMUNITY-MIB::snmpTrapAddress.0"):
				szOctets = val.asOctets()#.rstrip('\r').rstrip('\n')
				szOut = szOut + ", Octets: {}".format(szOctets)
		if (bDebug):
			print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
	outputFile.write(szOut)
	if "\n" not in szOut:
		outputFile.write("\n")
	outputFile.flush()


# Register SNMP Application at the SNMP engine
ntfrcv.NotificationReceiver(snmpEngine, cbReceiverSnmp)

# this job would never finish
snmpEngine.transportDispatcher.jobStarted(1)

# Run I/O dispatcher which would receive queries and send confirmations
try:
	snmpEngine.transportDispatcher.runDispatcher()
except:
	snmpEngine.transportDispatcher.closeDispatcher()
	raise