Created
November 24, 2020 01:57
-
-
Save edgimar/a5b2b7efb7dcd176e77bfe5f8ac7f2c4 to your computer and use it in GitHub Desktop.
Sample python3 code for listening to SNMP trap events
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Heavily borrowed from https://stackoverflow.com/q/33752602 | |
dependencies: | |
sudo apt install python3-pysnmp4 python3-pyasn1 | |
To run: | |
sudo ./snmptrap_listener.py | |
To test: | |
sudo snmptrap -v1 -c public 127.0.0.1 1.3.6.1.4.1.20408.4.1.1.2 127.0.0.1 1 1 123 1.3.6.1.2.1.1.1.0 s test | |
""" | |
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher | |
from pysnmp.carrier.asynsock.dgram import udp, udp6 | |
from pyasn1.codec.ber import decoder | |
from pysnmp.proto import api | |
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg): | |
while wholeMsg: | |
print('') | |
msgVer = int(api.decodeMessageVersion(wholeMsg)) | |
if msgVer in api.protoModules: | |
pMod = api.protoModules[msgVer] | |
else: | |
print('Unsupported SNMP version %s' % msgVer) | |
return | |
reqMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message()) | |
print('Notification message from {}:{}: '.format(transportDomain, transportAddress)) | |
reqPDU = pMod.apiMessage.getPDU(reqMsg) | |
if reqPDU.isSameTypeWith(pMod.TrapPDU()): | |
if msgVer == api.protoVersion1: | |
pdu = pMod.apiTrapPDU | |
print('Enterprise: {}'.format(pdu.getEnterprise(reqPDU).prettyPrint())) | |
print('Agent Address: {}'.format(pdu.getAgentAddr(reqPDU).prettyPrint())) | |
print('Generic Trap: {}'.format(pdu.getGenericTrap(reqPDU).prettyPrint())) | |
print('Specific Trap: {}'.format(pdu.getSpecificTrap(reqPDU).prettyPrint())) | |
print('Uptime: {}'.format(pdu.getTimeStamp(reqPDU).prettyPrint())) | |
varBinds = pdu.getVarBindList(reqPDU) | |
else: | |
varBinds = pMod.apiPDU.getVarBindList(reqPDU) | |
print('Var-binds:') | |
for varBind in varBinds: # SNMP response contents | |
for i in range(len(varBind)): | |
print(varBind[i].prettyPrint()) | |
print('') | |
return wholeMsg | |
if __name__ == "__main__": | |
transportDispatcher = AsynsockDispatcher() | |
transportDispatcher.registerRecvCbFun(cbFun) | |
# UDP/IPv4 | |
transportDispatcher.registerTransport( | |
#udp.domainName, udp.UdpSocketTransport().openServerMode(('localhost', 162)) | |
udp.domainName, udp.UdpSocketTransport().openServerMode(('', 162)) | |
) | |
# UDP/IPv6 | |
transportDispatcher.registerTransport( | |
udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162)) | |
) | |
transportDispatcher.jobStarted(1) | |
try: | |
# Dispatcher will never finish as job#1 never reaches zero | |
print('Waiting for SNMP trap events...') | |
transportDispatcher.runDispatcher() | |
except KeyboardInterrupt: | |
transportDispatcher.closeDispatcher() | |
except: | |
transportDispatcher.closeDispatcher() | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment