Skip to content

Instantly share code, notes, and snippets.

@fonylew
Last active June 28, 2023 16:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fonylew/21afd9d282cb0c82bce3d0f94928364c to your computer and use it in GitHub Desktop.
Save fonylew/21afd9d282cb0c82bce3d0f94928364c to your computer and use it in GitHub Desktop.
Simple code to ingest data from REST API (Bitcoin price from Coindesk) to BigQuery in Airflow.
import json
import requests
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
URL = "https://api.coindesk.com/v1/bpi/currentprice/SGD.json"
DATASET_ID = "demo"
TABLE_NAME = "coin_price"
INTERMEDIATE_FOLDER = "/home/airflow/gcs/data/"
FILE_NAME = "coin_price.csv"
GCS_BUCKET_NAME = "CLOUD_COMPOSER_BUCKET"
def make_request(url: str) -> dict:
"""Get data via REST API call using requests package
Args:
url (str): The URL of target endpoint.
Returns:
Dictionary of API response
"""
response = requests.get(url)
raw = response.json()
record = [(
raw["time"]["updatedISO"],
raw["bpi"]["SGD"]["rate_float"],
raw["bpi"]["USD"]["rate_float"]
)]
with open(INTERMEDIATE_FOLDER + FILE_NAME, "w") as f:
f.write( "\n".join(",".join(str(data) for data in row) for row in record) )
return record
with DAG("insert_data_workflow",
schedule_interval="* * * * *",
start_date=days_ago(1)) as dag:
get_data = PythonOperator(
task_id="get-data",
python_callable=make_request,
op_args=[URL]
)
load_to_bigquery = BashOperator(
task_id="load-to-bigquery",
bash_command=f"bq load --source_format=CSV \
{DATASET_ID}.{TABLE_NAME} \
gs://{GCS_BUCKET_NAME}/data/{FILE_NAME}",
)
get_data >> load_to_bigquery
@fonylew
Copy link
Author

fonylew commented Jul 3, 2021

Please note that some of the variables need to be configured before using:

  • CLOUD_COMPOSER_BUCKET, DATASET_ID, TABLE_NAME are environment variables that need to be set.
    The BigQuery dataset and table are needed to be prepared first with this code. The columns are timestamp (TIMESTAMP), price_SGD (FLOAT), price_USD(FLOAT)

@fonylew
Copy link
Author

fonylew commented Jul 22, 2021

Sample execution run on Airflow:
image

@fonylew
Copy link
Author

fonylew commented Jul 22, 2021

The result in BigQuery.
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment