Skip to content

Instantly share code, notes, and snippets.

@michelp
Created November 18, 2013 03:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michelp/7522179 to your computer and use it in GitHub Desktop.
Save michelp/7522179 to your computer and use it in GitHub Desktop.
import sys
import mmap
import tempfile
from ctypes import Structure, c_int, sizeof
from pyczmq import zmq, ffi, zctx, zsocket, zframe
import cmmap
class Point(Structure):
_fields_ = [
('x', c_int),
('y', c_int),
]
ctx = zctx.new()
fname = tempfile.mktemp()
endpoint = 'inproc://foo'
psize = sizeof(Point)
def make_some_points(numpoints):
"""Create some structures backed by the mapped file, intialize
them.
"""
f = open(fname, 'wb')
f.truncate(psize*numpoints)
for i in range(numpoints):
p = Point()
p.x = i
p.y = i
f.write(bytearray(p))
f.close()
def make_pub():
sock = zsocket.new(ctx, zmq.PUB)
zsocket.bind(sock, endpoint)
return sock
def make_sub():
sub = zsocket.new(ctx, zmq.SUB)
zsocket.set_subscribe(sub, '')
zsocket.connect(sub, endpoint)
return sub
def pub_points(sock, numpoints):
# we need a dummy dealocator
@ffi.callback('void(*)(void *, void *)')
def free(buf, hint):
return
# map the file
f = open(fname, 'r+b')
m = cmmap.mmap(
length=psize*numpoints,
prot=cmmap.PROT_READ,
fd=f.fileno(),
buffer=False)
# scan over the file, sending a message
# initialized from offsets into the map
for i in range(numpoints):
msg = ffi.new('zmq_msg_t*')
pos = psize*i
# this is a "one-less-copy" operation, the existing
# mapped data region is used by sendmsg as the source
zmq.msg_init_data(
msg,
cmmap.offset(m, pos),
psize,
free,
ffi.NULL)
zmq.sendmsg(sock, msg, 0)
def publish(numpoints):
make_some_points(numpoints)
pub = make_pub()
sub = make_sub()
pub_points(pub, numpoints)
return sub
def test():
numpoints = 1024
sub = publish(numpoints)
for i in range(numpoints):
b = zframe.recv(sub)
# also a "one-less-copy" operation, the
# data allocated for the frame is reused by
# the ctypes from_buffer constructor
p = Point.from_buffer(zframe.data(b))
assert p.x == p.y == i
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment