Skip to content

Instantly share code, notes, and snippets.

@erning
Created April 30, 2014 02:00
Show Gist options
  • Save erning/7806ab8c7c52a4091d11 to your computer and use it in GitHub Desktop.
Save erning/7806ab8c7c52a4091d11 to your computer and use it in GitHub Desktop.
PHP APS Client
<?php
/**
* Creates an APS client and returns the handler.
*
* @param $spID string
* @param $spVer string
* @param $sender string
* @return The APS client handler on success or false on failure.
*/
function aps_new($spID=null, $spVer=null, $sender=null) {
if (APSContext::$context === null) {
APSContext::$context = new ZMQContext(3, false);
}
if (APSContext::$poller === null) {
APSContext::$poller = new ZMQPoll();
}
$socket = new ZMQSocket(APSContext::$context, ZMQ::SOCKET_DEALER);
$socket->setsockopt(ZMQ::SOCKOPT_LINGER, APSContext::$linger);
$socket->setsockopt(ZMQ::SOCKOPT_SNDHWM, APSContext::$sndhwm);
$socket->setsockopt(ZMQ::SOCKOPT_RCVHWM, APSContext::$rcvhwm);
APSContext::$poller->add($socket, ZMQ::POLL_IN);
$aps = new StdClass();
$aps->socket = $socket;
$aps->spID = $spID;
$aps->spVer = $spVer;
$aps->sender = $sender;
return $aps;
}
/**
* Destroy the APS client.
*
* @param $aps the aps client handler.
*/
function aps_destroy(&$aps) {
APSContext::$poller->remove($aps->socket);
foreach ($aps->socket->getEndpoints()["connect"] as $endpoint) {
$aps->socket->disconnect($endpoint);
}
unset($aps->socket);
unset($aps);
}
/**
* Connect to endpoint.
*
* @param $aps the aps client handler.
* @param $endpoint the endpoint.
*/
function aps_connect($aps, $endpoint) {
$aps->socket->connect($endpoint);
}
/**
* Disconnect from the endpoint.
*
* @param $aps the aps client handler.
* @param $endpoint the endpoint.
*/
function aps_disconnect($aps, $endpoint) {
$aps->socket->disconnect($endpoint);
}
/**
* Send async request to service provider and returns the request reference.
*
* @param $aps the aps client handler.
* @param $method
* @param $params
* @param $expiry Expiration time of the request. If it's is null, the request
* will never expire. You can also use Unix timestamp in
* seconds or a number of seconds starting from current time,
* but in the latter case the number of seconds may not exceed
* 2,592,000 (30 days). Use float type for milliseconds.
* @param $callback The callback function or closure to be invoked on the reply
* received.
* function($result, $status, $extras);
* @param $extras The extras to be sent to service.
* @return Returns the request reference on success or flase on failure.
*/
function aps_start_request($aps, $method, $params=null, $expiry=null,
$callback=null, $extras=null) {
$ref = aps_oneway_request($aps, $method, $params, $expiry);
if ($ref !== false) {
APSContext::$pending_requests[$ref] = true;
if ($callback) {
APSContext::$pending_callbacks[$ref] = $callback;
}
}
return $ref;
}
/**
* Send an oneway request to service provider and ignore its reply.
* @see aps_start_request()
*/
function aps_oneway_request($aps, $method, $params=null, $expiry=null,
$extras=null) {
$ref = ++APSContext::$sequence;
$frames = [''];
$frames[] = APSContext::VERSION;
$frames[] = msgpack_pack([$ref, microtime(true), $expiry]);
$frames[] = $aps->spID ? ":{$aps->spID}:{$method}" : $method;
$frames[] = msgpack_pack($params);
if ($aps->sender) {
$frames[] = msgpack_pack(["Sender", $this->sender]);
}
if ($aps->spVer) {
$frames[] = msgpack_pack(["Version", $this->spVer]);
}
if ($aps->socket->sendmulti($frames, ZMQ::MODE_DONTWAIT) == false) {
// TODO: HWM!
return false;
}
if (is_array($extras)) {
foreach ($extras as $extra) {
$frames[] = msgpack_pack($extra);
}
}
return $ref;
}
/**
* Wait for replies.
*
* @param $refs a request reference or list of references to be waited. or null
* to wait for all the requests (oneway request NOT including).
* @param $timeout time out for poll in second. Use float type for milliseconds.
* null - use APSContext::$default_timeout
* -1 - wait for ever
* @return Returns the references of the received replies.
*/
function aps_wait_for_replies($refs=null, $timeout=null) {
$replies = [];
$waiting = [];
if ($refs === null) {
$refs = array_merge(
array_keys(APSContext::$pending_requests),
array_keys(APSContext::$pending_replies)
);
} elseif (!is_array($refs)) {
$refs = [$refs];
}
foreach ($refs as $ref) {
if (array_key_exists($ref, APSContext::$pending_requests)) {
$waiting[$ref] = true;
} elseif (array_key_exists($ref, APSContext::$pending_replies)) {
$reply = APSContext::$pending_replies[$ref];
$replies[$ref] = $reply;
unset(APSContext::$pending_replies[$ref]);
_aps_process_reply($reply);
}
}
$readable = $writeable = array();
if ($timeout === null) {
$timeout = APSContext::$default_timeout;
}
if ($timeout !== -1) {
$bt = microtime(true);
$timeout_millis = round($timeout * 1000);
} else {
$timeout_millis = -1;
}
while (count($waiting) > 0) {
$events = APSContext::$poller->poll($readable, $writeable,
$timeout_millis);
if ($events == 0) {
break;
}
foreach ($readable as $socket) {
while ($frames = $socket->recvmulti(ZMQ::MODE_DONTWAIT)) {
$reply = new StdClass();
$sep = array_search('', $frames);
$reply->version = $frames[$sep+1];
list($reply->sequence, $reply->timestamp, $reply->status) =
msgpack_unpack($frames[$sep+2]);
$reply->result = msgpack_unpack($frames[$sep+3]);
if (count($frames) > $sep+4) {
$extras = [];
foreach ($array_slice($frames, $sep+4) as $extra) {
$extras[] = msgpack_unpack($extra);
}
$reply->extras = $extras;
} else {
$reply->extras = null;
}
$ref = $reply->sequence;
unset(APSContext::$pending_requests[$ref]);
if (array_key_exists($ref, $waiting)) {
$replies[$ref] = $reply;
unset($waiting[$ref]);
_aps_process_reply($reply);
} else {
APSContext::$pending_replies[$ref] = $reply;
}
}
}
if ($timeout != -1) {
$timeout_millis = round(($timeout - microtime(true) + $bt) * 1000);
if ($timeout_millis <= 0) {
break;
}
}
}
return $replies;
}
//////////
class APSContext
{
const VERSION = 'APS1.2';
static $context = null;
static $poller = null;
static $pending_requests = [];
static $pending_replies = [];
static $pending_callbacks = [];
static $sequence = 1000;
static $linger = 100;
static $sndhwm = 1000;
static $rcvhwm = 1000;
static $default_timeout = 1; // dont be null
}
function _aps_process_reply($reply)
{
$ref = $reply->sequence;
if (array_key_exists($ref, APSContext::$pending_callbacks)) {
$callback = APSContext::$pending_callbacks[$ref];
unset(APSContext::$pending_callbacks[$ref]);
call_user_func_array($callback,
array($reply->result, $reply->status, $reply->extras));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment