Skip to content

Instantly share code, notes, and snippets.

@hankehly
Last active October 29, 2022 07:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hankehly/aa158fe6bbe7afe63c1cecb2712b2cc0 to your computer and use it in GitHub Desktop.
Save hankehly/aa158fe6bbe7afe63c1cecb2712b2cc0 to your computer and use it in GitHub Desktop.
2022/10/29 Airflowの基礎を学ぶハンズオンワークショップ 課題 #3
import datetime
from airflow import DAG
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
SQLITE_CONN_ID = "sqlite_test_conn"
with DAG(
dag_id="03_exercise",
schedule=None,
start_date=datetime.datetime(2021, 1, 1),
) as dag:
# 1. SQLiteテーブルを作成する (Connectionレコードをまず作成する)
# apache-airflow-providers-sqlite
# https://airflow.apache.org/docs/apache-airflow-providers-sqlite/stable/operators.html
create_db_table = SqliteOperator(
task_id="create_table_sqlite",
sqlite_conn_id=SQLITE_CONN_ID,
sql="""
CREATE TABLE IF NOT EXISTS Users (
id INT PRIMARY KEY,
name TEXT,
email TEXT,
timestamp TEXT
);
""",
)
# 2. Web APIからデータを取得する
@dag.task
def fetch_data():
import requests
res = requests.get("https://jsonplaceholder.typicode.com/users")
data = res.json()
return data
# 3. DBにデータを挿入する
@dag.task
def insert_data(data, logical_date=None):
rows = []
timestamp = logical_date.isoformat()
for row in data:
rows.append((row["name"], row["email"], timestamp))
sqlite_hook = SqliteHook(sqlite_conn_id=SQLITE_CONN_ID)
sqlite_hook.insert_rows(
table="Users", rows=rows, target_fields=["name", "email", "timestamp"]
)
data = fetch_data()
create_db_table >> data
insert_data(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment