Skip to content

Instantly share code, notes, and snippets.

@smklein
Last active May 25, 2021 20:19
Show Gist options
  • Save smklein/86508a6f40e919b449868ece5c5122b9 to your computer and use it in GitHub Desktop.
Save smklein/86508a6f40e919b449868ece5c5122b9 to your computer and use it in GitHub Desktop.
PORT=12400 INSTANCE=$(./omicron/tools/oxapi_demo project_list_instances myproject | jq -r '.items[].id') python instance_serial.py
import json
import fcntl
import os
import requests
import select
import sys,tty,termios
import time
port = os.environ.get("PORT")
instance = os.environ.get("INSTANCE")
url = 'http://127.0.0.1:'+str(port)+'/instances/' + instance + '/serial'
class raw(object):
def __init__(self, stream):
self.fd = stream.fileno()
def __enter__(self):
self.original_stty = termios.tcgetattr(self.fd)
tty.setraw(self.fd)
def __exit__(self, type, value, traceback):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.original_stty)
class nonblocking(object):
def __init__(self, stream):
self.fd = stream.fileno()
def __enter__(self):
self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK)
def __exit__(self, *args):
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl)
old_time = time.time_ns()
buf = []
with raw(sys.stdin):
with nonblocking(sys.stdin):
while True:
c = sys.stdin.read(1)
if c != '':
if ord(c) == 3:
exit(0)
buf += [ord(c)]
time.sleep(.01)
new_time = time.time_ns()
if new_time - old_time > 1000000:
payload = """{
"bytes": """ + str(buf) + """
}"""
headers = {'content-type': 'application/json'}
response = requests.put(url, data=payload, headers=headers)
out = response.json()["bytes"]
if out != []:
print("".join([chr(val) for val in out]), end='', flush=True)
buf = []
old_time = new_time
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment