Skip to content

Instantly share code, notes, and snippets.

@rossturk
Created December 16, 2022 22:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/f9a4171f9d4a650f34c606e56901b4d7 to your computer and use it in GitHub Desktop.
Save rossturk/f9a4171f9d4a650f34c606e56901b4d7 to your computer and use it in GitHub Desktop.
Get Orbit activities
from datetime import datetime
import requests
from include.autopaginate_api_call import AutoPaginate
from airflow.operators.python import get_current_context
from astro import sql as aql
from astro.sql.table import Table, Metadata
from airflow.models import DAG, Variable
import pandas as pd
from airflow.exceptions import AirflowSkipException
CONN_ID = "eco"
TOKEN = Variable.get("ORBIT_ACCESS_TOKEN")
schema = {
"id": "text",
"type": "text",
"attributes_key": "text",
"attributes_orbit_url": "text",
"attributes_action": "text",
"attributes_created_at": "text",
"attributes_occurred_at": "text",
"attributes_updated_at": "text",
"attributes_properties_github_repository": "text",
"attributes_properties_twitter_account": "text",
"attributes_properties_slack_channel": "text",
"attributes_g_title": "text",
"attributes_t_tweet_text": "text",
"relationships_user_data_id": "text",
"relationships_member_data_id": "text",
}
@aql.run_raw_sql
def create_table(table: Table):
fields = ""
for field, type in schema.items():
fields += "{} {}, ".format(field, type)
return "create table if not exists {{table}} ( " + fields[:-2] + ")"
@aql.dataframe()
def get_orbit_activities():
context = get_current_context()
url = "https://app.orbit.love/api/v1/olmqz/activities?start_date={}&end_date={}".format(
context["ds"], context["ds"]
)
results = AutoPaginate(
session=requests.session(),
url=url,
pagination_type="page_number",
data_path="data",
paging_param_name="page",
extra_headers={
"accept": "application/json",
"authorization": "Bearer " + TOKEN,
},
)
activities = []
for item in results:
activities.append(item)
if len(activities) == 0:
raise AirflowSkipException
return pd.json_normalize(activities, sep="_").reindex(columns=schema.keys())
with DAG(
"orbit-activities",
schedule_interval="@daily",
start_date=datetime(2020, 10, 20),
catchup=True,
default_args={
"retries": 2,
"depends_on_past": True,
},
tags=["orbit", "openlineage", "marquez"],
) as dag:
create_table(
table=Table(
name='ORBIT_ACTIVITIES',
metadata=Metadata(
schema='IN',
database='ECO',
),
conn_id=CONN_ID,
),
)
activities = get_orbit_activities(
output_table=Table(
conn_id=CONN_ID,
),
)
aql.append(
source_table=activities,
target_table=Table(
name='ORBIT_ACTIVITIES',
metadata=Metadata(
schema='IN',
database='ECO',
),
conn_id=CONN_ID,
)
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment