Skip to content

Instantly share code, notes, and snippets.

@totegamma
Last active October 17, 2022 01:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save totegamma/3b7b0e534789ddd673200af2792490e4 to your computer and use it in GitHub Desktop.
Save totegamma/3b7b0e534789ddd673200af2792490e4 to your computer and use it in GitHub Desktop.
import os
import textwrap
import threading
from flask import Flask, request
from neosvr_headless_api import LocalHeadlessClient
hc_lock = threading.Lock()
hc = LocalHeadlessClient(os.path.dirname(os.path.realpath(__file__)))
print("booting...")
hc.wait_for_ready()
print("done!")
app = Flask(__name__)
@app.route("/")
def hello():
print("hello!")
return {"message": "hello"}
@app.route("/api/<method>")
def run_command(method):
hc_lock.acquire()
try:
args = request.args.get('arg') or ""
args = filter(lambda x: x != '', args.split(',')) or []
func = getattr(hc, method)
response = func(*args)
return {"status": "OK", "result": response}
except AttributeError:
return {"status": "NG"}
finally:
hc_lock.release()
@app.route("/metrics")
def export_metrics():
hc_lock.acquire()
try:
worlddata = hc.worlds()
metrics = map(lambda x: textwrap.dedent('''\
# HELP neos_session_max_users The maximum number of users, allowed to join this session.
# TYPE neos_session_max_users gauge
neos_session_max_users{{name="{name}"}} {max_users}
# HELP neos_session_present The number of present users in this session.
# TYPE neos_session_present gauge
neos_session_present{{name="{name}"}} {present}
# HELP neos_session_users The number of users in this session.
# TYPE neos_session_users gauge
neos_session_users{{name="{name}"}} {users}
''').format(name=x['name'], max_users=x['max_users'], present=x['present'], users=x['users']), worlddata)
return metrics
except AttributeError:
return {"status": "NG"}
finally:
hc_lock.release()
def healthcheck():
global hc
if hc.process.process.poll() != None:
print("Session is down! rebooting...")
hc_lock.acquire()
hc = LocalHeadlessClient(os.path.dirname(os.path.realpath(__file__)))
hc.wait_for_ready()
hc_lock.release()
print("reboot done!")
threading.Timer(5, healthcheck).start()
if __name__ == "__main__":
healthcheck()
app.run(host='0.0.0.0')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment