Skip to content

Instantly share code, notes, and snippets.

@wflanagan
Created September 28, 2011 13:12
Show Gist options
  • Save wflanagan/1247894 to your computer and use it in GitHub Desktop.
Save wflanagan/1247894 to your computer and use it in GitHub Desktop.
EM-based crawler
require 'simple_worker'
require 'eventmachine'
require 'em-http-request'
require 'nokogiri'
require 'aws'
require 'redis'
class RedEx < SimpleWorker::Base
merge_gem 'em-redis'
merge_gem 'happening'
merge_gem 'utf8_utils'
merge_gem 'aws'
merge_gem 'crewait'
unmerge '../models/domain.rb'
unmerge '../models/project.rb'
unmerge '../models/company.rb'
unmerge '../models/plan.rb'
unmerge '../models/link.rb'
unmerge '../models/page.rb'
unmerge '../models/page_link.rb'
merge "../../lib/query_builder.rb"
merge "../../lib/google_base_em.rb"
merge "../../lib/google_em.rb"
merge "../../lib/indexation_stats_mixin.rb"
merge "../../lib/yahoo_stats_mixin.rb"
merge "../../lib/bing_stats_mixin.rb"
merge "../../lib/semrush_stats_mixin.rb"
merge "../../lib/social_stats_mixin.rb"
merge "../../lib/nokogiri_doc_parser_mixin.rb"
merge "../../lib/url_validator.rb"
merge "../../lib/url_cleaner.rb"
merge "../../lib/crawler_helper/db.rb"
merge "../../lib/crawler_helper/link.rb"
merge "../../lib/cloud_crawler_helper/page_processor.rb"
merge "../../lib/cloud_crawler_helper/found_link_adder.rb"
merge "../../lib/cloud_crawler_helper/page_stat2.rb"
merge "../../lib/cloud_crawler_helper/crawler_red_ex_storage_mixin.rb"
merge "../../lib/cloud_crawler_helper/red_work_queuer.rb"
merge "../../lib/market_fu/calculator.rb"
merge "../../lib/cloud_link_helper/redis_mixin.rb"
merge "../../lib/cloud_crawler_helper/red_ex_page.rb"
merge "../models/cloud_crawler_found_link.rb"
merge "../models/cloud_crawler.rb"
merge "../models/cloud_domain.rb"
merge "../models/cloud_crawler_url.rb"
merge "../models/cloud_page.rb"
merge "redex_page_processor.rb"
merge '../models/clean/domain.rb'
merge '../models/clean/project.rb'
merge '../models/clean/company.rb'
merge '../models/clean/plan.rb'
S3_ACCESS_KEY = 'TUSA'
S3_SECRET_KEY = 'y/XhpORF1vqrxOecHj'
DO_NOT_CRAWL_TYPES = %w(.pdf .doc .xls .ppt .mp3 .mp4 .m4v .avi .mpg .rss .xml .json .txt .git .zip .md5 .asc .jpg .jpeg .gif .png)
CONCURRENT_CONNECTIONS = 50
SIMULTANEOUS_DB_CONNECTIONS = 20
REDIS_OPTIONS_HASH = {:host => "ikefish.redistogo.com", :port => 9065, :password => "360c4b698d", :thread_safe => true}
VERBOSE = true
attr_accessor :domain_id, :a, :r, :visit_key, :queued_key, :starting_url, :base_uri,
:retrieved, :error_urls, :link_queue, :db_push_queue, :s3_urls, :retries, :domain_id,
:page_jobs, :link_graph, :found_link_list, :outstanding_jobs, :completed_jobs, :link_storage,
:path_based_crawl, :crawl_limit, :is_delegated
def setup_job
@job_starting_time = Time.now
SimpleRecord.establish_connection(S3_ACCESS_KEY, S3_SECRET_KEY, :s3_bucket=> :new)
raise "There is no domain_id supplied.. Aborted !!" if domain_id.blank?
@tmp_db_push_queue = []
@status_checking = false
@is_delegated = is_delegated || false
@crawl_limit = crawl_limit || 1000
@domain_id = domain_id
@domain = CloudDomain.find(@domain_id)
@domain.crawl_finished = 'false'
@domain.already_imported = 'false'
@domain.save
@crawler = @domain.crawler
@starting_uri = URI.parse(@domain.crawl_url)
@base_uri = URI.parse(clean_url(@starting_uri.to_s))
@starting_url = @base_uri.to_s
@retrieve_beat = Time.now
@@heartbeat = Time.now
@@crawled_page_count = 0
@@connections = 0
@@db_connections = 0
@checkstatus_connections = 0
@retries = 0
@s3_urls = []
@outstanding_jobs = []
@job_ids = []
@aggregate_job_processing_duration = 0
@baseurl_uri = URI.parse(@domain.crawl_url)
@transfer_status = false
@delegating_status = false
@outstanding_jobs_transfer = false
@bucket = 'domain_storage'
if @is_delegated.eql?(false)
log "Resetting queues..."
reset_redis_queues
@crawler.set_starting_time
@crawler.set_processing_status('Initializing')
end
true
end
def reset_redis_queues
@crawler.flush_all_redis_information
return true
end
def connecting_database
log "Connecting to Database" if VERBOSE
SimpleWorker.configure do |config|
config.access_key = '6d9aefcf04552c570b239857a56a8cc3'
config.secret_key = 'b87ef0d1d047fe457c2c6381fd1d174c'
username = "uf7wrd8yebt4sj"
password = "p61kv5wfk1trt0vd3w4qfper06"
host_name = "ec2-174-129-213-125.compute-1.amazonaws.com"
database = "dm3tjkjv0whfa7j"
config.database = {
:adapter => "postgresql",
:host => host_name, # Not localhost*
:database => database,
:username => username,
:password => password
}
end
end
def run
connecting_database
log "Setting up job..."
setup_job
if @starting_url.blank?
log "Need a starting URL to crawl."
return false
end
if setup_database_queues
log "Start crawling for real domain ID : #{@domain.domain_id}"
log "Starting at #{Time.now}"
setup_delegated_data
do_process
looking_up_transfer_status
log "Ending at #{Time.now}"
log "Ok."
else
log "Error setting up database queues. Starting URL bad?"
false
end
end
def do_process
@starting_time = Time.now
log "Starting Crawl at #{@starting_time}..."
do_the_loop
looking_up_for_delegating
@ending_time = Time.now
@total_seconds = @ending_time - @starting_time
@pph = ((@@crawled_page_count / @total_seconds) * 3600.0).to_i
log "Ending loop: Total time #{@total_seconds} seconds, total urls #{@@crawled_page_count} (#{@pph} pages/hr)"
cost_estimate = ((@aggregate_job_processing_duration / 1000.0) / 3600.0) * 0.05
log "Job Time: #{@aggregate_job_processing_duration}, estimated cost $#{cost_estimate} "
end
def looking_up_for_delegating
log "looking_up_for_delegating"
if @delegating_status.eql?(true)
log "\n * Setup delegated data for next job.. "
@crawler.flush_visit_key
@crawler.flush_skipped_url
@crawler.flush_error_url
@crawler.flush_retries
@crawler.flush_queued_url
@crawler.flush_todo
@crawler.flush_write_out_key
@crawler.flush_db_push_queue_from_s3
@crawler.flush_db_push_queue_key_list
@crawler.set_crawled_count(@@crawled_page_count)
@todo.size.times.each {|f| @crawler.add_todo(@todo.pop) }
@visit_key.size.times.each {|f| @crawler.add_visit_key(@visit_key.pop) }
@skipped_urls.size.times.each {|f| @crawler.add_skipped_url(@skipped_urls.pop) }
@queued_key.each {|url| @crawler.add_queued_url(url) }
@error_urls.size.times.each {|f| @crawler.add_error_url(@error_urls.pop) }
EM.run {
@db_push_queue.size.times.each do
# @db_push_queue.pop {|x| @crawler.add_db_push_queue(x.to_json); }
@db_push_queue.pop {|x| @crawler.add_db_push_queue_to_s3(x); }
end
EM.stop
}
redex = @crawler.redex_crawler
redex.crawl_limit = @crawl_limit
redex.is_delegated = true
job_id = redex.queue(:recursive => true)
@crawler.red_ex_job_id = job_id["task_id"]
@crawler.save
log "\n * New Job : #{job_id['task_id']}"
log "\n * Delegating the process to Job ID : #{@crawler.red_ex_job_id}"
end
end
def setup_delegated_data
if @is_delegated.eql?(true)
log "setup_delegated_data"
log "\n\t * Get delegated data for last job..."
@crawler.get_visit_key.each {|url| @visit_key << url }
@crawler.get_skipped_urls.each {|url| @skipped_urls << url }
@crawler.get_queued_urls.each {|url| @queued_key << url }
@crawler.get_todo.each {|url| @todo << url }
@crawler.get_error_urls.each {|url| @error_urls << url }
@crawler.get_oustanding_jobs.each {|job_id| @outstanding_jobs << {:job_id => job_id, :redis_status => true} }
@@crawled_page_count = @crawler.get_crawled_count.to_i
@retries = @crawler.get_retries_count.to_i
@crawler.get_db_push_key_list.each do |key|
db_push_queue = @crawler.get_db_push_queue_from_s3(key)
@tmp_db_push_queue.push(db_push_queue) if db_push_queue.is_a? Hash
puts @tmp_db_push_queue.size.to_s + "-" + db_push_queue[:url]
end
end
end
def update_outstanding_jobs
jobs = @outstanding_jobs.select {|job| job[:redis_status].eql?(false) }
jobs.each do |job|
# @redis.sadd(@crawler.outstanding_jobs_redis_key, job[:job_id])
begin
@crawler.add_oustanding_jobs(job[:job_id])
@crawler.increment_total_jobs
job[:redis_status] = true
rescue Timeout::Error => e
job[:redis_status] = false
next
end
end
@crawler.set_processing_status('Processing')
@crawler.set_crawled_count(@@crawled_page_count)
@crawler.set_retries_count(@retries)
end
def looking_up_transfer_status
if @transfer_status.eql?(true)
@domain.crawl_finished = "true"
@domain.save
@real_domain = Clean::Domain.find @domain.domain_id.to_i
@real_domain.last_crawl_date = Time.now
@real_domain.next_crawl_date = @real_domain.project.company.crawl_frequency_range
@real_domain.next_page_severity_update = Time.now + 1.day rescue nil
@real_domain.save(:validate => false)
@crawler.set_finished_time
@crawler.set_processing_status('Finished')
log "Domain ID :#{@real_domain.id}"
log "Last Crawl Date : " + @real_domain.last_crawl_date.to_s
log "Next Crawl Date : " + @real_domain.next_crawl_date.to_s
end
end
def pushing_db_push_queue_into_em
log "pushing_db_push_queue_into_em"
@tmp_db_push_queue.each do |queue|
@db_push_queue.push(queue)
end
end
def do_the_loop
@crawler.set_processing_status('Processing')
EM.run do
@redis = EM::Protocols::Redis.connect REDIS_OPTIONS_HASH
@db_push_queue ||= EM::Queue.new
pushing_db_push_queue_into_em if @is_delegated.eql?(true)
@@heartbeat = Time.now
log "\nBeginning RedEx Crawler Processing Loop..."
EM.add_periodic_timer(60) {
if (Time.now - @@heartbeat) > 60
log "Exiting: Heartbeat Not Detected for more than 60 seconds."
update_outstanding_jobs
marking_crawler_as_done
EM.stop
end
if (Time.now - @job_starting_time) > 2700
log "\t Hit 45 minutes.. Delegating data.. "
@delegating_status = true
@@connections = 51
@@db_connections = 21
update_outstanding_jobs
EM.stop
end
}
EM.add_periodic_timer(60) {
if @outstanding_jobs_transfer.eql?(false)
@outstanding_jobs_transfer = true
EM.defer(proc {
update_outstanding_jobs
}, proc {
@outstanding_jobs_transfer = false
log "Outstanding jobs on Redis updated.."
})
end
}
EM.add_periodic_timer(5) {
update_logs_with_current_status
if @@db_connections.to_i < SIMULTANEOUS_DB_CONNECTIONS.to_i and @delegating_status.eql?(false)
@db_push_queue.pop {|x| write_to_db(x) rescue nil } unless @db_push_queue.blank?
else
log "\n\n\n** Redex Thinks that there are either too many simultaneous DB connections or the Delegating Status == false"
log "DB Connections: #{@@db_connections}, Delegating Status: #{@delegating_status}\n\n\n"
end
if completed_retrieval?
# log "\n* Completed Retrieval and Page Processing.."
log "\n* Completed Retrieval.."
log "\n* Stoping EM.."
update_outstanding_jobs
marking_crawler_as_done
EM.stop
end
}
EM.add_periodic_timer(1) {
if (Time.now - @retrieve_beat) > 5
unless @todo.empty? or @@connections > CONCURRENT_CONNECTIONS or @@crawled_page_count > @crawl_limit or @delegating_status.eql?(true)
retrieve(@todo.pop) if @db_push_queue.size <= 500
end
end
}
retrieve(@todo.pop) unless @todo.blank?
end
end
def setup_database_queues
begin
@queued_key = []
@todo = Queue.new
@visit_key = Queue.new
@skipped_urls = Queue.new
@error_urls = Queue.new
if @is_delegated.eql?(false)
@todo << @starting_url
@queued_key << @starting_url
end
@retrieved = []
true
rescue
false
end
end
def clean_url(found_url)
begin
a = URI.parse(found_url)
a.fragment = nil
a.path = "/" if a.path.blank?
return a.to_s
rescue => e
log "Error with #{found_url} : #{e.inspect}"
return false
end
end
def valid_scheme?(uri)
["http", "https"].include?(uri.scheme)
end
def retrieve(url)
begin
@@heartbeat = Time.now
req = EventMachine::HttpRequest.new(url).get :head => {"Accept" => "text/html", "Accept-Encoding" => "UTF-8"}
@visit_key << url
@@crawled_page_count += 1
@@connections += 1
req.callback do
@@connections -= 1
@@heartbeat = Time.now
page = RedExPage.new({:url => url, :base_uri => @base_uri, :headers => req.response_header, :code => req.response_header.status, :content => req.response})
page.callback do |page_hash|
if [200].include?(page_hash[:code])
page_hash[:links].each do |link|
@@heartbeat = Time.now
setup_new_retrieval
uri = strip_off_fragment(link) rescue next
next unless valid_scheme?(uri)
uri = to_absolute(uri)
if same_host?(uri) and in_path?(uri)
unless @queued_key.include?(uri.to_s)
link = UrlValidator.new(uri.to_s)
filetype = link.filetype.blank? ? '' : link.filetype.downcase
if DO_NOT_CRAWL_TYPES.include?(".#{filetype}")
@skipped_urls << uri.to_s
next
end
unless @queued_key.length > @crawl_limit
@todo.push(uri.to_s)
@queued_key << uri.to_s
end
end
end
end # page_hash_each
elsif [301,302,404].include?(page_hash[:code])
elsif [503].include?(page_hash[:code])
@retries += 1
@todo.push(url)
else
log "[RedEx] Code type #{page_hash[:code]} not supported."
end
if [200,301,302,404,500].include?(page_hash[:code])
@db_push_queue.push(page_hash)
end
end
end
req.errback do
@@heartbeat = Time.now
@@connections -= 1
setup_new_retrieval
if [301,302,404,500].include?(req.response_header.status)
page = RedExPage.new({:url => url, :base_uri => @base_uri, :headers => req.response_header, :code => req.response_header.status, :content => req.response})
page.callback do |page_hash|
@db_push_queue.push(page_hash)
end
elsif [503].include?(req.response_header.status)
@retries += 1
@todo.push(url)
else
@error_urls << url
end
end
rescue => e
if @@connections.eql?(0)
log "Parsing error, stopping. URL: #{url}"
EM.stop
else
log "[Error On Retrieve] => #{e.inspect}"
end
end
end
def check_done
if @todo.empty? and @@connections == 0
EM.stop
end
end
def to_absolute(uri)
uri.relative? ? @base_uri.merge(uri) : uri
end
def same_host?(uri)
@base_uri.host.eql?(uri.host)
end
def in_path?(uri)
uri.path.index(@base_uri.path).eql?(0)
end
def do_queuer_loop
log "do_queuer_loop"
log "\n\t * Starting SW queuer.."
@@db_connections = 0
EM.run do
EM.add_periodic_timer(60) do
if (Time.now - @job_starting_time) > 3000
log "\t Hit 50 minutes.. Delegating data.. "
@delegating_status = true
EM.stop
end
end
EM.add_periodic_timer(1) do
if !@s3_urls.empty?
available_db_connections = SIMULTANEOUS_DB_CONNECTIONS - @@db_connections
new_connections = if @s3_urls.size > available_db_connections
@s3_urls.size
else
available_db_connections
end
EM::Iterator.new(0..new_connections).each do |num, iter|
s3_url = @s3_urls.pop
queue_into_sw!(s3_url) unless s3_url.blank?
iter.next
end
else
EM.stop
end
end
EM.add_periodic_timer(15) do
log "S3 URLS : #{@s3_urls.size}, DB Connections : #{@@db_connections}"
if @s3_urls.empty? and @@db_connections.eql?(0)
log '* Completed SW queuer..'
EM.stop
end
end
end
end
def queue_into_sw!(s3_url)
@@heartbeat = Time.now
EM.defer(proc {
wq = RedWorkQueuer.new(@crawler.id, s3_url)
wq.callback do |obinfo|
@@heartbeat = Time.now
if obinfo["task_id"].blank?
log "Error::Queueing into SW::Task ID is blank: #{obinfo["task_id"]}"
else
@outstanding_jobs << {:job_id => obinfo["task_id"], :redis_status => false}
@job_ids << obinfo["task_id"]
end
ret = {:task_id => obinfo["task_id"], :s3_url => s3_url, :ob_info => obinfo}
ret
end
wq.errback do
log "Error::Queuing into SW failed S3URL: #{s3_url}"
end
}, proc { |hash_values|
# log "Queued: #{hash_values.inspect}"
})
end
def write_to_db(page_hash)
log "Write_To_DB starting" if VERBOSE
@@heartbeat = Time.now
@@db_connections += 1
begin
pagedigest = Digest::MD5.hexdigest(page_hash[:url])
url = page_hash[:url] + "_#{pagedigest}"
begin
marshal_dump = Marshal.dump(page_hash)
rescue => e
@@db_connections -= 1
log "Error to dump object for URL : #{page_hash[:url]}.. Skip.."
return true
end
on_error = Proc.new do |http|
@@heartbeat = Time.now
log "WriteToDb::HappeningWrite::Error::#{http.response_header.status}"
@error_urls << page_hash[:url]
@@db_connections -= 1
end
s3_url = storage_url(url)
item = Happening::S3::Item.new(@bucket, s3_url, :aws_access_key_id => S3_ACCESS_KEY, :aws_secret_access_key => S3_SECRET_KEY)
item.put(marshal_dump, :on_error => on_error) do |resp|
log "Put #{s3_url} with Happening"
@@db_connections -= 1
queue_into_sw!(s3_url) unless s3_url.blank?
end
rescue => e
if e.inspect.include?('Happening::Error')
@@db_connections -= 1
log "Error to store with Happening S3 for URL : #{page_hash[:url]}.. Skip.."
return true
else
puts e.inspect
puts e.backtrace.join("\n") if e.backtrace
end
end
if @@db_connections.to_i < SIMULTANEOUS_DB_CONNECTIONS.to_i and @delegating_status.eql?(false)
@db_push_queue.pop {|x| write_to_db(x) rescue nil }
end
end
def completed_retrieval?
if (@@crawled_page_count > @crawl_limit) and @@connections.eql?(0) and @db_push_queue.size.eql?(0) and @@db_connections.eql?(0)
true
elsif @todo.empty? and @@connections.eql?(0) and @db_push_queue.size.eql?(0) and @@db_connections.eql?(0)
true
else
false
end
end
def completed_page_processing?
if @outstanding_jobs.size > 0
false
elsif @outstanding_jobs.size == 0 && @completed_jobs.size > 0
true
else
log "Falling through condition on RedEx on completed page processing. error check"
false
end
end
def marking_crawler_as_done
# log "Setting Domain and initiating transfer of the new page data."
log "Setting Domain."
@transfer_status = true
end
def strip_off_fragment(url)
uri = URI.parse(url)
unless uri.fragment.blank?
non_fragment = uri.to_s.gsub("##{uri.fragment}", '')
uri = URI.parse(non_fragment)
end
uri.path = "/" if uri.path.blank?
return uri
end
def setup_new_retrieval
unless @todo.empty? or (@@connections > CONCURRENT_CONNECTIONS) or (@@crawled_page_count > @crawl_limit) or @delegating_status.eql?(true)
if @db_push_queue.size <= 500
@retrieve_beat = Time.now
retrieve(@todo.pop)
end
end
end
def base_key
current_time = @starting_time
day = current_time.day
month = current_time.month
year = current_time.year
base_url = @baseurl_uri.to_s.gsub("http://", "")
base_key = "#{base_url}@-@#{year}-#{month}-#{day}/"
return base_key
end
def storage_url(url)
"#{base_key}#{CGI.escape(url)}"
end
def update_logs_with_current_status
log "\n\n\n#{Time.now} - Running Time: #{Time.now - @starting_time} seconds\n"
log "-- # to write to DB: #{@db_push_queue.size}, DB Connections : #{@@db_connections}, Outstanding jobs: #{@outstanding_jobs.size}"
log "-- Crawled Count: #{@@crawled_page_count}, Visited: #{@visit_key.size}, Touched: #{@queued_key.length}, Todo: #{@todo.length}, Connections: #{@@connections}, Retries: #{@retries}, Error: #{@error_urls.size}\n\n\n"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment