Skip to content

Instantly share code, notes, and snippets.

@WillianFuks
Created October 31, 2017 21:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save WillianFuks/de3b11293133bdb15c4f230fe07af0da to your computer and use it in GitHub Desktop.
Save WillianFuks/de3b11293133bdb15c4f230fe07af0da to your computer and use it in GitHub Desktop.
Connector used to interact with GCP
import time
from google.auth import app_engine
import googleapiclient.discovery as disco
from google.oauth2 import service_account
class GCPService(object):
def __init__(self, name, credentials=None):
"""Builds a connector to interact with Google Cloud tools.
:type name: str
:param name: name of which service to build, such as 'bigquery'
or 'storage'. ``name`` must be available in
`self.available_services()`
:type credentials: `google.auth.credentials.Credentials`
:param credentials: certificates to connect to GCP.
:returns: Resource object to interact with GCP
"""
if name not in self.available_services:
raise ValueError(("'{name}' is not a valid service."
"Available services are: {services}").format(
name=name,
services=",".join(list(
self.available_services))))
# if no ``credentials`` is sent then assume we are running this code
# in AppEngine environment
if not credentials:
credentials = app_engine.Credentials()
# credentials = service_account.Credentials.\
# from_service_account_file('key.json')
self.con = disco.build(name,
self.available_services[name],
credentials=credentials)
@property
def available_services(self):
"""Available services that can be used in this project"""
return {'bigquery': 'v2',
'storage': 'v1'}
def execute_job(self, project_id, body):
"""Executes a job to run in GCP.
:type project_id: str
:param projectId: name of project Id to run the job.
:type body: dict
:param body: dict that specifies the job configuration
"""
return self.con.jobs().insert(projectId=project_id,
body=body).execute(num_retries=3)
def poll_job(self, job):
"""Waits for a job to complete.
:type job: `googleapi.discovery.Resource`
:param job: any job that has been initiated by the connector.
"""
request = self.con.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])
while True:
result = request.execute(num_retries=3)
if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
return
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment