Skip to content

Instantly share code, notes, and snippets.

@SirEdvin
Created August 3, 2018 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SirEdvin/29f1b1ef2750eea5d8e6e63daed45cc4 to your computer and use it in GitHub Desktop.
Save SirEdvin/29f1b1ef2750eea5d8e6e63daed45cc4 to your computer and use it in GitHub Desktop.
import os
import asyncio
import concurrent.futures
import random
import string
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context()
current_folder = os.path.dirname(__file__)
async def message_publisher():
s = ctx.socket(zmq.PUSH)
s.bind('ipc:///tmp/test.pip')
while True:
await asyncio.wait(
[s.send_string(
''.join(random.choices(string.ascii_uppercase + string.digits, k=40))
) for _ in range(0, 5)]
)
await asyncio.sleep(1)
s.close()
def main_message_processer(i):
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.wait(
[message_processer(i, x) for x in range(0, 3)]
)
)
async def message_processer(process_index, coro_index):
s = ctx.socket(zmq.PULL)
s.connect('ipc:///tmp/test.pip')
while True:
msg = await s.recv_string()
print(f'Process {process_index}, coro {coro_index} get message: {msg}')
s.close()
def main_function(executor):
futures = [
asyncio.wrap_future(executor.submit(main_message_processer, i))
for i in range(0, 5)
]
print('start wait')
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.gather(
message_publisher(),
*futures
)
)
if __name__ == '__main__':
executor = concurrent.futures.ProcessPoolExecutor()
main_function(executor)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment