Skip to content

Instantly share code, notes, and snippets.

@j0057
Last active July 6, 2020 13:28
Show Gist options
  • Save j0057/ffccdbc808262509b743a3575d798efa to your computer and use it in GitHub Desktop.
Save j0057/ffccdbc808262509b743a3575d798efa to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import socket
import threading
import time
STATUS = None
def prompt_for_status():
global STATUS
while True:
STATUS = input(f"Current status is {STATUS!r}, enter new status: ")
def listen_health():
addr = '/tmp/funwithunixsockets'
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.bind(addr)
sock.listen()
while True:
(conn, client_addr) = sock.accept()
with conn:
conn.sendall((STATUS or 'NO STATUS YET').encode('ascii'))
prompt_thread = threading.Thread(target=prompt_for_status, daemon=True)
prompt_thread.start()
health_thread = threading.Thread(target=listen_health, daemon=True)
health_thread.start()
while True:
time.sleep(0.5)
#!/usr/bin/env python3
import socket
import sys
addr = '/tmp/funwithunixsockets'
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(addr)
data = sock.recv(128).decode('ascii')
print(f">>> {data!r}")
if data == 'UNHEALTHY':
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment