Skip to content

Instantly share code, notes, and snippets.

@flexiondotorg
Last active November 3, 2021 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flexiondotorg/19ffce89f55fa6893fbaebde4d1dadf0 to your computer and use it in GitHub Desktop.
Save flexiondotorg/19ffce89f55fa6893fbaebde4d1dadf0 to your computer and use it in GitHub Desktop.
Aitum's Butler PoC
#!/usr/bin/env python3
import argparse
import http.server
import signal
import socketserver
import subprocess
import sys
from urllib.parse import urlparse
def signal_handler(signal, frame):
"""
Signal handler that exits gracefully
"""
print('Exiting (Ctrl+C pressed)')
try:
if (butler):
butler.server_close()
finally:
sys.exit(0)
def cmd_handler(cmd, http):
p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
for line in iter(p.stdout.readline, b''):
http.wfile.write(line)
print(line.decode("utf-8").strip())
p.stdout.close()
p.wait()
class request_handler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(bytes(self.path, "utf8"))
if self.path == "/spotify/30":
cmd_handler("/home/martin/Scripts/Streaming/spotify-volume.sh 30", self)
elif self.path == "/spotify/50":
cmd_handler("/home/martin/Scripts/Streaming/spotify-volume.sh 50", self)
return
if __name__ == "__main__":
example_usage = """example:
aitums-butler.py"""
parser = argparse.ArgumentParser(description="Aitum's Butler",
epilog=example_usage,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--addr', '-a', action="store", default="localhost", help='ip address', type=str)
parser.add_argument('--port', '-p', action="store", default=8008, help='port to listen on', type=int)
args = parser.parse_args()
# Initialise the server
request_handler_object = request_handler
butler = socketserver.TCPServer((args.addr, args.port), request_handler_object)
#Ensures that Ctrl-C cleanly kills all spawned threads
butler.daemon_threads = True
#Quicker rebinding
butler.allow_reuse_address = True
# Initialise the signal handler to catch Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
print(f"Listening on {args.addr}:{args.port}")
# Now loop forever
try:
while True:
sys.stdout.flush()
butler.serve_forever()
except KeyboardInterrupt:
pass
butler.server_close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment