Skip to content

Instantly share code, notes, and snippets.

@iwiwi
Last active September 20, 2023 10:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save iwiwi/5618229 to your computer and use it in GitHub Desktop.
Save iwiwi/5618229 to your computer and use it in GitHub Desktop.
Lightweight parallel web graph crawler
#!/usr/bin/env ruby
#
# crawler.rb --- Lightweight parallel web graph crawler
#
# Usage:
# ./crawler.rb START_URL TARGET_REGEXP
#
# Output:
# stdout --- edge list (tab separated URLs)
# stderr --- log
#
# Example:
# ./crawler.rb http://is.s.u-tokyo.ac.jp is.s.u-tokyo.ac.jp >graph.tsv 2>log.txt
#
# Author:
# Takuya Akiba (@iwiwi)
# http://www-imai.is.s.u-tokyo.ac.jp/~takiba/index_e.html
#
require 'rubygems'
require 'mechanize'
require 'uri'
require 'thread'
require 'set'
require 'logger'
require 'digest/md5'
require 'json'
NUM_THREADS = 50
WAIT_FETCH = 1
WAIT_THREADS = NUM_THREADS / 10.0
WAIT_STAT = 10
TIMEOUT_FETCH = 5
MAX_URI_LENGTH = 256
MAX_URI_PER_HOST = 30000
@log = Logger.new(STDERR)
# @log.level = Logger::WARN
$host = Struct.new("Host", :addr, :mutex, :queue, :status, :visited_urls, :visited_md5s)
$hosts = Hash.new {|h, k| h[k] = $host.new(k, Mutex::new, [], :halted, Set.new, Set.new) }
$hosts_mutex = Mutex.new
$stat_num_pages = $stat_num_links = 0
$stat_mutex = Mutex.new
# The order of mutexes to hold:
# $hosts_mutex -> $hosts[_].mutex -> $stat_mutex
def output_edge(u1, u2)
u1 = u1.to_s.gsub("\t", " ")
u2 = u2.to_s.gsub("\t", " ")
print(u1 + "\t" + u2 + "\n")
$stat_mutex.synchronize do
$stat_num_links += 1
end
end
def enque(uri)
h = nil
$hosts_mutex.synchronize do
h = $hosts[URI.parse(uri.to_s).host]
end
h.mutex.synchronize do
return if h.visited_urls.include?(uri) || h.visited_urls.size >= MAX_URI_PER_HOST
h.queue.push(uri)
h.visited_urls.add(uri)
end
end
def find_waiting_host
exist_working = false
$hosts_mutex.synchronize do
$hosts.each do |a, h|
h.mutex.synchronize do
exist_working |= (h.status == :working)
if !h.queue.empty? && h.status == :halted
h.status = :working
return h.addr
end
end
end
end
return exist_working ? :wait : :done
end
def crawl(host_addr, thread_id = 0)
@log.info("[#{thread_id}] BEGIN #{host_addr}")
h = $hosts[host_addr]
Mechanize.start do |agent|
agent.max_history = 1
agent.robots = true
loop do
uri_from = nil
h.mutex.synchronize do
if h.queue.empty?
agent.shutdown
h.status = :halted
@log.info("[#{thread_id}] FINISH #{host_addr}")
return
else
uri_from = h.queue.shift
end
end
begin
sleep(WAIT_FETCH)
timeout(TIMEOUT_FETCH) do
@log.debug("[#{thread_id}] ACCESS #{uri_from} (QUEUE: #{h.queue.size})")
agent.get(uri_from)
$stat_mutex.synchronize do
$stat_num_pages += 1
end
if uri_from != agent.page.uri.to_s
@log.debug("[#{thread_id}] JUMP #{uri_from} -> #{agent.page.uri.to_s}")
end
next if !agent.page.kind_of?(Mechanize::Page)
begin
body = agent.page.body.dup
body.gsub!(/[0-9\s\/]+/, '')
body.gsub!(/<[^>]*>/, '')
[uri_from, agent.page.uri.to_s].uniq.each do |u|
u.split('/').sort.uniq.each do |w|
body.gsub!(w, '')
end
end
md5 = Digest::MD5.digest(body)
if h.visited_md5s.include?(md5)
@log.info("[#{thread_id}] DUPLICATE #{uri_from}")
next
end
h.visited_md5s.add(md5)
end
agent.page.search('a').each do |a|
next if !a['href']
uri_to = URI.parse(agent.page.uri.to_s).merge(a['href'])
next if uri_to.scheme != "http" && uri_to.scheme != "https"
uri_to = uri_to.to_s
next if !(@regexp =~ uri_to) || uri_to.include?('?') || uri_to.include?('#')
next if uri_to.include?('http://web.archive.org/')
if uri_to.length >= MAX_URI_LENGTH
@log.info("[#{thread_id}] TOO LONG #{uri_to}")
next
end
enque(uri_to)
output_edge(uri_from, uri_to)
end
end
rescue Timeout::Error, StandardError, NoMemoryError => e
@log.info("[#{thread_id}] ERROR #{e.to_s} (#{host_addr} #{uri_from})")
end
end
end
end
if __FILE__ == $0
if ARGV.length != 2
$stderr.puts("usage: crawler START_URL TARGET_REGEXP")
abort
end
enque(ARGV[0])
@regexp = Regexp.new(ARGV[1])
ts = (1 .. NUM_THREADS).map do |thread_id|
Thread.new do
loop do
r = find_waiting_host
break if r == :done
if r == :wait
sleep(WAIT_THREADS)
else
crawl(r, sprintf("%3d", thread_id))
end
end
end
end
begin
prv_ps = prv_ls = 0
while !ts.empty?
sleep WAIT_STAT
p = ($stat_num_pages - prv_ps) / WAIT_STAT.to_f
l = ($stat_num_links - prv_ls) / WAIT_STAT.to_f
t = 0
$hosts_mutex.synchronize do
t = $hosts.inject(0) do |s, h|
s + (h[1].status == :working ? 1 : 0)
end
end
@log.info("[---] STAT TOTAL #{$stat_num_pages} pages , #{$stat_num_links} links")
@log.info("[---] STAT CURRENT #{p} pages/s, #{l} links/s, #{t} threads working")
ts.delete_if do |t|
!t.alive? && t.join
end
prv_ps = $stat_num_pages
prv_ls = $stat_num_links
end
end
$hosts.map{|h, d| [h, d.visited_urls.size]}.sort_by{|h, d| d}.each do |h, d|
@log.info("[---] DONE #{d} #{h}")
end
end
@iwiwi
Copy link
Author

iwiwi commented Jun 12, 2013

Funny inifnite URLs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment