Created
October 31, 2017 21:16
-
-
Save WillianFuks/de3b11293133bdb15c4f230fe07af0da to your computer and use it in GitHub Desktop.
Connector used to interact with GCP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from google.auth import app_engine | |
import googleapiclient.discovery as disco | |
from google.oauth2 import service_account | |
class GCPService(object): | |
def __init__(self, name, credentials=None): | |
"""Builds a connector to interact with Google Cloud tools. | |
:type name: str | |
:param name: name of which service to build, such as 'bigquery' | |
or 'storage'. ``name`` must be available in | |
`self.available_services()` | |
:type credentials: `google.auth.credentials.Credentials` | |
:param credentials: certificates to connect to GCP. | |
:returns: Resource object to interact with GCP | |
""" | |
if name not in self.available_services: | |
raise ValueError(("'{name}' is not a valid service." | |
"Available services are: {services}").format( | |
name=name, | |
services=",".join(list( | |
self.available_services)))) | |
# if no ``credentials`` is sent then assume we are running this code | |
# in AppEngine environment | |
if not credentials: | |
credentials = app_engine.Credentials() | |
# credentials = service_account.Credentials.\ | |
# from_service_account_file('key.json') | |
self.con = disco.build(name, | |
self.available_services[name], | |
credentials=credentials) | |
@property | |
def available_services(self): | |
"""Available services that can be used in this project""" | |
return {'bigquery': 'v2', | |
'storage': 'v1'} | |
def execute_job(self, project_id, body): | |
"""Executes a job to run in GCP. | |
:type project_id: str | |
:param projectId: name of project Id to run the job. | |
:type body: dict | |
:param body: dict that specifies the job configuration | |
""" | |
return self.con.jobs().insert(projectId=project_id, | |
body=body).execute(num_retries=3) | |
def poll_job(self, job): | |
"""Waits for a job to complete. | |
:type job: `googleapi.discovery.Resource` | |
:param job: any job that has been initiated by the connector. | |
""" | |
request = self.con.jobs().get( | |
projectId=job['jobReference']['projectId'], | |
jobId=job['jobReference']['jobId']) | |
while True: | |
result = request.execute(num_retries=3) | |
if result['status']['state'] == 'DONE': | |
if 'errorResult' in result['status']: | |
raise RuntimeError(result['status']['errorResult']) | |
return | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment