Created
September 13, 2012 16:02
-
-
Save jippi/3715342 to your computer and use it in GitHub Desktop.
GearmanD => StatsD class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$GearmanStatus = new GearmanStatus(); | |
$GearmanStatus->setStatsdConfig(array('host' => '127.0.0.1', 'port' => 8125, 'prefix' => 'gearmand', 'rate' => 1)); | |
$GearmanStatus->setGearmanConfig(array('host' => '127.0.0.1', 'port' => 4730)); | |
$GearmanStatus->setDebug(true || false); | |
$GearmanStatus->reset(); | |
$GearmanStatus->getCounters(); | |
$GearmanStatus->getStatus(); | |
$GearmanStatus->getWorkers(); | |
$GearmanStatus->getRawStatus(); | |
$GearmanStatus->transmitCountersToStatsD(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$lockFile = '/tmp/gearman_statsd_counters.lock'; | |
$fp = fopen($lockFile, 'w+'); | |
if (!flock($fp, LOCK_EX | LOCK_NB)) { | |
exit(-1); | |
} | |
require_once('gearman.class.php'); | |
$GearmanStatus = new GearmanStatus(); | |
$GearmanStatus->setDebug(false); | |
$GearmanStatus->setStatsdConfig(array('host' => '127.0.0.1', 'port' => 8125, 'rate' => 0.1)); | |
$GearmanStatus->setGearmanConfig(array('host' => '127.0.0.1', 'port' => 4730)); | |
while(true) { | |
$GearmanStatus->reset(); | |
$GearmanStatus->transmitCountersToStatsD(); | |
sleep(10); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Gearman Status class | |
* | |
* Used to extract data from Gearman in a StatsD format | |
* | |
* @author Christian Winther | |
* @copyright Nodes Agency, 2012 | |
*/ | |
class GearmanStatus { | |
/** | |
* The configuration for connecting to gearmand | |
* | |
* @var int | |
*/ | |
protected $_gearmandConfig = array(); | |
/** | |
* The configuration for transmitting counters to StatsD | |
* | |
* @var array | |
*/ | |
protected $_statsdConfig = array(); | |
/** | |
* Internal reference to the GearmanD socket handle | |
* | |
* @var resource | |
*/ | |
protected $_gearmandHandle; | |
/** | |
* Internal reference to the StatsD socket handle | |
* | |
* @var resource | |
*/ | |
protected $_statsdHandle; | |
/** | |
* Internal counters, for StatsD aggregations | |
* | |
* @var array | |
*/ | |
protected $_counters = array(); | |
/** | |
* Status array with the processed output from "status" and "workers" | |
* | |
* @var array | |
*/ | |
protected $_status = array(); | |
/** | |
* Should we debug whats going on inside the class? | |
* | |
* @var boolean | |
*/ | |
protected $_debug = false; | |
/** | |
* Class constructor | |
* | |
* @param array $gearmandConfig | |
* @param array $statsdConfig | |
* @return void | |
*/ | |
public function __construct($gearmandConfig = array(), $statsdConfig = array()) { | |
$this->setGearmanConfig($gearmandConfig); | |
$this->setStatsdConfig($statsdConfig); | |
} | |
/** | |
* Change the internal StatsD configuration | |
* | |
* Keys: | |
* - host: The hostname of the StatsD server is running on | |
* - port: The port the StatsD server is listening | |
* - prefix: Prefix all statsd keys with this value (don't include trailing dot) | |
* - rate: The sample rate for StatsD | |
* | |
* @param array $config | |
* @param boolean $reconnect Reconnect if we already got an active handle | |
* @return void | |
*/ | |
public function setStatsdConfig($config, $reconnect = true) { | |
$this->_statsdConfig = $config + array('host' => '127.0.0.1', 'port' => 8125, 'prefix' => 'gearmand', 'rate' => 0.1); | |
if (!is_resource($this->_statsdHandle)) { | |
return; | |
} | |
$this->_disconnectStatsD(); | |
$this->_connectToStatsD(); | |
} | |
/** | |
* Change the internal GearmanD configuration | |
* | |
* Keys: | |
* - host: The hostname of the GearmanD server is running on | |
* - port: The port the GearmanD server is listening | |
* | |
* @param array $config | |
* @param boolean $reconnect Reconnect if we already got an active handle | |
* @return void | |
*/ | |
public function setGearmanConfig($config, $reconnect = true) { | |
$this->_gearmandConfig = $config + array('host' => '127.0.0.1', 'port' => 4730); | |
if (!is_resource($this->_gearmandHandle)) { | |
return; | |
} | |
$this->_disconnectGearmanD(); | |
$this->_connectTearmanD(); | |
} | |
/** | |
* Enable or disable debugging | |
* | |
* @param boolean $enabled | |
* @return void | |
*/ | |
public function setDebug($enabled = true) { | |
$this->_debug = (bool)$enabled; | |
} | |
/** | |
* Reset the internal state so we can re-process everything | |
* | |
* @return void | |
*/ | |
public function reset() { | |
$this->_debug('Resetting status and counters'); | |
$this->_status = array(); | |
$this->_counters = array(); | |
} | |
/** | |
* Getter for StatsD counters | |
* | |
* @return array | |
*/ | |
public function getCounters() { | |
$this->_gearmandStatus(); | |
return $this->_counters; | |
} | |
/** | |
* Get processed array with GearmanD status | |
* | |
* @return array | |
*/ | |
public function getStatus() { | |
$this->_gearmandStatus(); | |
return $this->_status['operations']; | |
} | |
/** | |
* Get processed array with GearmanD workers | |
* | |
* @return array | |
*/ | |
public function getWorkers() { | |
$this->_gearmandWorkers(); | |
return $this->_status['connections']; | |
} | |
/** | |
* Get the raw processed status parsed into PHP arrays | |
* | |
* @return array | |
*/ | |
public function getRawStatus() { | |
$this->_gearmandStatus(); | |
$this->_gearmandWorkers(); | |
return $this->_status; | |
} | |
/** | |
* Transmit internal counters to StatsD | |
* | |
* @return void | |
*/ | |
public function transmitCountersToStatsD() { | |
$this->_debug('Transmitting counters to StatsD'); | |
$this->_gearmandStatus(); | |
foreach ($this->_counters as $key => $value) { | |
$this->_writeStatsD(sprintf('%s:%s|c|@%.2F', $key, $value, $this->_statsdConfig['rate'])); | |
} | |
} | |
/** | |
* Execute "status" command and increment counters | |
* | |
* @return void | |
*/ | |
protected function _gearmandStatus() { | |
// Make sure we are connected | |
$this->_connectTearmanD(); | |
// Don't re-count if we already got operations status | |
if (!empty($this->_status['operations'])) { | |
return; | |
} | |
// Get status from Gearmand and process | |
$this->_writeGearmanD('status'); | |
while (!$this->_feofGearmanD()) { | |
$line = $this->_readGearmanD(); | |
if ($line == ".\n") { | |
break; | |
} | |
if (preg_match("~^(.*)[ \t](\d+)[ \t](\d+)[ \t](\d+)~", $line, $matches)) { | |
$function = $matches[1]; | |
$this->_incr(sprintf('functions.%s.count', $function)); | |
$this->_incr(sprintf('functions.%s.queue', $function), $matches[2]); | |
$this->_incr(sprintf('functions.%s.running', $function), $matches[3]); | |
$this->_incr(sprintf('functions.%s.workers', $function), $matches[4]); | |
$this->_status['operations'][$function] = array( | |
'function' => $function, | |
'total' => $matches[2], | |
'running' => $matches[3], | |
'connectedWorkers' => $matches[4], | |
); | |
} | |
} | |
} | |
/** | |
* Execute "workers" and increment counters | |
* | |
* @return void | |
*/ | |
protected function _gearmandWorkers() { | |
// Make sure we are connected | |
$this->_connectTearmanD(); | |
// Don't re-count if we already got connection status | |
if (!empty($this->_status['connections'])) { | |
return; | |
} | |
$this->_writeGearmanD('workers'); | |
while (!$this->_feofGearmanD()) { | |
$line = $this->_readGearmanD(); | |
if ($line==".\n") { | |
break; | |
} | |
// FD IP-ADDRESS CLIENT-ID : FUNCTION | |
if (preg_match("~^(\d+)[ \t](.*?)[ \t](.*?) : ?(.*)~",$line,$matches)) { | |
$fd = $matches[1]; | |
$this->_status['connections'][$fd] = array( | |
'fd' => $fd, | |
'ip' => $matches[2], | |
'id' => $matches[3], | |
'function' => explode(" ", $matches[4]), | |
); | |
} | |
} | |
} | |
/** | |
* Increment a specific field with a value of $value | |
* | |
* @param string $field The field name to increment in the counters table | |
* @param integer $value The value to increment the counter with | |
* @return void | |
*/ | |
protected function _incr($field, $value = 1) { | |
$this->_debug(sprintf('Increment field %s by %d', $field, $value)); | |
if (!array_key_exists($field, $this->_counters)) { | |
$this->_counters[$field] = $value; | |
return; | |
} | |
$this->_counters[$field] += $value; | |
} | |
/** | |
* Connect to GearmanD, if not already connected | |
* | |
* @return void | |
*/ | |
protected function _connectTearmanD() { | |
if (is_resource($this->_gearmandHandle)) { | |
$this->_debug('Already connected to GearmanD'); | |
return; | |
} | |
$this->_debug(sprintf('Connecting to GearmanD (%s:%d)', $this->_gearmandConfig['host'], $this->_gearmandConfig['port'])); | |
$this->_gearmandHandle = fsockopen($this->_gearmandConfig['host'], $this->_gearmandConfig['port'], $errorNumber, $errorString, 30); | |
if (is_null($this->_gearmandHandle) || empty($this->_gearmandHandle)) { | |
throw new Exception(sprintf('Could not connect to GearmanD (%s [%s])', $errorString, $errorNumber)); | |
} | |
} | |
/** | |
* Read from the GearmanD handle | |
* | |
* @param integer $size | |
* @return Returns a string of up to length - 1 bytes read from the file pointed to by handle. If there is no more data to read in the file pointer, then FALSE is returned. | |
*/ | |
protected function _readGearmanD($size = 4096) { | |
$this->_debug(sprintf('Reading %d bytes from GearmanD', $size)); | |
return fgets($this->_gearmandHandle, 4096); | |
} | |
/** | |
* Write to the GearmanD handle | |
* | |
* @param string $line | |
* @return The number of bytes written, or FALSE on error. | |
*/ | |
protected function _writeGearmanD($line) { | |
$this->_debug(sprintf('Writing to Gearmand: %s', $line)); | |
return fwrite($this->_gearmandHandle, $line . "\n"); | |
} | |
/** | |
* Test for end-of-file on GearmanD resource | |
* | |
* @return Returns TRUE if the file pointer is at EOF or an error occurs (including socket timeout); otherwise returns FALSE. | |
*/ | |
protected function _feofGearmanD() { | |
if (!is_resource($this->_gearmandHandle)) { | |
$this->_debug("Can't check feof for GearmanD, handle isn't a resource"); | |
return false; | |
} | |
$this->_debug('Checking feof for GearmanD'); | |
return feof($this->_gearmandHandle); | |
} | |
/** | |
* "Close" the GearmanD connection resource | |
* | |
* @return Returns TRUE on success or FALSE on failure. | |
*/ | |
protected function _disconnectGearmanD() { | |
$this->_debug('Disconnecting from GearmanD'); | |
if (!is_resource($this->_gearmandHandle)) { | |
$this->_debug("... GearmanD handle is not a resource, we probably don't have a connection"); | |
return true; | |
} | |
return fclose($this->_gearmandHandle); | |
} | |
/** | |
* "Connect" to StatsD by UDP if we don't already have an open resource | |
* | |
* @return void | |
*/ | |
protected function _connectToStatsD() { | |
if (is_resource($this->_statsdHandle)) { | |
$this->_debug('Already connected to StatsD'); | |
return; | |
} | |
$this->_debug(sprintf('Connecting to StatsD (udp://%s:%d)', $this->_statsdConfig['host'], $this->_statsdConfig['port'])); | |
$this->_statsdHandle = fsockopen("udp://" . $this->_statsdConfig['host'], $this->_statsdConfig['port'], $errno, $errstr); | |
if (is_null($this->_statsdHandle)) { | |
throw new Exception(sprintf('Could not connect to StatsD (%s [%s])', $errorString, $errorNumber)); | |
} | |
} | |
/** | |
* Write to the StatsD handle | |
* | |
* @param string $line | |
* @return The number of bytes written, or FALSE on error. | |
*/ | |
protected function _writeStatsD($line) { | |
if (!empty($this->_statsdConfig['prefix'])) { | |
$line = sprintf('%s.%s', $this->_statsdConfig['prefix'], $line); | |
} | |
$this->_debug(sprintf('Writing to StatsD: %s', $line)); | |
$this->_connectToStatsD(); | |
return fwrite($this->_statsdHandle, $line); | |
} | |
/** | |
* "Close" the StatsD connection resource | |
* | |
* @return Returns TRUE on success or FALSE on failure. | |
*/ | |
protected function _disconnectStatsD() { | |
$this->_debug('Disconnecting from StatsD'); | |
if (!is_resource($this->_statsdHandle)) { | |
$this->_debug("... StatsD handle is not a resource, we probably don't have a connection"); | |
return true; | |
} | |
return fclose($this->_statsdHandle); | |
} | |
/** | |
* Output a debug message | |
* | |
* @param string $message | |
* @return void | |
*/ | |
protected function _debug($message) { | |
if (!$this->_debug) { | |
return; | |
} | |
printf("[%s] - %s\n", date('r'), $message); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment