Skip to content

Instantly share code, notes, and snippets.

@sooop
Created June 16, 2019 02:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sooop/7b2808fb63c7843f314e5fb8b73ba266 to your computer and use it in GitHub Desktop.
Save sooop/7b2808fb63c7843f314e5fb8b73ba266 to your computer and use it in GitHub Desktop.
비동기 PUSH-PULL 분산처리 구조 - onefile
from multiprocessing import Process
import sys
import random
import zmq
import zmq.asyncio
import asyncio
ctx = zmq.asyncio.Context()
async def run_worker(protA=5556, portB=5557):
sock_pull = ctx.socket(zmq.PULL)
sock_pull.connect(f'tcp://localhost:{protA}')
sock_push = ctx.socket(zmq.PUSH)
sock_push.connect(f'tcp://localhost:{portB}'
while True:
data = await sock_pull.recv_pyobj()
if isinstance(data, int) or isinstance(data, float):
await asyncio.sleep(data * 0.01)
x = '+' if int(data) % 2 == 0 else '-'
await sock_push.send_string(x)
def proc_worker(portA=5556, portB=5557):
asyncio.run(run_worker(portA, portB))
async def run_sink(port=5557):
sock = ctx.socket(zmq.PULL)
sock.bind(f'tcp://*:{port}')
amount = int.from_bytes((await sock.recv()), 'big')
print(f"GOT AMOUNT: {amount}")
for _ in range(amount):
result = await sock.recv_string()
sys.stdout.write(result)
sys.stdout.flush()
def proc_sink(port=5557):
asyncio.run(run_sink(port))
async def run_vent():
sock_push = ctx.socket(zmq.PUSH)
sock_push.bind(f'tcp://*:5556')
sock_cmd = ctx.socket(zmq.PUSH)
sock_cmd.connect('tcp://localhost:5557')
#input('PRESS ENTER TO START')
amount = 1_000
await sock_cmd.send(amount.to_bytes(4, 'big'))
sock_cmd.close()
for _ in range(amount):
v = random.randrange(10, 200)
await sock_push.send_pyobj(v)
sock_push.close()
def proc_vent():
asyncio.run(run_vent())
if __name__ == '__main__':
workers = [Process(target=proc_worker) for _ in range(10)]
sink = Process(target=proc_sink)
sink.start()
for w in workers:
w.start()
proc_vent()
sink.join()
for w in workers:
w.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment