Created
October 22, 2013 08:46
-
-
Save murayama/7097269 to your computer and use it in GitHub Desktop.
rubyのsidekiqが処理するqueueをpushするPHPのクラス
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace SidekiqPush; | |
/** | |
* rubyのsidekiqのphp版クライアント | |
*/ | |
class Client { | |
private $redis; | |
private $namespace; | |
private $default_item = array( | |
'retry' => false, | |
'queue' => 'default', | |
'class' => '', | |
'args' => array() | |
); | |
public function __construct($redis, $namespace = null) { | |
if (is_array($redis)) { | |
$this->redis = new \Redis(); | |
$this->redis->connect($redis['host'], $redis['port']); | |
$this->redis->select($redis['db']); | |
} elseif ($redis instanceof \Redis) { | |
$this->redis = $redis; | |
} else { | |
throw new \Exception("Redis is undefined"); | |
} | |
$this->namespace = $namespace; | |
} | |
public function perform_async($args = array()) { | |
$normed = $this->normalize_item($args); | |
return $this->push($normed); | |
} | |
public function perform_in($interval, $args = array()) { | |
$int = floatval($interval); | |
$now = microtime(true); | |
$ts = ($int < 1000000000) ? $now + $int : $int; | |
if ($ts > $now) { | |
$args['at'] = $ts; | |
} | |
$normed = $this->normalize_item($args); | |
return $this->push($normed); | |
} | |
private function push($item) { | |
$pushed = false; | |
if (array_key_exists('at', $item)) { | |
$at_key = $this->key_generate(array('schedule')); | |
$at = $item['at']; | |
unset($item['at']); | |
$pushed = $this->redis->zAdd($at_key, $at, json_encode($item)); | |
} else { | |
$set_key = $this->key_generate(array('queues')); | |
$list_key = $this->key_generate(array('queue', $item['queue'])); | |
$this->redis->sAdd($set_key, $item['queue']); | |
$pushed = $this->redis->lPush($list_key, json_encode($item)); | |
} | |
return $pushed; | |
} | |
private function compact($array) { | |
return array_filter($array, 'strlen'); | |
} | |
private function key_generate($_parts) { | |
array_unshift($_parts, $this->namespace); | |
$parts = $this->compact($_parts); | |
return implode(':', $parts); | |
} | |
private function normalize_item($item) { | |
if (!$item['class'] || !$item['args']) { | |
throw new \Exception("Message must include a class and set of arguments: " . json_encode($item)); | |
} | |
$item = array_merge($this->default_item, $item); | |
$item['jid'] = substr(sha1(mt_rand()), 0, 24); | |
$item['enqueued_at'] = microtime(true); | |
return $item; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment