Skip to content

Instantly share code, notes, and snippets.

@mikemccabe
Created September 25, 2012 22:28
Show Gist options
  • Save mikemccabe/3784845 to your computer and use it in GitHub Desktop.
Save mikemccabe/3784845 to your computer and use it in GitHub Desktop.
Parallel archive.org metadata fetching using python and gevent
# This demonstrates doing multiple metadata fetches in parallel.
# It seems to be fast enough that the json decoding cost becomes
# a significant proportion of the execution time.
# It requires gevent; see http://www.gevent.org/intro.html#installation
# This is callable from the command line; call with --help for a summary.
# If you use it is a library, the main entry point is
# metadata_record_iterator(); see main() for an example.
import gevent
import urllib
import json
import sys
import random
import os
from itertools import islice
from gevent import queue as g_queue
from gevent import monkey
monkey.patch_all()
# Globals!
done_queueing_input = False
queued_count = 0
hosts = (
"www16",
"www17",
"www18",
"www19",
"www20",
"www21",
"www22",
"www23",
"www24",
"www25",
"www26",
"www27",
"www28",
"www29",
"www30",
"www31"
)
skips = []
def md_getter(input_queue, json_queue, recache):
while True:
i, id = input_queue.get()
host = hosts[random.randrange(len(hosts))]
while host in skips:
host = hosts[random.randrange(len(hosts))]
if recache:
recache_str = '?reCache=1'
else:
recache_str = ''
try:
j = get_url("http://%s.us.archive.org/metadata/%s%s"
% (host, id, recache_str))
if len(j) < 100:
print >> sys.stderr, "got short string " + str(j) + " for " + id + " - error?"
json_queue.put((i, id, j))
except IOError:
print >> sys.stderr, host + " failed"
skips.append(host)
input_queue.put((i, id))
finally:
input_queue.task_done()
def queue_input(ids, input_queue):
global queued_count
global done_queueing_input
for i, id in enumerate(ids):
id = id.strip()
input_queue.put((i, id))
queued_count += 1
done_queueing_input = True
def get_url(url):
f = urllib.urlopen(url)
if f.getcode() != 200:
print >> sys.stderr, "get failed for " + url
c = '{}'
else:
c = f.read()
f.close()
return c
def metadata_record_iterator(ids, workers=20, sorted=False, recache=False):
input_queue = g_queue.JoinableQueue(1000)
json_queue = g_queue.Queue(1000)
gevent.spawn(queue_input, ids, input_queue)
for i in range(workers):
gevent.spawn(md_getter, input_queue, json_queue, recache)
def metadata_iterator_helper():
got_count = 0
while True:
if done_queueing_input and got_count == queued_count:
break
yield json_queue.get()
got_count += 1
def sorted_iterator(results):
current_i = 0
pq = g_queue.PriorityQueue()
results_remain = True
while True:
if done_queueing_input and current_i == queued_count:
break
while True:
if results_remain:
try:
tup = results.next()
pq.put(tup)
except StopIteration:
results_remain = False
tup = pq.get()
i, _, _ = tup
if i == current_i:
yield tup
current_i += 1
break
else:
pq.put(tup)
if sorted:
return sorted_iterator(metadata_iterator_helper())
else:
return metadata_iterator_helper()
def info_callback(i, id, md_json):
o = json.loads(md_json)
print "%s %s %s %s %s" % (i, id, o.get('server', ''),
o.get('dir', ''), o.get('item_size', ''))
def printing_callback(i, id, md_json):
print md_json
def idonly_callback(i, id, md_json):
print "%s %s %s" % (i, id, len(md_json))
def printing_with_ids_callback(i, id, md_json):
print str(i), id, md_json
def main(argv):
import optparse
parser = optparse.OptionParser(usage='usage: %prog [options] file_or_id_or_-_for_stdin',
version='%prog 0.1',
description='get archive metadata for archive ids. Prints JSON to stdout by default.')
parser.add_option('--start',
action='store',
type='int',
default=0,
help='Index of first id to fetch')
parser.add_option('--count',
action='store',
type='int',
default=0,
help='Count of ids to fetch')
parser.add_option('--workers',
action='store',
type='int',
default=20,
help='How many metadata fetch workers to spawn')
parser.add_option('--recache',
action='store_true',
default=False,
help='Recache when fetching')
parser.add_option('--idonly',
action='store_true',
default=False,
help='Print "index id len(json) - for testing"')
parser.add_option('--withids',
action='store_true',
default=False,
help='Print "index id json"')
parser.add_option('--altformat',
action='store_true',
default=False,
help='Print "index id server dir item_size": parses json')
parser.add_option('--ujson',
action='store_true',
default=False,
help='use ujson instead of json')
parser.add_option('--sorted',
action='store_true',
default=False,
help='Produce results in sorted order')
opts, args = parser.parse_args(argv)
if len(args) != 1:
parser.print_usage()
sys.exit(1)
if os.path.exists(args[0]):
ids = open(args[0])
else:
if args[0] == '-':
ids = sys.stdin
else:
ids = [args[0]]
parser.destroy()
if opts.ujson:
import ujson
global json
json = ujson
callback = printing_callback
if opts.altformat:
callback = info_callback
if opts.idonly:
callback = idonly_callback
if opts.withids:
callback = printing_with_ids_callback
stop = opts.start + opts.count if opts.count is not 0 else None
ids = islice(ids, opts.start, stop)
results = metadata_record_iterator(ids, opts.workers,
opts.sorted, opts.recache)
for i, id, md_json in results:
callback(i, id, md_json)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment