Skip to content

Instantly share code, notes, and snippets.

@lfepp
Created May 25, 2016 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lfepp/89c960ca0f3dc1ab8e5569de9882fa90 to your computer and use it in GitHub Desktop.
Save lfepp/89c960ca0f3dc1ab8e5569de9882fa90 to your computer and use it in GitHub Desktop.
Sample script to output all PagerDuty incidents for a given time period to a CSV file (defaults to previous 24 hours)
#!/usr/bin/env python
#
# Copyright (c) 2016, PagerDuty, Inc. <info@pagerduty.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of PagerDuty Inc nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL PAGERDUTY INC BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Sample script to output all incidents for a given time period to a CSV file
# Currently the time period defaults to one day
# CLI Usage: ./get_incidents_csv api_key [since] [until]
# api_key: PagerDuty API access token
# since: Start date of incidents you want to pull in YYYY-MM-DD format
# until: End date of incidents you want to pull in YYYY-MM-DD format
import requests
import sys
import json
import csv
import datetime
from datetime import date
from datetime import timedelta
url = 'https://api.pagerduty.com/incidents'
yesterday = date.today() - timedelta(days=1)
def get_incidents(api_key, since=yesterday, until=date.today()):
headers = {
'Accept': 'application/vnd.pagerduty+json;version=2',
'Authorization': 'Token token=' + api_key
}
if isinstance(since, datetime.date):
since = since.isoformat()
if isinstance(until, datetime.date):
until = until.isoformat()
payload = {
'since': since,
'until': until
}
r = requests.get(url, headers=headers, params=payload)
incidents = r.json()['incidents']
csvfile = open('test.csv', 'w')
fieldnames = ['incident_id','incident_summary','incident_urgency','incident_status','created_at','incident_type','service_id','service_summary','escalation_policy_id','escalation_policy_summary']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i in incidents:
row = {
'incident_id': i['id'],
'incident_summary': i['summary'],
'incident_urgency': i['urgency'],
'incident_status': i['status'],
'created_at': i['created_at'],
'incident_type': i['type'],
'service_id': i['service']['id'],
'service_summary': i['service']['summary'],
'escalation_policy_id': i['escalation_policy']['id'],
'escalation_policy_summary': i['escalation_policy']['summary']
}
writer.writerow(row)
csvfile.close()
if __name__ == '__main__':
if len(sys.argv) == 1:
print "Error: You did not enter any parameters.\nUsage: ./get_incidents_csv api_key [since] [until]\n\tapi_key: PagerDuty API access token\n\tsince: Start date of incidents you want to pull in YYYY-MM-DD format\n\tuntil: End date of incidents you want to pull in YYYY-MM-DD format"
elif len(sys.argv) == 2:
get_incidents(sys.argv[1])
elif len(sys.argv) == 3:
get_incidents(sys.argv[1], sys.argv[2])
else:
get_incidents(sys.argv[1], sys.argv[2], sys.argv[3])
@LeeXGreen
Copy link

LeeXGreen commented Dec 4, 2020

It's worth noting that the PagerDuty API is paginated now, so limit and offset parameters are appropriate:

def get_incidents(api_key, since=yesterday, until=date.today()):
    limit = 100
    offset = 0
    has_more = True
    all_rows = []
    while(has_more):
        has_more, rows = make_one_request(api_key, since, until, limit, offset)

        offset = offset + limit
        all_rows = all_rows + rows

    write_csv(all_rows)

def write_csv(rows):
    csvfile = open('test.csv', 'w')
    fieldnames = ['incident_id','incident_summary','incident_urgency','incident_status','created_at','incident_type','service_id','service_summary','escalation_policy_id','escalation_policy_summary']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    for row in rows:
        writer.writerow(row)
    csvfile.close()

def make_one_request(api_key, since, until, limit, offset):
    headers = {
        'Accept': 'application/vnd.pagerduty+json;version=2',
        'Authorization': 'Token token=' + api_key
    }
    if isinstance(since, datetime.date):
        since = since.isoformat()
    if isinstance(until, datetime.date):
        until = until.isoformat()
    payload = {
        'since': since,
        'until': until,
        'limit': limit,
        'offset': offset,
    }
    r = requests.get(url, headers=headers, params=payload)
    json = r.json()
    incidents = json['incidents']
    has_more = json['more']
    rows = [ {
            'incident_id': i['id'],
            'incident_summary': i['summary'].encode('ascii', 'ignore'), # I had weird unicode characters in this field and Python 2 was sad
            'incident_urgency': i['urgency'],
            'incident_status': i['status'],
            'created_at': i['created_at'],
            'incident_type': i['type'],
            'service_id': i['service']['id'],
            'service_summary': i['service']['summary'],
            'escalation_policy_id': i['escalation_policy']['id'],
            'escalation_policy_summary': i['escalation_policy']['summary']
        } for i in incidents]

    return (has_more, rows)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment