Skip to content

Instantly share code, notes, and snippets.

@rfmcnally
Created May 16, 2018 21:39
Show Gist options
  • Save rfmcnally/74c1fb10cbb88d97f8fd39fdfc9633e5 to your computer and use it in GitHub Desktop.
Save rfmcnally/74c1fb10cbb88d97f8fd39fdfc9633e5 to your computer and use it in GitHub Desktop.
Sample Python classes for Import.io Store Objects
""" Classes for interacting with Import.io Object Store. Requires IMPORT_IO_API_KEY in env. """
import os
import math
import time
import json
from datetime import datetime, timedelta
from collections import OrderedDict
import requests
import pandas as pd
# Functions for searching Import.io store. Can be moved to separate module.
def all_extractors(page, apikey):
""" Function for querying extractor search based on user id and page """
url = "https://store.import.io/extractor/_search"
query = {"q": "(_missing_:archived OR archived:false)", "_page": page,
"_perpage": "1000", "_sort": "_meta.creationTimestamp",
"_apikey": apikey}
retry = 0
while True:
response = requests.get(url, params=query)
if response.status_code != 200:
if retry < 3:
retry += 1
time.sleep(2 * retry)
continue
break
break
data = response.json()
return data
def all_crawlruns(page, apikey):
""" Function for querying crawlrun search based on page """
url = "https://store.import.io/crawlrun/_search"
querystring = {"_page": page, "_perpage": "1000", "_sort": "_meta.creationTimestamp",
"_apikey": apikey}
retry = 0
while True:
response = requests.get(url, params=querystring)
if response.status_code != 200:
if retry < 3:
retry += 1
time.sleep(2 * retry)
continue
break
break
data = response.json()
return data
def extractor_crawlruns(page, guid, apikey):
""" Function for querying crawlrun search based on extractor id and page """
url = "https://store.import.io/crawlrun/_search"
querystring = {"extractorId": guid, "_page": page, "_perpage": "1000",
"_sort": "_meta.creationTimestamp", "_apikey": apikey}
retry = 0
while True:
response = requests.get(url, params=querystring)
if response.status_code != 200:
if retry < 3:
retry += 1
time.sleep(2 * retry)
continue
break
break
data = response.json()
return data
def running_crawlruns(page, apikey):
""" Function for querying crawlrun search based on a page """
url = "https://store.import.io/crawlrun/_search"
querystring = {"q": "(state:STARTED OR state:PENDING)", "_page": page,
"_perpage": "1000", "_sort": "_meta.creationTimestamp",
"_apikey": apikey}
retry = 0
while True:
response = requests.get(url, params=querystring)
if response.status_code != 200:
if retry < 3:
retry += 1
time.sleep(2 * retry)
continue
break
break
data = response.json()
return data
# Import.io Objects
class Crawlrun(object):
""" An Import.io crawl run """
def __init__(self, guid, extractor_id=None, state=None, started=None, stopped=None,
total_count=None, success_count=None, failed_count=None, in_progress=None,
requeued=None, row_count=None, robots_denied_count=None, no_data_count=None):
self.guid = guid
self.extractor_id = extractor_id
self.state = state
self.started = started
self.stopped = stopped
self.total_count = total_count
self.success_count = success_count
self.failed_count = failed_count
self.in_progress = in_progress
self.requeued = requeued
self.row_count = row_count
self.robots_denied_count = robots_denied_count
self.no_data_count = no_data_count
def add_fields(self, fields):
""" Appends fields to existing crawl run """
if 'state' in fields:
self.state = fields['state']
if '_meta' in fields:
self.started = datetime.utcfromtimestamp(int(fields['_meta']['creationTimestamp']/1000))
if '_meta' in fields:
self.stopped = datetime.utcfromtimestamp(int(fields['_meta']['timestamp']/1000))
if 'rowCount' in fields:
self.row_count = fields['rowCount']
if 'totalUrlCount' in fields:
self.total_count = fields['totalUrlCount']
if 'successUrlCount' in fields:
self.success_count = fields['successUrlCount']
if 'failedUrlCount' in fields:
self.failed_count = fields['failedUrlCount']
if 'inProgress' in fields:
self.in_progress = fields['inProgress']
if 'requeued' in fields:
self.requeued = fields['requeued']
if 'deniedByRobotsCount' in fields:
self.robots_denied_count = fields['deniedByRobotsCount']
return self.state
def get_json(self):
""" Returns JSON output from the crawl run """
url = "https://store.import.io/crawlrun/{0}/_attachment/json".format(self.guid)
querystring = {'_apikey': os.environ['IMPORT_IO_API_KEY']}
headers = {'Accept': "application/x-ldjson"}
response = requests.get(url, params=querystring, headers=headers, stream=True)
list_resp = response.text.splitlines()
json_resp = list(map(lambda x: json.loads(x, object_pairs_hook=OrderedDict), list_resp))
return json_resp
def get_csv(self):
""" Returns CSV output from the crawl run """
url = "https://store.import.io/crawlrun/{0}/_attachment/csv".format(self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {"Accept": "text/csv"}
resp = requests.get(url, params=querystring, headers=headers, stream=True)
csv_resp = resp.content.decode('utf-8-sig')
return csv_resp
def get_log(self):
""" Returns log output from the crawl run """
url = "https://store.import.io/crawlrun/{0}/_attachment/log".format(self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {"Accept": "text/csv"}
resp = requests.get(url, params=querystring, headers=headers, stream=True)
log_resp = resp.content.decode('utf-8-sig')
return log_resp
def get_results_df(self):
""" Returns a crawl run CSV as a DataFrame """
url = "https://store.import.io/crawlrun/{0}/_attachment/csv?_apikey={1}".format(
self.guid, os.environ['IMPORT_IO_API_KEY'])
results = pd.read_csv(url, keep_default_na=False, index_col=False)
return results
def get_log_df(self):
""" Returns a crawl run log as a DataFrame """
url = "https://store.import.io/crawlrun/{0}/_attachment/log?_apikey={1}".format(
self.guid, os.environ['IMPORT_IO_API_KEY'])
log = pd.read_csv(url, keep_default_na=False, index_col=False)
return log
class Extractor(object):
""" An Import.io extractor """
def __init__(self, guid, name=None, created=None, updated=None, interactive=None,
authenticated=None, fields=None, urls=None, crawlruns=None):
self.guid = guid
self.name = name
self.created = created
self.updated = updated
self.interactive = interactive
self.authenticated = authenticated
self.fields = fields
self.urls = urls
self.crawlruns = crawlruns
def get_info(self):
""" Returns extractor metadata """
url = "https://store.import.io/extractor/{0}".format(self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
response = requests.get(url, params=querystring)
data = response.json()
self.name = data['name']
self.created = data['_meta']['creationTimestamp']
self.updated = data['_meta']['timestamp']
self.interactive = False
self.authenticated = False
self.fields = data['fields']
if 'inputs' in data:
self.interactive = True
self.urls = data['inputs']
else:
self.urls = data['urlList']
if 'authUrl' in data:
self.authenticated = True
return self.name
def start(self):
""" Starts a crawl run on an extractor """
url = "https://run.import.io/{0}/start".format(self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {'cache-control': "no-cache"}
response = requests.post(url, headers=headers, params=querystring)
if response.status_code == 200:
result = response.json()['crawlRunId']
else:
result = response.json()
return result
def run(self):
""" Starts a crawl run and polls the extractor until completion """
run_id = self.start()
result = False
while True:
state = self.status(run_id)
if state == 'FINISHED':
result = True
break
if state == 'CANCELLED':
result = True
break
elif state == 'FAILED':
break
time.sleep(30)
return result, run_id
@staticmethod
def status(run_id):
""" Returns the status of a crawl run """
url = "https://store.import.io/crawlrun/{0}".format(run_id)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {'Accept': "application/json"}
response = requests.get(url, headers=headers, params=querystring)
state = None
try:
results = response.json()
try:
state = results['state']
except KeyError:
pass
except ValueError:
pass
return state
def get_url_body(self):
""" Returns current URLs string body from an extractor """
url = "https://store.import.io/extractor/{0}/_attachment/urlList".format(self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
response = requests.get(url, params=querystring)
return response.text
def put_url_body(self, urls):
""" Uploads a string body of URLs to an extractor """
url = "https://store.import.io/extractor/{0}/_attachment/urlList".format(
self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {'content-type': "text/plain"}
response = requests.put(url, data=urls, headers=headers, params=querystring)
return response
def put_url_list(self, urls):
""" Uploads a list of URLs to an extractor """
url = "https://store.import.io/extractor/{0}/_attachment/urlList".format(
self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {'content-type': "text/plain"}
data = "\n".join(urls)
body = data.encode(encoding='utf-8')
response = requests.put(url, data=body, headers=headers, params=querystring)
return response
def get_crawlruns(self):
""" Returns all crawlruns for the extractor """
crawlruns = []
page = 1
data = extractor_crawlruns(page, self.guid, os.environ['IMPORT_IO_API_KEY'])
runs_data = data['hits']['hits']
count = data['hits']['total']
pages = math.ceil(count / 1000)
page += 1
while page <= pages:
data = extractor_crawlruns(page, self.guid, os.environ['IMPORT_IO_API_KEY'])
runs_data.extend(data['hits']['hits'])
page += 1
for run_data in runs_data:
crawlrun = Crawlrun(run_data['_id'], extractor_id=run_data['fields']['extractorId'])
crawlrun.add_fields(run_data['fields'])
crawlruns.append(crawlrun)
self.crawlruns = crawlruns
return self.crawlruns
def latest_json(self):
""" Returns the JSON from the latest run """
url = "https://data.import.io/extractor/{0}/json/latest".format(self.guid)
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']}
headers = {"Accept": "application/x-ldjson"}
response = requests.get(url, params=querystring, headers=headers, stream=True)
list_resp = response.text.splitlines()
json_resp = list(map(lambda x: json.loads(x), list_resp))
return json_resp
class User(object):
""" An Import.io User """
def __init__(self, extractors=None):
self.extractors = extractors
@staticmethod
def recent_crawlruns():
""" Returns crawlruns in past 24 hours for the user """
crawlruns = []
page = 1
end_pages = False
day_ago = datetime.today() - timedelta(days=1)
data = all_crawlruns(page, os.environ['IMPORT_IO_API_KEY'])
runs_data = data['hits']['hits']
count = data['hits']['total']
pages = math.ceil(count / 1000)
while page <= pages:
for run_data in runs_data:
crawlrun = Crawlrun(run_data['_id'], extractor_id=run_data['fields']['extractorId'])
crawlrun.add_fields(run_data['fields'])
if crawlrun.started:
if crawlrun.started < day_ago:
end_pages = True
break
crawlruns.append(crawlrun)
if end_pages:
break
page += 1
data = all_crawlruns(page, os.environ['IMPORT_IO_API_KEY'])
runs_data = data['hits']['hits']
return crawlruns
@staticmethod
def running_crawlruns():
""" Returns running crawlruns for the user """
crawlruns = []
page = 1
data = running_crawlruns(page, os.environ['IMPORT_IO_API_KEY'])
runs_data = data['hits']['hits']
count = data['hits']['total']
pages = math.ceil(count / 1000)
page += 1
while page <= pages:
data = running_crawlruns(page, os.environ['IMPORT_IO_API_KEY'])
runs_data.extend(data['hits']['hits'])
page += 1
for run_data in runs_data:
crawlrun = Crawlrun(run_data['_id'], extractor_id=run_data['fields']['extractorId'])
crawlrun.add_fields(run_data['fields'])
crawlruns.append(crawlrun)
return crawlruns
def get_extractors(self):
""" Returns all extractors for the user and attaches them to the User Object """
extractors = []
page = 1
data = all_extractors(page, os.environ['IMPORT_IO_API_KEY'])
extractors_data = data['hits']['hits']
count = data['hits']['total']
pages = math.ceil(count / 1000)
page += 1
while page <= pages:
data = all_extractors(page, os.environ['IMPORT_IO_API_KEY'])
extractors_data.extend(data['hits']['hits'])
page += 1
for extractor_data in extractors_data:
extractor = Extractor(extractor_data['_id'], name=extractor_data['fields']['name'])
extractors.append(extractor)
self.extractors = extractors
return self.extractors
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment