Skip to content

Instantly share code, notes, and snippets.

@candlerb
Last active December 5, 2023 22:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save candlerb/30ba057b9d91a738a893dafc0c282723 to your computer and use it in GitHub Desktop.
Save candlerb/30ba057b9d91a738a893dafc0c282723 to your computer and use it in GitHub Desktop.
CGI script to allow manipulation of IOSv/IOSvL2 configurations in GNS3 (reset, export/edit/import, restore from snapshot)
#!/usr/bin/python3
"""
CGI tool to manage configurations of IOSv and CSR1000v devices in GNS3
* Restore configurations from snapshots
* Read and edit the config in NVRAM
"""
GNS3_URL = "http://127.0.0.1:3080"
EXCLUDE_PROJECTS = {
"NOC",
}
DEBUG = True
import base64
import cgi
import cgitb
import contextlib
import datetime
import glob
import html
import http.cookies
import json
import os
import subprocess
import sys
import urllib.request as request
from urllib.error import HTTPError
CI = http.cookies.SimpleCookie(os.environ.get("HTTP_COOKIE", ""))
CO = http.cookies.SimpleCookie() # Cookies to output
form = cgi.FieldStorage()
flash_message = None
if "flash" in CI:
flash_message = CI["flash"].value
CO["flash"] = ""
CO["flash"]["expires"] = "Thu, 01 Jan 1970 00:00:00 GMT"
def flash(msg):
if msg:
CO["flash"] = msg
SENT_PREAMBLE = False
def preamble(content_type="text/html"):
global SENT_PREAMBLE
if not SENT_PREAMBLE:
print("Content-Type: " + content_type)
print(CO)
print()
SENT_PREAMBLE = True
if DEBUG:
cgitb.enable()
def redirect(url, status="302 Found"):
print("Status: %s" % status)
print("Location: %s" % url)
print("Content-Type: text/html")
print(CO)
print("")
print('<a href="%s">Click to continue</a>' % url)
sys.exit(0)
@contextlib.contextmanager
def layout(title="GNS3 IOSv Configuration"):
preamble()
print("""
<html>
<head>
<title>%s</title>
<style>
body { font-family: sans-serif; }
form.login { display: table; }
form.login p {display: table-row; }
form.login label { display: table-cell; padding: 0.3em 1em 0.3em 0em; }
form.login input { display: table-cell; }
table.nodes th { background-color: #ff8080; }
table.nodes td { padding: 0em 0.5em 0em 0.5em; }
textarea.config { white-space: pre; width: 100%%; height: 80%%; font-family: monospace; font-size: 110%%; }
.flash { background-color: #ffaaaa; border: 2px solid red; padding: 10px; margin-bottom: 10px; }
</style>
</head>
<body>
""" % html.escape(title))
if flash_message:
print("<div class='flash'>%s</div>" % html.escape(flash_message))
yield
print("""
</body>
</html>
""")
def login(username="",*a,**k):
with layout(*a,**k):
print("""
<form class="login" method="post">
<fieldset>
<legend>GNS3 login required</legend>
<p>
<label for="username">Username</label>
<input type="text" name="username" value="%s" autofocus>
</p>
<p>
<label for="password">Password</label>
<input type="password" name="password">
</p>
<p>
<input type="submit">
</p>
</fieldset>
</form>
""" % html.escape(username))
sys.exit(0)
def logout():
CO["gns3_auth"] = ""
CO["gns3_auth"]["expires"] = "Thu, 01 Jan 1970 00:00:00 GMT"
redirect(os.environ["SCRIPT_NAME"])
def tag(name, value, **attrs):
print("<%s>%s</%s>" % (name, html.escape(value), name))
auth = None
def api_get(path):
global auth
req = request.Request(GNS3_URL+path)
if auth:
req.add_header("Authorization", auth)
try:
with request.urlopen(req) as r:
data = json.load(r)
return data
except HTTPError as e:
if e.code in [401, 403]:
logout()
else:
preamble()
print("Internal error: %s (%d)" % (html.escape(e.reason), e.code))
sys.exit(0)
def api_post(path, data={}):
global auth
req = request.Request(GNS3_URL+path, method="POST",
data=json.dumps(data).encode("UTF-8"),
headers={"Content-Type":"application/json"})
if auth:
req.add_header("Authorization", auth)
try:
with request.urlopen(req) as r:
data = json.load(r)
return data
except HTTPError as e:
if e.code in [401, 403]:
logout()
else:
preamble()
print("Internal error: %s (%d)" % (html.escape(e.reason), e.code))
sys.exit(0)
def start_stop(project_id, node_id, action):
return api_post("/v2/projects/%s/nodes/%s/%s" % (project_id, node_id, action))
def restore(project_id, node_ids, snapshot_id):
if not project_id:
return "No project selected"
if not node_ids:
return "No nodes selected"
if not snapshot_id:
return "No snapshot selected"
# Get the project path. Also checks authentication
project = api_get("/v2/projects/%s" % project_id)
ss_filename = None
snapshot_ids = {}
if snapshot_id != "None":
snapshots = api_get("/v2/projects/%s/snapshots" % project_id)
snapshots = [s for s in snapshots if s["snapshot_id"] == snapshot_id]
snapshot = snapshots[0]
ss_filename = "%s/snapshots/%s_%s.gns3project" % (project["path"], snapshot["name"],
datetime.datetime.utcfromtimestamp(snapshot["created_at"]).strftime("%d%m%y_%H%M%S"))
if not os.path.exists(ss_filename):
return "Unable to find %s" % ss_filename
# Annoyingly, the node_ids stored in the snapshot are different from the running
# node_ids, so we have to unpack the project.gns3 file and map by node name.
res = subprocess.run(["unzip", "-p", ss_filename, "project.gns3"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if res.returncode != 0:
return "unzip project.gns3 failed (%d): %s" % (res.returncode, res.stderr.decode("UTF-8", errors="replace"))
data = json.loads(res.stdout)
snapshot_ids = {n["name"]:n["node_id"] for n in data["topology"]["nodes"]}
# Iterate over all the nodes, restore them (stop if required)
nodes = api_get("/v2/projects/%s/nodes" % project_id)
restart = []
for node in nodes:
if node["node_id"] not in node_ids:
continue
if node["status"] != "stopped":
start_stop(project_id, node["node_id"], "stop")
restart.append(node)
if node["name"] in snapshot_ids:
cmd = ["unzip", "-jo", ss_filename,
"-d", node["node_directory"],
"project-files/*/%s/*" % snapshot_ids[node["name"]],
]
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if res.returncode != 0:
return "unzip failed (%d): %s" % (res.returncode, res.stdout.decode("UTF-8", errors="replace"))
else:
for file in glob.glob(node["node_directory"]+"/*.qcow2"):
os.unlink(file)
# Restart if required
for node in restart:
start_stop(project_id, node["node_id"], "start")
return "Restore complete"
def export_import_config(project_id, node_id, format, config=None):
"""
If config is None: reads the config
if config is bytes: replaces the config
Returns: error, name, current_or_previous_config
"""
sys.path.extend(glob.glob("/usr/share/gns3/gns3-server/lib/*/site-packages"))
from gns3server.compute.iou.utils.iou_export import nvram_export
from gns3server.compute.iou.utils.iou_import import nvram_import
import guestfs
if not project_id:
return ("No project selected", None, None)
if not node_id:
return ("No node selected", None, None)
node = api_get("/v2/projects/%s/nodes/%s" % (project_id, node_id))
name = node["name"]
if node["status"] == "started":
start_stop(project_id, node_id, "stop")
# virt-cat -a hda_disk.qcow2 -m /dev/sda1:/ /nvram >/tmp/nvram
#os.environ["LIBGUESTFS_DEBUG"]="1"
#os.environ["LIBGUESTFS_TRACE"]="1"
g = guestfs.GuestFS(python_return_dict=True)
g.add_drive_opts(node["node_directory"]+"/hda_disk.qcow2", format="qcow2", readonly=int(not bool(config)))
g.launch()
g.mount("/dev/sda1", "/")
try:
if format == "nvram":
nvram = g.read_file("/nvram")
try:
startup, private = nvram_export(nvram)
except ValueError as e:
return ("nvram_export: %s" % e, name, None)
if config is None:
return (None, name, startup)
try:
nvram = nvram_import(nvram, config, private, 512)
except ValueError as e:
return ("nvram_import: %s" % e, name, config)
try:
g.write("/nvram", bytes(nvram))
except RuntimeError as e:
return ("guestfs_write: %s" % e, name, config)
return (None, name, config)
elif format == "config":
if config is None:
startup = g.read_file("/config.txt")
return (None, name, startup or "boot config bootflash:config.txt nvbypass\n")
try:
g.write("/config.txt", bytes(config))
except RuntimeError as e:
return ("guestfs_write: %s" % e, name, config)
return (None, name, config)
else:
return ("unknown format: %s" % format, name, None)
finally:
g.shutdown()
g.close()
###### START HERE ######
# Check for logout
action = form.getfirst("action")
if action == "logout":
logout()
# Check we have an auth cookie; if not then login
if "gns3_auth" in CI:
auth = CI["gns3_auth"].value
else:
# Test if authorization is required
req = request.Request(GNS3_URL)
try:
with request.urlopen(req) as r:
need_auth = False
except HTTPError as e:
if e.code == 401:
need_auth = True
else:
raise
if need_auth:
username = form.getfirst("username")
password = form.getfirst("password")
if not username or not password:
login()
auth = "Basic " + base64.b64encode((username+":"+password).encode("UTF-8")).decode("UTF-8")
# Test it, before setting the cookie
req = request.Request(GNS3_URL)
req.add_header("Authorization", auth)
try:
with request.urlopen(req) as r:
pass
except HTTPError as e:
flash_message="Login failed: %s (%d)" % (html.escape(e.reason), e.code)
login(username=username)
else:
auth = ""
CO["gns3_auth"] = auth
# Everything works at the project level
project_id = form.getfirst("project_id")
# Actions which generate a redirect
if action in ["start", "stop"]:
start_stop(project_id, form.getfirst("node_id"), action)
redirect("?project_id=%s" % project_id)
if action == "restore" and os.environ["REQUEST_METHOD"] == "POST":
flash(restore(project_id, form.getlist("node_id"), form.getfirst("snapshot_id")))
redirect("?project_id=%s" % project_id)
if action == "edit":
config = (form.getfirst("config") or "").encode("UTF-8", errors="replace")
if os.environ["REQUEST_METHOD"] == "POST" and config:
err, name, prev_config = export_import_config(form.getfirst("project_id"), form.getfirst("node_id"), form.getfirst("format"), config)
if not err:
msg = "Imported to %s" % name
if form.getfirst("status") == "started":
start_stop(form.getfirst("project_id"), form.getfirst("node_id"), "start")
msg += " and restarted"
flash(msg)
redirect("?project_id=%s" % project_id)
flash_message = err
# but fall through to redisplay same page
else:
err, name, config = export_import_config(form.getfirst("project_id"), form.getfirst("node_id"), form.getfirst("format"))
if err:
flash(err)
redirect("?project_id=%s" % project_id)
with layout():
print("""
<h1>Config for %s</h1>
<form method="post">
<textarea class="config" name="config">%s</textarea>
<input type="submit" value="Import">
</form>
<a href="?project_id=%s">Cancel</a>
""" % (
html.escape(name),
html.escape(config.decode("UTF-8", errors="replace")),
form.getfirst("project_id"),
))
sys.exit(0)
# All cookies set, we can now generate the HTTP response
with layout():
#print(html.escape(repr(form)))
# Project selector
projects = api_get("/v2/projects")
projects.sort(key=lambda p: p["name"])
print("""
<form method="get">
<select name="project_id">
""")
for project in projects:
attr = ""
if project["status"] != "opened" or project["name"] in EXCLUDE_PROJECTS:
attr += " disabled"
elif project["project_id"] == project_id:
attr += " selected"
print('<option value="%s"%s>%s</option>' % (project["project_id"], attr, html.escape(project["name"])))
print("""
<input type="submit" value="Select project">
</form>""")
# Find selected project
sel = [p for p in projects if p["project_id"] == project_id]
if not sel:
sys.exit(0)
project = sel[0]
pid = project["project_id"]
print('<form method="post" onsubmit="return confirm(\'Are you sure? This will wipe the config of the selected devices!\');">')
# Snapshot actions
snapshots = api_get("/v2/projects/%s/snapshots" % pid)
snapshots.sort(key=lambda s: s["name"])
print("<hr>")
print('<p>Reset selected devices from snapshot')
print('<select name="snapshot_id">')
print('<option></option>')
print('<option value="None">&lt;Empty&gt;</option>')
for s in snapshots:
print('<option value="%s">%s</option>' % (s["snapshot_id"], html.escape(s["name"])))
print('</select>')
print('<button type="submit" name="action" value="restore">Restore</button>')
print('</p>')
# List nodes
print("<hr>")
print('<table class="nodes">')
print("""
<thead>
<tr>
<th colspan="3">%s</th>
</tr>
</thead>
""" % html.escape(project["name"]))
nodes = api_get("/v2/projects/%s/nodes" % project_id)
nodes.sort(key=lambda n: n["name"])
for node in nodes:
props = node.get("properties", {})
hda = props.get("hda_disk_image")
if not hda:
continue # filters out NAT nodes etc
if hda[0:4] == "vios":
format = "nvram"
elif hda[0:8] == "csr1000v":
format = "config"
else:
format = None
nid = node["node_id"]
attr = ""
actions = [node["status"]]
if node["status"] == "stopped":
actions.append('<a href="?project_id=%s&node_id=%s&action=start">Start</a>' % (pid, nid))
else:
actions.append('<a href="?project_id=%s&node_id=%s&action=stop">Stop</a>' % (pid, nid))
if format:
actions.append('<a href="?project_id=%s&node_id=%s&action=edit&status=%s&format=%s">Edit</a>' % (pid, nid, node["status"], format))
print("""
<tr>
<td><input type="checkbox" name="node_id" id="%s" value="%s"%s></td>
<td><label for="%s">%s</label></td>
<td class="%s">%s</td>
</tr>
""" % (nid, nid, attr, nid, html.escape(node["name"]), node["status"], " | ".join(actions)))
print("</table>")
print("</form>")
# Logout link
print("<hr>")
print('<a href="?action=logout">Logout</a>')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment