Skip to content

Instantly share code, notes, and snippets.

@jdmaturen
Last active December 28, 2015 02:09
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdmaturen/7425963 to your computer and use it in GitHub Desktop.
Save jdmaturen/7425963 to your computer and use it in GitHub Desktop.
Vacuum up Crunchbase
"""
Get a bunch of Crunchbase data, but respect the API limits.
Author JD Maturen
Apache 2 License
"""
import logging
from random import random
import sys
from itertools import count
from Queue import Queue, Empty
from threading import RLock, Thread, Lock
from time import sleep
from time import time
import requests
from requests import Session
from requests.exceptions import RequestException
class Counter(object):
"""Thread safe math"""
def __init__(self, start=0):
self.current = start
self.rlock = RLock()
def increment(self, step=1):
with self.rlock:
self.current += step
return self.current
def get(self):
with self.rlock:
return self.current
def get_and_set(self, value=0):
with self.rlock:
prev = self.current
self.current = value
return prev
class RateLimiter(object):
""""""
def __init__(self, rate):
self.rate = rate
self.count = 0
self.last_time = time()
self.future_time = None
self.lock = Lock()
def limit(self):
"""Blocking limiter"""
with self.lock:
current = time()
delta = current - self.last_time
if delta > 1:
self.last_time = current
self.count = 1
return
self.count += 1
if self.count >= self.rate:
logging.debug("sleeping {0:.3f}s".format(1-delta))
sleep(1 - delta)
def file_exists(filename):
exists = False
try:
with open(filename):
exists = True
except IOError, e:
pass
return exists
def get_crunchbase_data(q, s, api_key, company, filename, attempts, error_counter):
url = "http://api.crunchbase.com/v/1/company/{0:s}.js?api_key={1:s}".format(company, api_key)
logging.debug("Getting {0:s}".format(company))
r = s.get(url)
if r.status_code != 200:
if 'x-mashery-error-code' in r.headers and r.headers['x-mashery-error-code'] == 'ERR_403_DEVELOPER_OVER_QPS':
retry_after = 3
if 'retry-after' in r.headers:
retry_after = int(r.headers['retry-after'])
logging.debug("Going too fast, sleeping")
sleep(retry_after)
else:
logging.warn("Error getting {0:s}: {1:s}".format(company, str(r.headers)))
sleep(random() * 3)
if 400 <= r.status_code < 500:
error_counter.increment()
q.put((company, filename, attempts-1))
return
with open(filename, 'w') as f:
f.write(r.text)
def process_queue(q, countdown, api_key, error_counter, rate_limiter):
s = Session()
while countdown.get() != 0:
try:
company, filename, attempts = q.get(True, 0.1)
if attempts > 0:
rate_limiter.limit()
get_crunchbase_data(q, s, api_key, company, filename, attempts, error_counter)
q.task_done()
except Empty:
pass
except RequestException, e:
logging.exception(e)
s = Session()
def main():
"""docstring for main"""
rate_limit = 8 # API calls per second
output_dir = "companies/"
api_key = sys.argv[1]
logging.basicConfig(level=logging.INFO)
error_counter = Counter()
countdown = Counter(1)
get_rate_limiter = RateLimiter(rate_limit)
q = Queue() # unbounded queue so we don't deadlock on putting retries back in
thread_count = int(rate_limit * 0.75)
threads = []
for _ in xrange(thread_count):
t = Thread(target=process_queue, args=(q, countdown, api_key, error_counter, get_rate_limiter))
t.daemon = True
t.start()
threads.append(t)
for company in sys.stdin:
company = company.strip()
if error_counter.get() > 12:
raise Exception("too many errors")
filename = "{0:s}/{1:s}.json".format(output_dir, company)
if not file_exists(filename):
logging.info("fetching {0:s}".format(company))
while q.qsize() > rate_limit / 2:
sleep(.1)
q.put((company, filename, 3))
else:
logging.debug("Skipping {0:s}".format(company))
q.join()
countdown.increment(-1)
for t in threads:
t.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment