Skip to content

Instantly share code, notes, and snippets.

@AustinBGibbons AustinBGibbons/dag_pagerduty.py Secret
Created Dec 11, 2018

Embed
What would you like to do?
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import absolute_import
from __future__ import division
import boto3
import os
import requests
import sys
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.models import DAG
from airflow.models import Variable
POSTGRES_CONN_ID = '<your-redshift-db>'
S3_BUCKET = '<your-s3-bucket>'
# This file expects the specific pagerduty apiv2 json response and flattens it to CSV
# for consumption in Redshift.
# This version was sanitized from a file with internal dependencies:
# please test before deploying!
# Don't change these fields without altering the redshift tables!
COMMON_SUB_FIELDS = [
'id',
'type',
'summary',
'self',
'html_url',
]
# Don't change these fields without altering the redshift tables!
INCIDENT_FIELDS = [
'incident_number',
'created_at',
'status',
'title',
'incident_key',
'last_status_change_at',
'urgency',
]
# Don't change these fields without altering the redshift tables!
INCIDENT_PARENT_FIELDS = [
'service',
'priority',
'last_status_change_by',
'first_trigger_log_entry',
'escalation_policy',
'teams',
]
# Don't change these fields without altering the redshift tables!
LOG_ENTRIES_FIELDS = [
'created_at',
'note',
]
# Don't change these fields without altering the redshift tables!
LOG_ENTRIES_PARENT_FIELDS = [
'channel',
'incident',
'event_details',
]
def incidents_deduping_query(temp_table_name):
return """
begin ;
delete from pagerduty.incidents where id in (select id from pagerduty.{0}) ;
insert into pagerduty.incidents select * from pagerduty.{0} ;
end ;
""".format(temp_table_name)
def pagerduty_copy_query(execution_date, table_name, filename):
return '''
copy pagerduty.{} from
's3://{}/{}'
iam_role '{{{{ REDSHIFT_ROLE }}}}'
CSV IGNOREHEADER 1 EMPTYASNULL TRUNCATECOLUMNS FILLRECORD TIMEFORMAT 'auto';
'''.format(table_name, S3_BUCKET, s3_path(execution_date, filename))
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
def build_dag(dag_name):
default_args = {
'owner': 'airflow',
'execution_timeout': timedelta(hours=2),
'email': 'airflow@example.com',
'retries': 1,
'retry_delay': timedelta(minutes=15),
}
return DAG(
dag_id=dag_name,
default_args=default_args,
catchup=False,
max_active_runs=1,
# if you change schedule_interval you need to change
# the since/until range in log_entries and the s3 bucketing.
schedule_interval='@hourly',
)
def append_fields(input_json, output_csv, fields):
for field in fields:
output_csv.append(str(input_json.get(field, '')))
# Expect Pagerduty 1-level nested json, where each level supports COMMON_SUB_FIELDS
# and add additional specific fields at the top level.
def pagerduty_json_to_csv(
execution_date,
input_json,
specific_fields,
specific_parent_fields):
output_csv = [execution_date.strftime('%Y-%m-%d %H:%M:%S')]
append_fields(input_json, output_csv, COMMON_SUB_FIELDS)
append_fields(input_json, output_csv, specific_fields)
for parent_field in specific_parent_fields:
parent_json = input_json.get(parent_field, None)
if not parent_json or len(parent_json) == 0:
parent_json = {}
if isinstance(parent_json, list):
# lazily extract the first entry
parent_json = parent_json[0]
append_fields(parent_json, output_csv, COMMON_SUB_FIELDS)
return output_csv
def s3_path(time_folder, filename):
return os.path.join(
'pagerduty',
time_folder,
'{}.csv'.format(filename)
)
def write_to_s3(csv_rows, s3_bucket, s3_path):
s3 = boto3.resource('s3')
with tempfile.NamedTemporaryFile() as tmp_file:
csv_writer = csv.writer(tmp_file, quoting=csv.QUOTE_ALL)
for row in csv_rows:
csv_writer.writerow(row)
# like it's 2007 Java
tmp_file.flush()
s3.Bucket(s3_bucket).upload_file(tmp_file.name, s3_path)
def pagerduty_write_to_s3(csv_rows, execution_date, filename):
time_folder = execution_date.strftime('%Y/%m/%d/%H')
path = s3_path(time_folder, filename)
write_to_s3(csv_rows, S3_BUCKET, path)
# We manually list out the fields we're copying to ensure that the order
# is consistent
def header_row(specific_fields, specific_parent_fields):
fields = ['execution_date'] + COMMON_SUB_FIELDS + specific_fields
for parent_field in specific_parent_fields:
for common_field in COMMON_SUB_FIELDS:
fields.append('{}_{}'.format(parent_field, common_field))
return fields
"""
Downloads the pagerduty data through their apiv2:
https://v2.developer.pagerduty.com/v2/page/api-reference#!/Incidents/get_incidents
- incidents is not an append-only structure, so we download the last month of data,
then delete existing records with that id, and add the new ones
- log_entries is an append-only structure, so we only query in the (since, until) range and just append the data
"""
def download_pagerduty(
pagerduty_api_key,
endpoint,
execution_date,
specific_fields,
specific_parent_fields,
request_params):
# in order to correctly handle utf encoding, we need to set the system
# default encoding to utf-8, which requires us to reload the system.
reload(sys)
sys.setdefaultencoding('utf-8')
csv_rows = [header_row(specific_fields, specific_parent_fields)]
more = True
url = 'https://api.pagerduty.com/{}'.format(endpoint)
offset = 0
limit = 25
while more:
response = requests.get(
url,
headers={
'Authorization': 'Token token={}'.format(pagerduty_api_key),
'Accept': 'application/vnd.pagerduty+json;version=2',
},
params=merge_two_dicts({
'offset': offset,
'limit': limit,
}, request_params)
)
if not response.ok:
raise RuntimeError(
'Pagerduty request failed: status_code={}'.format(
response.status_code))
json = response.json()
more = json.get('more', False)
offset += limit
if offset % 200 == 0:
print('IngestingPagerduty offset={} created_at={}'.format(
str(offset), str(json[endpoint][0]['created_at'])))
array = json.get(endpoint, [])
for json in array:
csv_rows.append(
pagerduty_json_to_csv(
execution_date,
json,
specific_fields,
specific_parent_fields))
pagerduty_write_to_s3(csv_rows, execution_date, '{}'.format(endpoint))
def download_pagerduty_incidents(pagerduty_api_key, *args, **kwargs):
execution_date = kwargs['execution_date']
download_pagerduty(
pagerduty_api_key,
'incidents',
execution_date,
INCIDENT_FIELDS,
INCIDENT_PARENT_FIELDS,
{'sort_by': 'created_at:DESC'},
)
def download_pagerduty_log_entries(pagerduty_api_key, *args, **kwargs):
execution_date = kwargs['execution_date']
download_pagerduty(
pagerduty_api_key,
'log_entries',
execution_date,
LOG_ENTRIES_FIELDS,
LOG_ENTRIES_PARENT_FIELDS,
{
'since': execution_date.isoformat(),
'until': (execution_date + timedelta(hours=1)).isoformat(),
},
)
dag_name = 'pagerduty'
dag = build_dag(dag_name)
with dag:
download_pagerduty_incidents_op = PythonOperator(
task_id="download_incidents_to_s3",
python_callable=download_pagerduty_incidents,
op_kwargs={
'pagerduty_api_key': Variable.get('pagerduty_api_key', None),
},
provide_context=True,
)
create_staging_incidents_table_op = PostgresOperator(
task_id='create_staging_incidents_table',
postgres_conn_id=POSTGRES_CONN_ID,
sql='create table pagerduty.incidents_{{ ts_nodash }} as select * from pagerduty.incidents limit 0;')
copy_incidents_temp_s3_to_redshift_op = PostgresOperator(
task_id='copy_incidents_temp_s3_to_redshift',
postgres_conn_id=POSTGRES_CONN_ID,
sql=pagerduty_copy_query(
'{{ macros.ds_format(ts, "%Y-%m-%dT%H:%M:%S", "%Y/%m/%d/%H") }}',
'incidents_{{ ts_nodash }}',
'incidents',
),
)
update_incidents_table_op = PostgresOperator(
task_id='update_incidents_table',
postgres_conn_id=POSTGRES_CONN_ID,
sql=incidents_deduping_query('incidents_{{ ts_nodash }}'),
)
drop_incidents_temp_table_op = PostgresOperator(
task_id='drop_incidents_temp_table',
postgres_conn_id=POSTGRES_CONN_ID,
sql='drop table pagerduty.incidents_{{ ts_nodash }}',
)
incident_steps = [
download_pagerduty_incidents_op,
create_staging_incidents_table_op,
copy_incidents_temp_s3_to_redshift_op,
update_incidents_table_op,
drop_incidents_temp_table_op,
]
for index in len(incident_steps) - 1:
incident_steps[index + 1].set_upstream(incident_steps[index])
download_pagerduty_log_entries_op = PythonOperator(
task_id="download_log_entries_to_s3",
python_callable=download_pagerduty_log_entries,
op_kwargs={
'pagerduty_api_key': Variable.get('pagerduty_api_key', None),
},
provide_context=True,
)
copy_pagerduty_log_entries_op = PostgresOperator(
task_id='copy_log_entries_s3_to_redshift',
postgres_conn_id=POSTGRES_CONN_ID,
sql=pagerduty_copy_query(
'{{ macros.ds_format(ts, "%Y-%m-%dT%H:%M:%S", "%Y/%m/%d/%H") }}',
'log_entries',
'log_entries',
),
)
download_pagerduty_log_entries_op.set_upstream(
copy_pagerduty_log_entries_op)
globals()[dag_name] = dag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.