Skip to content

Instantly share code, notes, and snippets.

@rossturk
Created December 16, 2022 22:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/07eca253393fe79db4eb1f106eedb9a4 to your computer and use it in GitHub Desktop.
Save rossturk/07eca253393fe79db4eb1f106eedb9a4 to your computer and use it in GitHub Desktop.
Gather Awario mentions
from datetime import datetime
import requests
from include.autopaginate_api_call import AutoPaginate
from astro import sql as aql
from astro.sql.table import Table, Metadata
from airflow.models import DAG, Variable
from pandas import DataFrame
from airflow.exceptions import AirflowSkipException
CONN_ID = "eco"
TOKEN = Variable.get("AWARIO_ACCESS_TOKEN")
@aql.run_raw_sql(conn_id=CONN_ID)
def truncate_table(table: Table):
return """
truncate table {{table}}
"""
@aql.dataframe(conn_id=CONN_ID)
def get_awario_alerts():
url = "https://api.awario.com/v1.0/alerts/list?access_token=" + TOKEN
results = requests.get(url).json()
return DataFrame(results["alerts"])
@aql.transform(conn_id=CONN_ID)
def get_max_ids(table: Table):
return """
select alert_id, max(id) as max_id from {{table}} group by alert_id
"""
@aql.dataframe(conn_id=CONN_ID)
def get_awario_mentions(alerts: DataFrame, max_ids: DataFrame):
mentions = []
for alert_id in alerts["alert_id"].values.tolist():
alerts_mentions_url = "https://api.awario.com/v1.0/alerts/{}/mentions"
extra_params = {
"access_token": TOKEN,
"sort_by": "id",
"order": "asc",
"limit": "200",
}
try:
since_id = max_ids.loc[max_ids["alert_id"] == alert_id]["max_id"].values[0]
extra_params["since_id"] = since_id
print("gathering alert {} since id: {}".format(alert_id, since_id))
except:
print("first run! starting new dataset for {}".format(alert_id))
results = AutoPaginate(
session=requests.session(),
url=alerts_mentions_url.format(alert_id),
pagination_type="cursor",
data_path=["alert_data", "mentions"],
cursor_path=["alert_data", "next"],
paging_param_name="next",
extra_params=extra_params,
)
for item in results:
item["alert_id"] = alert_id
mentions.append(item)
print("gathered {} new mentions".format(len(mentions)))
if len(mentions) == 0:
raise AirflowSkipException
return DataFrame(mentions)
with DAG(
"awario-mentions",
schedule_interval="@hourly",
start_date=datetime(2022, 10, 20),
catchup=False,
default_args={
"retries": 2,
},
tags=["awario", "airflow"],
) as dag:
truncate = truncate_table(table=Table(name="AWARIO_ALERTS", metadata=Metadata(schema='IN'),))
alerts = get_awario_alerts(output_table=Table(name="AWARIO_ALERTS", metadata=Metadata(schema='IN'),
))
truncate >> alerts
max_ids = get_max_ids(table=Table(name="AWARIO_MENTIONS"))
mentions = get_awario_mentions(
alerts,
max_ids,
output_table=Table(
name="AWARIO_MENTIONS",
),
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment