Skip to content

Instantly share code, notes, and snippets.

@mholubowski
Created July 23, 2014 21:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mholubowski/466d6aea4467cf68c3b5 to your computer and use it in GitHub Desktop.
Save mholubowski/466d6aea4467cf68c3b5 to your computer and use it in GitHub Desktop.
KMR KeywordQueue
module KeywordQueue
extend self
QUEUE_KEY = 'keyword-queue'
LOCK_KEY = 'keyword-queue:lock'
DELAY_TIME = 1 # Time in seconds between scrape batches
KW_FAILURES_KEY = 'keyword_queue:keyword_failures'
KW_FAILED_QUEUE_KEY = 'keyword_queue:failed_keywords'
KW_FAILURE_THRESHOLD = 5
# Add a single Keyword to the queue for scraping
def add(keyword, priority: :low)
add_many([keyword], priority: priority)
end
# Queue jobs for keyword scraping
#
# keywords - Array[Keyword]
# priority - Symbol indicating scrape priority (:high or :low)
#
# Returns nothing
def add_many(keywords, priority: :low)
case priority
when :high
keywords.reverse.each { |kw| add_keyword_to_front(kw) }
when :low
keywords.each { |kw| add_keyword_to_back(kw) }
end
end
# All Keyword IDs currently queued for scraping
def ids
$redis.lrange(QUEUE_KEY, 0, -1).map(&:to_i)
end
# All Keywords currently queued for scraping
def keywords
Keyword.find(ids)
end
def queue_length
$redis.llen(QUEUE_KEY)
end
# Call out to our proxy pool and scrape as many keywords
# as available proxies. Failed scrapes are requed by the RubyScraper
#
# Return nothing
def scrape_next_batch(max=100)
num_ips = queue_length > max ? max : queue_length
ips = ProxyPool.reserve(num_ips, false)[:ips]
ips.each do |ip|
if keyword_id = $redis.lpop(QUEUE_KEY)
KeywordQueueWorker.perform_async(keyword_id, ip)
end
end
schedule_next_batch unless locked?
end
def schedule_next_batch
delay_for(DELAY_TIME.seconds, retry: false).scrape_next_batch
end
def abort_all!
$redis.del(QUEUE_KEY)
end
# Mark the KeywordQueue as locked and prevent scrapes from running
# Returns nothing
def lock!
$redis.set(LOCK_KEY, 1)
end
def locked?
$redis.get(LOCK_KEY).present?
end
def unlock!
$redis.del(LOCK_KEY)
schedule_next_batch
end
# Retrieve a list of IDs of failed keywords
#
# Returns Array[Integer]
def failed_keywords
$redis.lrange(KW_FAILED_QUEUE_KEY, 0, -1).map(&:to_i)
end
# Exceptions with a keyword exceeded a threshold -
# remove from scrape queue and move to dead queue for inspection
#
# keyword - Keyword instance
#
# Returns nothing
def mark_keyword_failed!(keyword)
$redis.multi do
$redis.lpush(KW_FAILED_QUEUE_KEY, keyword.id)
$redis.lrem(QUEUE_KEY, 0, keyword.id)
end
end
# Track a failure (exception) associated with a keyword and take action
# as appropriate, either re-queueing it or moving it to a dead queue
# for inspection if exceptions exceed a threshold
#
# keyword - Keyword instance
#
# Returns nothing
def handle_keyword_failure(keyword)
date = Time.now.strftime('%Y-%m-%d')
failure_key = "#{KW_FAILURES_KEY}:#{date}"
failure_count = $redis.hincrby(failure_key, keyword.id.to_s, 1)
if failure_count > KW_FAILURE_THRESHOLD
mark_keyword_failed!(keyword)
else
add(keyword, priority: :high)
end
end
private
def add_keyword_to_front(keyword)
$redis.lpush(QUEUE_KEY, keyword.id)
end
def add_keyword_to_back(keyword)
$redis.rpush(QUEUE_KEY, keyword.id)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment