Skip to content

Instantly share code, notes, and snippets.

@kounoike
Created January 11, 2020 07:38
Show Gist options
  • Save kounoike/31d8d23ff8e7dbe293ac8923ccfc849d to your computer and use it in GitHub Desktop.
Save kounoike/31d8d23ff8e7dbe293ac8923ccfc849d to your computer and use it in GitHub Desktop.
zmq_test
[RECV]make context
[RECV]make socket
[RECV]set_hwm
[RECV]setsockopt
[RECV]connect
[RECV]sleep
[SEND]make context
[SEND]make socket
[SEND]set_hwm
[SEND]set_sockopt
[SEND]bind
[SEND]loop send data1
[SEND] message 0
[SEND] message 1
[SEND] message 2
[SEND] message 3
[SEND] message 4
[SEND] message 5
[SEND] message 6
[SEND] message 7
[SEND] message 8
[SEND] message 9
[SEND] message 10
[SEND] message 11
[SEND] message 12
[SEND] message 13
[SEND] message 14
[SEND] message 15
[SEND] message 16
[SEND] message 17
[SEND] message 18
[SEND] message 19
[SEND] message 20
[RECV]loop recv data1
[RECV] message 0
[RECV] message 1
[RECV] message 2
[RECV] message 3
[RECV] message 4
[RECV] message 5
[RECV] message 6
[RECV] message 7
[RECV] message 8
[RECV] message 9
[RECV] message 10
[RECV] message 11
[RECV] message 12
[RECV] message 13
[SEND] message 21
[RECV] message 14
[SEND] message 22
[SEND] message 23
[SEND] message 24
[RECV] message 15
[SEND] message 25
[RECV] message 16
[RECV] message 17
[RECV] message 18
[SEND] message 26
[RECV] message 19
[SEND] message 27
[SEND] message 28
[SEND] message 29
[RECV] message 20
[SEND] message 30
[RECV] message 21
[RECV] message 22
[RECV] message 23
[SEND] message 31
[RECV] message 24
[SEND] message 32
[SEND] message 33
[SEND] message 34
[SEND] message 35
[RECV] message 25
[RECV] message 26
[RECV] message 27
[RECV] message 28
[SEND] message 36
[RECV] message 29
[SEND] message 37
[SEND] message 38
[SEND] message 39
[RECV] message 30
[SEND] message 40
[RECV] message 31
[RECV] message 32
[RECV] message 33
[SEND] message 41
[RECV] message 34
[SEND] message 42
[SEND] message 43
[SEND] message 44
[SEND] message 45
[RECV] message 35
[RECV] message 36
[RECV] message 37
[RECV] message 38
[RECV] message 39
[SEND] message 46
[SEND] message 47
[RECV] message 40
[SEND] message 48
[RECV] message 41
[SEND] message 49
[RECV] message 42
[SEND]done.
[RECV] message 43
[RECV] message 44
[RECV] message 45
[RECV] message 46
[RECV] message 47
[RECV] message 48
[RECV] message 49
[RECV]done.
Filename: test.py
Line # Mem usage Increment Line Contents
================================================
22 34.7 MiB 34.7 MiB @profile
23 def run_send():
24 34.7 MiB 0.0 MiB zeromq_ctx = zmq.Context()
25 34.7 MiB 0.0 MiB print("[SEND]make context", flush=True)
26 34.7 MiB 0.0 MiB print("[SEND]make socket", flush=True)
27 34.7 MiB 0.0 MiB sock = zeromq_ctx.socket(send_socktype)
28 34.7 MiB 0.0 MiB print("[SEND]set_hwm", flush=True)
29 35.4 MiB 0.7 MiB sock.set_hwm(send_hwm)
30 35.4 MiB 0.0 MiB print("[SEND]set_sockopt", flush=True)
31 35.4 MiB 0.0 MiB sock.setsockopt(zmq.SNDHWM, send_sndhwm)
32 35.4 MiB 0.0 MiB sock.setsockopt(zmq.RCVHWM, send_rcvhwm)
33 35.4 MiB 0.0 MiB print("[SEND]bind", flush=True)
34 35.4 MiB 0.0 MiB sock.bind(endpoint)
35
36 45.4 MiB 9.9 MiB data1 = np.full([10 * 1024 * 1024], 1, dtype=np.uint8)
37
38 45.4 MiB 0.0 MiB print("[SEND]loop send data1", flush=True)
39 165.8 MiB 0.0 MiB for i in range(loop_count):
40 165.8 MiB 0.0 MiB print(f"[SEND] message {i}", flush=True)
41 165.8 MiB 10.1 MiB sock.send_multipart([data1])
42 145.9 MiB 0.0 MiB print("[SEND]done.", flush=True)
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
black = "*"
[packages]
pyzmq = "*"
memory-profiler = "*"
numpy = "*"
[requires]
python_version = "3.6"
[pipenv]
allow_prereleases = true
import zmq
from memory_profiler import profile
import numpy as np
from multiprocessing import Process
import time
endpoint = "ipc://@/test"
sleep_time = 3
send_socktype = zmq.PUSH
recv_socktype = zmq.PULL
send_hwm = 1000
send_sndhwm = 10
send_rcvhwm = 1000
recv_hwm = 1000
recv_sndhwm = 1000
recv_rcvhwm = 10
loop_count = 50
@profile
def run_send():
zeromq_ctx = zmq.Context()
print("[SEND]make context", flush=True)
print("[SEND]make socket", flush=True)
sock = zeromq_ctx.socket(send_socktype)
print("[SEND]set_hwm", flush=True)
sock.set_hwm(send_hwm)
print("[SEND]set_sockopt", flush=True)
sock.setsockopt(zmq.SNDHWM, send_sndhwm)
sock.setsockopt(zmq.RCVHWM, send_rcvhwm)
print("[SEND]bind", flush=True)
sock.bind(endpoint)
data1 = np.full([10 * 1024 * 1024], 1, dtype=np.uint8)
print("[SEND]loop send data1", flush=True)
for i in range(loop_count):
print(f"[SEND] message {i}", flush=True)
sock.send_multipart([data1])
print("[SEND]done.", flush=True)
def run_recv():
print("[RECV]make context", flush=True)
zeromq_ctx = zmq.Context()
print("[RECV]make socket", flush=True)
sock = zeromq_ctx.socket(recv_socktype)
print("[RECV]set_hwm", flush=True)
sock.set_hwm(recv_hwm)
print("[RECV]setsockopt", flush=True)
sock.setsockopt(zmq.SNDHWM, recv_sndhwm)
sock.setsockopt(zmq.RCVHWM, recv_rcvhwm)
print("[RECV]connect", flush=True)
sock.connect(endpoint)
print("[RECV]sleep", flush=True)
time.sleep(sleep_time)
print("[RECV]loop recv data1", flush=True)
for i in range(loop_count):
print(f"[RECV] message {i}", flush=True)
sock.recv_multipart()
print("[RECV]done.", flush=True)
if __name__ == "__main__":
p = Process(target=run_recv)
p.start()
run_send()
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment