Skip to content

Instantly share code, notes, and snippets.

@lezwon
Created May 28, 2021 12:51
Show Gist options
  • Save lezwon/cdd486bdbdf00b40c2444a6e3ee21951 to your computer and use it in GitHub Desktop.
Save lezwon/cdd486bdbdf00b40c2444a6e3ee21951 to your computer and use it in GitHub Desktop.
import socket
from select import select
from collections import deque
import time
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Holds the tasks to be executed
self.sleeping = [] # Holds the sleeping tasks
self.read_waiting = {} # Callbacks to read from socket
self.write_waiting = {} # Callback to write to socket
def call_soon(self, func):
''' Adds the func to the ready queue immediately '''
self.ready.append(func)
def call_later(self, sleep, func):
'''
Adds the func to the sleeping queue
after calcualting deadline
'''
deadline = time.time() + sleep
heapq.heappush(self.sleeping, (deadline, func))
def read_wait(self, fileno, func):
''' Adds callback for reading a socket '''
self.read_waiting[fileno] = func
def write_wait(self, fileno, func):
''' Adds callback for writing to a socket '''
self.write_waiting[fileno] = func
def run(self):
''' Run the Event loop '''
while self.ready or self.sleeping or self.read_waiting or self.write_waiting:
if not self.ready:
if self.sleeping:
deadline, _ = self.sleeping[0]
timeout = deadline - time.time() # Calculate timeout
if timeout < 0:
timeout = 0
else:
timeout = None
# Use timeout in select call to check Network I/O
ready_read, ready_wait, _ = select(self.read_waiting, self.write_waiting, [], timeout)
for fd in ready_read:
self.ready.append(self.read_waiting.pop(fd))
for fd in ready_wait:
self.ready.append(self.write_waiting.pop(fd))
# Check Sleeping tasks
now = time.time()
while self.sleeping:
if self.sleeping[0][0] < now:
deadline, func = heapq.heappop(self.sleeping) # Pop expired sleeping func
self.ready.append(func) # add it to ready queue
else:
break
# Execute the ready tasks
while self.ready:
func = self.ready.popleft()
func()
addr = ('127.0.0.1', 3000)
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind(addr)
sock.listen(1)
sock.setblocking(False)
maxbytes = 10000
def accept(sock):
''' Accepts a connection '''
client, addr = sock.accept()
print("Connected to ", addr)
sched.read_wait(client, lambda: recv(client))
def recv(client):
''' Recives message from client '''
try:
data = client.recv(maxbytes)
if not data:
raise ConnectionError
except (ConnectionError, ConnectionResetError) as e:
client.close()
print("Connection Closed")
sched.read_wait(sock, lambda: accept(sock))
return
except Exception as e:
print(e)
print("Received message: ", data.decode())
sched.write_wait(client, lambda: send(client, data))
def send(sock, data):
''' Send message to client socket '''
sock.send(b'Got: ' + data)
sched.read_wait(sock, lambda: recv(sock))
def countdown(n):
if n > 0:
print('Down', n)
sched.call_later(4, lambda: countdown(n-1))
def countup(n, i=0):
print('Up', i)
sched.call_later(1, lambda: countup(n, i+1))
sched = Scheduler()
sched.read_wait(sock, lambda: accept(sock))
sched.call_soon(lambda: countdown(6))
sched.call_soon(lambda: countup(6))
sched.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment