Created
May 16, 2018 21:39
-
-
Save rfmcnally/74c1fb10cbb88d97f8fd39fdfc9633e5 to your computer and use it in GitHub Desktop.
Sample Python classes for Import.io Store Objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Classes for interacting with Import.io Object Store. Requires IMPORT_IO_API_KEY in env. """ | |
import os | |
import math | |
import time | |
import json | |
from datetime import datetime, timedelta | |
from collections import OrderedDict | |
import requests | |
import pandas as pd | |
# Functions for searching Import.io store. Can be moved to separate module. | |
def all_extractors(page, apikey): | |
""" Function for querying extractor search based on user id and page """ | |
url = "https://store.import.io/extractor/_search" | |
query = {"q": "(_missing_:archived OR archived:false)", "_page": page, | |
"_perpage": "1000", "_sort": "_meta.creationTimestamp", | |
"_apikey": apikey} | |
retry = 0 | |
while True: | |
response = requests.get(url, params=query) | |
if response.status_code != 200: | |
if retry < 3: | |
retry += 1 | |
time.sleep(2 * retry) | |
continue | |
break | |
break | |
data = response.json() | |
return data | |
def all_crawlruns(page, apikey): | |
""" Function for querying crawlrun search based on page """ | |
url = "https://store.import.io/crawlrun/_search" | |
querystring = {"_page": page, "_perpage": "1000", "_sort": "_meta.creationTimestamp", | |
"_apikey": apikey} | |
retry = 0 | |
while True: | |
response = requests.get(url, params=querystring) | |
if response.status_code != 200: | |
if retry < 3: | |
retry += 1 | |
time.sleep(2 * retry) | |
continue | |
break | |
break | |
data = response.json() | |
return data | |
def extractor_crawlruns(page, guid, apikey): | |
""" Function for querying crawlrun search based on extractor id and page """ | |
url = "https://store.import.io/crawlrun/_search" | |
querystring = {"extractorId": guid, "_page": page, "_perpage": "1000", | |
"_sort": "_meta.creationTimestamp", "_apikey": apikey} | |
retry = 0 | |
while True: | |
response = requests.get(url, params=querystring) | |
if response.status_code != 200: | |
if retry < 3: | |
retry += 1 | |
time.sleep(2 * retry) | |
continue | |
break | |
break | |
data = response.json() | |
return data | |
def running_crawlruns(page, apikey): | |
""" Function for querying crawlrun search based on a page """ | |
url = "https://store.import.io/crawlrun/_search" | |
querystring = {"q": "(state:STARTED OR state:PENDING)", "_page": page, | |
"_perpage": "1000", "_sort": "_meta.creationTimestamp", | |
"_apikey": apikey} | |
retry = 0 | |
while True: | |
response = requests.get(url, params=querystring) | |
if response.status_code != 200: | |
if retry < 3: | |
retry += 1 | |
time.sleep(2 * retry) | |
continue | |
break | |
break | |
data = response.json() | |
return data | |
# Import.io Objects | |
class Crawlrun(object): | |
""" An Import.io crawl run """ | |
def __init__(self, guid, extractor_id=None, state=None, started=None, stopped=None, | |
total_count=None, success_count=None, failed_count=None, in_progress=None, | |
requeued=None, row_count=None, robots_denied_count=None, no_data_count=None): | |
self.guid = guid | |
self.extractor_id = extractor_id | |
self.state = state | |
self.started = started | |
self.stopped = stopped | |
self.total_count = total_count | |
self.success_count = success_count | |
self.failed_count = failed_count | |
self.in_progress = in_progress | |
self.requeued = requeued | |
self.row_count = row_count | |
self.robots_denied_count = robots_denied_count | |
self.no_data_count = no_data_count | |
def add_fields(self, fields): | |
""" Appends fields to existing crawl run """ | |
if 'state' in fields: | |
self.state = fields['state'] | |
if '_meta' in fields: | |
self.started = datetime.utcfromtimestamp(int(fields['_meta']['creationTimestamp']/1000)) | |
if '_meta' in fields: | |
self.stopped = datetime.utcfromtimestamp(int(fields['_meta']['timestamp']/1000)) | |
if 'rowCount' in fields: | |
self.row_count = fields['rowCount'] | |
if 'totalUrlCount' in fields: | |
self.total_count = fields['totalUrlCount'] | |
if 'successUrlCount' in fields: | |
self.success_count = fields['successUrlCount'] | |
if 'failedUrlCount' in fields: | |
self.failed_count = fields['failedUrlCount'] | |
if 'inProgress' in fields: | |
self.in_progress = fields['inProgress'] | |
if 'requeued' in fields: | |
self.requeued = fields['requeued'] | |
if 'deniedByRobotsCount' in fields: | |
self.robots_denied_count = fields['deniedByRobotsCount'] | |
return self.state | |
def get_json(self): | |
""" Returns JSON output from the crawl run """ | |
url = "https://store.import.io/crawlrun/{0}/_attachment/json".format(self.guid) | |
querystring = {'_apikey': os.environ['IMPORT_IO_API_KEY']} | |
headers = {'Accept': "application/x-ldjson"} | |
response = requests.get(url, params=querystring, headers=headers, stream=True) | |
list_resp = response.text.splitlines() | |
json_resp = list(map(lambda x: json.loads(x, object_pairs_hook=OrderedDict), list_resp)) | |
return json_resp | |
def get_csv(self): | |
""" Returns CSV output from the crawl run """ | |
url = "https://store.import.io/crawlrun/{0}/_attachment/csv".format(self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {"Accept": "text/csv"} | |
resp = requests.get(url, params=querystring, headers=headers, stream=True) | |
csv_resp = resp.content.decode('utf-8-sig') | |
return csv_resp | |
def get_log(self): | |
""" Returns log output from the crawl run """ | |
url = "https://store.import.io/crawlrun/{0}/_attachment/log".format(self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {"Accept": "text/csv"} | |
resp = requests.get(url, params=querystring, headers=headers, stream=True) | |
log_resp = resp.content.decode('utf-8-sig') | |
return log_resp | |
def get_results_df(self): | |
""" Returns a crawl run CSV as a DataFrame """ | |
url = "https://store.import.io/crawlrun/{0}/_attachment/csv?_apikey={1}".format( | |
self.guid, os.environ['IMPORT_IO_API_KEY']) | |
results = pd.read_csv(url, keep_default_na=False, index_col=False) | |
return results | |
def get_log_df(self): | |
""" Returns a crawl run log as a DataFrame """ | |
url = "https://store.import.io/crawlrun/{0}/_attachment/log?_apikey={1}".format( | |
self.guid, os.environ['IMPORT_IO_API_KEY']) | |
log = pd.read_csv(url, keep_default_na=False, index_col=False) | |
return log | |
class Extractor(object): | |
""" An Import.io extractor """ | |
def __init__(self, guid, name=None, created=None, updated=None, interactive=None, | |
authenticated=None, fields=None, urls=None, crawlruns=None): | |
self.guid = guid | |
self.name = name | |
self.created = created | |
self.updated = updated | |
self.interactive = interactive | |
self.authenticated = authenticated | |
self.fields = fields | |
self.urls = urls | |
self.crawlruns = crawlruns | |
def get_info(self): | |
""" Returns extractor metadata """ | |
url = "https://store.import.io/extractor/{0}".format(self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
response = requests.get(url, params=querystring) | |
data = response.json() | |
self.name = data['name'] | |
self.created = data['_meta']['creationTimestamp'] | |
self.updated = data['_meta']['timestamp'] | |
self.interactive = False | |
self.authenticated = False | |
self.fields = data['fields'] | |
if 'inputs' in data: | |
self.interactive = True | |
self.urls = data['inputs'] | |
else: | |
self.urls = data['urlList'] | |
if 'authUrl' in data: | |
self.authenticated = True | |
return self.name | |
def start(self): | |
""" Starts a crawl run on an extractor """ | |
url = "https://run.import.io/{0}/start".format(self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {'cache-control': "no-cache"} | |
response = requests.post(url, headers=headers, params=querystring) | |
if response.status_code == 200: | |
result = response.json()['crawlRunId'] | |
else: | |
result = response.json() | |
return result | |
def run(self): | |
""" Starts a crawl run and polls the extractor until completion """ | |
run_id = self.start() | |
result = False | |
while True: | |
state = self.status(run_id) | |
if state == 'FINISHED': | |
result = True | |
break | |
if state == 'CANCELLED': | |
result = True | |
break | |
elif state == 'FAILED': | |
break | |
time.sleep(30) | |
return result, run_id | |
@staticmethod | |
def status(run_id): | |
""" Returns the status of a crawl run """ | |
url = "https://store.import.io/crawlrun/{0}".format(run_id) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {'Accept': "application/json"} | |
response = requests.get(url, headers=headers, params=querystring) | |
state = None | |
try: | |
results = response.json() | |
try: | |
state = results['state'] | |
except KeyError: | |
pass | |
except ValueError: | |
pass | |
return state | |
def get_url_body(self): | |
""" Returns current URLs string body from an extractor """ | |
url = "https://store.import.io/extractor/{0}/_attachment/urlList".format(self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
response = requests.get(url, params=querystring) | |
return response.text | |
def put_url_body(self, urls): | |
""" Uploads a string body of URLs to an extractor """ | |
url = "https://store.import.io/extractor/{0}/_attachment/urlList".format( | |
self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {'content-type': "text/plain"} | |
response = requests.put(url, data=urls, headers=headers, params=querystring) | |
return response | |
def put_url_list(self, urls): | |
""" Uploads a list of URLs to an extractor """ | |
url = "https://store.import.io/extractor/{0}/_attachment/urlList".format( | |
self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {'content-type': "text/plain"} | |
data = "\n".join(urls) | |
body = data.encode(encoding='utf-8') | |
response = requests.put(url, data=body, headers=headers, params=querystring) | |
return response | |
def get_crawlruns(self): | |
""" Returns all crawlruns for the extractor """ | |
crawlruns = [] | |
page = 1 | |
data = extractor_crawlruns(page, self.guid, os.environ['IMPORT_IO_API_KEY']) | |
runs_data = data['hits']['hits'] | |
count = data['hits']['total'] | |
pages = math.ceil(count / 1000) | |
page += 1 | |
while page <= pages: | |
data = extractor_crawlruns(page, self.guid, os.environ['IMPORT_IO_API_KEY']) | |
runs_data.extend(data['hits']['hits']) | |
page += 1 | |
for run_data in runs_data: | |
crawlrun = Crawlrun(run_data['_id'], extractor_id=run_data['fields']['extractorId']) | |
crawlrun.add_fields(run_data['fields']) | |
crawlruns.append(crawlrun) | |
self.crawlruns = crawlruns | |
return self.crawlruns | |
def latest_json(self): | |
""" Returns the JSON from the latest run """ | |
url = "https://data.import.io/extractor/{0}/json/latest".format(self.guid) | |
querystring = {"_apikey": os.environ['IMPORT_IO_API_KEY']} | |
headers = {"Accept": "application/x-ldjson"} | |
response = requests.get(url, params=querystring, headers=headers, stream=True) | |
list_resp = response.text.splitlines() | |
json_resp = list(map(lambda x: json.loads(x), list_resp)) | |
return json_resp | |
class User(object): | |
""" An Import.io User """ | |
def __init__(self, extractors=None): | |
self.extractors = extractors | |
@staticmethod | |
def recent_crawlruns(): | |
""" Returns crawlruns in past 24 hours for the user """ | |
crawlruns = [] | |
page = 1 | |
end_pages = False | |
day_ago = datetime.today() - timedelta(days=1) | |
data = all_crawlruns(page, os.environ['IMPORT_IO_API_KEY']) | |
runs_data = data['hits']['hits'] | |
count = data['hits']['total'] | |
pages = math.ceil(count / 1000) | |
while page <= pages: | |
for run_data in runs_data: | |
crawlrun = Crawlrun(run_data['_id'], extractor_id=run_data['fields']['extractorId']) | |
crawlrun.add_fields(run_data['fields']) | |
if crawlrun.started: | |
if crawlrun.started < day_ago: | |
end_pages = True | |
break | |
crawlruns.append(crawlrun) | |
if end_pages: | |
break | |
page += 1 | |
data = all_crawlruns(page, os.environ['IMPORT_IO_API_KEY']) | |
runs_data = data['hits']['hits'] | |
return crawlruns | |
@staticmethod | |
def running_crawlruns(): | |
""" Returns running crawlruns for the user """ | |
crawlruns = [] | |
page = 1 | |
data = running_crawlruns(page, os.environ['IMPORT_IO_API_KEY']) | |
runs_data = data['hits']['hits'] | |
count = data['hits']['total'] | |
pages = math.ceil(count / 1000) | |
page += 1 | |
while page <= pages: | |
data = running_crawlruns(page, os.environ['IMPORT_IO_API_KEY']) | |
runs_data.extend(data['hits']['hits']) | |
page += 1 | |
for run_data in runs_data: | |
crawlrun = Crawlrun(run_data['_id'], extractor_id=run_data['fields']['extractorId']) | |
crawlrun.add_fields(run_data['fields']) | |
crawlruns.append(crawlrun) | |
return crawlruns | |
def get_extractors(self): | |
""" Returns all extractors for the user and attaches them to the User Object """ | |
extractors = [] | |
page = 1 | |
data = all_extractors(page, os.environ['IMPORT_IO_API_KEY']) | |
extractors_data = data['hits']['hits'] | |
count = data['hits']['total'] | |
pages = math.ceil(count / 1000) | |
page += 1 | |
while page <= pages: | |
data = all_extractors(page, os.environ['IMPORT_IO_API_KEY']) | |
extractors_data.extend(data['hits']['hits']) | |
page += 1 | |
for extractor_data in extractors_data: | |
extractor = Extractor(extractor_data['_id'], name=extractor_data['fields']['name']) | |
extractors.append(extractor) | |
self.extractors = extractors | |
return self.extractors |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment