Skip to content

Instantly share code, notes, and snippets.

@prozacchiwawa
Last active June 5, 2022 08:55
Show Gist options
  • Save prozacchiwawa/7a640d810fc197bfba905e4993e68e50 to your computer and use it in GitHub Desktop.
Save prozacchiwawa/7a640d810fc197bfba905e4993e68e50 to your computer and use it in GitHub Desktop.
xtwrap - wrap cool-retro-term to suit gxemul's way of calling xterm
#!/usr/bin/env python3
import os
import sys
import tty
import time
import uuid
import signal
import socket
import argparse
import threading
import traceback
import subprocess
def copy_fd_to_stream(fdin,stream):
while True:
buf = os.read(fdin, 128)
if len(buf) == 0:
return
while len(buf) > 0:
try:
count = stream.send(buf)
buf = buf[count:]
except:
traceback.print_exc()
return
def copy_stream_to_fd(stream,fdout):
while True:
try:
buf = stream.recv(128)
except:
traceback.print_exc()
return
if len(buf) == 0:
return
while len(buf) > 0:
count = os.write(fdout, buf)
buf = buf[count:]
def handle_io_in_terminal(args):
sockname = args.inner
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(sockname)
try:
tty.setraw(sys.stdin.fileno())
except:
print('failed to set tty opts')
traceback.print_exc()
time.sleep(10)
def send_ctrl_c(signum, frame):
s.send(bytes([3]))
try:
signal.signal(signal.SIGINT, send_ctrl_c)
except:
print('failed to do signal handling')
traceback.print_exc()
time.sleep(10)
def write_output_from_socket():
copy_stream_to_fd(s, sys.stdout.fileno())
s.shutdown(2)
s.close()
os.close(sys.stdin.fileno())
os.unlink(args.inner)
to = threading.Thread(target=write_output_from_socket)
to.start()
def read_input_to_socket():
copy_fd_to_stream(sys.stdin.fileno(), s)
ti = threading.Thread(target=read_input_to_socket, daemon=True)
ti.start()
to.join()
def run_terminal_process(args, rest):
# Get passed-in info intended for child process from gxemul
ws_info = rest[0].split('S')[1].split(',')
fdin = int(ws_info[0])
fdout = int(ws_info[1])
# Make unix domain socket for communication with child process.
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sockname = '/tmp/gxemul-%s' % str(uuid.uuid4())
s.bind(sockname)
s.listen(5)
def run_term():
run_cmd = ['cool-retro-term', '-T', args.title, '-e', '/bin/sh', '-c', 'cd %s ; %s -inner %s %s' % (os.getcwd(), sys.argv[0], sockname, ' '.join(args.exec))]
subprocess.check_call(run_cmd)
t = threading.Thread(target=run_term)
t.start()
conn, addr = s.accept()
def copy_in_fd():
copy_fd_to_stream(fdin, conn)
conn.shutdown(2)
conn.close()
os.close(fdin)
def copy_out_fd():
copy_stream_to_fd(conn, fdout)
os.close(fdout)
tin = threading.Thread(target=copy_in_fd)
tin.start()
tout = threading.Thread(target=copy_out_fd, daemon=True)
tout.start()
t.join()
tin.join()
def main():
p = argparse.ArgumentParser(description='run cool-retro-term in a way compatible with xterm for gxemul')
p.add_argument('-title', type=str)
p.add_argument('-geometry', type=str)
p.add_argument('-e', action='store_true')
p.add_argument('-inner', type=str, default=False)
p.add_argument('exec', nargs='+')
args, rest = p.parse_known_args()
if args.inner is not False:
handle_io_in_terminal(args)
else:
run_terminal_process(args, rest)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment