Skip to content

Instantly share code, notes, and snippets.

@tomekit
Created March 6, 2019 21:39
Show Gist options
  • Save tomekit/676e93e67c0fe45d2d7f576bc0672542 to your computer and use it in GitHub Desktop.
Save tomekit/676e93e67c0fe45d2d7f576bc0672542 to your computer and use it in GitHub Desktop.
My way to implement SQS-like queue with delay / visibilityTimeout option and retry
<?php
class RedisQueueStorage
{
/**
* @var EasyRedis|Redis
*/
protected $redis;
/**
* @var string
*/
protected $queueName;
/**
* @var int (seconds)
*/
protected $visibilityTimeout;
/**
* separateBody set to true; Create hash from the message which is uniqueID; Store message content as a separate value where its hash is the key;
*
* @var boolean
*/
protected $separateBody;
public function __construct(EasyRedis $redis, $queueName, $separateBody = true) {
$this->redis = $redis;
$this->queueName = $queueName;
$this->separateBody = $separateBody;
}
/**
* @param int $visibilityTimeout
*/
public function setVisibilityTimeout(int $visibilityTimeout): void {
$this->visibilityTimeout = $visibilityTimeout;
}
/**
* @param mixed $item
*/
public function push($item) {
if ($this->separateBody) {
$hash = md5(json_encode($item));
$this->redis->setex($hash, 259200, $item); // Save object contents, where hash is the key. TTL: 259200s = 3*24*3600 = 3 days
$this->redis->zAdd($this->queueName, time(), $hash);
} else {
$this->redis->zAdd($this->queueName, time(), $item);
}
}
/**
* @param int $limit
* @return array
*/
public function pop(int $limit = 10) {
$items = $this->redis->zRangeByScore($this->queueName, 0, time(), ['limit' => [0, $limit]]); // Get all items until NOW. Don't get future items;
foreach($items as $i => $item) {
$score = time() + $this->visibilityTimeout; // This heavily relies on time being synchronized correctly between workers
$this->redis->zAdd($this->queueName.'-processing', $score, $item);
$result = $this->redis->zRem($this->queueName, $item);
if(!$result) { // Race condition
unset($items[$i]);
}
}
return $items;
}
public function requeueExpired($limit = 10) {
try {
$items = $this->redis->zRangeByScore($this->queueName.'-processing', 0, time(), ['limit' => [0, $limit]]); // Get expired hashes; This heavily relies on time being synchronized correctly between workers
foreach($items as $i => $item) {
$this->redis->zAdd($this->queueName, time()+1, $item); // Add back to main queue with 1s delay; // @TODO fix the race condition; Solution 1: use the <hash>-<retry-count> format as a key; Solution 2: Use ZADD with current timestamp + few seconds (for delayed processing).
$this->redis->zRem($this->queueName.'-processing', $item); // Remove from processing queue
}
return count($items) >= 1;
} catch(RedisException $e) {
LoggerProvider::getLogger()->error('Redis error in RedisQueueStorage: '.$e->getMessage());
}
}
/**
* @param $item
* @return mixed|null
*/
public function getMessageBodyByItem($item) {
if(!$this->separateBody) {
return $item;
}
$content = $this->redis->get($item);
if(!$content) {
LoggerProvider::getLogger()->warning("Content is empty", ['queueName' => $this->queueName, 'item' => $item, 'content' => $content]);
return null;
}
return $content;
}
public function remove($item) {
$this->redis->zRem($this->queueName.'-processing', $item); // Remove from processing queue
$this->redis->zRem($this->queueName, $item); // Remove from main queue
if($this->separateBody) {
$this->redis->del($item); // Remove body
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment