Skip to content

Instantly share code, notes, and snippets.

@pboling
Last active July 10, 2018 14:47
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save pboling/5834177 to your computer and use it in GitHub Desktop.
Save pboling/5834177 to your computer and use it in GitHub Desktop.
Facebook API Rate Limit Throttler using Sidekiq, does not execute the job inside the lock, to maintain some semblance of performance, just marks it in a counter, which other jobs from the same queue and using the same token will also update, and which will be throttled. Jobs from other queues will not be able to bust the lock until the timer run…
# Mixin to (i.e. include in) any worker class that does FB API calls and should be throttled.
module FacebookThrottle
def perform_throttled(*args, &block)
options = args.extract_options!
user = User.find_by_fb_uid(options[:fb_uid])
if user
if !user.valid_facebook_token? # A bitwise flag managed by flag_shih_tzu gem
puts "Skipping #{self.class} #{user.fb_uid}: Invalid Oauth Token for #{user}"
return false
end
source = self.class.get_sidekiq_options['queue']
if !user.get_oauth_lock!(source)
puts "Requeue #{self.class} #{user.fb_uid}: Oauth Token Locked for #{user} by #{user.oauth_locked_by} until #{user.oauth_locked_until} with #{user.oauth_lock_counter}"
self.requeue_myself(*args)
return false
end
begin
yield user
rescue Koala::Facebook::ClientError => e
#'Koala::Facebook::ClientError: type: OAuthException, code: 4, message: (#4) Application request limit reached [HTTP 403]'
user.oauth_lock_out!
puts "Requeue #{self.class} #{user.fb_uid}: #{e.class} #{e.message} for #{user} by #{user.oauth_locked_by} until #{user.oauth_locked_until} with #{user.oauth_lock_counter}"
self.requeue_myself(*args)
end
end
end
def requeue_myself(*args)
self.class.perform_in(15.minutes, *args)
end
def queue_source
self.class.get_sidekiq_options['queue']
end
end
class AddOauthLockToUsers < ActiveRecord::Migration
def change
## These do not need to be indexed
add_column :users, :oauth_locked_at, :datetime
add_column :users, :oauth_locked_by, :string
add_column :users, :oauth_lock_counter, :integer, default: 0, null: false
end
end
# Example of one such worker class which makes FB API calls and needs to be throttled.
# Throttling is per queue / block of time / token
# When the internal limit is reached *before facebook shuts you down* the jobs past the limit are requeued to# the future.
class ImportFacebookProfilePhotos
include Sidekiq::Worker
sidekiq_options queue: :photo
include FacebookThrottle
def perform(*args)
perform_throttled(*args, &block) do |user|
# Transaction locked user object!
# Only executed if perform_throttled hits a yield, which will only happen if it can get a lock.
# If it fails to get a lock, the job will be requeued for 15 minutes later.
# Assumption:
# Only makes one API call!
# If it makes more then use the insert an 'api_count' into the args and
# handle it in the throttle code in the User class.
user.background_photo_import
end
end
end
# The meat of the throttler.
class User
# ... More class ... snip
LOCK_FOR_AT_LEAST = 700.seconds
INTERNAL_RATE_LIMIT = 250 # requests, Facebook will shut you down at 600 reqs / 600 secs / token
LOCKOUT_FOR = 10.minutes
def get_oauth_lock!(source)
source = source.to_s
self.with_lock do
# This block is called within a transaction,
# user record is now locked, preventing updates from other parallel processes.
# 1. Has the token never been locked?
# Then we set the lock.
if self.oauth_locked_at.nil?
puts "#{self} Token Never Locked - Set Lock for #{source}!"
self.oauth_set_lock!(source)
return true
end
# 2. Is the token unlocked?
# Then we set the lock.
if !self.oauth_lock_current?
puts "#{self} Token last used by #{self.oauth_locked_by} is unlocked - Set Lock for #{source}!"
self.oauth_set_lock!(source)
return true
else
# 3. Is the token locked by same source right now?
if self.oauth_locked_by == source
if self.oauth_internal_limit_reached?
# 4 If we have reached out internal limit, then we must stop
puts "#{self} Token Locked by #{source} - Internal Limit Reached..."
return false
else
# 4 If we have *NOT* reached out internal limit, then we can proceed
puts "#{self} Token Locked by #{source} - Update Counter!"
self.oauth_update_lock!
return true
end
else
# 4. Is the token locked by another source right now?
# Then we can do nothing.
puts "#{self} Token Locked by #{self.oauth_locked_by} prevents #{source} work."
return false
end
end
end
end
def oauth_lock_current?
self.oauth_locked_until > Time.zone.now
end
def oauth_locked_until
self.oauth_locked_at + LOCK_FOR_AT_LEAST
end
def oauth_set_lock!(source)
self.oauth_locked_at = Time.zone.now
self.oauth_locked_by = source
self.oauth_lock_counter = 0
self.save!
end
def oauth_internal_limit_reached?
self.oauth_lock_counter >= INTERNAL_RATE_LIMIT
end
def oauth_update_lock!
self.oauth_lock_counter += 1 # If a job makes more than one call - then this needs to be more complex.
self.save!
end
def oauth_lock_out!
self.oauth_lock_counter = INTERNAL_RATE_LIMIT + 1
self.oauth_locked_at = Time.zone.now + LOCKOUT_FOR
self.save!
end
end
@rajagopals
Copy link

Thanks for sharing this. Very useful. Have a question though. Why do you use oauth_locked_by to restrict jobs from other queues to hit facebook? If all the jobs did the same number of queries to facebook, how would it matter which job executed the query? It seems like the rate limit is maintained by the oauth_lock_counter which keeps track of how many queries have been made...

@pboling
Copy link
Author

pboling commented Feb 9, 2014

@rajagopals, in my use case it made sense. I have my jobs divided up in a way that I want jobs of a specific type to finish first (ones that are high priority and fast) and other jobs which may monopolize the lock are less important and slower. I need to provide the results of these jobs to the front end user experience in as few seconds as possible, and had to cut up the work load, and keep the user's oauth token available to the highest priority jobs only, preventing the lower priority ones from killing my "time to usability" on the front end.

This may not apply to scenarios where a live user isn't waiting on the results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment