Skip to content

Instantly share code, notes, and snippets.

@ianbarber
Created June 21, 2013 15:17
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ianbarber/5831903 to your computer and use it in GitHub Desktop.
Save ianbarber/5831903 to your computer and use it in GitHub Desktop.
A noddy implementation of ZMTP 3.0 in PHP http://rfc.zeromq.org/spec:23
<?php
include_once "zmtp.php";
// This was connecting to a libzmq master based subscriber.
$host = "127.0.0.1";
$port = 4422;
// Connect and send null mechanism.
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$rc = socket_connect($socket, $host, $port);
$zmtp = new ZMTP($socket);
$zmtp->handshake();
sleep(1);
$zmtp->writeFrame("READY \x0bSocket-Type\x00\x00\x00\x04XPUB");
// Read the READY.
var_dump(bin2hex($zmtp->recvFrame()));
echo "Waiting for sub...\n";
$sub = $zmtp->recvFrame();
sleep(1);
while(true) {
$rc = $zmtp->writeFrame("Hello World");
var_dump($rc);
sleep(1);
}
<?php
include_once "zmtp.php";
// This was connecting to a libzmq master based publisher
$host = "127.0.0.1";
$port = 4422;
// Connect and send null mechanism.
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$rc = socket_connect($socket, $host, $port);
$zmtp = new ZMTP($socket);
sleep(1);
// Send greeting
$zmtp->handshake();
$zmtp->writeFrame("READY \x0bSocket-Type\x00\x00\x00\x04XSUB");
// Read the READY.
var_dump(bin2hex($zmtp->recvFrame()));
$zmtp->writeFrame("\x01");
while(true) {
$data = $zmtp->recvFrame();
var_dump($data);
}
<?php
class ZMTP {
const COMMAND = "\x04";
const MORE = "\x01";
const LARGEFRAME = "\x02";
private $socket;
public function __construct($socket) {
$this->socket = $socket;
}
public function recvFrame() {
$d = socket_read($this->socket, 2);
if(!$d) {
return false;
}
$data = socket_read($this->socket, ord($d[1]));
return $data;
}
public function writeFrame($data, $flags = "\x00") {
$cmd = $flags;
$len;
if(strlen($data) <= 256) {
$len = chr(strlen($data));
} else {
$cmd |= self::LARGEFRAME;
// Currently limiting to 4MB.
$len = pack("xxxxN", strlen($data));
}
$data = $cmd . $len . $data ;
$rc = socket_send($this->socket, $data, strlen($data), 0);
return $rc;
}
public function handshake() {
$this->sendGreeting();
$this->recvGreeting();
}
public function sendGreeting() {
$greeting = "\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x03\x00";
$rc = socket_send($this->socket, $greeting, strlen($greeting), 0);
if(!$rc) {
return false;
}
$greeting = "\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x03\x00";
$mechanism = "NULL" . str_repeat("\x00", 48);
$rc = socket_send($this->socket, $mechanism, strlen($mechanism), 0);
return $rc;
}
public function recvGreeting() {
$l = "";
while(strlen($l) < 64) {
$l .= socket_read($this->socket, 64);
}
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment