Skip to content

Instantly share code, notes, and snippets.

@Halama
Created July 17, 2013 11:21
Show Gist options
  • Save Halama/6019663 to your computer and use it in GitHub Desktop.
Save Halama/6019663 to your computer and use it in GitHub Desktop.
Keboola Storage API Python wrapper
"""This module is intended to help interactions
with Goodata Keboola storage.
It creates and performs requests and parse response back
to handler which called it.
"""
import json
import logging as log
import time
import requests
API_URL = "https://connection.keboola.com/v2/storage/"
# Maximum waiting time in seconds
MAX_WAIT_TIME = 60
class Keboola:
"""Keboola API helper
Composes requests and parses responses from Keboola
"""
def __init__(self, access_token):
self._token = access_token
def _process_request(self, url, data, files=None):
"""Processes request
Adds access token to headers.
"""
headers = {"X-StorageApi-Token": self._token}
r = requests.post(url, data=data, headers=headers, files=files)
return r
def get_buckets(self):
"""Get all buckets for your storage
"""
url = "{api}buckets".format(api=API_URL)
response = {}
try:
resp = self._process_request(url, None)
response['items'] = list()
for b in resp.json():
response['items'].append(b)
except HTTPError, e:
response['error'] = "{stat}: {msg}".format(stat=e.code, msg=e.reason)
return response
def get_bucket(self, bucketid):
"""Get detail for one bucket
"""
url = "{api}buckets/{bck}".format(api=API_URL,bkc=bucketid)
response = {}
try:
resp = self._process_request(url, None)
response['items'] = resp.json()
except HTTPError, e:
response['error'] = "{stat}: {msg}".format(stat=e.code, msg=e.reason)
return response
def get_tables(self, bucketid):
"""Gets all tables for given bucket id
"""
url = "{api}buckets/{bck}/tables".format(api=API_URL,bck=bucketid)
response = {}
try:
resp = self._process_request(url, None)
response['items'] = list()
for b in resp.json():
response['items'].append(b)
except HTTPError, e:
response['error'] = "{stat}: {msg}".format(stat=e.code, msg=e.reason)
return response
def get_job(self, jobid):
"""Gets status of job.
Returns:
dictionary from json response
"""
url = "{api}jobs/{jid}".format(api=API_URL,jid=jobid)
response = self._process_request(url, None, None)
return response.json()
def upload_file(self, filename, csvdata):
"""Uploads file to keboola storage.
Parameters:
filename: string, file name
data: file like object to read() data from
Returns:
file id returned from keboola
"""
url = "{api}files/prepare".format(api=API_URL)
data = {
'name': filename,
'sizeBytes': len(csvdata.read()),
'isPublic': False,
'notify': False
}
response = {}
resp = self._process_request(url, data)
resp = resp.json()
fileid = resp['id']
if not 'uploadParams' in resp:
raise KeyError("Missing upload parameters in file upload")
up = resp['uploadParams']
# second step uploading to S3
data = {
'key': up['key'],
'acl': up['acl'],
'signature': up['signature'],
'policy': up['policy'],
'AWSAccessKeyId': up['AWSAccessKeyId']
}
# return reader back to the beggining of file
csvdata.seek(0)
files = {
'file': (filename, csvdata)
}
resp = self._process_request(up['url'], data, files)
return fileid
def import_async(self, tableid, fileid):
"""Import events to storage from file already uploaded to keboola
Polls for import async job to finish and return the status
Parameters:
tableid: string, keboola storage table id
fileid: file id as returned from upload_file()
Return:
True on success
"""
# When file uploaded we have to start async-import
url = "{api}tables/{t}/import-async".format(api=API_URL,t=tableid)
data = {
'dataFileId': fileid,
'incremental': 1
}
resp = self._process_request(url, data)
if resp.status_code != 202:
raise requests.exceptions.HTTPError("Async import returning status {c1}, but expecting 202".format(c1=resp.status_code))
resp = resp.json()
jobid = resp['id']
r = self.get_job(jobid)
if 'error' in r:
if r['error'] == 'resource not found':
# Job completed so fast we couldn't catch it
return True
else:
raise Exception(r['error'])
wait = 0
response = {}
while r['status'] not in ('sucess', 'error'):
time.sleep(1)
wait += 1
if wait >= MAX_WAIT_TIME:
raise Exception("Import timeouted for table: {t} and fileid: {fid}".format(t=tableid, fid=fileid))
r = self.get_job(jobid)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment