Skip to content

Instantly share code, notes, and snippets.

@avli
Last active April 12, 2022 12:57
Show Gist options
  • Save avli/45f1bc4d771ad10a372fb3a0376372dc to your computer and use it in GitHub Desktop.
Save avli/45f1bc4d771ad10a372fb3a0376372dc to your computer and use it in GitHub Desktop.
import time
import threading
def set_timeout(delay, callback):
def target():
time.sleep(delay)
callback()
threading.Thread(target=target).start()
if __name__ == '__main__':
set_timeout(1, lambda: print(1))
set_timeout(10, lambda: print(10))
set_timeout(5, lambda: print(5))
set_timeout(3, lambda: print(3))
set_timeout(1, lambda: print(1))
set_timeout(2, lambda: print(2))
set_timeout(7, lambda: print(7))
set_timeout(4, lambda: print(4))
import time
from queue import Queue
def _make_infinity_queue():
return Queue(maxsize=-1)
_queue = _make_infinity_queue()
def set_timeout(delay, callback):
start_time = time.time()
def wrapper():
if time.time() >= start_time + delay:
_queue.put(callback)
else:
_queue.put(wrapper)
_queue.put(wrapper)
class EventLoop:
def __init__(self, initial_target):
_queue.put(initial_target)
# noinspection PyMethodMayBeStatic
def run(self):
while not _queue.empty():
func = _queue.get()
func()
if __name__ == '__main__':
def async_fun():
set_timeout(1, lambda: print(1))
set_timeout(10, lambda: print(10))
set_timeout(5, lambda: print(5))
set_timeout(3, lambda: print(3))
set_timeout(1, lambda: print(1))
set_timeout(2, lambda: print(2))
set_timeout(7, lambda: print(7))
set_timeout(4, lambda: print(4))
print("exiting from `async_fun`")
EventLoop(async_fun).run()
import time
from multiprocessing.pool import ThreadPool
from queue import Queue
def _make_infinity_queue():
return Queue(maxsize=-1)
_thread_pool = ThreadPool(processes=1)
_queue = _make_infinity_queue()
def set_timeout(delay, callback):
start_time = time.time()
def wrapper():
if time.time() >= start_time + delay:
_queue.put(callback)
else:
_queue.put(wrapper)
_queue.put(wrapper)
class EventLoop:
def __init__(self, initial_target):
_queue.put(initial_target)
# noinspection PyMethodMayBeStatic
def run(self):
while True:
target = _queue.get()
_thread_pool.apply(target)
if __name__ == '__main__':
def async_fun():
set_timeout(1, lambda: print(1))
set_timeout(10, lambda: print(10))
set_timeout(5, lambda: print(5))
set_timeout(3, lambda: print(3))
set_timeout(1, lambda: print(1))
set_timeout(2, lambda: print(2))
set_timeout(7, lambda: print(7))
set_timeout(4, lambda: print(4))
print("exiting from `async_fun`")
EventLoop(async_fun).run()
import time
def set_timeout(delay, callback):
time.sleep(delay)
callback()
if __name__ == '__main__':
set_timeout(1, lambda: print(1))
set_timeout(10, lambda: print(10))
set_timeout(5, lambda: print(5))
set_timeout(3, lambda: print(3))
set_timeout(1, lambda: print(1))
set_timeout(2, lambda: print(2))
set_timeout(7, lambda: print(7))
set_timeout(4, lambda: print(4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment