Skip to content

Instantly share code, notes, and snippets.

@piojanu
Last active January 2, 2024 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save piojanu/2ee6b91ba385dee17b85ffebf63ca583 to your computer and use it in GitHub Desktop.
Save piojanu/2ee6b91ba385dee17b85ffebf63ca583 to your computer and use it in GitHub Desktop.
A basic wrapper around the BigQuery Python client for performing group-by and aggregation operations similar to those in Pandas.
from google.cloud import bigquery
from pandas import DataFrame
class BigQueryDataFrame:
"""
This class is a wrapper around the BigQuery Python client for performing groupby and aggregation operations
similar to those in pandas, but on BigQuery data.
Basic usage:
```python
bigquery_df = BigQueryDataFrame("your-project.your-dataset.your-table")
bigquery_df = bigquery_df.filter("column5 = 'fashion'")
bigquery_df = bigquery_df.groupby("column1", "column2")
result_df = bigquery_df.agg(count_alias="count", column3="SUM", column4="AVG").to_dataframe()
```
This will execute a query similar to:
```sql
SELECT
column1, column2,
COUNT(*) as count, SUM(column3) as sum_column3, AVG(column4) as avg_column4
FROM `your-project.your-dataset.your-table`
WHERE column5 = 'fashion'
GROUP BY column1, column2
```
and return the results as a pandas DataFrame.
The class also supports nesting of queries. For example:
```python
bigquery_df = bigquery_df.groupby("item_id").agg(count_alias="item_freq")
bigquery_df = bigquery_df.groupby("item_freq").agg(count_alias="item_count")
result_df = bigquery_df.to_dataframe()
```
In this example, the class builds nested queries to first count occurrences of each item, and then count the resulting counts.
"""
def __init__(self, table_id: str):
"""
Initializes the BigQuery client and sets the table ID.
"""
self.client = bigquery.Client()
self.table_id = table_id
self.filters = None
self.groupby_columns = None
self.agg_columns = None
self.query = None
def filter(self, *args: str) -> 'BigQueryDataFrame':
"""
Sets the filters to apply. Does not execute the query.
"""
self.filters = " AND ".join(args)
return self
def groupby(self, *args: str) -> 'BigQueryDataFrame':
"""
Sets the columns to group by. Does not execute the query.
"""
self.groupby_columns = ", ".join(args)
return self
def agg(self, count_alias=None, **kwargs: str) -> 'BigQueryDataFrame':
"""
Sets the aggregations to perform. Does not execute the query.
"""
if count_alias:
self.agg_columns = f"COUNT(*) as {count_alias}"
if kwargs:
self.agg_columns += ", "
self.agg_columns += ", ".join(
f"{value}({key}) as {value.lower()}_{key}"
for key, value in kwargs.items()
) if kwargs else ""
self._build_query()
return self
def to_dataframe(self) -> DataFrame:
"""
Executes the query, returns the result as a pandas DataFrame, and resets the query.
"""
df = self._execute_query()
self.query = None # Reset the query
return df
def _build_query(self) -> None:
"""
Builds the SQL query.
"""
# Step 1: Begin the query with "SELECT"
query = f"SELECT\n\t{self.groupby_columns},\n\t{self.agg_columns}\n"
# Step 2: Add the "FROM" clause
# If a query already exists, nest it as a subquery
if self.query:
indented_query = '\n\t'.join(self.query.split('\n')) # Indent each line of the existing query
query += f"FROM (\n\t{indented_query}\n)"
else:
query += f"FROM `{self.table_id}`"
# Step 3: Add the "WHERE" clause if filters have been set
if self.filters:
query += f"\nWHERE {self.filters}"
self.filters = None # Reset the filters
# Step 4: Add the "GROUP BY" clause
query += f"\nGROUP BY {self.groupby_columns}"
self.query = query
def _execute_query(self) -> DataFrame:
"""
Executes the query and returns the result as a pandas DataFrame.
"""
return self.client.query(self.query).to_dataframe()
def plot_data(df, x_col, y_col, alpha=0.1, log_scale=True, log_log_scale=False):
# Sort the data by x_col so the line plot makes sense
df_sorted = df.sort_values(x_col)
# Calculate exponential moving average
if alpha is not None:
df_sorted[f'{y_col}_smooth'] = df_sorted[y_col].ewm(alpha=alpha, adjust=True).mean()
plt.plot(df_sorted[x_col], df_sorted[y_col], label='Original')
# Plot smoothed line only when window_size is not None
if alpha is not None:
plt.plot(df_sorted[x_col], df_sorted[f'{y_col}_smooth'], label='Smoothed', color='red')
plt.xlabel(x_col)
plt.ylabel(y_col)
if log_scale or log_log_scale:
plt.yscale('log') # This line changes the scale of the y-axis to logarithmic
if log_log_scale:
plt.xscale('log') # This line changes the scale of the x-axis to logarithmic
plt.title(f'{y_col} versus {x_col}')
plt.grid(True) # Add gridlines for better visualization
plt.legend()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment