Skip to content

Instantly share code, notes, and snippets.

@bacher09
Last active April 7, 2020 17:25
Show Gist options
  • Save bacher09/824e52dfe2d3f16adc1393e890a6866e to your computer and use it in GitHub Desktop.
Save bacher09/824e52dfe2d3f16adc1393e890a6866e to your computer and use it in GitHub Desktop.
Python live update
-module(echo_server).
-export([start/0, loop/2]).
-define(LISTEN_PORT, 1234).
start() ->
listen().
listen() ->
{ok, LSock} = gen_tcp:listen(?LISTEN_PORT, [binary, {active, false},
{reuseaddr, true}]),
accept(LSock).
accept(LSocket) ->
{ok, Socket} = gen_tcp:accept(LSocket),
spawn(fun() -> echo_server:loop(Socket, 1) end),
accept(LSocket).
loop(Socket, State) ->
case gen_tcp:recv(Socket, 0) of
{ok, Data} ->
Sym = list_to_binary(io_lib:format("~p", [State])),
Line = [<<"New Line ">>, Sym, <<": ">>, Data],
gen_tcp:send(Socket, Line),
NewState = State + 1,
echo_server:loop(Socket, NewState);
{error, closed} ->
ok
end.
#!/usr/bin/env python3
import socket
import select
from collections import deque
class ClientState:
__slots__ = ('line', 'queue')
def __init__(self):
self.line = 0
self.queue = deque()
def add_line(self, data):
self.line += 1
# some old versions of python can't format bytes
out = b"".join([b"Line ", str(self.line).encode("ascii"), b": ", data])
self.queue.append(out)
def send_nonblock(self, sock):
try:
data = self.queue.pop()
n = sock.send(data)
# check if all data was sent
if n < len(data):
# we delivered only part of data, add other part to head
# of the queue to be delivered next time
self.queue.appendleft(data[n:])
except IndexError:
pass
return not self.queue
class SocketServer:
LISTEN_PORT = 1234
BUFSIZE = 1500
def __init__(self, port=None):
if port is None:
self.port = self.LISTEN_PORT
self.listen_sock = None
self.rsocks = set([])
self.wsocks = set([])
self.xsocks = set([])
self.state = {}
def remove_sock(self, sock):
self.rsocks.discard(sock)
self.wsocks.discard(sock)
self.xsocks.discard(sock)
def run(self):
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setblocking(False)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind(self.listen_addr)
lsock.listen()
self.listen_sock = lsock
self.rsocks.add(lsock)
self.wsocks.add(lsock)
self.xsocks.add(lsock)
try:
self.reactor()
except KeyboardInterrupt:
self.close()
def reactor(self):
while True:
rready, wready, xready = select.select(self.rsocks, self.wsocks,
self.xsocks)
for sock in rready:
if sock == self.listen_sock:
client_sock, addr = sock.accept()
client_sock.setblocking(False)
self.rsocks.add(client_sock)
self.state[client_sock.fileno()] = ClientState()
else:
data = sock.recv(self.BUFSIZE)
if len(data) == 0:
# socket is readable and returns no data this means
# it's closed
self.remove_state(sock)
sock.close()
self.remove_sock(sock)
else:
self.state[sock.fileno()].add_line(data)
self.rsocks.remove(sock)
self.wsocks.add(sock)
for sock in wready:
if sock.fileno() not in self.state:
self.wsocks.remove(sock)
continue
if self.state[sock.fileno()].send_nonblock(sock):
# all data were sent
self.wsocks.remove(sock)
self.rsocks.add(sock)
for sock in xready:
print("xready")
self.remove_state(sock)
sock.close()
self.remove_sock(sock)
def remove_state(self, sock):
if sock.fileno() in self.state:
del self.state[sock.fileno()]
def close(self):
# close all sockets without flushing states
for sock in set(self.rsocks | self.wsocks | self.xsocks):
if sock != self.listen_sock:
sock.close()
self.listen_sock.close()
@property
def listen_addr(self):
return ('0.0.0.0', self.port)
@classmethod
def main(cls):
return cls().run()
if __name__ == '__main__':
SocketServer.main()
#!/usr/bin/env python3
import socket
import select
import signal
import sys
import os
import json
from collections import deque
class ClientState:
__slots__ = ('line', 'queue')
def __init__(self):
self.line = 0
self.queue = deque()
def add_line(self, data):
self.line += 1
# some old versions of python can't format bytes
out = b"".join([b"Line ", str(self.line).encode("ascii"), b": ", data])
self.queue.append(out)
def send_nonblock(self, sock):
try:
data = self.queue.pop()
n = sock.send(data)
# check if all data was sent
if n < len(data):
# we delivered only part of data, add other part to head
# of the queue to be delivered next time
self.queue.appendleft(data[n:])
except IndexError:
pass
return not self.queue
def to_json(self):
data = b"".join(self.queue)
return {'line': self.line, 'data': data}
@classmethod
def from_json(cls, data):
obj = cls()
obj.line = data['line']
obj.queue.append(bytes(data['data']))
return obj
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return list(obj)
elif hasattr(obj, 'to_json'):
return obj.to_json()
else:
super().default(obj)
class SocketServer:
LISTEN_PORT = 1234
BUFSIZE = 1500
def __init__(self, port=None):
if port is None:
self.port = self.LISTEN_PORT
self.listen_sock = None
self.rsocks = set([])
self.wsocks = set([])
self.xsocks = set([])
self.state = {}
def init_state(self):
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setblocking(False)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind(self.listen_addr)
lsock.listen()
self.listen_sock = lsock
self.rsocks.add(lsock)
self.wsocks.add(lsock)
self.xsocks.add(lsock)
def read_state(self, fd):
fstate = os.fdopen(fd)
state = None
with fstate:
state = json.loads(fstate.read())
self.listen_sock = socket.socket(fileno=state['listen_sock'])
self.rsocks = set(socket.socket(fileno=fd) for fd in state['rsocks'])
self.wsocks = set(socket.socket(fileno=fd) for fd in state['wsocks'])
self.xsocks = set(socket.socket(fileno=fd) for fd in state['xsocks'])
for k, v in state['state'].items():
self.state[int(k)] = ClientState.from_json(v)
self.rsocks.add(self.listen_sock)
self.xsocks.add(self.listen_sock)
def remove_sock(self, sock):
self.rsocks.discard(sock)
self.wsocks.discard(sock)
self.xsocks.discard(sock)
def run(self):
state_fd = os.getenv('HOT_SWAP', None)
if state_fd is not None:
try:
state_fd = int(state_fd)
except ValueError:
state_fd = None
if state_fd is not None:
self.read_state(state_fd)
else:
self.init_state()
signal.signal(signal.SIGUSR2, self.update)
try:
self.reactor()
except KeyboardInterrupt:
self.close()
def reactor(self):
while True:
rready, wready, xready = select.select(self.rsocks, self.wsocks,
self.xsocks)
for sock in rready:
if sock == self.listen_sock:
client_sock, addr = sock.accept()
client_sock.setblocking(False)
self.rsocks.add(client_sock)
self.state[client_sock.fileno()] = ClientState()
else:
data = sock.recv(self.BUFSIZE)
if len(data) == 0:
# socket is readable and returns no data this means
# it's closed
self.remove_state(sock)
sock.close()
self.remove_sock(sock)
else:
self.state[sock.fileno()].add_line(data)
self.rsocks.remove(sock)
self.wsocks.add(sock)
for sock in wready:
if sock.fileno() not in self.state:
self.wsocks.remove(sock)
continue
if self.state[sock.fileno()].send_nonblock(sock):
# all data were sent
self.wsocks.remove(sock)
self.rsocks.add(sock)
for sock in xready:
self.remove_state(sock)
sock.close()
self.remove_sock(sock)
def remove_state(self, sock):
if sock.fileno() in self.state:
del self.state[sock.fileno()]
def close(self):
# close all sockets without flushing states
for sock in set(self.rsocks | self.wsocks | self.xsocks):
if sock != self.listen_sock:
sock.close()
self.listen_sock.close()
def update(self, sig, frame):
rfd, wfd = os.pipe()
os.set_inheritable(rfd, True)
os.set_inheritable(0, True)
os.set_inheritable(1, True)
os.set_inheritable(2, True)
for sock in (self.rsocks | self.wsocks | self.xsocks):
sock.set_inheritable(True)
pid = os.fork()
if pid < 0:
# error, fork isn't working
return
if pid == 0:
env = os.environ
env['HOT_SWAP'] = str(rfd)
os.execve(__file__, sys.argv, env)
else:
try:
os.write(wfd, self.json_state().encode("utf8"))
finally:
os.close(wfd)
sys.exit(0)
def json_state(self):
rsocks = [s.fileno() for s in self.rsocks if s != self.listen_sock]
wsocks = [s.fileno() for s in self.wsocks if s != self.listen_sock]
xsocks = [s.fileno() for s in self.xsocks if s != self.listen_sock]
state = {
'listen_sock': self.listen_sock.fileno(),
'rsocks': rsocks,
'wsocks': wsocks,
'xsocks': xsocks,
'state': self.state
}
return json.dumps(state, cls=JsonEncoder)
@property
def listen_addr(self):
return ('0.0.0.0', self.port)
@classmethod
def main(cls):
return cls().run()
if __name__ == '__main__':
SocketServer.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment