Last active
June 28, 2023 16:50
-
-
Save fonylew/21afd9d282cb0c82bce3d0f94928364c to your computer and use it in GitHub Desktop.
Simple code to ingest data from REST API (Bitcoin price from Coindesk) to BigQuery in Airflow.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
from airflow.models import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.utils.dates import days_ago | |
URL = "https://api.coindesk.com/v1/bpi/currentprice/SGD.json" | |
DATASET_ID = "demo" | |
TABLE_NAME = "coin_price" | |
INTERMEDIATE_FOLDER = "/home/airflow/gcs/data/" | |
FILE_NAME = "coin_price.csv" | |
GCS_BUCKET_NAME = "CLOUD_COMPOSER_BUCKET" | |
def make_request(url: str) -> dict: | |
"""Get data via REST API call using requests package | |
Args: | |
url (str): The URL of target endpoint. | |
Returns: | |
Dictionary of API response | |
""" | |
response = requests.get(url) | |
raw = response.json() | |
record = [( | |
raw["time"]["updatedISO"], | |
raw["bpi"]["SGD"]["rate_float"], | |
raw["bpi"]["USD"]["rate_float"] | |
)] | |
with open(INTERMEDIATE_FOLDER + FILE_NAME, "w") as f: | |
f.write( "\n".join(",".join(str(data) for data in row) for row in record) ) | |
return record | |
with DAG("insert_data_workflow", | |
schedule_interval="* * * * *", | |
start_date=days_ago(1)) as dag: | |
get_data = PythonOperator( | |
task_id="get-data", | |
python_callable=make_request, | |
op_args=[URL] | |
) | |
load_to_bigquery = BashOperator( | |
task_id="load-to-bigquery", | |
bash_command=f"bq load --source_format=CSV \ | |
{DATASET_ID}.{TABLE_NAME} \ | |
gs://{GCS_BUCKET_NAME}/data/{FILE_NAME}", | |
) | |
get_data >> load_to_bigquery |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Please note that some of the variables need to be configured before using:
CLOUD_COMPOSER_BUCKET
,DATASET_ID
,TABLE_NAME
are environment variables that need to be set.The BigQuery dataset and table are needed to be prepared first with this code. The columns are
timestamp (TIMESTAMP), price_SGD (FLOAT), price_USD(FLOAT)