Last active
December 5, 2023 22:20
-
-
Save candlerb/30ba057b9d91a738a893dafc0c282723 to your computer and use it in GitHub Desktop.
CGI script to allow manipulation of IOSv/IOSvL2 configurations in GNS3 (reset, export/edit/import, restore from snapshot)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
CGI tool to manage configurations of IOSv and CSR1000v devices in GNS3 | |
* Restore configurations from snapshots | |
* Read and edit the config in NVRAM | |
""" | |
GNS3_URL = "http://127.0.0.1:3080" | |
EXCLUDE_PROJECTS = { | |
"NOC", | |
} | |
DEBUG = True | |
import base64 | |
import cgi | |
import cgitb | |
import contextlib | |
import datetime | |
import glob | |
import html | |
import http.cookies | |
import json | |
import os | |
import subprocess | |
import sys | |
import urllib.request as request | |
from urllib.error import HTTPError | |
CI = http.cookies.SimpleCookie(os.environ.get("HTTP_COOKIE", "")) | |
CO = http.cookies.SimpleCookie() # Cookies to output | |
form = cgi.FieldStorage() | |
flash_message = None | |
if "flash" in CI: | |
flash_message = CI["flash"].value | |
CO["flash"] = "" | |
CO["flash"]["expires"] = "Thu, 01 Jan 1970 00:00:00 GMT" | |
def flash(msg): | |
if msg: | |
CO["flash"] = msg | |
SENT_PREAMBLE = False | |
def preamble(content_type="text/html"): | |
global SENT_PREAMBLE | |
if not SENT_PREAMBLE: | |
print("Content-Type: " + content_type) | |
print(CO) | |
print() | |
SENT_PREAMBLE = True | |
if DEBUG: | |
cgitb.enable() | |
def redirect(url, status="302 Found"): | |
print("Status: %s" % status) | |
print("Location: %s" % url) | |
print("Content-Type: text/html") | |
print(CO) | |
print("") | |
print('<a href="%s">Click to continue</a>' % url) | |
sys.exit(0) | |
@contextlib.contextmanager | |
def layout(title="GNS3 IOSv Configuration"): | |
preamble() | |
print(""" | |
<html> | |
<head> | |
<title>%s</title> | |
<style> | |
body { font-family: sans-serif; } | |
form.login { display: table; } | |
form.login p {display: table-row; } | |
form.login label { display: table-cell; padding: 0.3em 1em 0.3em 0em; } | |
form.login input { display: table-cell; } | |
table.nodes th { background-color: #ff8080; } | |
table.nodes td { padding: 0em 0.5em 0em 0.5em; } | |
textarea.config { white-space: pre; width: 100%%; height: 80%%; font-family: monospace; font-size: 110%%; } | |
.flash { background-color: #ffaaaa; border: 2px solid red; padding: 10px; margin-bottom: 10px; } | |
</style> | |
</head> | |
<body> | |
""" % html.escape(title)) | |
if flash_message: | |
print("<div class='flash'>%s</div>" % html.escape(flash_message)) | |
yield | |
print(""" | |
</body> | |
</html> | |
""") | |
def login(username="",*a,**k): | |
with layout(*a,**k): | |
print(""" | |
<form class="login" method="post"> | |
<fieldset> | |
<legend>GNS3 login required</legend> | |
<p> | |
<label for="username">Username</label> | |
<input type="text" name="username" value="%s" autofocus> | |
</p> | |
<p> | |
<label for="password">Password</label> | |
<input type="password" name="password"> | |
</p> | |
<p> | |
<input type="submit"> | |
</p> | |
</fieldset> | |
</form> | |
""" % html.escape(username)) | |
sys.exit(0) | |
def logout(): | |
CO["gns3_auth"] = "" | |
CO["gns3_auth"]["expires"] = "Thu, 01 Jan 1970 00:00:00 GMT" | |
redirect(os.environ["SCRIPT_NAME"]) | |
def tag(name, value, **attrs): | |
print("<%s>%s</%s>" % (name, html.escape(value), name)) | |
auth = None | |
def api_get(path): | |
global auth | |
req = request.Request(GNS3_URL+path) | |
if auth: | |
req.add_header("Authorization", auth) | |
try: | |
with request.urlopen(req) as r: | |
data = json.load(r) | |
return data | |
except HTTPError as e: | |
if e.code in [401, 403]: | |
logout() | |
else: | |
preamble() | |
print("Internal error: %s (%d)" % (html.escape(e.reason), e.code)) | |
sys.exit(0) | |
def api_post(path, data={}): | |
global auth | |
req = request.Request(GNS3_URL+path, method="POST", | |
data=json.dumps(data).encode("UTF-8"), | |
headers={"Content-Type":"application/json"}) | |
if auth: | |
req.add_header("Authorization", auth) | |
try: | |
with request.urlopen(req) as r: | |
data = json.load(r) | |
return data | |
except HTTPError as e: | |
if e.code in [401, 403]: | |
logout() | |
else: | |
preamble() | |
print("Internal error: %s (%d)" % (html.escape(e.reason), e.code)) | |
sys.exit(0) | |
def start_stop(project_id, node_id, action): | |
return api_post("/v2/projects/%s/nodes/%s/%s" % (project_id, node_id, action)) | |
def restore(project_id, node_ids, snapshot_id): | |
if not project_id: | |
return "No project selected" | |
if not node_ids: | |
return "No nodes selected" | |
if not snapshot_id: | |
return "No snapshot selected" | |
# Get the project path. Also checks authentication | |
project = api_get("/v2/projects/%s" % project_id) | |
ss_filename = None | |
snapshot_ids = {} | |
if snapshot_id != "None": | |
snapshots = api_get("/v2/projects/%s/snapshots" % project_id) | |
snapshots = [s for s in snapshots if s["snapshot_id"] == snapshot_id] | |
snapshot = snapshots[0] | |
ss_filename = "%s/snapshots/%s_%s.gns3project" % (project["path"], snapshot["name"], | |
datetime.datetime.utcfromtimestamp(snapshot["created_at"]).strftime("%d%m%y_%H%M%S")) | |
if not os.path.exists(ss_filename): | |
return "Unable to find %s" % ss_filename | |
# Annoyingly, the node_ids stored in the snapshot are different from the running | |
# node_ids, so we have to unpack the project.gns3 file and map by node name. | |
res = subprocess.run(["unzip", "-p", ss_filename, "project.gns3"], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if res.returncode != 0: | |
return "unzip project.gns3 failed (%d): %s" % (res.returncode, res.stderr.decode("UTF-8", errors="replace")) | |
data = json.loads(res.stdout) | |
snapshot_ids = {n["name"]:n["node_id"] for n in data["topology"]["nodes"]} | |
# Iterate over all the nodes, restore them (stop if required) | |
nodes = api_get("/v2/projects/%s/nodes" % project_id) | |
restart = [] | |
for node in nodes: | |
if node["node_id"] not in node_ids: | |
continue | |
if node["status"] != "stopped": | |
start_stop(project_id, node["node_id"], "stop") | |
restart.append(node) | |
if node["name"] in snapshot_ids: | |
cmd = ["unzip", "-jo", ss_filename, | |
"-d", node["node_directory"], | |
"project-files/*/%s/*" % snapshot_ids[node["name"]], | |
] | |
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
if res.returncode != 0: | |
return "unzip failed (%d): %s" % (res.returncode, res.stdout.decode("UTF-8", errors="replace")) | |
else: | |
for file in glob.glob(node["node_directory"]+"/*.qcow2"): | |
os.unlink(file) | |
# Restart if required | |
for node in restart: | |
start_stop(project_id, node["node_id"], "start") | |
return "Restore complete" | |
def export_import_config(project_id, node_id, format, config=None): | |
""" | |
If config is None: reads the config | |
if config is bytes: replaces the config | |
Returns: error, name, current_or_previous_config | |
""" | |
sys.path.extend(glob.glob("/usr/share/gns3/gns3-server/lib/*/site-packages")) | |
from gns3server.compute.iou.utils.iou_export import nvram_export | |
from gns3server.compute.iou.utils.iou_import import nvram_import | |
import guestfs | |
if not project_id: | |
return ("No project selected", None, None) | |
if not node_id: | |
return ("No node selected", None, None) | |
node = api_get("/v2/projects/%s/nodes/%s" % (project_id, node_id)) | |
name = node["name"] | |
if node["status"] == "started": | |
start_stop(project_id, node_id, "stop") | |
# virt-cat -a hda_disk.qcow2 -m /dev/sda1:/ /nvram >/tmp/nvram | |
#os.environ["LIBGUESTFS_DEBUG"]="1" | |
#os.environ["LIBGUESTFS_TRACE"]="1" | |
g = guestfs.GuestFS(python_return_dict=True) | |
g.add_drive_opts(node["node_directory"]+"/hda_disk.qcow2", format="qcow2", readonly=int(not bool(config))) | |
g.launch() | |
g.mount("/dev/sda1", "/") | |
try: | |
if format == "nvram": | |
nvram = g.read_file("/nvram") | |
try: | |
startup, private = nvram_export(nvram) | |
except ValueError as e: | |
return ("nvram_export: %s" % e, name, None) | |
if config is None: | |
return (None, name, startup) | |
try: | |
nvram = nvram_import(nvram, config, private, 512) | |
except ValueError as e: | |
return ("nvram_import: %s" % e, name, config) | |
try: | |
g.write("/nvram", bytes(nvram)) | |
except RuntimeError as e: | |
return ("guestfs_write: %s" % e, name, config) | |
return (None, name, config) | |
elif format == "config": | |
if config is None: | |
startup = g.read_file("/config.txt") | |
return (None, name, startup or "boot config bootflash:config.txt nvbypass\n") | |
try: | |
g.write("/config.txt", bytes(config)) | |
except RuntimeError as e: | |
return ("guestfs_write: %s" % e, name, config) | |
return (None, name, config) | |
else: | |
return ("unknown format: %s" % format, name, None) | |
finally: | |
g.shutdown() | |
g.close() | |
###### START HERE ###### | |
# Check for logout | |
action = form.getfirst("action") | |
if action == "logout": | |
logout() | |
# Check we have an auth cookie; if not then login | |
if "gns3_auth" in CI: | |
auth = CI["gns3_auth"].value | |
else: | |
# Test if authorization is required | |
req = request.Request(GNS3_URL) | |
try: | |
with request.urlopen(req) as r: | |
need_auth = False | |
except HTTPError as e: | |
if e.code == 401: | |
need_auth = True | |
else: | |
raise | |
if need_auth: | |
username = form.getfirst("username") | |
password = form.getfirst("password") | |
if not username or not password: | |
login() | |
auth = "Basic " + base64.b64encode((username+":"+password).encode("UTF-8")).decode("UTF-8") | |
# Test it, before setting the cookie | |
req = request.Request(GNS3_URL) | |
req.add_header("Authorization", auth) | |
try: | |
with request.urlopen(req) as r: | |
pass | |
except HTTPError as e: | |
flash_message="Login failed: %s (%d)" % (html.escape(e.reason), e.code) | |
login(username=username) | |
else: | |
auth = "" | |
CO["gns3_auth"] = auth | |
# Everything works at the project level | |
project_id = form.getfirst("project_id") | |
# Actions which generate a redirect | |
if action in ["start", "stop"]: | |
start_stop(project_id, form.getfirst("node_id"), action) | |
redirect("?project_id=%s" % project_id) | |
if action == "restore" and os.environ["REQUEST_METHOD"] == "POST": | |
flash(restore(project_id, form.getlist("node_id"), form.getfirst("snapshot_id"))) | |
redirect("?project_id=%s" % project_id) | |
if action == "edit": | |
config = (form.getfirst("config") or "").encode("UTF-8", errors="replace") | |
if os.environ["REQUEST_METHOD"] == "POST" and config: | |
err, name, prev_config = export_import_config(form.getfirst("project_id"), form.getfirst("node_id"), form.getfirst("format"), config) | |
if not err: | |
msg = "Imported to %s" % name | |
if form.getfirst("status") == "started": | |
start_stop(form.getfirst("project_id"), form.getfirst("node_id"), "start") | |
msg += " and restarted" | |
flash(msg) | |
redirect("?project_id=%s" % project_id) | |
flash_message = err | |
# but fall through to redisplay same page | |
else: | |
err, name, config = export_import_config(form.getfirst("project_id"), form.getfirst("node_id"), form.getfirst("format")) | |
if err: | |
flash(err) | |
redirect("?project_id=%s" % project_id) | |
with layout(): | |
print(""" | |
<h1>Config for %s</h1> | |
<form method="post"> | |
<textarea class="config" name="config">%s</textarea> | |
<input type="submit" value="Import"> | |
</form> | |
<a href="?project_id=%s">Cancel</a> | |
""" % ( | |
html.escape(name), | |
html.escape(config.decode("UTF-8", errors="replace")), | |
form.getfirst("project_id"), | |
)) | |
sys.exit(0) | |
# All cookies set, we can now generate the HTTP response | |
with layout(): | |
#print(html.escape(repr(form))) | |
# Project selector | |
projects = api_get("/v2/projects") | |
projects.sort(key=lambda p: p["name"]) | |
print(""" | |
<form method="get"> | |
<select name="project_id"> | |
""") | |
for project in projects: | |
attr = "" | |
if project["status"] != "opened" or project["name"] in EXCLUDE_PROJECTS: | |
attr += " disabled" | |
elif project["project_id"] == project_id: | |
attr += " selected" | |
print('<option value="%s"%s>%s</option>' % (project["project_id"], attr, html.escape(project["name"]))) | |
print(""" | |
<input type="submit" value="Select project"> | |
</form>""") | |
# Find selected project | |
sel = [p for p in projects if p["project_id"] == project_id] | |
if not sel: | |
sys.exit(0) | |
project = sel[0] | |
pid = project["project_id"] | |
print('<form method="post" onsubmit="return confirm(\'Are you sure? This will wipe the config of the selected devices!\');">') | |
# Snapshot actions | |
snapshots = api_get("/v2/projects/%s/snapshots" % pid) | |
snapshots.sort(key=lambda s: s["name"]) | |
print("<hr>") | |
print('<p>Reset selected devices from snapshot') | |
print('<select name="snapshot_id">') | |
print('<option></option>') | |
print('<option value="None"><Empty></option>') | |
for s in snapshots: | |
print('<option value="%s">%s</option>' % (s["snapshot_id"], html.escape(s["name"]))) | |
print('</select>') | |
print('<button type="submit" name="action" value="restore">Restore</button>') | |
print('</p>') | |
# List nodes | |
print("<hr>") | |
print('<table class="nodes">') | |
print(""" | |
<thead> | |
<tr> | |
<th colspan="3">%s</th> | |
</tr> | |
</thead> | |
""" % html.escape(project["name"])) | |
nodes = api_get("/v2/projects/%s/nodes" % project_id) | |
nodes.sort(key=lambda n: n["name"]) | |
for node in nodes: | |
props = node.get("properties", {}) | |
hda = props.get("hda_disk_image") | |
if not hda: | |
continue # filters out NAT nodes etc | |
if hda[0:4] == "vios": | |
format = "nvram" | |
elif hda[0:8] == "csr1000v": | |
format = "config" | |
else: | |
format = None | |
nid = node["node_id"] | |
attr = "" | |
actions = [node["status"]] | |
if node["status"] == "stopped": | |
actions.append('<a href="?project_id=%s&node_id=%s&action=start">Start</a>' % (pid, nid)) | |
else: | |
actions.append('<a href="?project_id=%s&node_id=%s&action=stop">Stop</a>' % (pid, nid)) | |
if format: | |
actions.append('<a href="?project_id=%s&node_id=%s&action=edit&status=%s&format=%s">Edit</a>' % (pid, nid, node["status"], format)) | |
print(""" | |
<tr> | |
<td><input type="checkbox" name="node_id" id="%s" value="%s"%s></td> | |
<td><label for="%s">%s</label></td> | |
<td class="%s">%s</td> | |
</tr> | |
""" % (nid, nid, attr, nid, html.escape(node["name"]), node["status"], " | ".join(actions))) | |
print("</table>") | |
print("</form>") | |
# Logout link | |
print("<hr>") | |
print('<a href="?action=logout">Logout</a>') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment