Skip to content

Instantly share code, notes, and snippets.

@jmelloy
Created July 14, 2017 01:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmelloy/80f6184be9e794a5b5f7d91f8a0f66d1 to your computer and use it in GitHub Desktop.
Save jmelloy/80f6184be9e794a5b5f7d91f8a0f66d1 to your computer and use it in GitHub Desktop.
import datetime
import logging
import struct
import sys
import os
sys.path.append("lib")
import httplib2
from googleapiclient import http
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
logger = logging.getLogger()
GOOGLEAPI_AUTH_URL = 'https://www.googleapis.com/auth/'
DEBUG = False
http_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "responses")
if DEBUG and not os.path.exists(http_path):
os.makedirs(http_path)
def dparse(data, path):
x = path.split("/")
for k in x:
if k:
data = data.get(k, {})
if data:
return data
return None
class HttpRecord(httplib2.Http):
def request(self, uri, *args, **kwargs):
resp, content = super().request(uri, *args, **kwargs)
filename = os.path.join(http_path, f'{datetime.datetime.now()}-{uri.replace("/", "-")}-{kwargs.get("method")}')
with open(filename, "wb") as f:
f.write(content)
return resp, content
def credentials(name, json="secret.json", scope=None):
if scope is None:
scope = GOOGLEAPI_AUTH_URL + name
elif '://' in scope:
assert GOOGLEAPI_AUTH_URL in scope, scope
else:
scope = GOOGLEAPI_AUTH_URL + scope
cred = ServiceAccountCredentials.from_json_keyfile_name(json, scope)
assert cred.invalid is False
return cred
def service(name, json="secret.json", scope=None, v=None):
v = v or ('v2' if name == 'drive' else 'v1')
cred = credentials(name, json, scope=scope)
http = httplib2.Http()
if DEBUG:
http = HttpRecord()
return build(name, v, http=cred.authorize(http))
def cs_put_object(file, bucket, key, mimetype="application/octet-stream", check_size=True, gs=None):
"""Uploads a file to google cloud storage. Retries once automatically if 503 returned"""
gs = gs or service("storage", scope="devstorage.read_write")
logger.info("[upload_cloud_store] Uploading %s to %s" % (key, bucket))
if check_size:
exists, data = cs_get_object_info(bucket, key)
# I'd like to use md5 for this but not sure how google's calculating it
# u'md5Hash': u'n2j1RoJz0ewlq7khTTCdwg==', ??? maybe base64?
if exists and data["size"] == check_size:
logger.info("[upload_cloud_store] Skipping upload for %s" % (key, ))
return data
upload = http.MediaIoBaseUpload(file, mimetype=mimetype, resumable=True)
resp = gs.objects().insert(
bucket=bucket,
name=key,
media_body=upload
).execute()
logger.debug(resp)
return resp
def cs_get_object(bucket, key, gs=None, tracer=logger.info, filename=None):
import io
gs = gs or service("storage", scope="devstorage.read_write")
# https://cloud.google.com/storage/docs/json_api/v1/objects/get
# Get Metadata
req = gs.objects().get(
bucket=bucket,
object=key)
resp = req.execute()
# Get Payload Data
req = gs.objects().get_media(
bucket=bucket,
object=key)
# The BytesIO object may be replaced with any io.Base instance.
if not filename:
fh = io.BytesIO()
else:
fh = open(filename, "wb")
downloader = http.MediaIoBaseDownload(fh, req, chunksize=1024 * 1024)
done = False
while not done:
status, done = downloader.next_chunk()
if status:
tracer('Download %d%%.' % int(status.progress() * 100))
tracer('Download Complete!')
if filename:
fh.close()
fh = open(filename)
else:
fh.seek(0)
return fh
def cs_get_object_info(bucket, key, gs=None):
gs = gs or service("storage", scope="devstorage.read_write")
req = gs.objects().get(
bucket=bucket,
object=key)
try:
resp = req.execute()
resp["size"] = int(resp["size"])
return True, resp
except http.HttpError as he:
if "404" in str(he):
return False, {}
else:
logger.error(he)
raise
def cs_delete_object(bucket, key, gs=None):
gs = gs or service("storage", scope="devstorage.read_write")
logger.info("Deleting %s from %s" % (bucket, key))
req = gs.objects().delete(
bucket=bucket,
object=key
)
resp = req.execute()
# Success returns empty string: ''
return resp
def bq_job_status(jobId, projectId="whois-980", gs=None):
""" Checks the status of a bigquery job, such as returned by bq_export, load, or asynchronous query """
gs = gs or service("bigquery", v="v2")
resp = gs.jobs().get(projectId=projectId, jobId=jobId).execute()
return resp
def bq_load(gs_filename, destination, projectId="whois-980",
datasetId='whois2015', fields=[], gs=None, **kwargs):
gs = gs or service("bigquery", v="v2")
if type(gs_filename) != list:
gs_filename = [gs_filename]
if type(fields) != list:
raise ValueError("Fields is not a list!")
logger.info("[bq_load] Loading %s into %s %s" % (gs_filename, destination,
",".join(["%s:%s" % (k, v)
for k, v in kwargs.items()]) if kwargs else ""))
field_names = []
for f in fields:
r = f.split(":")
field_type = 'STRING'
if len(r) > 1:
field_type = r[1]
field_names.append({"name": r[0], "type": field_type, "mode": 'NULLABLE'})
# https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
body = {
"configuration": { # [Required] Describes the job configuration.
"load": { # [Pick one] Configures a load job.
"destinationTable": { # [Required] The destination table to load the data into.
"projectId": projectId, # [Required] The ID of the project containing this table.
"tableId": destination,
# [Required] The ID of the table. The ID must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_). The maximum length is 1,024 characters.
"datasetId": datasetId, # [Required] The ID of the dataset containing this table.
},
"sourceUris": # [Required] The list of fully-qualified URIs that point to your data in Google Cloud Storage. Each URI can contain one '*' wildcard character and it must come after the 'bucket' name.
gs_filename
,
"schema": {
# [Optional] The schema for the destination table. The schema can be omitted if the destination table already exists or if the schema can be inferred from the loaded data.
"fields": field_names,
},
},
},
}
body["configuration"]["load"].update(kwargs)
resp = gs.jobs().insert(
projectId=projectId,
body=body
).execute()
logger.debug(resp)
logger.info("[bq_load] Started job %s" % resp["jobReference"]["jobId"])
return resp["jobReference"]["jobId"]
def bq_tables(datasetId='whois2015', projectId="whois-980", tracer=logger.info, gs=None, maxResults=1000):
gs = gs or service("bigquery", v="v2")
tracer = tracer or globals()['tracer']()
token = ''
tables = []
while token is not None:
rs = gs.tables().list(
projectId=projectId,
datasetId=datasetId,
maxResults=maxResults,
pageToken=token
).execute()
for t in tables:
yield t["tableReference"]["tableId"]
token = rs.get("nextPageToken")
tracer("Got %d tables" % len(tables))
class BigQueryException(Exception):
pass
class BigQuery_Job(object):
"""Takes a BQ job id, calls the status API, and turns results into an object
Ref: https://cloud.google.com/bigquery/docs/reference/v2/jobs#resource
"""
def __init__(self, job_id, tags=None, projectId="whois-980", skip_check=False, gs=None):
self.job_id = job_id
self.tags = tags
self.projectId = projectId
self.complete = False
self.gs = gs
resp = None
if not skip_check:
resp = bq_job_status(job_id, projectId=self.projectId, gs=gs)
self.raw_json = resp
self.from_response(resp)
def __repr__(self):
return "<BQ Job %s: %s Started: %s Finished: %s%s>" % (self.job_id, self.state,
self.startTime, self.endTime,
' (%s)' % self.tags if self.tags else '')
def update(self):
if not self.complete:
logger.debug("Checking status for %s", self.job_id)
resp = bq_job_status(self.job_id, projectId=self.projectId, gs=self.gs)
self.from_response(resp)
def wait(self, timeout=10, raise_exception=True):
import time
while not self.complete:
self.update()
logger.info("Status for %s (%s)", self.job_id, self.state)
if self.complete:
break
time.sleep(timeout)
if raise_exception and self.state == "ERROR":
raise BigQueryException("BQ: %s" % self.errors)
def from_response(self, resp):
if resp is None:
resp = {}
self.raw_json = resp
def p(path):
return dparse(self.raw_json, path)
self.kind = p("/kind")
self.etag = p("/etag")
self.projectId = p("/jobReference/projectId")
self.jobId = p("/jobReference/jobId")
self.selfLink = p("/selfLink")
self.creationTime = _safe_datetime_from_ms(p("/statistics/creationTime"))
self.startTime = _safe_datetime_from_ms(p("/statistics/startTime"))
self.endTime = _safe_datetime_from_ms(p("/statistics/endTime"))
self.id = p("/id")
self.state = p("/status/state")
if p("/status/errorResult"):
self.state = 'ERROR'
self.errors = ["%s (%s)" % (u['message'], u.get("location", "")) for u in p("/status/errors")]
self.errorResult = p("/status/errorResult/message")
else:
self.errors = None
self.errorResult = None
self.job_type = p("/configuration.keys()[0]")
# job_type == 'extract':
self.destinationUri = p("/configuration/extract/destinationUri")
# job_type == 'query':
self.query = p("/configuration/query/query")
self.destinationTableId = p("/configuration/query/destinationTable/tableId")
self.destinationProjectId = p("/configuration/query/destinationTable/projectId")
self.destinationDatasetId = p("/configuration/query/destinationTable/datasetId")
self.inputFiles = _safe_int(p("/statistics/load/inputFiles"))
self.inputBytes = _safe_int(p("/statistics/load/inputFileBytes"))
self.outputRows = _safe_int(p("/statistics/load/outputRows"))
self.user_email = p("/user_email")
if self.state in ("DONE", "ERROR"):
self.complete = True
def _safe_int(x):
try:
if x is not None:
return int(x)
except ValueError as ve:
logger.warning("Could not convert %s to int", x)
return x
return x
def _safe_datetime_from_ms(x):
try:
if x is not None:
x = _safe_int(x)
return datetime.datetime.fromtimestamp(x / 1000)
return None
except Exception as e:
logger.warning("Could not convert %s to date (%s)", x, e)
return x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment