Skip to content

Instantly share code, notes, and snippets.

@jippi
Created September 13, 2012 16:02
Show Gist options
  • Save jippi/3715342 to your computer and use it in GitHub Desktop.
Save jippi/3715342 to your computer and use it in GitHub Desktop.
GearmanD => StatsD class
$GearmanStatus = new GearmanStatus();
$GearmanStatus->setStatsdConfig(array('host' => '127.0.0.1', 'port' => 8125, 'prefix' => 'gearmand', 'rate' => 1));
$GearmanStatus->setGearmanConfig(array('host' => '127.0.0.1', 'port' => 4730));
$GearmanStatus->setDebug(true || false);
$GearmanStatus->reset();
$GearmanStatus->getCounters();
$GearmanStatus->getStatus();
$GearmanStatus->getWorkers();
$GearmanStatus->getRawStatus();
$GearmanStatus->transmitCountersToStatsD();
<?php
$lockFile = '/tmp/gearman_statsd_counters.lock';
$fp = fopen($lockFile, 'w+');
if (!flock($fp, LOCK_EX | LOCK_NB)) {
exit(-1);
}
require_once('gearman.class.php');
$GearmanStatus = new GearmanStatus();
$GearmanStatus->setDebug(false);
$GearmanStatus->setStatsdConfig(array('host' => '127.0.0.1', 'port' => 8125, 'rate' => 0.1));
$GearmanStatus->setGearmanConfig(array('host' => '127.0.0.1', 'port' => 4730));
while(true) {
$GearmanStatus->reset();
$GearmanStatus->transmitCountersToStatsD();
sleep(10);
}
<?php
/**
* Gearman Status class
*
* Used to extract data from Gearman in a StatsD format
*
* @author Christian Winther
* @copyright Nodes Agency, 2012
*/
class GearmanStatus {
/**
* The configuration for connecting to gearmand
*
* @var int
*/
protected $_gearmandConfig = array();
/**
* The configuration for transmitting counters to StatsD
*
* @var array
*/
protected $_statsdConfig = array();
/**
* Internal reference to the GearmanD socket handle
*
* @var resource
*/
protected $_gearmandHandle;
/**
* Internal reference to the StatsD socket handle
*
* @var resource
*/
protected $_statsdHandle;
/**
* Internal counters, for StatsD aggregations
*
* @var array
*/
protected $_counters = array();
/**
* Status array with the processed output from "status" and "workers"
*
* @var array
*/
protected $_status = array();
/**
* Should we debug whats going on inside the class?
*
* @var boolean
*/
protected $_debug = false;
/**
* Class constructor
*
* @param array $gearmandConfig
* @param array $statsdConfig
* @return void
*/
public function __construct($gearmandConfig = array(), $statsdConfig = array()) {
$this->setGearmanConfig($gearmandConfig);
$this->setStatsdConfig($statsdConfig);
}
/**
* Change the internal StatsD configuration
*
* Keys:
* - host: The hostname of the StatsD server is running on
* - port: The port the StatsD server is listening
* - prefix: Prefix all statsd keys with this value (don't include trailing dot)
* - rate: The sample rate for StatsD
*
* @param array $config
* @param boolean $reconnect Reconnect if we already got an active handle
* @return void
*/
public function setStatsdConfig($config, $reconnect = true) {
$this->_statsdConfig = $config + array('host' => '127.0.0.1', 'port' => 8125, 'prefix' => 'gearmand', 'rate' => 0.1);
if (!is_resource($this->_statsdHandle)) {
return;
}
$this->_disconnectStatsD();
$this->_connectToStatsD();
}
/**
* Change the internal GearmanD configuration
*
* Keys:
* - host: The hostname of the GearmanD server is running on
* - port: The port the GearmanD server is listening
*
* @param array $config
* @param boolean $reconnect Reconnect if we already got an active handle
* @return void
*/
public function setGearmanConfig($config, $reconnect = true) {
$this->_gearmandConfig = $config + array('host' => '127.0.0.1', 'port' => 4730);
if (!is_resource($this->_gearmandHandle)) {
return;
}
$this->_disconnectGearmanD();
$this->_connectTearmanD();
}
/**
* Enable or disable debugging
*
* @param boolean $enabled
* @return void
*/
public function setDebug($enabled = true) {
$this->_debug = (bool)$enabled;
}
/**
* Reset the internal state so we can re-process everything
*
* @return void
*/
public function reset() {
$this->_debug('Resetting status and counters');
$this->_status = array();
$this->_counters = array();
}
/**
* Getter for StatsD counters
*
* @return array
*/
public function getCounters() {
$this->_gearmandStatus();
return $this->_counters;
}
/**
* Get processed array with GearmanD status
*
* @return array
*/
public function getStatus() {
$this->_gearmandStatus();
return $this->_status['operations'];
}
/**
* Get processed array with GearmanD workers
*
* @return array
*/
public function getWorkers() {
$this->_gearmandWorkers();
return $this->_status['connections'];
}
/**
* Get the raw processed status parsed into PHP arrays
*
* @return array
*/
public function getRawStatus() {
$this->_gearmandStatus();
$this->_gearmandWorkers();
return $this->_status;
}
/**
* Transmit internal counters to StatsD
*
* @return void
*/
public function transmitCountersToStatsD() {
$this->_debug('Transmitting counters to StatsD');
$this->_gearmandStatus();
foreach ($this->_counters as $key => $value) {
$this->_writeStatsD(sprintf('%s:%s|c|@%.2F', $key, $value, $this->_statsdConfig['rate']));
}
}
/**
* Execute "status" command and increment counters
*
* @return void
*/
protected function _gearmandStatus() {
// Make sure we are connected
$this->_connectTearmanD();
// Don't re-count if we already got operations status
if (!empty($this->_status['operations'])) {
return;
}
// Get status from Gearmand and process
$this->_writeGearmanD('status');
while (!$this->_feofGearmanD()) {
$line = $this->_readGearmanD();
if ($line == ".\n") {
break;
}
if (preg_match("~^(.*)[ \t](\d+)[ \t](\d+)[ \t](\d+)~", $line, $matches)) {
$function = $matches[1];
$this->_incr(sprintf('functions.%s.count', $function));
$this->_incr(sprintf('functions.%s.queue', $function), $matches[2]);
$this->_incr(sprintf('functions.%s.running', $function), $matches[3]);
$this->_incr(sprintf('functions.%s.workers', $function), $matches[4]);
$this->_status['operations'][$function] = array(
'function' => $function,
'total' => $matches[2],
'running' => $matches[3],
'connectedWorkers' => $matches[4],
);
}
}
}
/**
* Execute "workers" and increment counters
*
* @return void
*/
protected function _gearmandWorkers() {
// Make sure we are connected
$this->_connectTearmanD();
// Don't re-count if we already got connection status
if (!empty($this->_status['connections'])) {
return;
}
$this->_writeGearmanD('workers');
while (!$this->_feofGearmanD()) {
$line = $this->_readGearmanD();
if ($line==".\n") {
break;
}
// FD IP-ADDRESS CLIENT-ID : FUNCTION
if (preg_match("~^(\d+)[ \t](.*?)[ \t](.*?) : ?(.*)~",$line,$matches)) {
$fd = $matches[1];
$this->_status['connections'][$fd] = array(
'fd' => $fd,
'ip' => $matches[2],
'id' => $matches[3],
'function' => explode(" ", $matches[4]),
);
}
}
}
/**
* Increment a specific field with a value of $value
*
* @param string $field The field name to increment in the counters table
* @param integer $value The value to increment the counter with
* @return void
*/
protected function _incr($field, $value = 1) {
$this->_debug(sprintf('Increment field %s by %d', $field, $value));
if (!array_key_exists($field, $this->_counters)) {
$this->_counters[$field] = $value;
return;
}
$this->_counters[$field] += $value;
}
/**
* Connect to GearmanD, if not already connected
*
* @return void
*/
protected function _connectTearmanD() {
if (is_resource($this->_gearmandHandle)) {
$this->_debug('Already connected to GearmanD');
return;
}
$this->_debug(sprintf('Connecting to GearmanD (%s:%d)', $this->_gearmandConfig['host'], $this->_gearmandConfig['port']));
$this->_gearmandHandle = fsockopen($this->_gearmandConfig['host'], $this->_gearmandConfig['port'], $errorNumber, $errorString, 30);
if (is_null($this->_gearmandHandle) || empty($this->_gearmandHandle)) {
throw new Exception(sprintf('Could not connect to GearmanD (%s [%s])', $errorString, $errorNumber));
}
}
/**
* Read from the GearmanD handle
*
* @param integer $size
* @return Returns a string of up to length - 1 bytes read from the file pointed to by handle. If there is no more data to read in the file pointer, then FALSE is returned.
*/
protected function _readGearmanD($size = 4096) {
$this->_debug(sprintf('Reading %d bytes from GearmanD', $size));
return fgets($this->_gearmandHandle, 4096);
}
/**
* Write to the GearmanD handle
*
* @param string $line
* @return The number of bytes written, or FALSE on error.
*/
protected function _writeGearmanD($line) {
$this->_debug(sprintf('Writing to Gearmand: %s', $line));
return fwrite($this->_gearmandHandle, $line . "\n");
}
/**
* Test for end-of-file on GearmanD resource
*
* @return Returns TRUE if the file pointer is at EOF or an error occurs (including socket timeout); otherwise returns FALSE.
*/
protected function _feofGearmanD() {
if (!is_resource($this->_gearmandHandle)) {
$this->_debug("Can't check feof for GearmanD, handle isn't a resource");
return false;
}
$this->_debug('Checking feof for GearmanD');
return feof($this->_gearmandHandle);
}
/**
* "Close" the GearmanD connection resource
*
* @return Returns TRUE on success or FALSE on failure.
*/
protected function _disconnectGearmanD() {
$this->_debug('Disconnecting from GearmanD');
if (!is_resource($this->_gearmandHandle)) {
$this->_debug("... GearmanD handle is not a resource, we probably don't have a connection");
return true;
}
return fclose($this->_gearmandHandle);
}
/**
* "Connect" to StatsD by UDP if we don't already have an open resource
*
* @return void
*/
protected function _connectToStatsD() {
if (is_resource($this->_statsdHandle)) {
$this->_debug('Already connected to StatsD');
return;
}
$this->_debug(sprintf('Connecting to StatsD (udp://%s:%d)', $this->_statsdConfig['host'], $this->_statsdConfig['port']));
$this->_statsdHandle = fsockopen("udp://" . $this->_statsdConfig['host'], $this->_statsdConfig['port'], $errno, $errstr);
if (is_null($this->_statsdHandle)) {
throw new Exception(sprintf('Could not connect to StatsD (%s [%s])', $errorString, $errorNumber));
}
}
/**
* Write to the StatsD handle
*
* @param string $line
* @return The number of bytes written, or FALSE on error.
*/
protected function _writeStatsD($line) {
if (!empty($this->_statsdConfig['prefix'])) {
$line = sprintf('%s.%s', $this->_statsdConfig['prefix'], $line);
}
$this->_debug(sprintf('Writing to StatsD: %s', $line));
$this->_connectToStatsD();
return fwrite($this->_statsdHandle, $line);
}
/**
* "Close" the StatsD connection resource
*
* @return Returns TRUE on success or FALSE on failure.
*/
protected function _disconnectStatsD() {
$this->_debug('Disconnecting from StatsD');
if (!is_resource($this->_statsdHandle)) {
$this->_debug("... StatsD handle is not a resource, we probably don't have a connection");
return true;
}
return fclose($this->_statsdHandle);
}
/**
* Output a debug message
*
* @param string $message
* @return void
*/
protected function _debug($message) {
if (!$this->_debug) {
return;
}
printf("[%s] - %s\n", date('r'), $message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment