Skip to content

Instantly share code, notes, and snippets.

@christophercrouzet
Created August 24, 2016 07:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save christophercrouzet/ce2054ab1f20602bc55702e36040c99c to your computer and use it in GitHub Desktop.
Save christophercrouzet/ce2054ab1f20602bc55702e36040c99c to your computer and use it in GitHub Desktop.
Python's asynchronous queues vs synchronous pipes.
#!/usr/bin/env python3
import logging
import multiprocessing
_LOGGER = multiprocessing.log_to_stderr()
_LOGGER.setLevel(logging.DEBUG)
_USE_QUEUE = True
_ITERATIONS_MAX = 100
class State(object):
def __init__(self):
self.number = 0
self.letter = ''
self.seq = []
def __str__(self):
return ("State(number=%d, letter=%s, seq=%s, "
% (self.number, self.letter, self.seq))
def main():
if _USE_QUEUE:
queue = multiprocessing.Queue()
args = (queue, queue)
else:
pipe = multiprocessing.Pipe(duplex=False)
args = (pipe[1], pipe[0])
processes = (
multiprocessing.Process(
target=_producer,
name='producer',
args=(args[0],)
),
multiprocessing.Process(
target=_consumer,
name='consumer',
args=(args[1],)
)
)
for process in processes:
process.start()
for process in processes:
process.join()
_LOGGER.debug("exiting main")
def _producer(conn):
try:
state = State()
iteration = 0
while iteration < _ITERATIONS_MAX:
_set_initial_state(state)
_send(conn, state)
# When using queues, this call might modify the state before it is
# actually sent to the consumer process. This is because data
# is pushed through queues asynchronously in a separate thread.
# This issue doesn't apply to pipes since the push operation is
# synchronous, meaning that the sending process is blocked until
# the data is fully sent.
# On the other hand, pipes could hang if too much data is pushed
# while no other process is consuming the data, requiring both
# processes to respectively send and receive the data at the same
# time.
_modify_state(state)
iteration += 1
finally:
_send(conn, 'END')
conn.close()
def _consumer(conn):
try:
data = None
while True:
state = _receive(conn)
if state == 'END':
break
# The 'initial' state is expected to be printed out.
_LOGGER.debug(state)
finally:
conn.close()
def _set_initial_state(state):
state.number = 1
state.letter = 'a'
state.seq = ['initial']
def _modify_state(state):
state.number = 9
state.letter = 'z'
state.seq = ['modified']
def _send(conn, data):
if _USE_QUEUE :
conn.put(data, block=True)
else:
conn.send(data)
def _receive(conn):
if _USE_QUEUE :
return conn.get()
else:
return conn.recv()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment