Created
September 28, 2011 13:12
-
-
Save wflanagan/1247894 to your computer and use it in GitHub Desktop.
EM-based crawler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'simple_worker' | |
require 'eventmachine' | |
require 'em-http-request' | |
require 'nokogiri' | |
require 'aws' | |
require 'redis' | |
class RedEx < SimpleWorker::Base | |
merge_gem 'em-redis' | |
merge_gem 'happening' | |
merge_gem 'utf8_utils' | |
merge_gem 'aws' | |
merge_gem 'crewait' | |
unmerge '../models/domain.rb' | |
unmerge '../models/project.rb' | |
unmerge '../models/company.rb' | |
unmerge '../models/plan.rb' | |
unmerge '../models/link.rb' | |
unmerge '../models/page.rb' | |
unmerge '../models/page_link.rb' | |
merge "../../lib/query_builder.rb" | |
merge "../../lib/google_base_em.rb" | |
merge "../../lib/google_em.rb" | |
merge "../../lib/indexation_stats_mixin.rb" | |
merge "../../lib/yahoo_stats_mixin.rb" | |
merge "../../lib/bing_stats_mixin.rb" | |
merge "../../lib/semrush_stats_mixin.rb" | |
merge "../../lib/social_stats_mixin.rb" | |
merge "../../lib/nokogiri_doc_parser_mixin.rb" | |
merge "../../lib/url_validator.rb" | |
merge "../../lib/url_cleaner.rb" | |
merge "../../lib/crawler_helper/db.rb" | |
merge "../../lib/crawler_helper/link.rb" | |
merge "../../lib/cloud_crawler_helper/page_processor.rb" | |
merge "../../lib/cloud_crawler_helper/found_link_adder.rb" | |
merge "../../lib/cloud_crawler_helper/page_stat2.rb" | |
merge "../../lib/cloud_crawler_helper/crawler_red_ex_storage_mixin.rb" | |
merge "../../lib/cloud_crawler_helper/red_work_queuer.rb" | |
merge "../../lib/market_fu/calculator.rb" | |
merge "../../lib/cloud_link_helper/redis_mixin.rb" | |
merge "../../lib/cloud_crawler_helper/red_ex_page.rb" | |
merge "../models/cloud_crawler_found_link.rb" | |
merge "../models/cloud_crawler.rb" | |
merge "../models/cloud_domain.rb" | |
merge "../models/cloud_crawler_url.rb" | |
merge "../models/cloud_page.rb" | |
merge "redex_page_processor.rb" | |
merge '../models/clean/domain.rb' | |
merge '../models/clean/project.rb' | |
merge '../models/clean/company.rb' | |
merge '../models/clean/plan.rb' | |
S3_ACCESS_KEY = 'TUSA' | |
S3_SECRET_KEY = 'y/XhpORF1vqrxOecHj' | |
DO_NOT_CRAWL_TYPES = %w(.pdf .doc .xls .ppt .mp3 .mp4 .m4v .avi .mpg .rss .xml .json .txt .git .zip .md5 .asc .jpg .jpeg .gif .png) | |
CONCURRENT_CONNECTIONS = 50 | |
SIMULTANEOUS_DB_CONNECTIONS = 20 | |
REDIS_OPTIONS_HASH = {:host => "ikefish.redistogo.com", :port => 9065, :password => "360c4b698d", :thread_safe => true} | |
VERBOSE = true | |
attr_accessor :domain_id, :a, :r, :visit_key, :queued_key, :starting_url, :base_uri, | |
:retrieved, :error_urls, :link_queue, :db_push_queue, :s3_urls, :retries, :domain_id, | |
:page_jobs, :link_graph, :found_link_list, :outstanding_jobs, :completed_jobs, :link_storage, | |
:path_based_crawl, :crawl_limit, :is_delegated | |
def setup_job | |
@job_starting_time = Time.now | |
SimpleRecord.establish_connection(S3_ACCESS_KEY, S3_SECRET_KEY, :s3_bucket=> :new) | |
raise "There is no domain_id supplied.. Aborted !!" if domain_id.blank? | |
@tmp_db_push_queue = [] | |
@status_checking = false | |
@is_delegated = is_delegated || false | |
@crawl_limit = crawl_limit || 1000 | |
@domain_id = domain_id | |
@domain = CloudDomain.find(@domain_id) | |
@domain.crawl_finished = 'false' | |
@domain.already_imported = 'false' | |
@domain.save | |
@crawler = @domain.crawler | |
@starting_uri = URI.parse(@domain.crawl_url) | |
@base_uri = URI.parse(clean_url(@starting_uri.to_s)) | |
@starting_url = @base_uri.to_s | |
@retrieve_beat = Time.now | |
@@heartbeat = Time.now | |
@@crawled_page_count = 0 | |
@@connections = 0 | |
@@db_connections = 0 | |
@checkstatus_connections = 0 | |
@retries = 0 | |
@s3_urls = [] | |
@outstanding_jobs = [] | |
@job_ids = [] | |
@aggregate_job_processing_duration = 0 | |
@baseurl_uri = URI.parse(@domain.crawl_url) | |
@transfer_status = false | |
@delegating_status = false | |
@outstanding_jobs_transfer = false | |
@bucket = 'domain_storage' | |
if @is_delegated.eql?(false) | |
log "Resetting queues..." | |
reset_redis_queues | |
@crawler.set_starting_time | |
@crawler.set_processing_status('Initializing') | |
end | |
true | |
end | |
def reset_redis_queues | |
@crawler.flush_all_redis_information | |
return true | |
end | |
def connecting_database | |
log "Connecting to Database" if VERBOSE | |
SimpleWorker.configure do |config| | |
config.access_key = '6d9aefcf04552c570b239857a56a8cc3' | |
config.secret_key = 'b87ef0d1d047fe457c2c6381fd1d174c' | |
username = "uf7wrd8yebt4sj" | |
password = "p61kv5wfk1trt0vd3w4qfper06" | |
host_name = "ec2-174-129-213-125.compute-1.amazonaws.com" | |
database = "dm3tjkjv0whfa7j" | |
config.database = { | |
:adapter => "postgresql", | |
:host => host_name, # Not localhost* | |
:database => database, | |
:username => username, | |
:password => password | |
} | |
end | |
end | |
def run | |
connecting_database | |
log "Setting up job..." | |
setup_job | |
if @starting_url.blank? | |
log "Need a starting URL to crawl." | |
return false | |
end | |
if setup_database_queues | |
log "Start crawling for real domain ID : #{@domain.domain_id}" | |
log "Starting at #{Time.now}" | |
setup_delegated_data | |
do_process | |
looking_up_transfer_status | |
log "Ending at #{Time.now}" | |
log "Ok." | |
else | |
log "Error setting up database queues. Starting URL bad?" | |
false | |
end | |
end | |
def do_process | |
@starting_time = Time.now | |
log "Starting Crawl at #{@starting_time}..." | |
do_the_loop | |
looking_up_for_delegating | |
@ending_time = Time.now | |
@total_seconds = @ending_time - @starting_time | |
@pph = ((@@crawled_page_count / @total_seconds) * 3600.0).to_i | |
log "Ending loop: Total time #{@total_seconds} seconds, total urls #{@@crawled_page_count} (#{@pph} pages/hr)" | |
cost_estimate = ((@aggregate_job_processing_duration / 1000.0) / 3600.0) * 0.05 | |
log "Job Time: #{@aggregate_job_processing_duration}, estimated cost $#{cost_estimate} " | |
end | |
def looking_up_for_delegating | |
log "looking_up_for_delegating" | |
if @delegating_status.eql?(true) | |
log "\n * Setup delegated data for next job.. " | |
@crawler.flush_visit_key | |
@crawler.flush_skipped_url | |
@crawler.flush_error_url | |
@crawler.flush_retries | |
@crawler.flush_queued_url | |
@crawler.flush_todo | |
@crawler.flush_write_out_key | |
@crawler.flush_db_push_queue_from_s3 | |
@crawler.flush_db_push_queue_key_list | |
@crawler.set_crawled_count(@@crawled_page_count) | |
@todo.size.times.each {|f| @crawler.add_todo(@todo.pop) } | |
@visit_key.size.times.each {|f| @crawler.add_visit_key(@visit_key.pop) } | |
@skipped_urls.size.times.each {|f| @crawler.add_skipped_url(@skipped_urls.pop) } | |
@queued_key.each {|url| @crawler.add_queued_url(url) } | |
@error_urls.size.times.each {|f| @crawler.add_error_url(@error_urls.pop) } | |
EM.run { | |
@db_push_queue.size.times.each do | |
# @db_push_queue.pop {|x| @crawler.add_db_push_queue(x.to_json); } | |
@db_push_queue.pop {|x| @crawler.add_db_push_queue_to_s3(x); } | |
end | |
EM.stop | |
} | |
redex = @crawler.redex_crawler | |
redex.crawl_limit = @crawl_limit | |
redex.is_delegated = true | |
job_id = redex.queue(:recursive => true) | |
@crawler.red_ex_job_id = job_id["task_id"] | |
@crawler.save | |
log "\n * New Job : #{job_id['task_id']}" | |
log "\n * Delegating the process to Job ID : #{@crawler.red_ex_job_id}" | |
end | |
end | |
def setup_delegated_data | |
if @is_delegated.eql?(true) | |
log "setup_delegated_data" | |
log "\n\t * Get delegated data for last job..." | |
@crawler.get_visit_key.each {|url| @visit_key << url } | |
@crawler.get_skipped_urls.each {|url| @skipped_urls << url } | |
@crawler.get_queued_urls.each {|url| @queued_key << url } | |
@crawler.get_todo.each {|url| @todo << url } | |
@crawler.get_error_urls.each {|url| @error_urls << url } | |
@crawler.get_oustanding_jobs.each {|job_id| @outstanding_jobs << {:job_id => job_id, :redis_status => true} } | |
@@crawled_page_count = @crawler.get_crawled_count.to_i | |
@retries = @crawler.get_retries_count.to_i | |
@crawler.get_db_push_key_list.each do |key| | |
db_push_queue = @crawler.get_db_push_queue_from_s3(key) | |
@tmp_db_push_queue.push(db_push_queue) if db_push_queue.is_a? Hash | |
puts @tmp_db_push_queue.size.to_s + "-" + db_push_queue[:url] | |
end | |
end | |
end | |
def update_outstanding_jobs | |
jobs = @outstanding_jobs.select {|job| job[:redis_status].eql?(false) } | |
jobs.each do |job| | |
# @redis.sadd(@crawler.outstanding_jobs_redis_key, job[:job_id]) | |
begin | |
@crawler.add_oustanding_jobs(job[:job_id]) | |
@crawler.increment_total_jobs | |
job[:redis_status] = true | |
rescue Timeout::Error => e | |
job[:redis_status] = false | |
next | |
end | |
end | |
@crawler.set_processing_status('Processing') | |
@crawler.set_crawled_count(@@crawled_page_count) | |
@crawler.set_retries_count(@retries) | |
end | |
def looking_up_transfer_status | |
if @transfer_status.eql?(true) | |
@domain.crawl_finished = "true" | |
@domain.save | |
@real_domain = Clean::Domain.find @domain.domain_id.to_i | |
@real_domain.last_crawl_date = Time.now | |
@real_domain.next_crawl_date = @real_domain.project.company.crawl_frequency_range | |
@real_domain.next_page_severity_update = Time.now + 1.day rescue nil | |
@real_domain.save(:validate => false) | |
@crawler.set_finished_time | |
@crawler.set_processing_status('Finished') | |
log "Domain ID :#{@real_domain.id}" | |
log "Last Crawl Date : " + @real_domain.last_crawl_date.to_s | |
log "Next Crawl Date : " + @real_domain.next_crawl_date.to_s | |
end | |
end | |
def pushing_db_push_queue_into_em | |
log "pushing_db_push_queue_into_em" | |
@tmp_db_push_queue.each do |queue| | |
@db_push_queue.push(queue) | |
end | |
end | |
def do_the_loop | |
@crawler.set_processing_status('Processing') | |
EM.run do | |
@redis = EM::Protocols::Redis.connect REDIS_OPTIONS_HASH | |
@db_push_queue ||= EM::Queue.new | |
pushing_db_push_queue_into_em if @is_delegated.eql?(true) | |
@@heartbeat = Time.now | |
log "\nBeginning RedEx Crawler Processing Loop..." | |
EM.add_periodic_timer(60) { | |
if (Time.now - @@heartbeat) > 60 | |
log "Exiting: Heartbeat Not Detected for more than 60 seconds." | |
update_outstanding_jobs | |
marking_crawler_as_done | |
EM.stop | |
end | |
if (Time.now - @job_starting_time) > 2700 | |
log "\t Hit 45 minutes.. Delegating data.. " | |
@delegating_status = true | |
@@connections = 51 | |
@@db_connections = 21 | |
update_outstanding_jobs | |
EM.stop | |
end | |
} | |
EM.add_periodic_timer(60) { | |
if @outstanding_jobs_transfer.eql?(false) | |
@outstanding_jobs_transfer = true | |
EM.defer(proc { | |
update_outstanding_jobs | |
}, proc { | |
@outstanding_jobs_transfer = false | |
log "Outstanding jobs on Redis updated.." | |
}) | |
end | |
} | |
EM.add_periodic_timer(5) { | |
update_logs_with_current_status | |
if @@db_connections.to_i < SIMULTANEOUS_DB_CONNECTIONS.to_i and @delegating_status.eql?(false) | |
@db_push_queue.pop {|x| write_to_db(x) rescue nil } unless @db_push_queue.blank? | |
else | |
log "\n\n\n** Redex Thinks that there are either too many simultaneous DB connections or the Delegating Status == false" | |
log "DB Connections: #{@@db_connections}, Delegating Status: #{@delegating_status}\n\n\n" | |
end | |
if completed_retrieval? | |
# log "\n* Completed Retrieval and Page Processing.." | |
log "\n* Completed Retrieval.." | |
log "\n* Stoping EM.." | |
update_outstanding_jobs | |
marking_crawler_as_done | |
EM.stop | |
end | |
} | |
EM.add_periodic_timer(1) { | |
if (Time.now - @retrieve_beat) > 5 | |
unless @todo.empty? or @@connections > CONCURRENT_CONNECTIONS or @@crawled_page_count > @crawl_limit or @delegating_status.eql?(true) | |
retrieve(@todo.pop) if @db_push_queue.size <= 500 | |
end | |
end | |
} | |
retrieve(@todo.pop) unless @todo.blank? | |
end | |
end | |
def setup_database_queues | |
begin | |
@queued_key = [] | |
@todo = Queue.new | |
@visit_key = Queue.new | |
@skipped_urls = Queue.new | |
@error_urls = Queue.new | |
if @is_delegated.eql?(false) | |
@todo << @starting_url | |
@queued_key << @starting_url | |
end | |
@retrieved = [] | |
true | |
rescue | |
false | |
end | |
end | |
def clean_url(found_url) | |
begin | |
a = URI.parse(found_url) | |
a.fragment = nil | |
a.path = "/" if a.path.blank? | |
return a.to_s | |
rescue => e | |
log "Error with #{found_url} : #{e.inspect}" | |
return false | |
end | |
end | |
def valid_scheme?(uri) | |
["http", "https"].include?(uri.scheme) | |
end | |
def retrieve(url) | |
begin | |
@@heartbeat = Time.now | |
req = EventMachine::HttpRequest.new(url).get :head => {"Accept" => "text/html", "Accept-Encoding" => "UTF-8"} | |
@visit_key << url | |
@@crawled_page_count += 1 | |
@@connections += 1 | |
req.callback do | |
@@connections -= 1 | |
@@heartbeat = Time.now | |
page = RedExPage.new({:url => url, :base_uri => @base_uri, :headers => req.response_header, :code => req.response_header.status, :content => req.response}) | |
page.callback do |page_hash| | |
if [200].include?(page_hash[:code]) | |
page_hash[:links].each do |link| | |
@@heartbeat = Time.now | |
setup_new_retrieval | |
uri = strip_off_fragment(link) rescue next | |
next unless valid_scheme?(uri) | |
uri = to_absolute(uri) | |
if same_host?(uri) and in_path?(uri) | |
unless @queued_key.include?(uri.to_s) | |
link = UrlValidator.new(uri.to_s) | |
filetype = link.filetype.blank? ? '' : link.filetype.downcase | |
if DO_NOT_CRAWL_TYPES.include?(".#{filetype}") | |
@skipped_urls << uri.to_s | |
next | |
end | |
unless @queued_key.length > @crawl_limit | |
@todo.push(uri.to_s) | |
@queued_key << uri.to_s | |
end | |
end | |
end | |
end # page_hash_each | |
elsif [301,302,404].include?(page_hash[:code]) | |
elsif [503].include?(page_hash[:code]) | |
@retries += 1 | |
@todo.push(url) | |
else | |
log "[RedEx] Code type #{page_hash[:code]} not supported." | |
end | |
if [200,301,302,404,500].include?(page_hash[:code]) | |
@db_push_queue.push(page_hash) | |
end | |
end | |
end | |
req.errback do | |
@@heartbeat = Time.now | |
@@connections -= 1 | |
setup_new_retrieval | |
if [301,302,404,500].include?(req.response_header.status) | |
page = RedExPage.new({:url => url, :base_uri => @base_uri, :headers => req.response_header, :code => req.response_header.status, :content => req.response}) | |
page.callback do |page_hash| | |
@db_push_queue.push(page_hash) | |
end | |
elsif [503].include?(req.response_header.status) | |
@retries += 1 | |
@todo.push(url) | |
else | |
@error_urls << url | |
end | |
end | |
rescue => e | |
if @@connections.eql?(0) | |
log "Parsing error, stopping. URL: #{url}" | |
EM.stop | |
else | |
log "[Error On Retrieve] => #{e.inspect}" | |
end | |
end | |
end | |
def check_done | |
if @todo.empty? and @@connections == 0 | |
EM.stop | |
end | |
end | |
def to_absolute(uri) | |
uri.relative? ? @base_uri.merge(uri) : uri | |
end | |
def same_host?(uri) | |
@base_uri.host.eql?(uri.host) | |
end | |
def in_path?(uri) | |
uri.path.index(@base_uri.path).eql?(0) | |
end | |
def do_queuer_loop | |
log "do_queuer_loop" | |
log "\n\t * Starting SW queuer.." | |
@@db_connections = 0 | |
EM.run do | |
EM.add_periodic_timer(60) do | |
if (Time.now - @job_starting_time) > 3000 | |
log "\t Hit 50 minutes.. Delegating data.. " | |
@delegating_status = true | |
EM.stop | |
end | |
end | |
EM.add_periodic_timer(1) do | |
if !@s3_urls.empty? | |
available_db_connections = SIMULTANEOUS_DB_CONNECTIONS - @@db_connections | |
new_connections = if @s3_urls.size > available_db_connections | |
@s3_urls.size | |
else | |
available_db_connections | |
end | |
EM::Iterator.new(0..new_connections).each do |num, iter| | |
s3_url = @s3_urls.pop | |
queue_into_sw!(s3_url) unless s3_url.blank? | |
iter.next | |
end | |
else | |
EM.stop | |
end | |
end | |
EM.add_periodic_timer(15) do | |
log "S3 URLS : #{@s3_urls.size}, DB Connections : #{@@db_connections}" | |
if @s3_urls.empty? and @@db_connections.eql?(0) | |
log '* Completed SW queuer..' | |
EM.stop | |
end | |
end | |
end | |
end | |
def queue_into_sw!(s3_url) | |
@@heartbeat = Time.now | |
EM.defer(proc { | |
wq = RedWorkQueuer.new(@crawler.id, s3_url) | |
wq.callback do |obinfo| | |
@@heartbeat = Time.now | |
if obinfo["task_id"].blank? | |
log "Error::Queueing into SW::Task ID is blank: #{obinfo["task_id"]}" | |
else | |
@outstanding_jobs << {:job_id => obinfo["task_id"], :redis_status => false} | |
@job_ids << obinfo["task_id"] | |
end | |
ret = {:task_id => obinfo["task_id"], :s3_url => s3_url, :ob_info => obinfo} | |
ret | |
end | |
wq.errback do | |
log "Error::Queuing into SW failed S3URL: #{s3_url}" | |
end | |
}, proc { |hash_values| | |
# log "Queued: #{hash_values.inspect}" | |
}) | |
end | |
def write_to_db(page_hash) | |
log "Write_To_DB starting" if VERBOSE | |
@@heartbeat = Time.now | |
@@db_connections += 1 | |
begin | |
pagedigest = Digest::MD5.hexdigest(page_hash[:url]) | |
url = page_hash[:url] + "_#{pagedigest}" | |
begin | |
marshal_dump = Marshal.dump(page_hash) | |
rescue => e | |
@@db_connections -= 1 | |
log "Error to dump object for URL : #{page_hash[:url]}.. Skip.." | |
return true | |
end | |
on_error = Proc.new do |http| | |
@@heartbeat = Time.now | |
log "WriteToDb::HappeningWrite::Error::#{http.response_header.status}" | |
@error_urls << page_hash[:url] | |
@@db_connections -= 1 | |
end | |
s3_url = storage_url(url) | |
item = Happening::S3::Item.new(@bucket, s3_url, :aws_access_key_id => S3_ACCESS_KEY, :aws_secret_access_key => S3_SECRET_KEY) | |
item.put(marshal_dump, :on_error => on_error) do |resp| | |
log "Put #{s3_url} with Happening" | |
@@db_connections -= 1 | |
queue_into_sw!(s3_url) unless s3_url.blank? | |
end | |
rescue => e | |
if e.inspect.include?('Happening::Error') | |
@@db_connections -= 1 | |
log "Error to store with Happening S3 for URL : #{page_hash[:url]}.. Skip.." | |
return true | |
else | |
puts e.inspect | |
puts e.backtrace.join("\n") if e.backtrace | |
end | |
end | |
if @@db_connections.to_i < SIMULTANEOUS_DB_CONNECTIONS.to_i and @delegating_status.eql?(false) | |
@db_push_queue.pop {|x| write_to_db(x) rescue nil } | |
end | |
end | |
def completed_retrieval? | |
if (@@crawled_page_count > @crawl_limit) and @@connections.eql?(0) and @db_push_queue.size.eql?(0) and @@db_connections.eql?(0) | |
true | |
elsif @todo.empty? and @@connections.eql?(0) and @db_push_queue.size.eql?(0) and @@db_connections.eql?(0) | |
true | |
else | |
false | |
end | |
end | |
def completed_page_processing? | |
if @outstanding_jobs.size > 0 | |
false | |
elsif @outstanding_jobs.size == 0 && @completed_jobs.size > 0 | |
true | |
else | |
log "Falling through condition on RedEx on completed page processing. error check" | |
false | |
end | |
end | |
def marking_crawler_as_done | |
# log "Setting Domain and initiating transfer of the new page data." | |
log "Setting Domain." | |
@transfer_status = true | |
end | |
def strip_off_fragment(url) | |
uri = URI.parse(url) | |
unless uri.fragment.blank? | |
non_fragment = uri.to_s.gsub("##{uri.fragment}", '') | |
uri = URI.parse(non_fragment) | |
end | |
uri.path = "/" if uri.path.blank? | |
return uri | |
end | |
def setup_new_retrieval | |
unless @todo.empty? or (@@connections > CONCURRENT_CONNECTIONS) or (@@crawled_page_count > @crawl_limit) or @delegating_status.eql?(true) | |
if @db_push_queue.size <= 500 | |
@retrieve_beat = Time.now | |
retrieve(@todo.pop) | |
end | |
end | |
end | |
def base_key | |
current_time = @starting_time | |
day = current_time.day | |
month = current_time.month | |
year = current_time.year | |
base_url = @baseurl_uri.to_s.gsub("http://", "") | |
base_key = "#{base_url}@-@#{year}-#{month}-#{day}/" | |
return base_key | |
end | |
def storage_url(url) | |
"#{base_key}#{CGI.escape(url)}" | |
end | |
def update_logs_with_current_status | |
log "\n\n\n#{Time.now} - Running Time: #{Time.now - @starting_time} seconds\n" | |
log "-- # to write to DB: #{@db_push_queue.size}, DB Connections : #{@@db_connections}, Outstanding jobs: #{@outstanding_jobs.size}" | |
log "-- Crawled Count: #{@@crawled_page_count}, Visited: #{@visit_key.size}, Touched: #{@queued_key.length}, Todo: #{@todo.length}, Connections: #{@@connections}, Retries: #{@retries}, Error: #{@error_urls.size}\n\n\n" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment