Created
November 18, 2013 03:50
-
-
Save michelp/7522179 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import mmap | |
import tempfile | |
from ctypes import Structure, c_int, sizeof | |
from pyczmq import zmq, ffi, zctx, zsocket, zframe | |
import cmmap | |
class Point(Structure): | |
_fields_ = [ | |
('x', c_int), | |
('y', c_int), | |
] | |
ctx = zctx.new() | |
fname = tempfile.mktemp() | |
endpoint = 'inproc://foo' | |
psize = sizeof(Point) | |
def make_some_points(numpoints): | |
"""Create some structures backed by the mapped file, intialize | |
them. | |
""" | |
f = open(fname, 'wb') | |
f.truncate(psize*numpoints) | |
for i in range(numpoints): | |
p = Point() | |
p.x = i | |
p.y = i | |
f.write(bytearray(p)) | |
f.close() | |
def make_pub(): | |
sock = zsocket.new(ctx, zmq.PUB) | |
zsocket.bind(sock, endpoint) | |
return sock | |
def make_sub(): | |
sub = zsocket.new(ctx, zmq.SUB) | |
zsocket.set_subscribe(sub, '') | |
zsocket.connect(sub, endpoint) | |
return sub | |
def pub_points(sock, numpoints): | |
# we need a dummy dealocator | |
@ffi.callback('void(*)(void *, void *)') | |
def free(buf, hint): | |
return | |
# map the file | |
f = open(fname, 'r+b') | |
m = cmmap.mmap( | |
length=psize*numpoints, | |
prot=cmmap.PROT_READ, | |
fd=f.fileno(), | |
buffer=False) | |
# scan over the file, sending a message | |
# initialized from offsets into the map | |
for i in range(numpoints): | |
msg = ffi.new('zmq_msg_t*') | |
pos = psize*i | |
# this is a "one-less-copy" operation, the existing | |
# mapped data region is used by sendmsg as the source | |
zmq.msg_init_data( | |
msg, | |
cmmap.offset(m, pos), | |
psize, | |
free, | |
ffi.NULL) | |
zmq.sendmsg(sock, msg, 0) | |
def publish(numpoints): | |
make_some_points(numpoints) | |
pub = make_pub() | |
sub = make_sub() | |
pub_points(pub, numpoints) | |
return sub | |
def test(): | |
numpoints = 1024 | |
sub = publish(numpoints) | |
for i in range(numpoints): | |
b = zframe.recv(sub) | |
# also a "one-less-copy" operation, the | |
# data allocated for the frame is reused by | |
# the ctypes from_buffer constructor | |
p = Point.from_buffer(zframe.data(b)) | |
assert p.x == p.y == i | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment