Skip to content

Instantly share code, notes, and snippets.

@diogommartins
Created March 7, 2023 04:14
Show Gist options
  • Save diogommartins/c295b03700a3a336af1ef08e147d7a13 to your computer and use it in GitHub Desktop.
Save diogommartins/c295b03700a3a336af1ef08e147d7a13 to your computer and use it in GitHub Desktop.
amora models import
Index: amora/providers/bigquery.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/amora/providers/bigquery.py b/amora/providers/bigquery.py
--- a/amora/providers/bigquery.py (revision 9423ae400a81b138623bddacffe2096834d066bc)
+++ b/amora/providers/bigquery.py (revision 40cd4646db1f3b816fcb7af81761007cd9e768d5)
@@ -6,6 +6,21 @@
import pandas as pd
import sqlalchemy
+from amora.compilation import compile_statement
+from amora.config import settings
+from amora.contracts import BaseResult
+from amora.logger import log_execution, logger
+from amora.models import (
+ SQLALCHEMY_METADATA_KEY,
+ AmoraModel,
+ Field,
+ MaterializationTypes,
+ Model,
+ amora_model_for_path,
+)
+from amora.protocols import Compilable
+from amora.storage import cache
+from amora.version import VERSION
from google.api_core.client_info import ClientInfo
from google.api_core.exceptions import NotFound
from google.cloud.bigquery import (
@@ -16,6 +31,8 @@
TableReference,
)
from google.cloud.bigquery.table import RowIterator, _EmptyRowIterator
+from jinja2 import Environment, PackageLoader, select_autoescape
+from shed import shed
from sqlalchemy import (
Column,
String,
@@ -39,21 +56,6 @@
from sqlalchemy_bigquery import STRUCT
from sqlalchemy_bigquery.base import BQArray, BQBinary, unnest
-from amora.compilation import compile_statement
-from amora.config import settings
-from amora.contracts import BaseResult
-from amora.logger import log_execution, logger
-from amora.models import (
- SQLALCHEMY_METADATA_KEY,
- AmoraModel,
- Field,
- MaterializationTypes,
- Model,
-)
-from amora.protocols import Compilable
-from amora.storage import cache
-from amora.version import VERSION
-
Schema = List[SchemaField]
BQTable = Union[Table, TableReference, str]
@@ -117,6 +119,13 @@
sqltypes.Time: "TIME",
}
+JINJA2_NEW_MODEL_TEMPLATE = Environment(
+ loader=PackageLoader("amora"),
+ autoescape=select_autoescape(),
+ trim_blocks=True,
+ lstrip_blocks=True,
+).get_template("new-model.py.jinja2")
+
class TimePart(Enum):
"""
@@ -174,6 +183,77 @@
return f"{model.__table__.metadata.schema}.{model.__tablename__}"
+def list_tables(dataset_reference: str) -> List[str]:
+ """
+ List tables in the dataset.
+ Read more: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
+
+ >>> amora.providers.bigquery.list_tables("amora-data-build-tool.amora")
+ [
+ 'amora-data-build-tool.amora.array_repeated_fields',
+ 'amora-data-build-tool.amora.health',
+ 'amora-data-build-tool.amora.heart_rate',
+ 'amora-data-build-tool.amora.heart_rate_agg',
+ 'amora-data-build-tool.amora.heart_rate_over_100',
+ 'amora-data-build-tool.amora.step_count_by_source',
+ 'amora-data-build-tool.amora.steps',
+ 'amora-data-build-tool.amora.steps_agg'
+ ]
+ """
+ return [
+ str(table_list_item.reference)
+ for table_list_item in get_client().list_tables(dataset_reference)
+ ]
+
+
+def import_table(table_reference: str, overwrite=False) -> Model:
+ """
+ Creates an `AmoraModel` file from a table reference and returns the model reference.
+ E.g.:
+
+ >>> amora.providers.bigquery.import_table("amora-data-build-tool.amora.health")
+ """
+ destination_file_path = settings.models_path.joinpath(
+ table_reference.replace("-", "_").replace(".", "/") + ".py"
+ )
+
+ if destination_file_path.exists() and not overwrite:
+ raise ValueError(
+ f"`{destination_file_path}` already exists. "
+ f"Pass `--overwrite` to overwrite file.",
+ )
+
+ project, dataset, table = table_reference.split(".")
+ model_name = "".join(part.title() for part in table.split("_"))
+ sorted_schema = sorted(get_schema(table_reference), key=lambda field: field.name)
+
+ model_source_code = JINJA2_NEW_MODEL_TEMPLATE.render(
+ BIGQUERY_TYPES_TO_PYTHON_TYPES=BIGQUERY_TYPES_TO_PYTHON_TYPES,
+ BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES=BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES,
+ dataset=dataset,
+ dataset_id=f"{project}.{dataset}",
+ model_name=model_name,
+ project=project,
+ schema=sorted_schema,
+ table=table,
+ )
+
+ formatted_source_code = shed(model_source_code)
+
+ destination_file_path.parent.mkdir(parents=True, exist_ok=True)
+ destination_file_path.write_text(data=formatted_source_code)
+
+ logger.info(
+ f"🎉 Amora Model imported",
+ extra=dict(
+ destination_file_path=destination_file_path,
+ model_name=model_name,
+ table_reference=table_reference,
+ ),
+ )
+ return amora_model_for_path(destination_file_path)
+
+
def get_schema(table_id: str) -> Schema:
"""
Given a `table_id`, returns the `Schema` of the table by querying BigQueries API
@@ -604,10 +684,9 @@
This is used to produce `ARRAY` literals in SQL expressions, e.g.:
```python
- from sqlalchemy import select
-
from amora.compilation import compile_statement
from amora.providers.bigquery import array
+ from sqlalchemy import select
stmt = select([array([1, 2]).label("a"), array([3, 4, 5]).label("b")])
Index: amora/cli/models.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/amora/cli/models.py b/amora/cli/models.py
--- a/amora/cli/models.py (revision 23a6da8ada76dbdad4077087fc125922146f81c3)
+++ b/amora/cli/models.py (revision 40cd4646db1f3b816fcb7af81761007cd9e768d5)
@@ -1,26 +1,20 @@
import json
from dataclasses import dataclass
-from pathlib import Path
from typing import List, Optional
import typer
-from jinja2 import Environment, PackageLoader, select_autoescape
-from rich.console import Console
-from rich.table import Table
-from rich.text import Text
-from shed import shed
-
from amora.config import settings
from amora.models import Model, list_models
+from amora.providers import bigquery
from amora.providers.bigquery import (
- BIGQUERY_TYPES_TO_PYTHON_TYPES,
- BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES,
DryRunResult,
dry_run,
estimated_query_cost_in_usd,
estimated_storage_cost_in_usd,
- get_schema,
)
+from rich.console import Console
+from rich.table import Table
+from rich.text import Text
app = typer.Typer(help="List or import Amora Models")
@@ -167,86 +161,36 @@
typer.echo(json.dumps(output))
-@app.command(name="import")
-def models_import(
- table_reference: str = typer.Option(
- ...,
- "--table-reference",
+models_import = typer.Typer(help="Import models")
+app.add_typer(models_import, name="import")
+
+
+@models_import.command("table", help="Generate an AmoraModel file from a table")
+def models_import_table(
+ table_reference: str = typer.Argument(
+ None,
help="BigQuery unique table identifier. "
- "E.g.: project-id.dataset-id.table-id",
+ "E.g.: `amora-data-build-tool.amora.health`",
),
- model_file_path: str = typer.Argument(
+ overwrite: bool = typer.Option(
+ False, help="Overwrite the output file if one already exists"
+ ),
+):
+ bigquery.import_table(table_reference, overwrite)
+
+
+@models_import.command(
+ "dataset", help="Generate AmoraModel files for dataset contents."
+)
+def models_import_dataset(
+ dataset_reference: str = typer.Argument(
None,
- help="Canonical name of python module for the generated AmoraModel. "
- "A good pattern would be to use an unique "
- "and deterministic identifier, like: `project_id.dataset_id.table_id`",
+ help="BigQuery unique dataset identifier. "
+ "E.g.: `amora-data-build-tool.amora`",
),
overwrite: bool = typer.Option(
False, help="Overwrite the output file if one already exists"
),
):
- """
- Generates a new amora model file from an existing table/view
-
- ```shell
- amora models import --table-reference my_gcp_project.my_dataset.my_table my_gcp_project/my_dataset/my_table
- ```
- """
-
- env = Environment(
- loader=PackageLoader("amora"),
- autoescape=select_autoescape(),
- trim_blocks=True,
- lstrip_blocks=True,
- )
- template = env.get_template("new-model.py.jinja2")
-
- project, dataset, table = table_reference.split(".")
- model_name = "".join(part.title() for part in table.split("_"))
-
- if model_file_path:
- destination_file_path = Path(model_file_path)
- if (
- destination_file_path.is_absolute()
- and settings.models_path not in destination_file_path.parents
- ):
- typer.echo(
- "Destination path must be relative to the configured models path",
- err=True,
- )
- raise typer.Exit(1)
- else:
- destination_file_path = settings.models_path.joinpath(
- model_name.replace(".", "/") + ".py"
- )
-
- if destination_file_path.exists() and not overwrite:
- typer.echo(
- f"`{destination_file_path}` already exists. "
- f"Pass `--overwrite` to overwrite file.",
- err=True,
- )
- raise typer.Exit(1)
-
- sorted_schema = sorted(get_schema(table_reference), key=lambda field: field.name)
- model_source_code = template.render(
- BIGQUERY_TYPES_TO_PYTHON_TYPES=BIGQUERY_TYPES_TO_PYTHON_TYPES,
- BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES=BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES,
- dataset=dataset,
- dataset_id=f"{project}.{dataset}",
- model_name=model_name,
- project=project,
- schema=sorted_schema,
- table=table,
- )
- formatted_source_code = shed(model_source_code)
-
- destination_file_path.parent.mkdir(parents=True, exist_ok=True)
- destination_file_path.write_text(data=formatted_source_code)
-
- typer.secho(
- f"🎉 Amora Model `{model_name}` (`{table_reference}`) imported!",
- fg=typer.colors.GREEN,
- bold=True,
- )
- typer.secho(f"Current File Path: `{destination_file_path.as_posix()}`")
+ for table in bigquery.list_tables(dataset_reference):
+ bigquery.import_table(table, overwrite)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment