Skip to content

Instantly share code, notes, and snippets.

@PttCodingMan
Created December 31, 2023 03:00
Show Gist options
  • Save PttCodingMan/610ff787833ea79c8b416b4134c81bb5 to your computer and use it in GitHub Desktop.
Save PttCodingMan/610ff787833ea79c8b416b4134c81bb5 to your computer and use it in GitHub Desktop.
import multiprocessing
import threading
import time
class ServerConsole:
def __init__(self):
self.msg_queue = multiprocessing.Queue()
self.console_process = threading.Thread(target=self.console, args=(self.msg_queue,))
self.console_process.start()
def print(self, msg: str):
self.msg_queue.put(msg)
def console(self, queue):
input_process = threading.Thread(target=self.wait_input, args=(queue,))
input_process.start()
while True:
while not queue.empty():
if input_process is not None:
input_process.terminate()
input_process = None
print('\r', end='')
print('server log:', queue.get())
# time.sleep(0.2)
if input_process is None:
input_process = threading.Thread(target=self.wait_input, args=(queue,))
input_process.start()
def wait_input(self, queue):
try:
cmd = input('>> ')
print('!!!!!!!!!!!!')
print('!!!!!!!!!!!!')
print('!!!!!!!!!!!!')
self.msg_queue.put(cmd)
queue.put(cmd)
except EOFError:
pass
if __name__ == '__main__':
c = ServerConsole()
for _ in range(10):
time.sleep(5)
c.print('test')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment