Skip to content

Instantly share code, notes, and snippets.

@shupp
Created May 7, 2011 23:38
Show Gist options
  • Save shupp/960968 to your computer and use it in GitHub Desktop.
Save shupp/960968 to your computer and use it in GitHub Desktop.
Kestrel Client Decorator
<?php
/**
* A thin kestrel client that wraps Memcached (libmemcached extension)
*
* @author Bill Shupp <hostmaster@shupp.org>
* @copyright 2010-2011 Empower Campaigns
*/
class EC_KestrelClient
{
/**
* The Memcached instance
*
* @var Memcached
*/
protected $_memcached = null;
/**
* The Kestrel server IP
*
* @var string
*/
protected $_host = '127.0.0.1';
/**
* The Kestrel server port
*
* @var string
*/
protected $_port = 22133;
/**
* Optional options, not currently used
*
* @var array
*/
protected $_options = array();
/**
* Sets the host, port, and options to be used
*
* @param string $host The host to use, defaults to 127.0.0.1
* @param int $port The port to use, defaults to 22133
* @param array $options Memcached options, not currently used
*
* @return void
*/
public function __construct(
$host = '127.0.0.1', $port = 22133, array $options = array()
)
{
$this->_host = $host;
$this->_port = $port;
$this->setOptions($options);
}
/**
* Sets job data on the queue, json_encoding the value to avoid problematic
* serialization.
*
* @param string $queue The queue name
* @param mixed $data The data to store
*
* @return bool
*/
public function set($queue, $data)
{
// Local json serialization, as kestrel doesn't send serialization flags
return $this->getMemcached()->set($queue, json_encode($data));
}
/**
* Reliably read an item off of the queue. Meant to be run in a loop, and
* call closeReliableRead() when done to make sure the final job is not left
* on the queue.
*
* @param mixed $queue The queue name to read from
* @param int $timeout The timeout to wait for a job to appear
*
* @return array|false
* @see closeReliableRead()
*/
public function reliableRead($queue, $timeout = 1000)
{
$queue = $queue . '/close/open/t=' . $timeout;
$result = $this->getMemcached()->get($queue);
if ($result === false) {
return $result;
}
// Local json serialization, as kestrel doesn't send serialization flags
return json_decode($result, true);
}
/**
* Closes any existing open read
*
* @param string $queue The queue name
*
* @return false
*/
public function closeReliableRead($queue)
{
$queue = $queue . '/close';
return $this->getMemcached()->get($queue);
}
/**
* Aborts an existing reliable read
*
* @param string $queue The queue name
*
* @return false
*/
public function abortReliableRead($queue)
{
$queue = $queue . '/abort';
return $this->getMemcached()->get($queue);
}
/**
* Set an option to be used with the Memcached client. Not used.
*
* @param string $name The option name
* @param value $value The option value
*
* @return void
*/
public function setOption($name, $value)
{
$this->_options[$name] = $value;
}
/**
* Sets multiple options
*
* @param array $options Array of key/values to set
*
* @return void
*/
public function setOptions(array $options)
{
foreach ($options as $name => $value) {
$this->setOption($name, $value);
}
}
/**
* Gets a current option's value
*
* @param string $name The option name
*
* @return mixed
*/
public function getOption($name)
{
if (isset($this->_options[$name])) {
return $this->_options[$name];
}
return null;
}
/**
* Gets all current options
*
* @return array
*/
public function getOptions()
{
return $this->_options;
}
/**
* Gets a singleton instance of the Memcached client
*
* @return Memcached
*/
public function getMemcached()
{
if ($this->_memcached === null) {
$this->_initMemcached();
}
return $this->_memcached;
}
/**
* Initialized the Memcached client instance
*
* @return void
*/
protected function _initMemcached()
{
$this->_memcached = $this->_getMemcachedInstance();
foreach ($this->_options as $option => $value) {
$this->_memcached->setOption($option, $value);
}
$this->_memcached->addServer($this->_host, $this->_port);
$this->_memcached->setOption(Memcached::OPT_COMPRESSION, false);
}
// @codeCoverageIgnoreStart
/**
* Returns a new instance of Memcached. Abstracted for testing.
*
* @return Memcached
*/
protected function _getMemcachedInstance()
{
return new Memcached();
}
// @codeCoverageIgnoreEnd
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment