Skip to content

Instantly share code, notes, and snippets.

@alrocar
Created April 18, 2018 05:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alrocar/89b1846deb5d9436e09e31e4f99c6a92 to your computer and use it in GitHub Desktop.
Save alrocar/89b1846deb5d9436e09e31e4f99c6a92 to your computer and use it in GitHub Desktop.
This is a sample snippet to ingest data from BigQuery into CARTO by using both Python SDKs
A snippet to make a query to BigQuery and import the result in CARTO using both Python client libraries:
```
#!/usr/bin/env python
import argparse
import time
import uuid
from tempfile import NamedTemporaryFile
from carto.auth import APIKeyAuthClient
from carto.exceptions import CartoException
from carto.datasets import DatasetManager
from google.cloud import bigquery
from google_auth_oauthlib import flow
USERNAME = "aromeu"
ORGANIZATION = "team"
API_KEY = "WRITE_HERE_YOUR_API_KEY"
USR_BASE_URL = "https://{organization}.carto.com/user/{user}".format(
organization=ORGANIZATION, user=USERNAME)
auth_client = APIKeyAuthClient(
api_key=API_KEY, base_url=USR_BASE_URL,
organization=ORGANIZATION)
def wait_for_job(job):
while True:
job.reload() # Refreshes the state via a GET request.
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
def run_query(credentials, project, query):
client = bigquery.Client(project=project, credentials=credentials)
query_job = client.run_async_query(str(uuid.uuid4()), query)
query_job.use_legacy_sql = False
query_job.begin()
wait_for_job(query_job)
# Drain the query results by requesting a page at a time.
query_results = query_job.results()
page_token = None
fill_field_names = True
with NamedTemporaryFile(mode="wb", suffix=".csv") as data:
while True:
rows, total_rows, page_token = query_results.fetch_data(
max_results=1000,
page_token=page_token)
if fill_field_names:
field_names = [field.name for field in query_results.schema]
if field_names is not None:
data.write(",".join(field_names))
data.write("\n")
fill_field_names = False
for row in rows:
row = list(map(str, row))
data.write(",".join(row) + "\n")
if not page_token:
break
dataset_manager = DatasetManager(auth_client)
data.flush()
try:
with open(data.name, "rb") as archive:
dataset_manager.create(archive)
except CartoException as e:
print("some error ocurred", e)
except:
archive.close()
data.close()
def authenticate_and_query(project, query, launch_browser=True):
appflow = flow.InstalledAppFlow.from_client_secrets_file(
'client_secrets.json',
scopes=['https://www.googleapis.com/auth/bigquery'])
if launch_browser:
appflow.run_local_server()
else:
appflow.run_console()
run_query(appflow.credentials, project, query)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'--launch-browser',
help='Use a local server flow to authenticate. ',
action='store_true')
parser.add_argument('project', help='Project to use for BigQuery billing.')
parser.add_argument('query', help='BigQuery SQL Query.')
args = parser.parse_args()
authenticate_and_query(
args.project, args.query, launch_browser=args.launch_browser)
```
The `requirements.txt` file:
```
carto==1.0.1
google-cloud-bigquery==0.24.0
google-auth-oauthlib==0.1.0
pytz==2017.2
```
To run the `sample.py` script execute:
```
python sample.py --launch-browser "eternal-ship-170218" "select * from test.test"
```
Where:
- `eternal-ship-170218` is the name of the BigQuery project
- `select * from test.test` is the query
- `--launch-browser` opens a browser window for the OAuth workflow.
In order to enable authentication, you have to put in the same directory a `client_secrets.json` file with the client_id and client_secret (it's a downloadable file from the BigQuery Web UI)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment