Skip to content

Instantly share code, notes, and snippets.

@alsma
Last active June 7, 2016 15:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alsma/a59f90d9b46345b2c02f to your computer and use it in GitHub Desktop.
Save alsma/a59f90d9b46345b2c02f to your computer and use it in GitHub Desktop.
queue
/* global require, module */
var _ = require('lodash')
, Q = require('q')
, uuid = require('node-uuid')
, GET_TIMEOUT = 1e3
;
/**
* @param {RedisClient} redis
* @param {Logger} logger
* @param {object} config
*/
exports.create = function (redis, logger, config) {
return new QueueManager(redis, logger, config)
};
/**
*
* @param {RedisClient} redis
* @param {Logger} logger
* @param {object} config
* @param {string} config.name List name where tasks are stored
* @param {string?} config.data Hash name where tasks' data are stored
* @param {string?} config.inProgress Hash name where tasks in progress are stored
* @param {string?} config.retry Hash name where number of retries per task stored
* @param {number?} config.requeueInterval Timeout to mark task as stuck and requeue it
* @param {number?} config.maxRetries Max tasks retries num
*
* @constructor
*/
function QueueManager(redis, logger, config) {
this.redis = redis;
this.logger = logger;
this._queueName = config.name;
this._tasksDataHashName = config.data || this._queueName + ':data';
this._tasksInProgressHashName = config.inProgress || this._queueName + ':in_progress';
this._tasksRetryHashName = config.retry || this._queueName + ':retry';
this._requeueInterval = config.requeueInterval || 30 * 1e3;
this._maxRetries = config.maxRetries || 3;
setInterval(_.bind(this._requeueStuckTasks, this), this._requeueInterval);
}
QueueManager.prototype = {};
QueueManager.prototype.enqueue = function (data) {
var uuid4 = uuid.v4();
data = data || {};
this.redis.hset(this._tasksDataHashName, uuid4, JSON.stringify(data));
this.redis.rpush(this._queueName, uuid4);
};
QueueManager.prototype.requeue = function (taskId) {
this.logger.warn('Task was requeued. UUID: ', taskId);
this.redis.hdel(this._tasksInProgressHashName, taskId);
this.redis.rpush(this._queueName, taskId);
this.redis.hincrby(this._tasksRetryHashName, taskId, 1);
};
QueueManager.prototype.unqueue = function (taskId) {
this.logger.warn('Task was unqueued. UUID: ', taskId);
this.redis.hdel(this._tasksInProgressHashName, taskId);
this.redis.hdel(this._tasksRetryHashName, taskId);
this.redis.hdel(this._tasksDataHashName, taskId);
this.redis.lrem(this._queueName, 0, taskId);
};
//////////////////////////
// private members
QueueManager.prototype._requeueStuckTasks = function () {
var _self = this, now;
now = _.now();
var _defInProgress = Q.defer();
this.redis.hgetall(this._tasksInProgressHashName, function (err, data) {
if (err) return _defInProgress.reject(err);
_defInProgress.resolve(data || {});
});
var _defRetries = Q.defer();
this.redis.hgetall(this._tasksRetryHashName, function (err, data) {
if (err) return _defInProgress.reject(err);
_defRetries.resolve(data || {});
});
Q.all([_defInProgress.promise, _defRetries.promise])
.timeout(GET_TIMEOUT)
.spread(function (inProgress, retriesNum) {
var stuck = _.chain(inProgress)
.pick(function (startTimestamp) { return (now - startTimestamp * 1e3) >= _self._requeueInterval; })
.keys()
.partition(function (taskId) { return (retriesNum[taskId] || 0) < _self._maxRetries; })
.value();
_.each(stuck[0], _self.requeue, _self);
_.each(stuck[1], _self.unqueue, _self);
});
};
<?php
use Ramsey\Uuid\Uuid;
class Manager
{
const DEFAULT_CONSUME_TIMEOUT = 12 * 60 * 60;
const DEFAULT_BLPOP_INTERVAL = 30;
/** @var \Redis */
protected $redis;
/** @var string */
protected $queueName;
/** @var string */
protected $tasksDataHashName;
/** @var string */
protected $tasksInProgressHashName;
/** @var string */
protected $tasksRetryHashName;
/**
* @param \Redis $redis
* @param array $config Config reference: [
* 'name' => 'tasks' // list name where tasks are stored
* 'data' => 'tasks:data' // hash name where tasks' data are stored,
* 'in_progress' => 'tasks:in_progress' // hash name where tasks in progress are stored
* 'retry' => 'tasks:retry' // hash name where number of retries per task stored
* ]
*/
public function __construct(\Redis $redis, array $config)
{
$this->redis = $redis;
$this->queueName = $config['name'];
$this->tasksDataHashName = $this->getOr($config, 'data', $this->queueName);
$this->tasksInProgressHashName = $this->getOr($config, 'in_progress', $this->queueName);
$this->tasksRetryHashName = $this->getOr($config, 'retry', $this->queueName);
}
/**
* @param array $data
*/
public function enqueue(array $data)
{
$uuid4 = Uuid::uuid4();
$taskId = $uuid4->toString();
$this->redis->hSet($this->tasksDataHashName, $taskId, json_encode($data));
$this->redis->rPush($this->queueName, $taskId);
}
/**
* @return Task|null
*/
public function dequeue()
{
$taskId = $this->redis->lPop($this->queueName);
if (false === $taskId) {
return null;
}
return $this->createTask($taskId);
}
/**
* @param callable $consumer
* @param int $timeout
*/
public function consume(callable $consumer, $timeout = self::DEFAULT_CONSUME_TIMEOUT)
{
$start = time();
$continue = true;
do {
$listItem = $this->redis->blPop($this->queueName, self::DEFAULT_BLPOP_INTERVAL);
if (!empty($listItem)) {
$res = $consumer($this->createTask(end($listItem)));
$continue = $res !== false;
}
$continue &= ((time() - $start) < $timeout);
} while ($continue);
}
/**
* @param Task $task
*/
public function compete(Task $task)
{
$this->redis->hDel($this->tasksInProgressHashName, $task->getId());
$this->redis->hDel($this->tasksRetryHashName, $task->getId());
$this->redis->hDel($this->tasksDataHashName, $task->getId());
}
/**
* @param string $taskId UUID
*
* @return Task
*/
protected function createTask($taskId)
{
$data = $this->redis->hGet($this->tasksDataHashName, $taskId);
$data = false === $data ? [] : json_decode($data, true);
$this->redis->hSet($this->tasksInProgressHashName, $taskId, time());
return new Task($taskId, $data);
}
/**
* @param array $array
* @param string $key
* @param string $valueToGenerateDefault
*
* @return string
*/
protected function getOr(array $array, $key, $valueToGenerateDefault)
{
return isset($array[$key]) ? $array[$key] : sprintf('%s:%s', $valueToGenerateDefault, $key);
}
}
package alsma.aggregator.queue
import java.util.UUID
import java.io.StringWriter
import com.twitter.util.Future
import com.twitter.finagle.redis.Client
import org.jboss.netty.buffer.ChannelBuffer
import com.fasterxml.jackson.databind.ObjectMapper
import com.twitter.finagle.redis.util.{CBToString, StringToChannelBuffer}
class Queue(private val mapper: ObjectMapper, private val client: Client, private val queueName: String) {
import alsma.aggregator.queue.Queue._
private[queue] val QueueName = queueName
private[queue] val DataHashName = s"$queueName:data"
private[queue] val InProgressHashName = s"$queueName:in_progress"
private[queue] val RetryHashName = s"$queueName:retry"
setupRequeue(this)
def enqueue(msg: AnyRef): Future[String] = {
val uid = UUID.randomUUID.toString.substring(0, 8)
val writer = new StringWriter
mapper.writeValue(writer, msg)
val ops = client.hSet(DataHashName, uid, writer.toString) :: client.rPush(QueueName, uid :: Nil) :: Nil
Future join ops map {_ => uid}
}
def unqueue(uid: String) = {
client.hDel(InProgressHashName, Seq(uid))
client.hDel(RetryHashName, Seq(uid))
client.hDel(DataHashName, Seq(uid))
}
def requeue(uid: String) = {
client.hDel(InProgressHashName, Seq(uid))
client.rPush(QueueName, uid :: Nil)
client.hIncrBy(RetryHashName, uid, 1L)
}
}
object Queue {
import java.util.concurrent._
val RetryInterval = 3
val MaxRetries = 5
lazy val executor = createExecutor()
implicit def string2ChannelBuffer(value: String): ChannelBuffer = StringToChannelBuffer(value)
implicit def stringSeq2SeqChannelBuffer(value: Seq[String]): Seq[ChannelBuffer] = value.map {StringToChannelBuffer(_)}
def setupRequeue(queue: Queue): Unit = {
executor.scheduleAtFixedRate(createRequeueTask(queue), 1, 1, TimeUnit.SECONDS)
}
private[this] def createRequeueTask(queue: Queue) = {
new Runnable {
def run(): Unit = {
val now: Long = System.currentTimeMillis / 1000
import com.twitter.util.Future
Future.join(queue.client.hGetAll(queue.InProgressHashName), queue.client.hGetAll(queue.RetryHashName)).
map({ case (inProgress, retries) => (CBToString.fromTuples(inProgress), CBToString.fromTuples(retries))}).
map({ case (inProgress, retries) =>
val retryMap = retries.groupBy(_._1).mapValues(_.head._2.toInt)
val (toRequeue, toUnqueue) = inProgress.
map(element => (element._1, element._2.toInt)).
filter(now - _._2 > RetryInterval).
map(_._1).
partition(uid => retryMap.getOrElse(uid, 0) < MaxRetries)
toRequeue.foreach(queue.requeue)
toUnqueue.foreach(queue.unqueue)
})
}
}
}
private[this] def createExecutor(): ScheduledThreadPoolExecutor = {
new ScheduledThreadPoolExecutor(1, new ThreadFactory {
def newThread(r: Runnable): Thread = {
val thread = new Thread(r)
thread.setDaemon(true)
thread
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment