Skip to content

Instantly share code, notes, and snippets.

@KostyaEsmukov
Last active May 30, 2018 11:18
Show Gist options
  • Save KostyaEsmukov/55f802a638bbe64c0838f43f7c8c02c4 to your computer and use it in GitHub Desktop.
Save KostyaEsmukov/55f802a638bbe64c0838f43f7c8c02c4 to your computer and use it in GitHub Desktop.
Safe Eventlet queue across multiple threads
"""
First attempt: doesn't work.
Output:
,,,,,,,,,,,,,,,,,,,,,,,,,,,<snip>
"""
import sys
from threading import Thread
from time import sleep
import eventlet
from eventlet.queue import LightQueue as Queue
def dot(d):
sys.stdout.write(d)
sys.stdout.flush()
def inf_generator():
i = 1
while True:
yield i
i += 1
def thread_run(events):
sleep(1)
for i in inf_generator():
events.put(i)
dot(",")
def eventlet_loop(events):
prev = 0
while True:
i = events.get()
if i != prev + 1:
print(f"Skip! {prev} {i}")
prev = i
dot(".")
events = Queue()
Thread(target=thread_run, args=(events,)).start()
eventlet.spawn(eventlet_loop, events).wait()
"""
Second attempt: spawning a green thread inside the other thread. Doesn't work either.
Output:
,,,,,,,,,,,,,,,,,,,,,,,,,,,<snip>
"""
import sys
from threading import Thread
from time import sleep
import eventlet
from eventlet.queue import LightQueue as Queue
def dot(d):
sys.stdout.write(d)
sys.stdout.flush()
def inf_generator():
i = 1
while True:
yield i
i += 1
def thread_run(events):
sleep(1)
for i in inf_generator():
eventlet.spawn(events.put, i)
dot(",")
def eventlet_loop(events):
prev = 0
while True:
i = events.get()
if i != prev + 1:
print(f"Skip! {prev} {i}")
prev = i
dot(".")
events = Queue()
Thread(target=thread_run, args=(events,)).start()
eventlet.spawn(eventlet_loop, events).wait()
"""
Third attempt (finally a working one): using socket pipe for communication between the threads.
Output:
,:,:,,:,:,:,:,,,,::,,::,:,,:::::,,:,,,,,:,,:,,,::,:,,,,:,,,:::,::::,:,:::,,,:,:,:,:,:,:,:,,,,,,,,::,:,:,,,,::::::,::,:,::::::::,::,:,::::...,.......,....,...,...,,.,,,,..,,..........,...,.,................................:,:,:,::,::,:::::,:,,,,,,,,,,:,,,,,,:,,::,,::,:,,,,,::::,:::,:,:,:,:::::::,:,::,::,::,:::::,,::::,:,,:,:::,,:,:,:,:,:,,,:,,,:,:,,::,:::,:::,:,,,,,,,,,,,:::::,:,,,:,:,:,:,::,:,:,:,:,:,:,::::::::,,,:,:,::,::,,,:,:,:,::::::,:,:,:::,,:,,,:,:::,:,,,:,::,,,,:,:,:,,:,,,,::::::,::,:,,,:,:,,:,:,,,::,:::::::::.....,.,.,.,.,...,.................,............,,..............................,...,..,,..,..............,.....,..,,..,,,...,.,,........,..........,.,.,......,...,..............,,..........,::,,,:,:,:::::,,,::::,,:,:,:::::::,:,,::,::,:,,:,,,:,:,:,,:,,:,::,::,,:,,:::,,:,:::::,::::,,,,,,:,:::::::,,:,,,::,:::,,:::,:::::::::::,:,,::::::,:,::::..................................................,...,,.......................,.,..,.,............,:,:,:,,,,:,,:,:,,:,,:::::,:,,,,,,,,,,,,,,,,,,,:,:,:::::,:,,::,,::,:,,,::,::::,,:,,,:,::,,:::,,:,:::,::::,:::,:::::,:::::,,:,::::::::::,..,..............................................,,,............,.......,....,:,:::,:::,,,:,,,:,::::::,:::.....,....,,........,.::::.,...,::..,,,,,,,,,,,:,,:,,::,:,:,:,::,:,::::::,:,,:,:::,,:,,,,,::,:::,:,,::,:::::,:::,,.....................................,<snip>
"""
import socket
import struct
import sys
from threading import Thread
from time import sleep
import eventlet
from eventlet.greenio import GreenPipe, GreenSocket
from eventlet.queue import LightQueue as Queue
def dot(d):
sys.stdout.write(d)
sys.stdout.flush()
def inf_generator():
i = 1
while True:
yield i
i += 1
def thread_run(p):
sleep(1)
for i in inf_generator():
p.sendall(struct.pack("!i", i))
dot(",")
def green_pipe():
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
s2 = GreenSocket(s2)
return s1, s2
def eventlet_pipe_processing(events, p):
while True:
i, = struct.unpack("!i", p.recv(4, socket.MSG_WAITALL))
events.put(i)
dot(":")
def eventlet_loop(events):
prev = 0
while True:
i = events.get()
if i != prev + 1:
print(f"Skip! {prev} {i}")
prev = i
dot(".")
events = Queue()
a, b = green_pipe()
Thread(target=thread_run, args=(a,)).start()
gt = [
eventlet.spawn(eventlet_loop, events),
eventlet.spawn(eventlet_pipe_processing, events, b),
]
for t in gt:
t.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment