Skip to content

Instantly share code, notes, and snippets.

@sphaero
Created May 9, 2019 15:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sphaero/f5b2c89a52a1cd3c55bccd61b0ae7ff8 to your computer and use it in GitHub Desktop.
Save sphaero/f5b2c89a52a1cd3c55bccd61b0ae7ff8 to your computer and use it in GitHub Desktop.
czmq python cffi test
from czmq_cffi import *
import pdb
Zactor.test(True)
output = Zsock(0) # 0 == PAIR
output.bind(b"inproc://zmsg.test")
assert(output)
input = Zsock(0) # 0 == PAIR
input.connect(b"inproc://zmsg.test")
assert (input);
msg = Zmsg()
assert (msg);
frame = Zframe(b"Hello", 5)
assert (frame);
msg.prepend(frame)
assert( msg.size() == 1 )
assert (msg.content_size () == 5);
rc = msg.send(output)
assert (rc == 0)
print("recv")
msg2 = Zstr.recv(input)
#msg = input.recv("m");
s = native.ffi.string(msg2)
assert( s == b"Hello")
# Test send and receive of multi-frame message
msg3 = Zmsg()
assert (msg)
print(msg._p)
import time
#pdb.set_trace()
rc = msg3.addmem( b"Frame0", 6);
print("hmm")
assert (rc == 0)
rc = msg3.addmem (b"Frame1", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame2", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame3", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame4", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame5", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame6", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame7", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame8", 6);
assert (rc == 0)
rc = msg3.addmem (b"Frame9", 6);
assert (rc == 0)
copy = msg3.dup()
pycopy = Zmsg()
# very hackish
pycopy._p = utils.ffi.gc(copy, libczmq_destructors.zmsg_destroy_py)
assert (pycopy)
rc = pycopy.send (output)
assert (rc == 0)
rc = msg3.send ( output);
assert (rc == 0)
input.recv(input, b"m", utils.copy._p)
assert(copy)
assert(copy.size() == 10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment