Created
April 30, 2014 02:00
-
-
Save erning/7806ab8c7c52a4091d11 to your computer and use it in GitHub Desktop.
PHP APS Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Creates an APS client and returns the handler. | |
* | |
* @param $spID string | |
* @param $spVer string | |
* @param $sender string | |
* @return The APS client handler on success or false on failure. | |
*/ | |
function aps_new($spID=null, $spVer=null, $sender=null) { | |
if (APSContext::$context === null) { | |
APSContext::$context = new ZMQContext(3, false); | |
} | |
if (APSContext::$poller === null) { | |
APSContext::$poller = new ZMQPoll(); | |
} | |
$socket = new ZMQSocket(APSContext::$context, ZMQ::SOCKET_DEALER); | |
$socket->setsockopt(ZMQ::SOCKOPT_LINGER, APSContext::$linger); | |
$socket->setsockopt(ZMQ::SOCKOPT_SNDHWM, APSContext::$sndhwm); | |
$socket->setsockopt(ZMQ::SOCKOPT_RCVHWM, APSContext::$rcvhwm); | |
APSContext::$poller->add($socket, ZMQ::POLL_IN); | |
$aps = new StdClass(); | |
$aps->socket = $socket; | |
$aps->spID = $spID; | |
$aps->spVer = $spVer; | |
$aps->sender = $sender; | |
return $aps; | |
} | |
/** | |
* Destroy the APS client. | |
* | |
* @param $aps the aps client handler. | |
*/ | |
function aps_destroy(&$aps) { | |
APSContext::$poller->remove($aps->socket); | |
foreach ($aps->socket->getEndpoints()["connect"] as $endpoint) { | |
$aps->socket->disconnect($endpoint); | |
} | |
unset($aps->socket); | |
unset($aps); | |
} | |
/** | |
* Connect to endpoint. | |
* | |
* @param $aps the aps client handler. | |
* @param $endpoint the endpoint. | |
*/ | |
function aps_connect($aps, $endpoint) { | |
$aps->socket->connect($endpoint); | |
} | |
/** | |
* Disconnect from the endpoint. | |
* | |
* @param $aps the aps client handler. | |
* @param $endpoint the endpoint. | |
*/ | |
function aps_disconnect($aps, $endpoint) { | |
$aps->socket->disconnect($endpoint); | |
} | |
/** | |
* Send async request to service provider and returns the request reference. | |
* | |
* @param $aps the aps client handler. | |
* @param $method | |
* @param $params | |
* @param $expiry Expiration time of the request. If it's is null, the request | |
* will never expire. You can also use Unix timestamp in | |
* seconds or a number of seconds starting from current time, | |
* but in the latter case the number of seconds may not exceed | |
* 2,592,000 (30 days). Use float type for milliseconds. | |
* @param $callback The callback function or closure to be invoked on the reply | |
* received. | |
* function($result, $status, $extras); | |
* @param $extras The extras to be sent to service. | |
* @return Returns the request reference on success or flase on failure. | |
*/ | |
function aps_start_request($aps, $method, $params=null, $expiry=null, | |
$callback=null, $extras=null) { | |
$ref = aps_oneway_request($aps, $method, $params, $expiry); | |
if ($ref !== false) { | |
APSContext::$pending_requests[$ref] = true; | |
if ($callback) { | |
APSContext::$pending_callbacks[$ref] = $callback; | |
} | |
} | |
return $ref; | |
} | |
/** | |
* Send an oneway request to service provider and ignore its reply. | |
* @see aps_start_request() | |
*/ | |
function aps_oneway_request($aps, $method, $params=null, $expiry=null, | |
$extras=null) { | |
$ref = ++APSContext::$sequence; | |
$frames = ['']; | |
$frames[] = APSContext::VERSION; | |
$frames[] = msgpack_pack([$ref, microtime(true), $expiry]); | |
$frames[] = $aps->spID ? ":{$aps->spID}:{$method}" : $method; | |
$frames[] = msgpack_pack($params); | |
if ($aps->sender) { | |
$frames[] = msgpack_pack(["Sender", $this->sender]); | |
} | |
if ($aps->spVer) { | |
$frames[] = msgpack_pack(["Version", $this->spVer]); | |
} | |
if ($aps->socket->sendmulti($frames, ZMQ::MODE_DONTWAIT) == false) { | |
// TODO: HWM! | |
return false; | |
} | |
if (is_array($extras)) { | |
foreach ($extras as $extra) { | |
$frames[] = msgpack_pack($extra); | |
} | |
} | |
return $ref; | |
} | |
/** | |
* Wait for replies. | |
* | |
* @param $refs a request reference or list of references to be waited. or null | |
* to wait for all the requests (oneway request NOT including). | |
* @param $timeout time out for poll in second. Use float type for milliseconds. | |
* null - use APSContext::$default_timeout | |
* -1 - wait for ever | |
* @return Returns the references of the received replies. | |
*/ | |
function aps_wait_for_replies($refs=null, $timeout=null) { | |
$replies = []; | |
$waiting = []; | |
if ($refs === null) { | |
$refs = array_merge( | |
array_keys(APSContext::$pending_requests), | |
array_keys(APSContext::$pending_replies) | |
); | |
} elseif (!is_array($refs)) { | |
$refs = [$refs]; | |
} | |
foreach ($refs as $ref) { | |
if (array_key_exists($ref, APSContext::$pending_requests)) { | |
$waiting[$ref] = true; | |
} elseif (array_key_exists($ref, APSContext::$pending_replies)) { | |
$reply = APSContext::$pending_replies[$ref]; | |
$replies[$ref] = $reply; | |
unset(APSContext::$pending_replies[$ref]); | |
_aps_process_reply($reply); | |
} | |
} | |
$readable = $writeable = array(); | |
if ($timeout === null) { | |
$timeout = APSContext::$default_timeout; | |
} | |
if ($timeout !== -1) { | |
$bt = microtime(true); | |
$timeout_millis = round($timeout * 1000); | |
} else { | |
$timeout_millis = -1; | |
} | |
while (count($waiting) > 0) { | |
$events = APSContext::$poller->poll($readable, $writeable, | |
$timeout_millis); | |
if ($events == 0) { | |
break; | |
} | |
foreach ($readable as $socket) { | |
while ($frames = $socket->recvmulti(ZMQ::MODE_DONTWAIT)) { | |
$reply = new StdClass(); | |
$sep = array_search('', $frames); | |
$reply->version = $frames[$sep+1]; | |
list($reply->sequence, $reply->timestamp, $reply->status) = | |
msgpack_unpack($frames[$sep+2]); | |
$reply->result = msgpack_unpack($frames[$sep+3]); | |
if (count($frames) > $sep+4) { | |
$extras = []; | |
foreach ($array_slice($frames, $sep+4) as $extra) { | |
$extras[] = msgpack_unpack($extra); | |
} | |
$reply->extras = $extras; | |
} else { | |
$reply->extras = null; | |
} | |
$ref = $reply->sequence; | |
unset(APSContext::$pending_requests[$ref]); | |
if (array_key_exists($ref, $waiting)) { | |
$replies[$ref] = $reply; | |
unset($waiting[$ref]); | |
_aps_process_reply($reply); | |
} else { | |
APSContext::$pending_replies[$ref] = $reply; | |
} | |
} | |
} | |
if ($timeout != -1) { | |
$timeout_millis = round(($timeout - microtime(true) + $bt) * 1000); | |
if ($timeout_millis <= 0) { | |
break; | |
} | |
} | |
} | |
return $replies; | |
} | |
////////// | |
class APSContext | |
{ | |
const VERSION = 'APS1.2'; | |
static $context = null; | |
static $poller = null; | |
static $pending_requests = []; | |
static $pending_replies = []; | |
static $pending_callbacks = []; | |
static $sequence = 1000; | |
static $linger = 100; | |
static $sndhwm = 1000; | |
static $rcvhwm = 1000; | |
static $default_timeout = 1; // dont be null | |
} | |
function _aps_process_reply($reply) | |
{ | |
$ref = $reply->sequence; | |
if (array_key_exists($ref, APSContext::$pending_callbacks)) { | |
$callback = APSContext::$pending_callbacks[$ref]; | |
unset(APSContext::$pending_callbacks[$ref]); | |
call_user_func_array($callback, | |
array($reply->result, $reply->status, $reply->extras)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment