Skip to content

Instantly share code, notes, and snippets.

@kfei
Created October 13, 2014 02:35
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kfei/5c9222dad94c3e588fe7 to your computer and use it in GitHub Desktop.
Save kfei/5c9222dad94c3e588fe7 to your computer and use it in GitHub Desktop.
A Python3 ZMQ file transfer example
# File Transfer model #3
#
# In which the client requests each chunk individually, using
# command pipelining to give us a credit-based flow control.
import os
import sys
from threading import Thread
import zmq
def zpipe(ctx):
import binascii
"""build inproc pipe for talking to threads
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
a.linger = b.linger = 0
a.hwm = b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a,b
CHUNK_SIZE = 250000
def client_thread(ctx, pipe):
dealer = ctx.socket(zmq.DEALER)
# libzmq 2/3 compatible sethwm
try:
dealer.sndhwm = dealer.rcvhwm = 1
except AtttibuteError:
dealer.hwm = 1
dealer.connect("tcp://127.0.0.1:6000")
total = 0 # Total bytes received
chunks = 0 # Total chunks received
while True:
# Ask for next chunk
if sys.version_info >= (3, 0):
dealer.send_multipart([
"fetch".encode(),
total.to_bytes((total.bit_length() // 8) + 1,
byteorder='little'),
CHUNK_SIZE.to_bytes((CHUNK_SIZE.bit_length() // 8) + 1,
byteorder='little')
])
else:
dealer.send_multipart([
b"fetch",
b"%i" % total,
b"%i" % CHUNK_SIZE
])
try:
chunk = dealer.recv()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
return # Shutting down, quit
else:
raise
chunks += 1
size = len(chunk)
total += size
if size < CHUNK_SIZE:
break # Last chunk received; exit
print("%i chunks received, %i bytes" % (chunks, total))
pipe.send(b"OK")
# .split File server thread
# The server thread waits for a chunk request from a client,
# reads that chunk and sends it back to the client:
def server_thread(ctx):
file = open("testdata", "rb")
router = ctx.socket(zmq.ROUTER)
router.bind("tcp://*:6000")
while True:
# First frame in each message is the sender identity
# Second frame is "fetch" command
try:
msg = router.recv_multipart()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
return # Shutting down, quit
else:
raise
identity, command, offset_str, chunksz_str = msg
assert command == b"fetch"
if sys.version_info >= (3, 0):
offset = int.from_bytes(offset_str, 'little')
chunksz = int.from_bytes(chunksz_str, 'little')
else:
offset = int(offset_str)
chunksz = int(chunksz_str)
# Read chunk of data from file
file.seek(offset, os.SEEK_SET)
data = file.read(chunksz)
# Send resulting chunk to client
router.send_multipart([identity, data])
# The main task is just the same as in the first model.
# .skip
def main():
# Start child threads
ctx = zmq.Context()
a, b = zpipe(ctx)
client = Thread(target=client_thread, args=(ctx, b))
server = Thread(target=server_thread, args=(ctx,))
client.start()
server.start()
# Loop until client tells us it's done
try:
print(a.recv())
except KeyboardInterrupt:
pass
del a, b
ctx.term()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment