Created
July 17, 2013 11:21
-
-
Save Halama/6019663 to your computer and use it in GitHub Desktop.
Keboola Storage API Python wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This module is intended to help interactions | |
with Goodata Keboola storage. | |
It creates and performs requests and parse response back | |
to handler which called it. | |
""" | |
import json | |
import logging as log | |
import time | |
import requests | |
API_URL = "https://connection.keboola.com/v2/storage/" | |
# Maximum waiting time in seconds | |
MAX_WAIT_TIME = 60 | |
class Keboola: | |
"""Keboola API helper | |
Composes requests and parses responses from Keboola | |
""" | |
def __init__(self, access_token): | |
self._token = access_token | |
def _process_request(self, url, data, files=None): | |
"""Processes request | |
Adds access token to headers. | |
""" | |
headers = {"X-StorageApi-Token": self._token} | |
r = requests.post(url, data=data, headers=headers, files=files) | |
return r | |
def get_buckets(self): | |
"""Get all buckets for your storage | |
""" | |
url = "{api}buckets".format(api=API_URL) | |
response = {} | |
try: | |
resp = self._process_request(url, None) | |
response['items'] = list() | |
for b in resp.json(): | |
response['items'].append(b) | |
except HTTPError, e: | |
response['error'] = "{stat}: {msg}".format(stat=e.code, msg=e.reason) | |
return response | |
def get_bucket(self, bucketid): | |
"""Get detail for one bucket | |
""" | |
url = "{api}buckets/{bck}".format(api=API_URL,bkc=bucketid) | |
response = {} | |
try: | |
resp = self._process_request(url, None) | |
response['items'] = resp.json() | |
except HTTPError, e: | |
response['error'] = "{stat}: {msg}".format(stat=e.code, msg=e.reason) | |
return response | |
def get_tables(self, bucketid): | |
"""Gets all tables for given bucket id | |
""" | |
url = "{api}buckets/{bck}/tables".format(api=API_URL,bck=bucketid) | |
response = {} | |
try: | |
resp = self._process_request(url, None) | |
response['items'] = list() | |
for b in resp.json(): | |
response['items'].append(b) | |
except HTTPError, e: | |
response['error'] = "{stat}: {msg}".format(stat=e.code, msg=e.reason) | |
return response | |
def get_job(self, jobid): | |
"""Gets status of job. | |
Returns: | |
dictionary from json response | |
""" | |
url = "{api}jobs/{jid}".format(api=API_URL,jid=jobid) | |
response = self._process_request(url, None, None) | |
return response.json() | |
def upload_file(self, filename, csvdata): | |
"""Uploads file to keboola storage. | |
Parameters: | |
filename: string, file name | |
data: file like object to read() data from | |
Returns: | |
file id returned from keboola | |
""" | |
url = "{api}files/prepare".format(api=API_URL) | |
data = { | |
'name': filename, | |
'sizeBytes': len(csvdata.read()), | |
'isPublic': False, | |
'notify': False | |
} | |
response = {} | |
resp = self._process_request(url, data) | |
resp = resp.json() | |
fileid = resp['id'] | |
if not 'uploadParams' in resp: | |
raise KeyError("Missing upload parameters in file upload") | |
up = resp['uploadParams'] | |
# second step uploading to S3 | |
data = { | |
'key': up['key'], | |
'acl': up['acl'], | |
'signature': up['signature'], | |
'policy': up['policy'], | |
'AWSAccessKeyId': up['AWSAccessKeyId'] | |
} | |
# return reader back to the beggining of file | |
csvdata.seek(0) | |
files = { | |
'file': (filename, csvdata) | |
} | |
resp = self._process_request(up['url'], data, files) | |
return fileid | |
def import_async(self, tableid, fileid): | |
"""Import events to storage from file already uploaded to keboola | |
Polls for import async job to finish and return the status | |
Parameters: | |
tableid: string, keboola storage table id | |
fileid: file id as returned from upload_file() | |
Return: | |
True on success | |
""" | |
# When file uploaded we have to start async-import | |
url = "{api}tables/{t}/import-async".format(api=API_URL,t=tableid) | |
data = { | |
'dataFileId': fileid, | |
'incremental': 1 | |
} | |
resp = self._process_request(url, data) | |
if resp.status_code != 202: | |
raise requests.exceptions.HTTPError("Async import returning status {c1}, but expecting 202".format(c1=resp.status_code)) | |
resp = resp.json() | |
jobid = resp['id'] | |
r = self.get_job(jobid) | |
if 'error' in r: | |
if r['error'] == 'resource not found': | |
# Job completed so fast we couldn't catch it | |
return True | |
else: | |
raise Exception(r['error']) | |
wait = 0 | |
response = {} | |
while r['status'] not in ('sucess', 'error'): | |
time.sleep(1) | |
wait += 1 | |
if wait >= MAX_WAIT_TIME: | |
raise Exception("Import timeouted for table: {t} and fileid: {fid}".format(t=tableid, fid=fileid)) | |
r = self.get_job(jobid) | |
return True | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment