Skip to content

Instantly share code, notes, and snippets.

@mochipon
Last active December 13, 2019 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mochipon/0fa4719f25ba5710bf6300cd72b57aff to your computer and use it in GitHub Desktop.
Save mochipon/0fa4719f25ba5710bf6300cd72b57aff to your computer and use it in GitHub Desktop.
Winter illumination meets NETCONF/YANG

How to use

$ git clone https://gist.github.com/0fa4719f25ba5710bf6300cd72b57aff.git illumination
$ cd illumination
$ virtualenv venv
$ . venv/bin/activate
$ pip install -r requirements.txt
$ python main.py -u username hostname.example.com GigabitEthernet1
#!/usr/bin/env python
from base64 import b64decode
import datetime
import getpass
import signal
import socket
import sys
import time
import argparse
import broadlink
from lxml import etree
from ncclient import manager
from ncclient.transport.session import SessionListener
import xmltodict
laststats = None
rmmini = None
IRDATA = {
"slow": b"JgBIAAABLZUVERQTExMTEhMTExMTERQTEzcTNxQ2FDcUNxM3FDcTNxQ2FBMTExM2FBMTExMRFBMTEhQ2FDcUEhM3FDYUNxQ2FAANBQ==",
"normal": b"JgBIAAABLZUUERURFBIUERQSFBIUERQSFDYUNxQ2FDcUNhQ3FDYUNhU2FDYVERQSFBEUEhQ2FREUEhQRFTYUNhU2FDYUEhQ2FQANBQ==",
"fast": b"JgBIAAABLpUUEhQSFBEUEhQSFBEVERQSFDYVNhQ2FTYUNhU2FDcUNhQSFBEVNhQ3FBEUEhQSFBEUNxQ2FREUEhQ2FDcUNhU2FAANBQ==",
}
def pacemaker(mbps):
if mbps > 100:
return "fast"
elif mbps > 10:
return "normal"
else:
return "slow"
def callback(notif):
global laststats, rmmini
response = xmltodict.parse(etree.tostring(notif.datastore_ele))[
"datastore-contents-xml"
]
stats = response["interfaces"]["interface"]["statistics"]
stats["event_time"] = notif.event_time
stats["pace"] = None
if laststats is not None:
delta_in_octets = int(stats["in-octets"]) - int(laststats["in-octets"])
delta_out_octets = int(stats["out-octets"]) - int(laststats["out-octets"])
delta_time = notif.event_time - laststats["event_time"]
in_mbps = delta_in_octets * 8 / 1000000 / delta_time.total_seconds()
out_mbps = delta_out_octets * 8 / 1000000 / delta_time.total_seconds()
print("in mbps: {}".format(in_mbps))
print("out mbps: {}".format(out_mbps))
stats["pace"] = pacemaker(max(in_mbps, out_mbps))
if stats["pace"] != laststats["pace"]:
rmmini.send_data(b64decode(IRDATA[stats["pace"]]))
laststats = dict(stats)
print("--")
def errback(notif):
pass
def sigint_handler(signal, frame):
global m
m.close_session()
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
def establish_subscription(host, username, password, port, xpath, period):
global m
m = manager.connect(
host=host,
port=port,
username=username,
password=password,
allow_agent=False,
look_for_keys=False,
hostkey_verify=False,
unknown_host_cb=lambda host, fingeprint: True,
)
s = m.establish_subscription(callback, errback, xpath=xpath, period=period)
print("Subscription Result : %s" % s.subscription_result)
if s.subscription_result.endswith("ok"):
print("Subscription Id : %d" % s.subscription_id)
def main(args):
global rmmini
local_ip_address = [
(s.connect(("1.1.1.1", 80)), s.getsockname()[0], s.close())
for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]
][0][1]
discovered = broadlink.discover(timeout=5, local_ip_address=local_ip_address)
if len(discovered) == 0:
print("Error receiving discovery responses from RM mini3")
exit(1)
rmmini = discovered[0]
if not rmmini.auth():
print("Error authenticating with RM mini3: {}".format(rmmini.host))
exit(1)
establish_subscription(
host=args.destination,
username=args.username,
password=str(args.password),
port=args.port,
xpath='/interfaces/interface[name="{}"]/statistics'.format(args.ifname),
period=args.period,
)
while True:
time.sleep(5)
# https://stackoverflow.com/a/44416389
class Password:
DEFAULT = "Prompt if not specified"
def __init__(self, value):
if value == self.DEFAULT:
value = getpass.getpass("Password: ")
self.value = value
def __str__(self):
return self.value
def __len__(self):
return len(self.value)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="パラメータを入力してください",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("-u", "--username", help="Username", type=str, required=True)
parser.add_argument(
"-p", "--password", help="Password", default=Password.DEFAULT, type=Password
)
parser.add_argument("--port", help="Port", default=830, type=int)
parser.add_argument(
"--period",
help="Period in centiseconds for periodic subscription",
default=500,
type=int,
)
parser.add_argument(
"destination", help="Connects and logs into the specified destination"
)
parser.add_argument(
"ifname", help="Interface name to check its bandwidth (i.e., GigabitEthernet1)"
)
args = parser.parse_args()
main(args)
bcrypt==3.1.7
broadlink==0.12.0
cffi==1.13.2
cryptography==2.8
lxml==4.4.2
-e git+git://github.com/CiscoDevNet/ncclient.git@7c0ab11b50463c3d4ae731221e6ba07dbadd715e#egg=ncclient
paramiko==2.4.1
pyasn1==0.4.8
pycparser==2.19
PyCRC==1.21
PyNaCl==1.3.0
python-dateutil==2.8.1
six==1.13.0
xmltodict==0.12.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment