Skip to content

Instantly share code, notes, and snippets.

This file has been truncated, but you can view the full file.
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
{"int_mix_float": 123, "float_mix_str": 4.2}
[
{
"mode": "NULLABLE",
"type": "FLOAT",
"name": "int_mix_float"
},
{
"mode": "NULLABLE",
"type": "STRING",
"name": "float_mix_str"
import json
from bq_schema_generator import batch_to_bq_schema
# Load the sample data into Python dictionaries
with open("tests/sample_data_1.json", "r") as f:
json_objs = f.read().split("\n")
data = [ json.loads(json_obj) for json_obj in json_objs ]
# Run batch_to_bq_schema on the list of disctionaries
bq_schema = batch_to_bq_schema(data)
@zshaoz
zshaoz / dbt_k8spodoperator.py
Created June 30, 2022 09:22
DBT Task with KubernetesPodOperator
def create_dbt_task(node, command, kube_config):
dbt_task = KubernetesPodOperator(
namespace=kube_config['namespace'],
image=f"dbt-image-name:latest", # latest dbt image created through CI/CD process
resources={'request_memory': "128M", 'request_cpu': "100m",
'limit_memory': "256M", 'limit_cpu': "200m"},
name=node,
task_id=node,
get_logs=True,
cmds=['bash'],
@zshaoz
zshaoz / dbt_pythonoperator.py
Created June 30, 2022 09:19
DBT Task with PythonOperator
from jsonrpcclient.clients.http_client import HTTPClient
def make_rpc_call_to_dbt_server(dbt_server_endpoint: str, model: str, dbt_verb: str, env: str, query_id: str) -> None:
"""
Send a request to rpc server to run a specific model within a target environment
"""
conn = HTTPClient(dbt_server_endpoint)
params = {
"jsonrpc": "2.0",