Skip to content

Instantly share code, notes, and snippets.

@edgimar
Created November 24, 2020 01:57
Show Gist options
  • Save edgimar/a5b2b7efb7dcd176e77bfe5f8ac7f2c4 to your computer and use it in GitHub Desktop.
Save edgimar/a5b2b7efb7dcd176e77bfe5f8ac7f2c4 to your computer and use it in GitHub Desktop.
Sample python3 code for listening to SNMP trap events
#!/usr/bin/env python3
"""
Heavily borrowed from https://stackoverflow.com/q/33752602
dependencies:
sudo apt install python3-pysnmp4 python3-pyasn1
To run:
sudo ./snmptrap_listener.py
To test:
sudo snmptrap -v1 -c public 127.0.0.1 1.3.6.1.4.1.20408.4.1.1.2 127.0.0.1 1 1 123 1.3.6.1.2.1.1.1.0 s test
"""
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
from pysnmp.carrier.asynsock.dgram import udp, udp6
from pyasn1.codec.ber import decoder
from pysnmp.proto import api
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
print('')
msgVer = int(api.decodeMessageVersion(wholeMsg))
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
print('Unsupported SNMP version %s' % msgVer)
return
reqMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
print('Notification message from {}:{}: '.format(transportDomain, transportAddress))
reqPDU = pMod.apiMessage.getPDU(reqMsg)
if reqPDU.isSameTypeWith(pMod.TrapPDU()):
if msgVer == api.protoVersion1:
pdu = pMod.apiTrapPDU
print('Enterprise: {}'.format(pdu.getEnterprise(reqPDU).prettyPrint()))
print('Agent Address: {}'.format(pdu.getAgentAddr(reqPDU).prettyPrint()))
print('Generic Trap: {}'.format(pdu.getGenericTrap(reqPDU).prettyPrint()))
print('Specific Trap: {}'.format(pdu.getSpecificTrap(reqPDU).prettyPrint()))
print('Uptime: {}'.format(pdu.getTimeStamp(reqPDU).prettyPrint()))
varBinds = pdu.getVarBindList(reqPDU)
else:
varBinds = pMod.apiPDU.getVarBindList(reqPDU)
print('Var-binds:')
for varBind in varBinds: # SNMP response contents
for i in range(len(varBind)):
print(varBind[i].prettyPrint())
print('')
return wholeMsg
if __name__ == "__main__":
transportDispatcher = AsynsockDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)
# UDP/IPv4
transportDispatcher.registerTransport(
#udp.domainName, udp.UdpSocketTransport().openServerMode(('localhost', 162))
udp.domainName, udp.UdpSocketTransport().openServerMode(('', 162))
)
# UDP/IPv6
transportDispatcher.registerTransport(
udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162))
)
transportDispatcher.jobStarted(1)
try:
# Dispatcher will never finish as job#1 never reaches zero
print('Waiting for SNMP trap events...')
transportDispatcher.runDispatcher()
except KeyboardInterrupt:
transportDispatcher.closeDispatcher()
except:
transportDispatcher.closeDispatcher()
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment