Skip to content

Instantly share code, notes, and snippets.

@depfryer
Last active October 9, 2019 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save depfryer/834102e0d1ab8b30b28e73edd5741984 to your computer and use it in GitHub Desktop.
Save depfryer/834102e0d1ab8b30b28e73edd5741984 to your computer and use it in GitHub Desktop.
simple class addon for pysnmp
from pysnmp.hlapi import *
# credential is like CommunityData('public') (for v2)
# target is a IP
# then simply do :
# s = snmpy('demo.snmplabs.com', CommunityData('public'))
# o = s.get_Tree(['1.3.6.1.2.1.2.2.1.2'])
# v = s.get(['1.3.6.1.2.1.2.2.1.2.2'])
# print(v)
# for a in o:
# print(a)
class Snmpy:
def __init__(self, target, credentials, port=161, engine=SnmpEngine(), context=ContextData()):
self.target = target
self.port = port
self.credentials = credentials
self.context = context
self.engine = engine
def change_target(self, newTarget):
self.target = newTarget
def construct_object_types(self, list_of_oids):
object_types = []
for oid in list_of_oids:
object_types.append(ObjectType(ObjectIdentity(oid)))
return object_types
def fetch(self, handler, count):
result = []
for i in range(count):
try:
error_indication, error_status, error_index, var_binds = next(handler)
if not error_indication and not error_status:
items = {}
for var_bind in var_binds:
items[str(var_bind[0])] = self.cast(var_bind[1])
result.append(items)
else:
raise RuntimeError('Got SNMP error: {0}'.format(error_indication))
except StopIteration:
break
except RuntimeError as e:
print(e)
return result
def cast(self, value):
try:
return int(value)
except (ValueError, TypeError):
try:
return float(value)
except (ValueError, TypeError):
try:
return str(value)
except (ValueError, TypeError):
pass
return value
def get(self, oids):
handler = getCmd(self.engine, self.credentials, UdpTransportTarget(
(self.target, self.port)), self.context, *self.construct_object_types(oids))
try:
result = self.fetch(handler, 1)[0]
except IndexError as e:
print(e)
result = {oids[0]: ""}
resu = []
for r in result:
resu.append(result[r])
return resu
def get_bulk(self, oids, count, start_from=0):
handler = bulkCmd(self.engine, self.credentials,
UdpTransportTarget((self.target, self.port)), self.context, start_from,
count, *self.construct_object_types(oids))
result = self.fetch(handler, count)
return result
def get_bulk_auto(self, oids, count_oid, start_from=0):
count = self.get([count_oid], self.engine, context)[count_oid]
return self.get_bulk(oids, count, start_from)
def get_Tree(self, oids):
handler = nextCmd(self.engine,
self.credentials,
UdpTransportTarget((self.target, self.port)),
self.context,
*self.construct_object_types(oids),
lookupMib=False,
lexicographicMode=False)
result = self.fetchTree(handler, oids)
return result
def fetchTree(self, handler, oids):
result = []
while True:
try:
error_indication, error_status, error_index, var_binds = next(handler)
if not error_indication and not error_status:
items = []
for var_bind in var_binds:
t = ''
for oid in oids:
t = str(var_bind[0]).replace(oid, '')
if t != str(var_bind[0]) :
break
items.append((t,self.cast(var_bind[1])))
result.append(items)
else:
raise RuntimeError('Got SNMP error: {0}'.format(error_indication))
except StopIteration:
break
except RuntimeError as e:
print(e)
return result
def set(self, oid, content, typeOfContent = 'str'):
if typeOfContent == 'str':
answer = OctetString(content)
else:
raise SyntaxError
try:
errorIndication, errorStatus, errorIndex, varBinds = next(
setCmd(self.engine, self.credentials, UdpTransportTarget(
(self.target, self.port)),self.context,
ObjectType(ObjectIdentity(oid),answer))
)
except Exception as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment