Skip to content

Instantly share code, notes, and snippets.

@rossturk
Last active December 16, 2022 21:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/7c8f63071f00a65e3d01c74a7c471663 to your computer and use it in GitHub Desktop.
Save rossturk/7c8f63071f00a65e3d01c74a7c471663 to your computer and use it in GitHub Desktop.
Import Orbit members and orgs
from datetime import datetime
import requests
from include.autopaginate_api_call import AutoPaginate
from astro import sql as aql
from astro.sql.table import Table
from airflow.models import DAG, Variable
import pandas as pd
CONN_ID = "dwh"
TOKEN = Variable.get("ORBIT_ACCESS_TOKEN")
@aql.dataframe
def get_orbit_members():
results = AutoPaginate(
session=requests.session(),
url="https://app.orbit.love/api/v1/olmqz/members",
pagination_type="page_number",
data_path="data",
paging_param_name="page",
extra_headers={
"accept": "application/json",
"authorization": "Bearer " + TOKEN,
},
)
members = []
for item in results:
members.append(item)
return pd.DataFrame(members)
@aql.dataframe
def get_orbit_orgs():
results = AutoPaginate(
session=requests.session(),
url="https://app.orbit.love/api/v1/olmqz/organizations",
pagination_type="page_number",
data_path="data",
paging_param_name="page",
extra_headers={
"accept": "application/json",
"authorization": "Bearer " + TOKEN,
},
)
orgs = []
for item in results:
orgs.append(item)
return pd.DataFrame(orgs)
with DAG(
"orbit-members-orgs",
schedule_interval="@daily",
start_date=datetime(2022, 10, 20),
catchup=False,
default_args={
"retries": 2,
},
tags=["orbit", "openlineage", "marquez"],
) as dag:
drop_members = aql.drop_table(
table=Table(
name="ORBIT_MEMBERS",
conn_id=CONN_ID,
)
)
get_members = get_orbit_members(
output_table=Table(
name="ORBIT_MEMBERS",
conn_id=CONN_ID,
)
)
drop_orgs = aql.drop_table(
table=Table(
name="ORBIT_ORGS",
conn_id=CONN_ID,
)
)
get_orgs = get_orbit_orgs(
output_table=Table(
name="ORBIT_ORGS",
conn_id=CONN_ID,
)
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment