Skip to content

Instantly share code, notes, and snippets.

@phillbaker
Created January 23, 2011 19:04
Show Gist options
  • Save phillbaker/792327 to your computer and use it in GitHub Desktop.
Save phillbaker/792327 to your computer and use it in GitHub Desktop.
Basic abstracted structure of this scraper.
require 'rubygems'
require 'sqlite3'
require 'thread'
require 'digest/md5'
require 'lib/ThreadPool.rb'
#main thread that hits a sqlite db
@db = SQLite3::Database.new('test_thread.sqlite')#SQLite3::Database.new(":memory:")
#create simple table in db
@db.execute_batch('create table `table` (id integer primary key autoincrement, value text)')
@threads = ThreadPooling::ThreadPool.new(10)
#@threads.debug = true
#throw everything that needs to be written into this thread-safe queue
@work_queue = Queue.new
#store our results here
@results = {} #see http://stackoverflow.com/questions/1080993/pure-ruby-concurrent-hash/1081604#1081604 at the bottom; this might not need to be a concurrent hash
working = true
#centralize our read/writes to the db
Thread.new do #@threads.dispatch do
db = SQLite3::Database.open('test_thread.sqlite')
while working do #keep this thread running until we're done; in the real implementation, this would be forever...
#this works because even if we turn off working, we'll have stuff queued and we'll loop in the inner loop until the queue is cleared
until @work_queue.empty? do
val, key = @work_queue.pop()
db.execute("insert into `table` (value) values ('#{val}')")
#the value to reference the written value at
if key
id = db.get_first_value("SELECT last_insert_rowid()").to_s
@results[key] = id #only do this if we need to return it
end
end
end
db.close unless db.closed?
#puts 'end'
end
start = Time.now.to_i
times = []
repeats = 1001
repeats.times do |i|
beg = Time.now.to_i
key = Digest::MD5.hexdigest(Time.now.to_i.to_s + i.to_s)
@work_queue << [i.to_s, key] #the kick starting value
#we need the id that we inserted this at; maybe put it on the queue with a unique id (hash of unix time? + value to be written) and then wait until that id gets passed back?
puts 'pushed work: ' + i.to_s if i % 100 == 0
3.times do |j|
@threads.dispatch do
k = j
begin
#my_key = key.dup #keep local, threaded copy of key
#wait here for our id to come back
#could do another queue; with the db_worker pushing the ids on, and then just popping/pushing until we find the one we're looking for - performance would suck! we need something non-linear like a hash
loop do
break if @results[key]
end
#simulate work for sometime less than a second
sleep(rand(3)) #simulate a long running task
raise Exception.new("in worker #{i} - #{j}") if rand(10) == 1
@work_queue << ["#{i} - #{j}"]
#puts 'finish worker'
rescue Exception => e
#puts e
print '.'
ensure
print "\n" if j == 1000
end
end
end
#we can only do the below if all dependent threads on that id are done!
#TODO @results[my_key] = nil #keep the hash from growing infinitely; put this in and we finish queuing much quicker, but never finish...
times << Time.now.to_i - beg
end
puts 'queuing done in ' + (Time.now.to_i - start).to_s + 's'
loop do
break if @work_queue.empty? #@threads.queue.empty?
end
puts 'writing done in ' + (Time.now.to_i - start).to_s + 's'
puts 'size: ' + @threads.queue.size.to_s
working = false
@threads.join
puts 'working done in ' + (Time.now.to_i - start).to_s + 's'
sleep 10 #TODO we're losing some values, it may not matter in the broader scheme of things since we're daemonizing...
puts 'count: ' + @db.get_first_value("SELECT count(*) from `table`") + ' avg: ' + (times.inject(0){|s,o| s+= o }.to_f/repeats).to_s
@db.close unless @db.closed?
#
# This file is part of ThreadPool, a jruby or ruby-based
# thread pool manager.
# Copyright (C) 2009,2010 Daniel Bush
# This program is distributed under the terms of the MIT
# license.
# A copy of the license is
# enclosed with this project in the file LICENSE.
#
#
module ThreadPooling
# A class containing an internal Queue and pool of threads.
#
# ThreadPool uses a 'dispatch' method with a block for putting jobs on
# the queue to be processed asynchronously:
#
# tp = ThreadPool.new(5) # Create 5 threads
# tp.dispatch do
# ... your task ...
# end
#
# Or lambdas
#
# func = lambda { ... your task ... }
# tp.dispatch func
#
# In fact, any object that responds to 'call' should be ok.
class ThreadPool
require 'thread'
attr_reader :threads , :thread_count, :queue
attr_writer :debug
# Initialize a ThreadPool instance with 'num' number
# of threads.
def initialize num=1
@thread_count=0
@threads=[]
# Other option is to use ThreadGroup.
@queue = Queue.new
@mutex = Mutex.new
# Private mutex.
self.increment(num)
require 'logger'
@logger = Logger.new('log/pool.log')
end
def debug msg
@mutex.synchronize do
puts msg
end
end
# Add threads to the pool
def increment num=1
num.times do
@mutex.synchronize do
@threads.push(
Thread.new do
loop do
item = @queue.pop
#print out a bit mask of whether each thread in the pull is currently active/dead/etc.
#sleep, run, aborting, false (terminated normally), nil (terminated with exception)
#look at inspect() of each thread
#bit mask of threads running
puts @queue.size.to_s + ' ' + @threads.collect{|o| o.status.to_s[0..0] }.to_s
#bit mask of threads sleeping
#bit maks of threads dead
#@logger.info 'calling - queue size: ' + @queue.size.to_s
#begin
case item
when Array
item[0].call(*item[1])
# item[0] should be lambda;
# item[1] should be its args.
else
item.call
end
#rescue Exception => e
# puts 'Exception in caller: ' + e.to_s
# puts e.backtrace
#end
end
end
)
end
end
@thread_count+=num
end
# Remove threads from the pool
def decrement num=1
num=@thread_count if num>@thread_count
num.times do
debug "Dispatching termination command" if @debug
self.dispatch do
@mutex.synchronize do
@threads.delete(Thread.current)
end
debug "Deleting thread #{Thread.current}" if @debug
Thread.current.exit
end
end
@thread_count-=num
end
# The thread that calls this will block until
# the threads in @threads have finished.
# These threads will be terminated and the thread
# pool emptied.
def join
threads=@threads.dup
# Taking a copy here is really important!
self.decrement @thread_count
# Stop the threads or else suffer a deadlock.
threads.each do |t|
debug "joining thread #{t}" if @debug
t.join
end
end
# Dispatch jobs asynchronously.
def dispatch func=nil , args=nil , &block
if func.nil?
raise "Must be called with a block or lambda." unless block_given?
else
if args.nil?
@queue << func
else
@queue << [func,args]
end
end
if block_given?
@queue << block
#puts 'thread queue: ' + @queue.size.to_s
end
end
end
# A Queue that contains its own thread and which
# dispatches jobs synchronously.
#
# Use it like:
#
# sq = SyncQueue.new
# sq.dispatch do
# ... your task ...
# end
#
# Or
#
# sq.dispatch lambda { ... your task ... }
#
# Or
#
# sq.push lambda { ... your task ... }
class SyncQueue < Queue
def initialize
@processing=false
@stopping=false
@running=false
super
start
end
# True if 'stop' has been called but we haven't
# terminated yet.
def stopping?
@stopping
end
# True if the SyncQueue is no longer
# running. The thread for this queue is
# not in the middle of processing anything.
# The queue should be empty.
# See #terminate .
def stopped?
!@running && !@stopping && !@processing
end
# Don't process any more jobs but
# the current one; then stop the thread.
# Remaining jobs are removed from the queue
# and returned
def terminate
@running=false
@stopping=false
@left=[]
while self.size>0
@left.push self.pop
end
self << lambda{}
# Pass a blank function to unblock
# the thread so it can die.
@left
end
# Stop the thread, but allow it to finish
# processing the queue.
# The queue goes into a special state
# where it will throw an error if you try
# to add to the queue.
# The last job will terminate, allowing
# the queue to be added to at a later time.
# SyncQueue#stop is used by SyncQueue#join.
def stop
@stopping=true
self << lambda{ self.terminate }
# Pass a terminate function as final
# function on queue. Will unblock thread
# if not doing anything.
end
# True if the SyncQueue instance is not terminated
# or in a stopping state.
def running?
@running && !@stopping
end
# Fires up a new thread to process the queue.
#
# This method is automatically called when you
# instantiate.
#
# Using it to restart an existing SyncQueue instance
# has not been fully tested yet. Currently, it
# will call SyncQueue#join and go into a stopping
# state before starting up a new thread.
def start
self.join if @running
@running=true
@thread = Thread.new do
while @running
block=self.pop
@processing=true
block.call
@processing=false
end
end
end
# Dispatch jobs synchronously.
def dispatch func=nil , &block
if block_given?
self << func unless func.nil?
self << block
else
raise "Must be called with a block." if func.nil?
self << func
end
end
# Thread calling this will wait for @thread to
# finish all queued jobs and terminate @thread.
def join
self.stop
# Stop the thread or else suffer a deadlock.
@thread.join
end
# Push blocks onto the queue.
#
# Raise an error if this queue is in a stopping
# state caused by calling SyncQueue#stop.
# Note that enq and << are aliases for 'push'.
def push block
if @stopping
raise "This SyncQueue has been put into a stopping state using ThreadPool::SyncQueue#stop."
end
super
end
end
end
@phillbaker
Copy link
Author

Tried calling close() on the handles passed to the workers, but that doesn't work either.

  • need multiple threads to solve long-running work processes
  • need per thread handle to solve multi-threading issues with single handles

@phillbaker
Copy link
Author

Okay, above might work. In order to get the id that we inserted the initial value at; maybe put it on the queue with a unique id (hash of unix time?) and then wait until that id gets passed back?

@phillbaker
Copy link
Author

Okay...well now it's hanging. Run it and check the print statements, the loop do kills it...

@phillbaker
Copy link
Author

Two new problems:

  1. It's slow
  2. We're not getting everything written to the database

@phillbaker
Copy link
Author

without dispatch()

$ rm test_thread.sqlite && ruby test_structure.rb
pushed work: 0
queuing done in 33s
working done in 33s
count: 400 avg: 0.33

with dispatch

$ rm test_thread.sqlite && ruby test_structure.rb
pushed work: 0
pushed work: 100
queuing done in 9s
working done in 9s
count: 400 avg: 0.0891089108910891

@phillbaker
Copy link
Author

So...working better, I think. But now it won't complete...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment