Skip to content

Instantly share code, notes, and snippets.

@coding-to-music
Forked from korakot/bq_easy.py
Created October 26, 2019 02:13
Show Gist options
  • Save coding-to-music/e77e32a3e4cd7ff8cfd26ce605677e0a to your computer and use it in GitHub Desktop.
Save coding-to-music/e77e32a3e4cd7ff8cfd26ce605677e0a to your computer and use it in GitHub Desktop.
Using bigquery easily in Google Colab. Focus on a single table.
import re
import pandas as pd
from google.cloud import bigquery
from google.colab import auth
PROJECT_ID = 'sql-hunt' # change to your own project
class BqTable:
def __init__(self, table, dataset='samples', active_project='bigquery-public-data', client=None):
if client: # allow reuse client from other table
c = client
else:
c = bigquery.Client(project=PROJECT_ID) # for connection only
self.client = c
self.project = active_project
self.dataset = dataset
self.table = table
self.full_name = f"`{active_project}.{dataset}.{table}`"
# get real table object
self.d = c.get_dataset(c.dataset(dataset, active_project))
self.t = c.get_table(self.d.table(table))
self.schema = self.t.schema
self.max_gb = 1. # change it to set limit
def query(self, query):
query_size = self.query_size(query)
if query_size <= self.max_gb:
job = self.client.query(query)
rows = list(job.result(timeout=30))
if len(rows) == 0:
print("Query returned no rows.")
return None
columns = list(rows[0]._xxx_field_to_index.keys())
return pd.DataFrame(
data=[list(x.values()) for x in rows], columns=columns)
print(f"Query cancelled; estimated size of {query_size} exceeds limit of {self.max_gb} GB")
def select(self, columns, where=None, group_by=None, order_by=None, limit=None):
query = f"SELECT {columns} FROM {self.full_name}"
if where:
if not re.match('WHERE|GROUP|ORDER', where, re.I):
where = 'WHERE ' + where
query += " " + where
if group_by:
query += " GROUP BY " + group_by
if order_by:
query += " ORDER BY " + order_by
if limit:
query += " LIMIT " + str(limit)
return self.query(query)
def query_size(self, query):
config = bigquery.QueryJobConfig()
config.dry_run = True
job = self.client.query(query, job_config=config)
return job.total_bytes_processed / 2**30 # GB
def head(self, num_rows=5, start_index=None, columns=None):
if type(columns)==str:
columns = re.split(', *', columns)
schema = None
if columns:
schema = [col for col in self.t.schema if col.name in columns]
results = self.client.list_rows(self.t, selected_fields=schema,
max_results=num_rows, start_index=start_index)
rows = [x for x in results]
columns = list(rows[0]._xxx_field_to_index.keys())
return pd.DataFrame(
data=[list(x.values()) for x in rows], columns=columns)
@property
def size(self):
query = f"""
SELECT SUM(size_bytes) /POW(2,30)
FROM `{self.project}.{self.dataset}.__TABLES__`
WHERE table_id='{self.table}'
"""
job = self.client.query(query)
gb = list(job.result())[0][0]
return gb
def __str__(self):
'''Use in query e.g. SELECT f FROM {tbl}'''
return self.full_name
def __repr__(self):
params = f'"{self.table}", "{self.dataset}"'
if self.project != 'bigquery-public-data':
params += f', "self.project"'
return f'BqTable({params})'
def view(self, columns, where=None, group_by=None, order_by=None, limit=None):
'''Same as select(), but for WITH-AS'''
query = f"SELECT {columns} FROM {self.full_name}"
if where:
if not re.match('WHERE|GROUP|ORDER', where, re.I):
where = 'WHERE ' + where
query += " " + where
if group_by:
query += " GROUP BY " + group_by
if order_by:
query += " ORDER BY " + order_by
if limit:
query += " LIMIT " + str(limit)
return '('+ query +')'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment