Skip to content

Instantly share code, notes, and snippets.

@0x61726b
Created August 17, 2018 19:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 0x61726b/b74de278cc805a29eea55871e3cfef6c to your computer and use it in GitHub Desktop.
Save 0x61726b/b74de278cc805a29eea55871e3cfef6c to your computer and use it in GitHub Desktop.
pi
import logging
import os
import datetime
import click
import requests
from flask import Flask, request, send_from_directory, abort, jsonify
from functools import wraps
import config
import socket
app = Flask(__name__)
app.url_map.strict_slashes = False
logging.basicConfig(level=logging.INFO)
current_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(current_dir, "logs")
handler = logging.FileHandler(filename='{}/{}.log'.format(logs_dir, datetime.datetime.now().strftime('%Y-%m-%d')),
encoding='utf-8', mode='a')
handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s'))
logger = logging.getLogger()
logger.addHandler(handler)
from models import *
init_db()
from wol import wake_on_lan
@app.before_request
def clear_trailing():
from flask import redirect, request
rp = request.path
if rp != '/' and rp.endswith('/'):
return redirect(rp[:-1])
def fingerprint_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.form['device_id'] is None or request.form['api_key'] is None:
return jsonify(ok=False, reason="unauthorized")
return f(*args, **kwargs)
return decorated_function
def target_host_check(f):
@wraps(f)
def decorated_function(*args, **kwargs):
device_id = request.form['device_id']
try:
user = User.select().where(User.device_id == device_id).get()
except:
return jsonify(ok=False, reason="Device key did not match any users"), 400
if user.device_id != device_id:
return jsonify(ok=False, reason="Device IDs are different."), 400
target_host = request.form['target_host']
try:
network_device = NetworkDevice.select().where(NetworkDevice.user == user,
NetworkDevice.hostname == target_host).get()
except:
return jsonify(ok=False, reason="Could not find any device belonging to this user."), 400
return f(*args, **kwargs)
return decorated_function
def get_ip_by_hostname(hostname):
return socket.gethostbyname(hostname)
@app.route('/ping/', methods = ["POST"])
@target_host_check
def do_ping():
device_id = request.form['device_id']
target_host = request.form['target_host']
# Get user
user = User.select().where(User.device_id == device_id).get()
# Get network device
network_device = NetworkDevice.select().where(NetworkDevice.user == user,
NetworkDevice.hostname == target_host).get()
host_ip = network_device.ip_address
# Scan network to see if the local IP address has changed
# network_ip = get_ip_by_hostname(network_device.hostname)
# if host_ip != network_ip:
# host_ip = network_ip
# network_device.ip_address = host_ip
# try:
# network_device.save()
# except:
# logger.error("Could not save new IP!")
# Ping the target host
try:
r = requests.get(f'http://{host_ip}:{config.CLIENT_PORT}/ping')
if r.status_code == 200:
return jsonify(ok=True)
except:
return jsonify(ok=False, reason="Target host is offline."), 200
@app.route('/do/', methods = ['POST'])
@target_host_check
def do_action():
# Actions
# pc_shut_down
# pc_launch
action = request.form['action']
target_host = request.form['target_host']
device_id = request.form['device_id']
# Get user
user = User.select().where(User.device_id == device_id).get()
# Get network device
network_device = NetworkDevice.select().where(NetworkDevice.user == user,
NetworkDevice.hostname == target_host).get()
host_ip = network_device.ip_address
# Scan network to see if the local IP address has changed
# network_ip = get_ip_by_hostname(network_device.hostname)
# if host_ip != network_ip:
# host_ip = network_ip
# network_device.ip_address = host_ip
# try:
# network_device.save()
# except:
# logger.error("Could not save new IP!")
# Ping the target host
if action != "turn_on":
try:
r = requests.get(f'http://{host_ip}:{config.CLIENT_PORT}/ping')
if r.status_code != 200:
return jsonify(ok=False, reason="Target host is offline."), 404
except:
return jsonify(ok=False, reason="Target host is offline."), 404
# Pass the action
try:
r = None
if action == "turn_off" or action == "screenshot":
r = requests.post(f'http://{host_ip}:{config.CLIENT_PORT}/do', data = { 'action': action })
if action == "turn_on":
try:
wake_on_lan(host_ip)
return jsonify(ok=True)
except Exception as ex:
logger.exception("Error waking on LAN.")
return jsonify(ok=False)
if action == "audio_level":
r = requests.post(f'http://{host_ip}:{config.CLIENT_PORT}/do', data={'action': action, 'audio_level': request.form['audio_level'] })
if r.status_code == 200:
return jsonify(ok=True)
else:
return jsonify(ok=False, reason="Could not perform action"), 503
except Exception as ex:
logger.error(ex)
return jsonify(ok=False, reason="Request to host failed"), 503
@app.route('/login/', methods = ['POST'])
@fingerprint_required
def login_user():
device_id = request.form['device_id']
logger.info(f"Login Request: {device_id}")
try:
user = User.select().where(User.device_id == device_id).get()
except DoesNotExist:
logger.info("This device key does not exist. Creating...")
try:
user = User.create(device_id = device_id)
except:
return jsonify(ok=False)
try:
device_list = NetworkDevice.select().where(NetworkDevice.user == user)
device_list_json = []
for d in device_list:
device_list_json.append(dict(hostname = d.hostname, ip = d.ip_address))
if len(device_list_json) == 0:
return jsonify(ok=False)
return jsonify(ok=True,devices=device_list_json, user_name=user.user_name, device_id=user.device_id)
except Exception as ex:
logger.error(ex)
pass
return jsonify(ok=False)
@click.group(invoke_without_command=True, options_metavar='[options]')
@click.pass_context
def main(ctx):
if ctx.invoked_subcommand is None:
while True:
app.run(host='0.0.0.0', port=config.PI_PORT)
print("Restarting...")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment