Created
June 16, 2019 02:20
-
-
Save sooop/7b2808fb63c7843f314e5fb8b73ba266 to your computer and use it in GitHub Desktop.
비동기 PUSH-PULL 분산처리 구조 - onefile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process | |
import sys | |
import random | |
import zmq | |
import zmq.asyncio | |
import asyncio | |
ctx = zmq.asyncio.Context() | |
async def run_worker(protA=5556, portB=5557): | |
sock_pull = ctx.socket(zmq.PULL) | |
sock_pull.connect(f'tcp://localhost:{protA}') | |
sock_push = ctx.socket(zmq.PUSH) | |
sock_push.connect(f'tcp://localhost:{portB}' | |
while True: | |
data = await sock_pull.recv_pyobj() | |
if isinstance(data, int) or isinstance(data, float): | |
await asyncio.sleep(data * 0.01) | |
x = '+' if int(data) % 2 == 0 else '-' | |
await sock_push.send_string(x) | |
def proc_worker(portA=5556, portB=5557): | |
asyncio.run(run_worker(portA, portB)) | |
async def run_sink(port=5557): | |
sock = ctx.socket(zmq.PULL) | |
sock.bind(f'tcp://*:{port}') | |
amount = int.from_bytes((await sock.recv()), 'big') | |
print(f"GOT AMOUNT: {amount}") | |
for _ in range(amount): | |
result = await sock.recv_string() | |
sys.stdout.write(result) | |
sys.stdout.flush() | |
def proc_sink(port=5557): | |
asyncio.run(run_sink(port)) | |
async def run_vent(): | |
sock_push = ctx.socket(zmq.PUSH) | |
sock_push.bind(f'tcp://*:5556') | |
sock_cmd = ctx.socket(zmq.PUSH) | |
sock_cmd.connect('tcp://localhost:5557') | |
#input('PRESS ENTER TO START') | |
amount = 1_000 | |
await sock_cmd.send(amount.to_bytes(4, 'big')) | |
sock_cmd.close() | |
for _ in range(amount): | |
v = random.randrange(10, 200) | |
await sock_push.send_pyobj(v) | |
sock_push.close() | |
def proc_vent(): | |
asyncio.run(run_vent()) | |
if __name__ == '__main__': | |
workers = [Process(target=proc_worker) for _ in range(10)] | |
sink = Process(target=proc_sink) | |
sink.start() | |
for w in workers: | |
w.start() | |
proc_vent() | |
sink.join() | |
for w in workers: | |
w.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment