Skip to content

Instantly share code, notes, and snippets.

@alfuken
Created March 24, 2023 12:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alfuken/aa6ce9aa09639c9c8157dc5551515bb7 to your computer and use it in GitHub Desktop.
Save alfuken/aa6ce9aa09639c9c8157dc5551515bb7 to your computer and use it in GitHub Desktop.
Fetching and processing undefined number of pages in a threaded manner
# using https://github.com/taganaka/easy_threadpool_rb
require "easy_threadpool"
Thread.abort_on_exception = true
class ThreadsafeCounter
def initialize
@counter = 0
@mutex = Mutex.new
end
def increment = @mutex.synchronize { @counter += 1 }
def value = @counter
end
$enqueued = ThreadsafeCounter.new
$processed = ThreadsafeCounter.new
@page_batch_size = 10
@pool = Thread.pool(@page_batch_size)
@data = []
def get_and_parse_page_number(x)
# dummy code to simulate page retrieval. Returns `nil` on pages with no content
sleep rand
x <= 100 ? x.to_s : nil
end
def process_page(x)
# as soon as we've added the page to the queue, we increment the corresponding counter
$enqueued.increment
# `.process` adds the code block to the queue, which is then consumed by one of the threads
# see https://github.com/taganaka/easy_threadpool_rb/blob/656bfabdc86a317b98bd5f28185eda98235abfcc/lib/easy_threadpool/pool.rb#L53
@pool.process do
puts("processing page ##{x}")
retries = 0
begin
page_content = get_and_parse_page_number x
if page_content
@data << page_content
process_page(x + @page_batch_size)
end
# as soon as we've finished processing the page, we increment the corresponding counter
$processed.increment
rescue Exception => e
if retries < 10
retries += 1
sleep 10
retry
else # give up and move on
process_page(x + @page_batch_size)
end
end
end
end
# enqueue first N pages
(1..@page_batch_size).to_a.each do |page_n|
process_page(page_n)
end
# wait for all the enqueued pages to finish processing
sleep 0.1 until $processed.value == $enqueued.value
@pool.shutdown
puts "Verifying if the threaded data resulted in expected data:"
puts ("1".."100").to_a.sort == @data.sort
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment