Skip to content

Instantly share code, notes, and snippets.

@davepeck
Last active January 5, 2024 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davepeck/0538fd39fbe677e35635956a199002df to your computer and use it in GitHub Desktop.
Save davepeck/0538fd39fbe677e35635956a199002df to your computer and use it in GitHub Desktop.
Toying around with a very naive python BigQuery query builder + related utilities
"""BigQuery utilities."""
import datetime
import typing as t
from abc import ABC, abstractmethod
from google.cloud import bigquery
from google.cloud.bigquery import query
type QueryParamValue = str | int | float | datetime.date | datetime.datetime | t.Sequence[ # noqa: E501
str
]
type QueryParams = t.Mapping[str, QueryParamValue]
def default_table_alias(table_name: str, table_alias: str | None = None) -> str | None:
"""Return the default table alias if appropriate, and no explicit alias is given."""
if not table_alias and "." in table_name:
table_alias = table_name.split(".")[-1]
return table_alias
class Statement:
"""A BigQuery SQL statement builder."""
table_name: str
table_alias: str | None
select_columns: list[str]
filters: list[str]
joins: list[str]
params: dict[str, QueryParamValue]
def __init__(self, table_name: str, table_alias: str | None = None):
self.table_name = table_name
self.table_alias = default_table_alias(table_name, table_alias)
self.table_alias = table_alias
self.select_columns = []
self.filters = []
self.joins = []
self.params = {}
def select(
self,
*columns: str | tuple[str, str],
):
"""Add a column to the SELECT clause of the query."""
for column in columns:
if isinstance(column, tuple):
self.select_columns.append(f"{column[0]} AS {column[1]}")
else:
self.select_columns.append(column)
return self
def where(self, column, operator, value, param_name=None):
"""Add a filter to the WHERE clause of the query."""
if param_name is None:
param_name = f"param{len(self.params)}"
self.params[param_name] = value
if operator.lower() == "in":
condition = f"{column} IN UNNEST(@{param_name})"
else:
condition = f"{column} {operator} @{param_name}"
self.filters.append(condition.strip())
return self
def join(
self, table_name, on_clause, join_type="INNER", table_alias: str | None = None
):
"""Add a JOIN clause to the query."""
table_alias = default_table_alias(table_name, table_alias)
alias_clause = f" AS {table_alias}" if table_alias else ""
self.joins.append(
f"{join_type} JOIN `{table_name}`{alias_clause} ON {on_clause}"
)
return self
def build_query(self):
"""Build the query string."""
select_clause = ", ".join(self.select_columns) or "*"
join_clause = " ".join(self.joins).strip()
where_clause = " AND ".join(self.filters) if self.filters else ""
alias_clause = f" AS {self.table_alias}" if self.table_alias else ""
query_parts = [
f"SELECT {select_clause} FROM `{self.table_name}`{alias_clause}",
]
if join_clause:
query_parts.append(join_clause)
if where_clause:
query_parts.append(f"WHERE {where_clause}")
return " ".join(query_parts).strip()
def build_query_param(
self, name: str, value: QueryParamValue
) -> query._AbstractQueryParameter:
"""Build a query parameter."""
if isinstance(value, str):
return bigquery.ScalarQueryParameter(name, "STRING", value)
elif isinstance(value, int):
return bigquery.ScalarQueryParameter(name, "INT64", value)
elif isinstance(value, float):
return bigquery.ScalarQueryParameter(name, "FLOAT64", value)
elif isinstance(value, datetime.datetime):
return bigquery.ScalarQueryParameter(name, "DATETIME", value)
elif isinstance(value, datetime.date):
return bigquery.ScalarQueryParameter(name, "DATE", value)
elif isinstance(value, t.Sequence):
if all(isinstance(v, str) for v in value):
return bigquery.ArrayQueryParameter(name, "STRING", value)
raise ValueError(f"Unsupported parameter type: {type(value)}")
def build_query_params(self):
"""Build the query parameters."""
return [
self.build_query_param(name, value) for name, value in self.params.items()
]
def build_query_job_config(self):
"""Build the query job config."""
return bigquery.QueryJobConfig(query_parameters=self.build_query_params())
class BQClient(bigquery.Client):
"""Our BigQuery client, with some extra methods for convenience."""
def execute(self, statement: Statement):
"""Execute the query."""
query = statement.build_query()
job_config = statement.build_query_job_config()
job = self.query(query, job_config=job_config)
return job.result()
class Table[ModelT](ABC):
"""Base class for all 'tables' -- aka managers for getting data from BQ."""
client: BQClient
name: str
alias: str | None
def __init__(self, client: BQClient, name: str, alias: str | None = None):
self.client = client
self.name = name
self.alias = default_table_alias(name, alias)
@abstractmethod
def get_model_instance(self, bq_row: t.Any) -> ModelT:
"""Create an instance from a BigQuery row."""
...
def execute(self, statement: Statement) -> t.Iterable[ModelT]:
"""Execute a BigQuery statement."""
return (self.get_model_instance(row) for row in self.client.execute(statement))
def all_stmt(self) -> Statement:
"""Return the default statement."""
return Statement(self.name, self.alias)
def all(self) -> t.Iterable[ModelT]:
"""Return the default query."""
return self.execute(self.all_stmt())
def get_client(project_id: str = "five-minute-5") -> BQClient:
"""Get a BigQuery client for a specific GCP project ID."""
return BQClient(project_id)
import pydantic as p
from server.utils.bq import BQClient, Statement, Table
def get_yy(yy: str | datetime.date) -> str:
"""Return the two-digit year for the given date."""
return yy.strftime("%y") if isinstance(yy, datetime.date) else yy[-2:]
class Committee(p.BaseModel, frozen=True):
"""Represents an FEC committee."""
id: str
name: str | None
party: str | None
candidate_id: str | None
@property
def adjusted_party(self) -> str | None:
"""
Return the FEC reported party, except in a few key cases,
where we know better. (For instance, ActBlue has a "party"
of NULL in the FEC's 2020 dataset. But we know it's mostly
DEM.)
"""
if self.id in KNOWN_DEM_COMMITTEE_IDS:
return Party.DEMOCRAT
return self.party
class CommitteeTable(Table[Committee]):
"""Tools for querying the BigQuery committee master file."""
def __init__(self, client: BQClient, year: str | datetime.date):
super().__init__(client, f"bigquery-public-data.fec.cm{get_yy(year)}")
def get_model_instance(self, row: t.Any) -> Committee:
"""Create a committee from a row of the committee master file."""
return Committee(
id=row.cmte_id,
name=row.cmte_nm,
party=row.cmte_pty_affiliation,
candidate_id=row.cand_id,
)
def for_name_stmt(self, name: str) -> Statement:
"""Return a select statement for committees matching the given criteria."""
return self.all_stmt().where("cmte_nm", "LIKE", f"%{name.upper()}%")
def for_name(self, name: str) -> t.Iterable[Committee]:
"""Return a query for committees matching the given criteria."""
return self.execute(self.for_name_stmt(name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment