Created
September 30, 2016 03:15
-
-
Save xuanyu-h/60ae15fbdb78dfe0b5cfe1dcb5910302 to your computer and use it in GitHub Desktop.
redis queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
require 'redis' | |
require 'redis/connection/hiredis' | |
class RedisQueue | |
attr_reader :redis, :queue, :buffer, :timeout | |
attr_accessor :buffer_max | |
def initialize(queue_name, options = {}) | |
if !queue_name.is_a?(String) || queue_name.empty? | |
raise ArgumentError, "Queue name must be a non empty string" | |
end | |
@queue = "#{queue_name}:source" | |
@buffer = "#{queue_name}:buffer" | |
@redis = options.fetch(:redis, Redis.current) | |
@buffer_max = options.fetch(:conc_num, 8) | |
end | |
# Return the buffer max num | |
# | |
# @return [int] | |
alias :conc_num :buffer_max | |
# Reset the buffer max num | |
# | |
# @return [int] | |
alias :conc_num= :buffer_max= | |
# Returns the number of queue elements | |
# | |
# @return [int] | |
def length | |
redis.llen(queue) | |
end | |
alias :size :length | |
# Checks whether the queue is empty | |
# | |
# @return [true, false] | |
def empty? | |
!(length > 0) | |
end | |
alias :empty :empty? | |
# Access the first element of queue | |
# | |
# @return [Object] | |
def front | |
redis.lindex(queue, 0) | |
end | |
# Access the last element of queue | |
# | |
# @return [Object] | |
def back | |
redis.lindex(queue, -1) | |
end | |
# Access all elements of queue | |
# | |
# @return [List of Object] | |
def list | |
redis.lrange(queue, 0, -1) | |
end | |
# Access all elements of buffer | |
# | |
# @return [List of Object] | |
def runners | |
redis.lrange(buffer, 0, -1) | |
end | |
# Removes the first element of queue and push it to buffer | |
# if the buffer is fill, return nil | |
# | |
# @return [Object] | |
def pop | |
redis.rpoplpush(queue, buffer) unless buffer_is_fill? | |
end | |
# Inserts element at the end of queue and return the size of queue | |
# | |
# @return [int] | |
def push(obj) | |
redis.lpush(queue, obj) | |
end | |
alias :<< :push | |
# Remove one element of queue | |
# return false if the element not exist | |
# | |
# @return [true, false] | |
def remove(obj) | |
(redis.lrem(queue, 1, obj) != 0) ? true : false | |
end | |
alias :delete :remove | |
# Remove one element of buffer | |
# return false if the element not exist | |
# | |
# @return [true, false] | |
def commit(obj) | |
(redis.lrem(buffer, 1, obj) != 0) ? true : false | |
end | |
# Clear the queue and buffer | |
def clear | |
redis.del(queue) | |
redis.del(buffer) | |
end | |
private | |
def buffer_is_fill? | |
redis.llen(buffer) >= buffer_max | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment