Skip to content

Instantly share code, notes, and snippets.

@victorouse
Created September 26, 2023 02:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save victorouse/01d6e246e269718d23ed2289a5bdc427 to your computer and use it in GitHub Desktop.
Save victorouse/01d6e246e269718d23ed2289a5bdc427 to your computer and use it in GitHub Desktop.
Example Airflow job to get data from an API and store it in Cloud Storage and BigQuery
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.variable import Variable
from airflow.providers.google.cloud.transfers.local_to_gcs import (
LocalFilesystemToGCSOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
API_URL = "api.domain.com/example"
def save_data_from_api(**context):
response = requests.get(
f"{API_URL}/v1/path/to/resource",
auth=HTTPBasicAuth(
Variable.get("api_key"), Variable.get("api_secret")
),
)
if response.status_code != 200:
raise Exception(
f"Request failed with status code {response.status_code}: {response.text}"
)
jit_limits = response.json()
# Transpose to turn dictionary into list, i.e.
# { foo: 1, bar: 2, baz: 3 }
# => [ { foo: 1, bar: 2, baz: 3 } ]
df = pd.DataFrame.from_dict(jit_limits, orient="index").transpose()
# Then add an insertion timestamp to all rows
df["timestamp"] = datetime.now()
filename = f"data_{context['run_id']}.csv"
df.to_csv(filename, header=True, index=False)
# Return filename for upload
return filename
default_params = {
"bucket": "my_bucket",
"bucket_folder": "my_folder",
"location": "australia-southeast1",
"project": "my_gcp_project_id",
"dataset": "my_dataset",
"table": "my_table",
}
default_args = {
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"email": ["email@domain.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"timezone": "Australia/Sydney",
}
with DAG(
"alpaca_jit_daily_limits",
default_args=default_args,
params=default_params,
description="ETL for Alpaca JIT Daily Limits API",
schedule_interval="0 22 * * *", # 9:00 AM Sydney time
catchup=False,
render_template_as_native_obj=True,
) as dag:
filename = "{{ ti.xcom_pull(task_ids='save_jit_daily_limits') }}"
bucket = "{{ params.bucket }}"
bucket_folder = "{{ params.bucket_folder }}"
bucket_filepath = f"{bucket_folder}/{filename}"
destination_project_dataset_table = (
"{{ params.project }}.{{ params.dataset }}.{{ params.table }}"
)
save_data_from_api_task = PythonOperator(
task_id="save_jit_daily_limits",
python_callable=save_data_from_api,
provide_context=True,
dag=dag,
)
upload_to_gcs_task = LocalFilesystemToGCSOperator(
task_id="upload_to_gcs",
bucket=bucket,
src=filename,
dst=bucket_filepath,
dag=dag,
)
upload_to_bigquery_task = GCSToBigQueryOperator(
task_id="upload_to_bigquery",
bucket=bucket,
source_objects=[bucket_filepath],
destination_project_dataset_table=destination_project_dataset_table,
schema_fields=[
{"name": "the_first_column", "type": "STRING", "mode": "NULLABLE"},
{"name": "the_second_column", "type": "FLOAT", "mode": "NULLABLE"},
],
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_APPEND",
skip_leading_rows=1,
dag=dag,
)
(
save_data_from_api_task
>> upload_to_gcs_task
>> upload_to_bigquery_task
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment