Skip to content

Instantly share code, notes, and snippets.

Created April 19, 2021 11:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MachineLearningIsEasy/2686385cb577846a269580153f49d0cf to your computer and use it in GitHub Desktop.
Save MachineLearningIsEasy/2686385cb577846a269580153f49d0cf to your computer and use it in GitHub Desktop.
Airflow Dag
import datetime
import time
import requests
import pandas as pd
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
import os
args = {
'owner': 'dimon',
'start_date':datetime.datetime(2018, 11, 1),
key_wwo = Variable.get("KEY_API_WWO")
start_hour = 1
horizont_hours = 48
lat = 47.939
lng = 46.681
moscow_timezone = 3
def extract_data(**kwargs):
ti = kwargs['ti']
# Запрос на прогноз со следующего часа
response = requests.get(
'Authorization': key_wwo
if response.status_code==200:
# Разбор ответа сервера
json_data = response.json()
ti.xcom_push(key='weather_wwo_json', value=json_data)
def transform_data(**kwargs):
ti = kwargs['ti']
json_data = ti.xcom_pull(key='weather_wwo_json', task_ids=['extract_data'])[0]
start_moscow = datetime.datetime.utcnow() + datetime.timedelta(hours=moscow_timezone)
start_station = datetime.datetime.utcnow() + datetime.timedelta(hours=local_timezone)
end_station = start_station + datetime.timedelta(hours=horizont_hours)
date_list = []
value_list = []
weather_data = json_data['data']['weather']
for weather_count in range(len(weather_data)):
temp_date = weather_data[weather_count]['date']
hourly_values = weather_data[weather_count]['hourly']
for i in range(len(hourly_values)):
date_time_str = '{} {:02d}:00:00'.format(temp_date, int(hourly_values[i]['time'])//100)
res_df = pd.DataFrame(value_list,columns=['cloud_cover'])
# Время предсказания (местное для рассматриваемой точки)
res_df["date_to"] = date_list
res_df["date_to"] = pd.to_datetime(res_df["date_to"])
# Определение 48 интервала предсказания
res_df = res_df[res_df['date_to'].between(start_station,end_station, inclusive=True)]
# Время предсказания (по Москве)
res_df["date_to"] = res_df["date_to"] + datetime.timedelta(hours=moscow_timezone - local_timezone)
res_df["date_to"] = res_df["date_to"].dt.strftime('%Y-%m-%d %H:%M:%S')
# Время отправки запроса (по Москве)
res_df["date_from"] = start_moscow
res_df["date_from"] = pd.to_datetime(res_df["date_from"]).dt.strftime('%Y-%m-%d %H:%M:%S')
# Время получения ответа (по UTC)
res_df["processing_date"] = res_df["date_from"]
ti.xcom_push(key='weather_wwo_df', value=res_df)
def load_data(**kwargs):
ti = kwargs['ti']
res_df = ti.xcom_pull(key='weather_wwo_df', task_ids=['transform_data'])[0]
with DAG('load_weather_wwo', description='load_weather_wwo', schedule_interval='*/1 * * * *', catchup=False,default_args=args) as dag: #0 * * * * */1 * * * *
extract_data = PythonOperator(task_id='extract_data', python_callable=extract_data)
transform_data = PythonOperator(task_id='transform_data', python_callable=transform_data)
load_data = PythonOperator(task_id='load_data', python_callable=load_data)
extract_data >> transform_data >> load_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment