Skip to content

Instantly share code, notes, and snippets.

@fritz-astronomer
Last active October 21, 2023 05:15
Show Gist options
  • Save fritz-astronomer/ef2d10407bae5a0c753b8b9a877d8bb9 to your computer and use it in GitHub Desktop.
Save fritz-astronomer/ef2d10407bae5a0c753b8b9a877d8bb9 to your computer and use it in GitHub Desktop.
Telescope DAGs

Telescope

Telescope is a tool to get a birds-eye view of one or many Airflow deployments.

Telescope Processor

Output Preview

Telescope reports are processed and visualized in a BI Tool called Sigma. Some samples: image image image

Telescope DAGs

Zendesk Import

We utilize Zendesk as a help desk, so allow customers to submit Telescope Reports through our Support Portal.

image

Intake Import

We additionally allow a direct upload to our Storage Bucket with a signed-url upon request.

image

Processor

Both the intake_import and zendesk_import DAG use a TriggerDAGRunOperator to begin processing a Telescope file. It then gets uploaded to a data store which our visualization software is connected to.

image

Reporting

The processor emits a Dataset which triggers a Cosmos DAG, to do small additional transformations and data quality checking. This makes the data available to our BI Tool to view and export the reports to send back to the customer.

"""move from /intake to /raw
![Telescope Processor](https://lucid.app/publicSegments/view/df1920bd-05e6-4b9b-b905-e1baa210423c/image.png)
"""
from datetime import datetime, timedelta
from urllib.parse import quote
from airflow import DAG
from airflow.decorators import task
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectsWithPrefixExistenceSensor
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.trigger_rule import TriggerRule
from dags.telescope.report_util import fetch_salesforce_name
GCP_CONN_ID = "google_cloud_telescope_sa"
SNOWFLAKE_CONN_ID = "snowflake_default"
with DAG(
dag_id="intake_import",
schedule="@hourly",
start_date=datetime(2023, 10, 1),
max_active_runs=1,
doc_md=__doc__
) as dag:
list_objects_task = GCSObjectsWithPrefixExistenceSensor(
task_id="sense_objects",
google_cloud_conn_id=GCP_CONN_ID,
bucket="{{ var.value.telescope_bucket }}",
prefix="intake",
soft_fail=True,
poke_interval=timedelta(minutes=5).total_seconds(),
timeout=timedelta(minutes=10).total_seconds(),
doc_md="are there files in the bucket?"
)
LatestOnlyOperator(task_id="latest_only") >> list_objects_task
@task
def enrich_objects(
objects: list[str],
snowflake_conn_id: str,
):
"""e.g. gs://astronomer-telescope/intake/<id>/2022-11-08/2022-11-08.<id>.data.json"""
snowflake_hook = SnowflakeHook(snowflake_conn_id=snowflake_conn_id)
enriched_objects = []
for _object in objects:
[_, sfdc_id, date, file] = _object.split("/")
(sfdc_name,) = fetch_salesforce_name(sfdc_id, snowflake_hook)
enriched_objects.append(
{
"acct_id": sfdc_id,
"organization_name": sfdc_name,
"report_date": date,
"gcs_input_file": _object,
"new_gcs_input_file": f"raw/{quote(sfdc_name.lower())}/{date}/{file}",
}
)
return enriched_objects
enriched_objects_task = enrich_objects(list_objects_task.output, snowflake_conn_id=SNOWFLAKE_CONN_ID)
moved_task = GCSToGCSOperator.partial(
task_id="move_from_intake_to_raw",
source_bucket="{{ var.value.telescope_bucket }}",
move_object=True,
doc_md="""move them to raw""",
gcp_conn_id=GCP_CONN_ID,
).expand_kwargs(
enriched_objects_task.map(
lambda x: {"source_object": x["gcs_input_file"], "destination_object": x["new_gcs_input_file"]}
),
)
moved_task >> TriggerDagRunOperator.partial(
task_id="trigger_processor",
trigger_dag_id="telescope.processor",
execution_date="{{ dag_run.logical_date + macros.timedelta(seconds=ti.map_index) }}",
wait_for_completion=True,
reset_dag_run=True,
trigger_rule=TriggerRule.ALL_DONE,
).expand(
conf=enriched_objects_task.map(
lambda d: {
"report_date": d["report_date"],
"gcs_input_file": d["new_gcs_input_file"],
"organization_name": d["organization_name"],
"acct_id": d["acct_id"],
}
)
)
"""Creates .parquet files from raw telescope report, load to snowflake
![Telescope Processor](https://lucid.app/publicSegments/view/df1920bd-05e6-4b9b-b905-e1baa210423c/image.png)
"""
import hashlib
import logging
from datetime import datetime
from airflow.models import Param
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
from astro.constants import FileType
from astro.files import File
from astro.sql import LoadFileOperator, MergeOperator, Table, cleanup
from dags import datasets
from dags.telescope.report_util import (
create_gcs_conn_and_token,
open_gcs_file_in_memory,
generate_output_reports,
convert_output_reports_to_df,
fix_output_reports_df,
get_gcs_path,
)
GCP_CONN_ID = "google_cloud_telescope_sa"
SNOWFLAKE_CONN_ID = "snowflake_telescope"
TELESCOPE_METADATA = {"schema": "TELESCOPE", "database": "CS"}
SLACK_CONN_ID = "slack_default"
with DAG(
dag_id="processor",
schedule=None,
start_date=datetime(2023, 10, 1),
doc_md=__doc__,
template_searchpath="/usr/local/airflow/dags/telescope/SQL",
params={
"gcs_input_file": Param(
"NOTSET",
type="string",
description="GCS Input Path - e.g. raw/Astronomer/2022-01-01/2022-01-01.Astronomer.data.json",
),
"report_date": Param("NOTSET", type="string", description="Report Date - e.g. 2022-01-01"),
"organization_name": Param(
"NOTSET", type="string", description="Organization Name - e.g. 'Astronomer, Inc.'"
),
"acct_id": Param("NOTSET", type="string", description="Account ID, if it exists"),
},
on_success_callback=[
send_slack_notification(
channel="#cse-feed",
slack_conn_id=SLACK_CONN_ID,
text=":white_check_mark: {{ params.organization_name }} data "
"for {{ params.report_date }} processed and loaded "
"from {{ params.gcs_input_file }}",
)
],
):
@task
def process_org_files_and_upload(
gcs_input_file: str, organization_name: str, date: str, gcp_conn_id: str, bucket: str, acct_id: str | None
) -> list[dict]:
"""Used as an Airflow Task - creates .parquet files to upload to snowflake"""
if "NOTSET" in [gcs_input_file, organization_name, date]:
raise RuntimeError("All Arguments Must be Set")
file_hash = hashlib.md5(gcs_input_file.encode()).hexdigest() # nosec
gcs_conn, gcs_token = create_gcs_conn_and_token(gcp_conn_id)
data = open_gcs_file_in_memory(gcs_input_file, gcs_conn, bucket)
logging.info(f"Generating reports from {gcs_input_file}...")
initial_output_reports = generate_output_reports(data, organization_name, date, file_hash, acct_id)
output_reports = convert_output_reports_to_df(initial_output_reports)
output_reports = fix_output_reports_df(output_reports)
logging.info(f"Uploading {len(output_reports)} reports as parquet from {gcs_input_file}...")
output = []
for report_type, df in output_reports.items():
if not df.empty:
gcs_output_path = get_gcs_path(bucket, report_type, organization_name, date, file_hash)
logging.info(f"Saving {report_type} as parquet to path {gcs_output_path}")
df.to_parquet(
gcs_output_path,
compression="snappy",
storage_options={"token": gcs_token},
)
report_object = initial_output_reports[report_type][0]
output.append(
{
"filepath": gcs_output_path,
"table_name": report_object.__tablename__,
"columns": report_object.get_fields(),
}
)
return output
process_org_files_and_upload_task = process_org_files_and_upload(
gcp_conn_id=GCP_CONN_ID,
bucket="{{ var.value.telescope_bucket }}",
date="{{ params.report_date }}",
organization_name="{{ params.organization_name }}",
gcs_input_file="{{ params.gcs_input_file }}",
acct_id="{{ params.acct_id }}",
)
load_as_temp_table_task = LoadFileOperator.partial(
task_id="load_as_temp_table",
use_native_support=False,
native_support_kwargs={"storage_integration": "GCS_INT"},
).expand_kwargs(
process_org_files_and_upload_task.map(
lambda d: {
"input_file": File(path=d["filepath"], conn_id=GCP_CONN_ID, filetype=FileType.PARQUET),
"output_table": Table(
conn_id=SNOWFLAKE_CONN_ID, metadata=TELESCOPE_METADATA, columns=d["columns"], temp=True
),
}
)
)
(
SQLExecuteQueryOperator(
task_id="create_tables_if_not_exists",
sql=['USE SCHEMA "TELESCOPE"', "DAG_REPORT.sql", "DEPLOYMENT_REPORT.sql", "INFRASTRUCTURE_REPORT.sql"],
conn_id=SNOWFLAKE_CONN_ID,
show_return_value_in_logs=True,
split_statements=True,
autocommit=True,
)
>> MergeOperator.partial(
task_id="merge_append_table",
if_conflicts="update",
outlets=[
datasets.telescope_dag_report,
datasets.telescope_deployment_report,
datasets.telescope_infra_report,
],
).expand_kwargs(
process_org_files_and_upload_task.zip(load_as_temp_table_task.output).map(
lambda _in: {
"source_table": _in[1],
"target_table": Table(
conn_id=SNOWFLAKE_CONN_ID,
metadata=TELESCOPE_METADATA,
columns=_in[0]["columns"],
name=_in[0]["table_name"],
),
"columns": [f.name for f in _in[0]["columns"]],
"target_conflict_columns": [f.name for f in _in[0]["columns"] if f.primary_key],
}
)
)
>> EmptyOperator(task_id="finish", outlets=[datasets.telescope_processor_finish])
)
cleanup()
"""Searches for Telescope Report files hourly from Zendesk via the Search API
![Telescope Processor](https://lucid.app/publicSegments/view/df1920bd-05e6-4b9b-b905-e1baa210423c/image.png)
"""
import itertools
import json
import logging
import tempfile
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
import gcsfs
import requests
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.sensors.base import PokeReturnValue
from airflow.utils.trigger_rule import TriggerRule
from requests import Response
from dags.telescope.report_util import (
get_gcs_path,
create_gcs_conn_and_token,
fetch_salesforce_id,
)
GCP_CONN_ID = "google_cloud_telescope_sa"
ZENDESK_CONN_ID = "zendesk"
SNOWFLAKE_CONN_ID = "snowflake_default"
SLACK_CONN_ID = "slack_default"
@dataclass
class ZendeskAttachment:
"""Intermediate data structure, information is added over the DAG"""
ticket_id: int
organization_id: int
ticket_url: str
ticket_created_at: str
ticket_updated_at: str
comment_id: int
comment_created_at: str
attachment_id: int
mapped_content_url: str
author_id: int
content_url: str
attachment_file_name: str
# These get added later
salesforce_id: str | None = None
salesforce_name: str | None = None
gcs_path: str | None = None
full_gcs_path: str | None = None
report_date: str | None = None
organization_name: str | None = None
with DAG(
dag_id="zendesk_import",
schedule="@hourly",
start_date=datetime(2023, 10, 20),
catchup=True,
doc_md=__doc__,
) as dag:
@task.sensor(
mode="reschedule",
poke_interval=timedelta(minutes=30),
timeout=timedelta(hours=12),
soft_fail=True,
silent_fail=True,
)
def fetch_zendesk_telescope_tickets(
zendesk_conn_id: str, data_interval_start: str, data_interval_end: str
) -> PokeReturnValue:
"""
Query zendesk search endpoint - /api/v2/search.json
We skip if we don't find anything.
Optionally returns a list of tickets
"""
logging.info("Searching for zendesk tickets...")
response: Response = HttpHook(method="GET", http_conn_id=zendesk_conn_id).run(
endpoint="/api/v2/search.json",
data={
"query": " ".join(
[
"type:ticket",
"tags:telescope_report",
f"created>={data_interval_start}",
f"created<{data_interval_end}",
"status<=hold",
]
)
},
)
logging.info(response.url)
response_json = response.json()
if not response.ok:
logging.error(f"Error: {response.url=} {response.status_code=} {response.reason=}")
response.raise_for_status()
elif response_json is None:
logging.info("No response...")
return PokeReturnValue(False)
elif response_json["count"] == 0:
logging.info("No tickets found...")
return PokeReturnValue(False)
else:
return PokeReturnValue(True, xcom_value=response_json["results"])
fetch_zendesk_telescope_tickets_task = fetch_zendesk_telescope_tickets(zendesk_conn_id=ZENDESK_CONN_ID)
@task
def enrich_zendesk_telescope_ticket(
response_json: dict,
zendesk_conn_id: str,
snowflake_conn_id: str,
slack_conn_id: str | None,
) -> list[dict]:
"""
Parse the search results from zendesk,
Get the comments from Zendesk,
and get metadata from Snowflake
"""
zendesk_hook = HttpHook(method="GET", http_conn_id=zendesk_conn_id)
snowflake_hook = SnowflakeHook(snowflake_conn_id=snowflake_conn_id)
attachments = []
logging.info(f"Found ticket at url {response_json['url']}...")
if slack_conn_id:
SlackHook(slack_conn_id=slack_conn_id).get_conn().chat_postMessage(
channel="#cse-feed",
text=f"Found Telescope Zendesk ticket at url {response_json['url']}",
)
org_id = response_json["organization_id"]
sfdc_id, sfdc_name = fetch_salesforce_id(org_id, snowflake_hook)
datum = {
"ticket_id": response_json["id"],
"organization_id": org_id,
"ticket_url": response_json["url"],
"ticket_created_at": response_json["created_at"],
"ticket_updated_at": response_json["updated_at"],
"salesforce_id": sfdc_id,
"salesforce_name": sfdc_name,
}
# Query and map the `api/v2/tickets/{ticket_id}/comments` records
comments_response = zendesk_hook.run(endpoint=f"api/v2/tickets/{datum['ticket_id']}/comments")
logging.info(comments_response.url)
comments_response_json = comments_response.json()
if not comments_response.ok:
comments_response.raise_for_status()
elif comments_response_json["count"] == 0:
logging.error("No comments found...")
raise RuntimeError(f"No comments found for {comments_response.url}")
for comment in comments_response_json["comments"]:
attachments.extend(
ZendeskAttachment(
**datum,
**{
"comment_id": comment["id"],
"comment_created_at": comment["created_at"],
"attachment_id": attachment["id"],
"mapped_content_url": attachment["mapped_content_url"],
"author_id": comment["author_id"],
"content_url": attachment["content_url"],
"attachment_file_name": attachment["file_name"],
},
).__dict__
for attachment in comment["attachments"]
if "data.json" in attachment["file_name"]
and Path(attachment["file_name"]).suffix == ".json"
)
return attachments
enrich_zendesk_telescope_ticket_task = enrich_zendesk_telescope_ticket.partial(
zendesk_conn_id=ZENDESK_CONN_ID, snowflake_conn_id=SNOWFLAKE_CONN_ID, slack_conn_id=SLACK_CONN_ID
).expand(response_json=fetch_zendesk_telescope_tickets_task)
@task(trigger_rule=TriggerRule.ALL_DONE)
def reduce(args):
if args and len(args):
return list(itertools.chain(*args))
else:
raise AirflowSkipException("No input given, SKIPPING")
reduce_task = reduce(enrich_zendesk_telescope_ticket_task)
@task(multiple_outputs=False)
def download_zendesk_ticket_attachment_to_gcs(zendesk_attachment: dict, gcp_conn_id: str, bucket) -> dict:
"""Used as an Airflow Task to download the zendesk attachment to memory, and upload to GCS"""
gcs_conn, token = create_gcs_conn_and_token(gcp_conn_id)
zendesk_attachment = ZendeskAttachment(**zendesk_attachment)
# Download the file to memory
with requests.get(zendesk_attachment.content_url, stream=True, auth=("u", "p")) as r:
r.raise_for_status()
with tempfile.TemporaryFile() as f_temp:
for chunk in r.iter_content(chunk_size=None):
f_temp.write(chunk)
f_temp.flush()
f_temp.seek(0)
report = json.load(f_temp)
# Gather metadata from file headers
report_date = report.get("report_date")
zendesk_attachment.report_date = report_date
report_organization_name = report.get("organization_name")
organization_name = report_organization_name
# If we have a name via Zendesk, overwrite the `organization_name` in the report
if zendesk_attachment.salesforce_name:
organization_name = zendesk_attachment.salesforce_name
if report_organization_name != organization_name:
report["organization_name"] = organization_name
zendesk_attachment.organization_name = organization_name
if not zendesk_attachment.organization_name:
raise RuntimeError("Cannot find organization_name in JSON File or from Zendesk/Snowflake")
if not report_date:
raise RuntimeError("Cannot find report_date in JSON File")
zendesk_attachment.full_gcs_path = get_gcs_path(
bucket=bucket,
report_type="raw",
organization_name=zendesk_attachment.organization_name,
date=report_date,
filename=zendesk_attachment.attachment_file_name,
)
zendesk_attachment.gcs_path = "/".join(Path(zendesk_attachment.full_gcs_path).parts[2:])
with gcsfs.GCSFileSystem(token=token, project=gcs_conn.project).open(
zendesk_attachment.full_gcs_path, "wb"
) as f:
f.write(json.dumps(report).encode())
return zendesk_attachment.__dict__
download_zendesk_ticket_attachment_to_gcs_task = download_zendesk_ticket_attachment_to_gcs.partial(
gcp_conn_id=GCP_CONN_ID, bucket="{{ var.value.telescope_bucket }}"
).expand(zendesk_attachment=reduce_task)
trigger_dag_run_task = TriggerDagRunOperator.partial(
task_id="trigger_processor",
trigger_dag_id="telescope.processor",
execution_date="{{ dag_run.logical_date + macros.timedelta(seconds=ti.map_index) }}",
wait_for_completion=True,
reset_dag_run=True,
trigger_rule=TriggerRule.ALL_DONE,
).expand(
conf=download_zendesk_ticket_attachment_to_gcs_task.map(
lambda d: {
"report_date": d["report_date"],
"gcs_input_file": d["gcs_path"],
"organization_name": d["organization_name"],
"acct_id": d["salesforce_id"],
}
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment