Skip to content

Instantly share code, notes, and snippets.

@kylemcdonald
Created January 5, 2024 21:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylemcdonald/a08166bac8c912522796e0b2d3834df8 to your computer and use it in GitHub Desktop.
Save kylemcdonald/a08166bac8c912522796e0b2d3834df8 to your computer and use it in GitHub Desktop.
import os
import socket
import time
import struct
from multiprocessing import Process
socket_path = "/tmp/ipc_benchmark_socket"
def run_server():
if os.path.exists(socket_path):
os.remove(socket_path)
server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
server.bind(socket_path)
# Set the buffer size just large enough for our message
server.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 128 * 1024)
# Wait for a message
latencies = []
for i in range(10):
data, _ = server.recvfrom(128 * 1024) # Buffer size for the payload and the timestamp
received_time = time.time_ns()
# Extract and print latency
sent_time = struct.unpack("Q", data[:8])[0] # Extract the first 8 bytes for the timestamp
latency_ns = received_time - sent_time
# convert ns to ms
latency_ms = latency_ns / 1000000
latencies.append(latency_ms)
mean_latency = sum(latencies) / len(latencies)
range_latency = max(latencies) - min(latencies)
print(f"IPC Latency for under 128kB message: {mean_latency:.2f} ms +/- {range_latency/2:.2f}ms")
server.close()
def run_client():
# We allow some time for the server to start up and set its socket options
time.sleep(0.1)
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
# Match the server's set buffer size
client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 128 * 1024)
try:
# Create a payload that uses the maximum size
payload_size = 128 * 1024 - 8 # 212992 (max message size) - 8 (size of timestamp)
payload = b'a' * payload_size # Prepare the payload
for i in range(10):
sent_time = time.time_ns()
message = struct.pack("Q", sent_time) + payload # Prefix with the timestamp
client.sendto(message, socket_path)
finally:
client.close()
if __name__ == "__main__":
# Create and start server process
server_process = Process(target=run_server)
server_process.start()
# Run client in the main process
run_client()
# Wait for the server process to finish
server_process.join()
# Clean up
if os.path.exists(socket_path):
os.remove(socket_path)
import zmq
import time
import struct
from multiprocessing import Process
socket_path = "ipc:///tmp/zmq_benchmark_socket"
def run_server():
context = zmq.Context()
server = context.socket(zmq.PAIR)
server.bind(socket_path)
latencies = []
for _ in range(10):
data = server.recv() # No need to specify buffer size with ZeroMQ
received_time = time.time_ns()
sent_time = struct.unpack("Q", data[:8])[0] # Extract the first 8 bytes for timestamp
latency_ns = received_time - sent_time
latency_ms = latency_ns / 1e6
latencies.append(latency_ms)
mean_latency = sum(latencies) / len(latencies)
range_latency = max(latencies) - min(latencies)
print(f"IPC Latency for under 128kB message: {mean_latency:.2f} ms +/- {range_latency/2:.2f} ms")
server.close()
context.term()
def run_client():
time.sleep(1) # Give server some time to bind and start
context = zmq.Context()
client = context.socket(zmq.PAIR)
client.connect(socket_path)
payload_size = 128 * 1024 - 8 # Message size less size of timestamp
payload = b'a' * payload_size
for _ in range(10):
sent_time = time.time_ns()
message = struct.pack("Q", sent_time) + payload # Combine timestamp and payload
client.send(message)
client.close()
context.term()
if __name__ == "__main__":
server_process = Process(target=run_server)
server_process.start()
run_client()
server_process.join()
import zmq
import time
import struct
from multiprocessing import Process
socket_path = "tcp://0.0.0.0:5555"
def run_server():
context = zmq.Context()
server = context.socket(zmq.PAIR)
server.bind(socket_path)
latencies = []
for _ in range(10):
data = server.recv() # No need to specify buffer size with ZeroMQ
received_time = time.time_ns()
sent_time = struct.unpack("Q", data[:8])[0] # Extract the first 8 bytes for timestamp
latency_ns = received_time - sent_time
latency_ms = latency_ns / 1e6
latencies.append(latency_ms)
mean_latency = sum(latencies) / len(latencies)
range_latency = max(latencies) - min(latencies)
print(f"TCP Latency for under 128kB message: {mean_latency:.2f} ms +/- {range_latency/2:.2f} ms")
server.close()
context.term()
def run_client():
time.sleep(1) # Give server some time to bind and start
context = zmq.Context()
client = context.socket(zmq.PAIR)
client.connect(socket_path)
payload_size = 128 * 1024 - 8 # Message size less size of timestamp
payload = b'a' * payload_size
for _ in range(10):
sent_time = time.time_ns()
message = struct.pack("Q", sent_time) + payload # Combine timestamp and payload
client.send(message)
client.close()
context.term()
if __name__ == "__main__":
server_process = Process(target=run_server)
server_process.start()
run_client()
server_process.join()
python benchmark-ipc-socket.py
python benchmark-ipc-zmq.py
python benchmark-tcp-zmq.py
IPC Latency for under 128kB message: 0.34 ms +/- 0.26ms
IPC Latency for under 128kB message: 1.18 ms +/- 0.15 ms
TCP Latency for under 128kB message: 1.42 ms +/- 0.37 ms
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment