Last active
October 9, 2019 14:33
-
-
Save depfryer/834102e0d1ab8b30b28e73edd5741984 to your computer and use it in GitHub Desktop.
simple class addon for pysnmp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pysnmp.hlapi import * | |
# credential is like CommunityData('public') (for v2) | |
# target is a IP | |
# then simply do : | |
# s = snmpy('demo.snmplabs.com', CommunityData('public')) | |
# o = s.get_Tree(['1.3.6.1.2.1.2.2.1.2']) | |
# v = s.get(['1.3.6.1.2.1.2.2.1.2.2']) | |
# print(v) | |
# for a in o: | |
# print(a) | |
class Snmpy: | |
def __init__(self, target, credentials, port=161, engine=SnmpEngine(), context=ContextData()): | |
self.target = target | |
self.port = port | |
self.credentials = credentials | |
self.context = context | |
self.engine = engine | |
def change_target(self, newTarget): | |
self.target = newTarget | |
def construct_object_types(self, list_of_oids): | |
object_types = [] | |
for oid in list_of_oids: | |
object_types.append(ObjectType(ObjectIdentity(oid))) | |
return object_types | |
def fetch(self, handler, count): | |
result = [] | |
for i in range(count): | |
try: | |
error_indication, error_status, error_index, var_binds = next(handler) | |
if not error_indication and not error_status: | |
items = {} | |
for var_bind in var_binds: | |
items[str(var_bind[0])] = self.cast(var_bind[1]) | |
result.append(items) | |
else: | |
raise RuntimeError('Got SNMP error: {0}'.format(error_indication)) | |
except StopIteration: | |
break | |
except RuntimeError as e: | |
print(e) | |
return result | |
def cast(self, value): | |
try: | |
return int(value) | |
except (ValueError, TypeError): | |
try: | |
return float(value) | |
except (ValueError, TypeError): | |
try: | |
return str(value) | |
except (ValueError, TypeError): | |
pass | |
return value | |
def get(self, oids): | |
handler = getCmd(self.engine, self.credentials, UdpTransportTarget( | |
(self.target, self.port)), self.context, *self.construct_object_types(oids)) | |
try: | |
result = self.fetch(handler, 1)[0] | |
except IndexError as e: | |
print(e) | |
result = {oids[0]: ""} | |
resu = [] | |
for r in result: | |
resu.append(result[r]) | |
return resu | |
def get_bulk(self, oids, count, start_from=0): | |
handler = bulkCmd(self.engine, self.credentials, | |
UdpTransportTarget((self.target, self.port)), self.context, start_from, | |
count, *self.construct_object_types(oids)) | |
result = self.fetch(handler, count) | |
return result | |
def get_bulk_auto(self, oids, count_oid, start_from=0): | |
count = self.get([count_oid], self.engine, context)[count_oid] | |
return self.get_bulk(oids, count, start_from) | |
def get_Tree(self, oids): | |
handler = nextCmd(self.engine, | |
self.credentials, | |
UdpTransportTarget((self.target, self.port)), | |
self.context, | |
*self.construct_object_types(oids), | |
lookupMib=False, | |
lexicographicMode=False) | |
result = self.fetchTree(handler, oids) | |
return result | |
def fetchTree(self, handler, oids): | |
result = [] | |
while True: | |
try: | |
error_indication, error_status, error_index, var_binds = next(handler) | |
if not error_indication and not error_status: | |
items = [] | |
for var_bind in var_binds: | |
t = '' | |
for oid in oids: | |
t = str(var_bind[0]).replace(oid, '') | |
if t != str(var_bind[0]) : | |
break | |
items.append((t,self.cast(var_bind[1]))) | |
result.append(items) | |
else: | |
raise RuntimeError('Got SNMP error: {0}'.format(error_indication)) | |
except StopIteration: | |
break | |
except RuntimeError as e: | |
print(e) | |
return result | |
def set(self, oid, content, typeOfContent = 'str'): | |
if typeOfContent == 'str': | |
answer = OctetString(content) | |
else: | |
raise SyntaxError | |
try: | |
errorIndication, errorStatus, errorIndex, varBinds = next( | |
setCmd(self.engine, self.credentials, UdpTransportTarget( | |
(self.target, self.port)),self.context, | |
ObjectType(ObjectIdentity(oid),answer)) | |
) | |
except Exception as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment