|
"""Searches for Telescope Report files hourly from Zendesk via the Search API |
|
![Telescope Processor](https://lucid.app/publicSegments/view/df1920bd-05e6-4b9b-b905-e1baa210423c/image.png) |
|
""" |
|
import itertools |
|
import json |
|
import logging |
|
import tempfile |
|
from dataclasses import dataclass |
|
from datetime import datetime, timedelta |
|
from pathlib import Path |
|
import gcsfs |
|
import requests |
|
from airflow.decorators import task |
|
from airflow.exceptions import AirflowSkipException |
|
from airflow.models.dag import DAG |
|
from airflow.operators.trigger_dagrun import TriggerDagRunOperator |
|
from airflow.providers.http.hooks.http import HttpHook |
|
from airflow.providers.slack.hooks.slack import SlackHook |
|
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook |
|
from airflow.sensors.base import PokeReturnValue |
|
from airflow.utils.trigger_rule import TriggerRule |
|
from requests import Response |
|
from dags.telescope.report_util import ( |
|
get_gcs_path, |
|
create_gcs_conn_and_token, |
|
fetch_salesforce_id, |
|
) |
|
|
|
GCP_CONN_ID = "google_cloud_telescope_sa" |
|
ZENDESK_CONN_ID = "zendesk" |
|
SNOWFLAKE_CONN_ID = "snowflake_default" |
|
SLACK_CONN_ID = "slack_default" |
|
|
|
|
|
@dataclass |
|
class ZendeskAttachment: |
|
"""Intermediate data structure, information is added over the DAG""" |
|
ticket_id: int |
|
organization_id: int |
|
ticket_url: str |
|
ticket_created_at: str |
|
ticket_updated_at: str |
|
comment_id: int |
|
comment_created_at: str |
|
attachment_id: int |
|
mapped_content_url: str |
|
author_id: int |
|
content_url: str |
|
attachment_file_name: str |
|
# These get added later |
|
salesforce_id: str | None = None |
|
salesforce_name: str | None = None |
|
gcs_path: str | None = None |
|
full_gcs_path: str | None = None |
|
report_date: str | None = None |
|
organization_name: str | None = None |
|
|
|
|
|
with DAG( |
|
dag_id="zendesk_import", |
|
schedule="@hourly", |
|
start_date=datetime(2023, 10, 20), |
|
catchup=True, |
|
doc_md=__doc__, |
|
) as dag: |
|
@task.sensor( |
|
mode="reschedule", |
|
poke_interval=timedelta(minutes=30), |
|
timeout=timedelta(hours=12), |
|
soft_fail=True, |
|
silent_fail=True, |
|
) |
|
def fetch_zendesk_telescope_tickets( |
|
zendesk_conn_id: str, data_interval_start: str, data_interval_end: str |
|
) -> PokeReturnValue: |
|
""" |
|
Query zendesk search endpoint - /api/v2/search.json |
|
We skip if we don't find anything. |
|
Optionally returns a list of tickets |
|
""" |
|
logging.info("Searching for zendesk tickets...") |
|
response: Response = HttpHook(method="GET", http_conn_id=zendesk_conn_id).run( |
|
endpoint="/api/v2/search.json", |
|
data={ |
|
"query": " ".join( |
|
[ |
|
"type:ticket", |
|
"tags:telescope_report", |
|
f"created>={data_interval_start}", |
|
f"created<{data_interval_end}", |
|
"status<=hold", |
|
] |
|
) |
|
}, |
|
) |
|
logging.info(response.url) |
|
response_json = response.json() |
|
if not response.ok: |
|
logging.error(f"Error: {response.url=} {response.status_code=} {response.reason=}") |
|
response.raise_for_status() |
|
elif response_json is None: |
|
logging.info("No response...") |
|
return PokeReturnValue(False) |
|
elif response_json["count"] == 0: |
|
logging.info("No tickets found...") |
|
return PokeReturnValue(False) |
|
else: |
|
return PokeReturnValue(True, xcom_value=response_json["results"]) |
|
|
|
fetch_zendesk_telescope_tickets_task = fetch_zendesk_telescope_tickets(zendesk_conn_id=ZENDESK_CONN_ID) |
|
|
|
@task |
|
def enrich_zendesk_telescope_ticket( |
|
response_json: dict, |
|
zendesk_conn_id: str, |
|
snowflake_conn_id: str, |
|
slack_conn_id: str | None, |
|
) -> list[dict]: |
|
""" |
|
Parse the search results from zendesk, |
|
Get the comments from Zendesk, |
|
and get metadata from Snowflake |
|
""" |
|
zendesk_hook = HttpHook(method="GET", http_conn_id=zendesk_conn_id) |
|
snowflake_hook = SnowflakeHook(snowflake_conn_id=snowflake_conn_id) |
|
attachments = [] |
|
logging.info(f"Found ticket at url {response_json['url']}...") |
|
if slack_conn_id: |
|
SlackHook(slack_conn_id=slack_conn_id).get_conn().chat_postMessage( |
|
channel="#cse-feed", |
|
text=f"Found Telescope Zendesk ticket at url {response_json['url']}", |
|
) |
|
|
|
org_id = response_json["organization_id"] |
|
sfdc_id, sfdc_name = fetch_salesforce_id(org_id, snowflake_hook) |
|
datum = { |
|
"ticket_id": response_json["id"], |
|
"organization_id": org_id, |
|
"ticket_url": response_json["url"], |
|
"ticket_created_at": response_json["created_at"], |
|
"ticket_updated_at": response_json["updated_at"], |
|
"salesforce_id": sfdc_id, |
|
"salesforce_name": sfdc_name, |
|
} |
|
|
|
# Query and map the `api/v2/tickets/{ticket_id}/comments` records |
|
comments_response = zendesk_hook.run(endpoint=f"api/v2/tickets/{datum['ticket_id']}/comments") |
|
logging.info(comments_response.url) |
|
comments_response_json = comments_response.json() |
|
if not comments_response.ok: |
|
comments_response.raise_for_status() |
|
elif comments_response_json["count"] == 0: |
|
logging.error("No comments found...") |
|
raise RuntimeError(f"No comments found for {comments_response.url}") |
|
for comment in comments_response_json["comments"]: |
|
attachments.extend( |
|
ZendeskAttachment( |
|
**datum, |
|
**{ |
|
"comment_id": comment["id"], |
|
"comment_created_at": comment["created_at"], |
|
"attachment_id": attachment["id"], |
|
"mapped_content_url": attachment["mapped_content_url"], |
|
"author_id": comment["author_id"], |
|
"content_url": attachment["content_url"], |
|
"attachment_file_name": attachment["file_name"], |
|
}, |
|
).__dict__ |
|
for attachment in comment["attachments"] |
|
if "data.json" in attachment["file_name"] |
|
and Path(attachment["file_name"]).suffix == ".json" |
|
) |
|
return attachments |
|
|
|
enrich_zendesk_telescope_ticket_task = enrich_zendesk_telescope_ticket.partial( |
|
zendesk_conn_id=ZENDESK_CONN_ID, snowflake_conn_id=SNOWFLAKE_CONN_ID, slack_conn_id=SLACK_CONN_ID |
|
).expand(response_json=fetch_zendesk_telescope_tickets_task) |
|
|
|
@task(trigger_rule=TriggerRule.ALL_DONE) |
|
def reduce(args): |
|
if args and len(args): |
|
return list(itertools.chain(*args)) |
|
else: |
|
raise AirflowSkipException("No input given, SKIPPING") |
|
|
|
reduce_task = reduce(enrich_zendesk_telescope_ticket_task) |
|
|
|
@task(multiple_outputs=False) |
|
def download_zendesk_ticket_attachment_to_gcs(zendesk_attachment: dict, gcp_conn_id: str, bucket) -> dict: |
|
"""Used as an Airflow Task to download the zendesk attachment to memory, and upload to GCS""" |
|
gcs_conn, token = create_gcs_conn_and_token(gcp_conn_id) |
|
zendesk_attachment = ZendeskAttachment(**zendesk_attachment) |
|
|
|
# Download the file to memory |
|
with requests.get(zendesk_attachment.content_url, stream=True, auth=("u", "p")) as r: |
|
r.raise_for_status() |
|
with tempfile.TemporaryFile() as f_temp: |
|
for chunk in r.iter_content(chunk_size=None): |
|
f_temp.write(chunk) |
|
|
|
f_temp.flush() |
|
f_temp.seek(0) |
|
report = json.load(f_temp) |
|
|
|
# Gather metadata from file headers |
|
report_date = report.get("report_date") |
|
zendesk_attachment.report_date = report_date |
|
report_organization_name = report.get("organization_name") |
|
organization_name = report_organization_name |
|
|
|
# If we have a name via Zendesk, overwrite the `organization_name` in the report |
|
if zendesk_attachment.salesforce_name: |
|
organization_name = zendesk_attachment.salesforce_name |
|
if report_organization_name != organization_name: |
|
report["organization_name"] = organization_name |
|
zendesk_attachment.organization_name = organization_name |
|
if not zendesk_attachment.organization_name: |
|
raise RuntimeError("Cannot find organization_name in JSON File or from Zendesk/Snowflake") |
|
if not report_date: |
|
raise RuntimeError("Cannot find report_date in JSON File") |
|
|
|
zendesk_attachment.full_gcs_path = get_gcs_path( |
|
bucket=bucket, |
|
report_type="raw", |
|
organization_name=zendesk_attachment.organization_name, |
|
date=report_date, |
|
filename=zendesk_attachment.attachment_file_name, |
|
) |
|
zendesk_attachment.gcs_path = "/".join(Path(zendesk_attachment.full_gcs_path).parts[2:]) |
|
|
|
with gcsfs.GCSFileSystem(token=token, project=gcs_conn.project).open( |
|
zendesk_attachment.full_gcs_path, "wb" |
|
) as f: |
|
f.write(json.dumps(report).encode()) |
|
|
|
return zendesk_attachment.__dict__ |
|
|
|
download_zendesk_ticket_attachment_to_gcs_task = download_zendesk_ticket_attachment_to_gcs.partial( |
|
gcp_conn_id=GCP_CONN_ID, bucket="{{ var.value.telescope_bucket }}" |
|
).expand(zendesk_attachment=reduce_task) |
|
|
|
trigger_dag_run_task = TriggerDagRunOperator.partial( |
|
task_id="trigger_processor", |
|
trigger_dag_id="telescope.processor", |
|
execution_date="{{ dag_run.logical_date + macros.timedelta(seconds=ti.map_index) }}", |
|
wait_for_completion=True, |
|
reset_dag_run=True, |
|
trigger_rule=TriggerRule.ALL_DONE, |
|
).expand( |
|
conf=download_zendesk_ticket_attachment_to_gcs_task.map( |
|
lambda d: { |
|
"report_date": d["report_date"], |
|
"gcs_input_file": d["gcs_path"], |
|
"organization_name": d["organization_name"], |
|
"acct_id": d["salesforce_id"], |
|
} |
|
) |
|
) |