Skip to content

Instantly share code, notes, and snippets.

@noteflakes
Last active May 11, 2021 07:13
Show Gist options
  • Save noteflakes/2beecc22eb34d2be6163567428e4aac8 to your computer and use it in GitHub Desktop.
Save noteflakes/2beecc22eb34d2be6163567428e4aac8 to your computer and use it in GitHub Desktop.
A script showing how to use Polyphony to perform HTTP requests concurrently from a queue of jobs
# "I'm losing my mind on concurrency"
# https://www.reddit.com/r/ruby/comments/lmg8xo/im_losing_my_mind_on_concurrency/
require 'polyphony'
require 'httparty'
require 'json'
CHANNELS = ['list', 'of', 'channels']
JOB_QUEUE = Queue.new
TIMEOUT = 60 # 60 second timeout
CONCURRENCY = 5
def handle_channel_response(response)
JSON.parse(response.body)['videos'].each do |v|
JOB_QUEUE << { kind: :video, url: v[:url] }
end
end
def handle_video_response(response)
# TODO: implement
end
def job_url(job)
case job[:kind]
when :channel
"https://mytube.com/channel/#{job[:name]}"
when :video
job[:url]
end
end
def perform(job)
# A timeout will raise a Polyphony::Cancel error
url = job_url(job)
response = cancel_after(TIMEOUT) { HTTParty.get(url) }
case job[:kind]
when :channel
handle_channel_response(response)
when :video
handle_video_response(response)
end
rescue Polyphony::Cancel
puts "Timeout while handling #{job.inspect}, requeuing..."
JOB_QUEUE << job
rescue => e
# decide what to do with error
if id_like_to_retry
JOB_QUEUE << job
else
puts "Error while handling #{job.inspect}: #{e.inspect}"
end
end
# Prepopulate job queue
CHANNELS.each do |c|
JOB_QUEUE << { kind: :channel, name: c }
end
# Setup workers
CONCURRENCY.times do
spin_loop do
break if job_queue.empty?
job = job_queue.shift
perform(job)
end
end
# wait for all workers to be done
Fiber.current.wait_for_all_children
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment