Skip to content

Instantly share code, notes, and snippets.

@ltrager
Created January 22, 2021 00:55
Show Gist options
  • Save ltrager/8f5b0018914b2368920b8847f332b9f0 to your computer and use it in GitHub Desktop.
Save ltrager/8f5b0018914b2368920b8847f332b9f0 to your computer and use it in GitHub Desktop.
maas-wol service to use with the MAAS webhook power driver
#!/usr/bin/env python3
#
# maas-wol - Web server to accept WoL requests
#
# Author: Lee Trager <lee.trager@canonical.com>
#
# Copyright (C) 2021 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import base64
import http.server
import socketserver
import signal
import logging
import sys
import socket
import re
import select
import json
GET_REGEX = re.compile(r"^/(?P<MAC>([\da-f]{2}[:-]){5}[\da-f]{2})[/]?$", re.I)
POST_REGEX = re.compile(
r"^/(?P<MAC>([\da-f]{2}[:-]){5}[\da-f]{2})/\?op=(?P<OP>(start|stop))$", re.I
)
# Dictionary containing machine statuses based on previous requests.
machine_status = {}
# User settings
broadcast_ip = None
broadcast_port = None
username = None
password = None
token = None
class HTTPWoL(http.server.SimpleHTTPRequestHandler):
def _authenticate(self):
global username, password, token
if not username and not password and not token:
return True
try:
cred = self.headers.get("Authorization").split()[1]
except (IndexError, AttributeError):
cred = None
if token:
# RFC 6750
if cred == token:
return True
elif username or password:
# RFC 7617
if base64.b64decode(cred).decode() == f"{username}:{password}":
return True
else:
self.send_response(http.client.UNAUTHORIZED)
self.end_headers()
self.wfile.write(b"Unauthorized!\n")
return False
def _bad_path(self):
self.send_response(http.client.BAD_REQUEST)
self.end_headers()
self.wfile.write(b"Unknown path!\n")
def do_GET(self):
if not self._authenticate():
return
# MAAS will send the the system_id in the header
# system_id = self.headers.get("System_id")
m = GET_REGEX.search(self.path)
if m:
global machine_status
self.send_response(http.client.OK)
self.end_headers()
self.wfile.write(
json.dumps(
{"status": machine_status.get(m.group("MAC"), "unknown")}
).encode()
+ b"\n"
)
else:
self._bad_path()
def _start(self, mac_address):
global machine_status, broadcast_ip, broadcast_port
# Don't include ':' or '-' in the MAC address
seperator = mac_address[2]
packets = bytes.fromhex("F" * 12 + mac_address.replace(seperator, "") * 16)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.connect((broadcast_ip, broadcast_port))
sock.send(packets)
self.send_response(http.client.OK)
self.end_headers()
self.wfile.write(b"WoL packet sent!\n")
machine_status[mac_address] = "running"
def _stop(self, mac_address):
print(
f"Press enter once {mac_address} has been shutdown, "
"waiting 60 seconds..."
)
i, _, _ = select.select([sys.stdin], [], [], 60)
if i:
self.send_response(http.client.OK)
self.end_headers()
self.wfile.write(b"User has shutdown the system!\n")
global machine_status
machine_status[mac_address] = "stopped"
else:
self.send_response(http.client.INTERNAL_SERVER_ERROR)
self.end_headers()
self.wfile.write(b"No one available to shutdown the system!\n")
def do_POST(self):
if not self._authenticate():
return
# MAAS will send the the system_id in the header
# system_id = self.headers.get("System_id")
m = POST_REGEX.search(self.path)
if m:
if m.group("OP") == "start":
self._start(m.group("MAC"))
elif m.group("OP") == "stop":
self._stop(m.group("MAC"))
else:
self._bad_path()
def main():
parser = argparse.ArgumentParser(description="Web server to issue WoL commands")
parser.add_argument(
"--broadcast",
"-b",
default="255.255.255.255",
type=str,
help="The broadcast address to use for the wake on LAN command.",
)
parser.add_argument(
"--broadcast-port",
"-B",
default=9,
type=int,
help="The broadcast port to use for the wake on LAN command.",
)
parser.add_argument(
"--port",
"-p",
default=8080,
type=int,
help="The port to listen for requests on.",
)
parser.add_argument(
"--username", "-u", type=str, help="The username required for remote use."
)
parser.add_argument(
"--password", "-P", type=str, help="The password required for remote use."
)
parser.add_argument("--token", "-t", type=str, help="An authentication token.")
args = parser.parse_args()
global broadcast_ip, broadcast_port, username, password, token
broadcast_ip = args.broadcast
broadcast_port = args.broadcast_port
username = args.username
password = args.password
token = args.token
with socketserver.TCPServer(("", args.port), HTTPWoL) as httpd:
def shutdown(*args, **kwargs):
print("Shutting down...")
httpd.server_close()
sys.exit(0)
print("Listening for requests...")
signal.signal(signal.SIGINT, shutdown)
httpd.serve_forever()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment