Skip to content

Instantly share code, notes, and snippets.

@mdcollins05
Last active August 29, 2015 14:16
Show Gist options
  • Save mdcollins05/15211d8b32666b121eff to your computer and use it in GitHub Desktop.
Save mdcollins05/15211d8b32666b121eff to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import requests
import sys
import json
from datetime import date, timedelta
import pprint
#Your PagerDuty API key. A read-only key will work for this.
auth_token = 'API_KEY_HERE'
#The PagerDuty subdomain
pd_subdomain = 'YOUR_PD_SUBDOMAIN'
#The PagerDuty team id (leave blank for all teams)
pd_team = ''
## NOTE: Dates should be in ISO 8601 format: http://en.wikipedia.org/wiki/ISO_8601
#Start date
#Default value is: start_date = date.today().isoformat()
start_date = date.today().isoformat()
#End date
#Default falue is: end_date = (date.today() + timedelta(180)).isoformat()
end_date = (date.today() + timedelta(180)).isoformat()
#Include all users or just the ones attached to an escalation policy/schedule
inclusive = False
#Include the header in the CSV output
header = True
##Output columns to file 'on-call_users.csv':
##user name, team name, escalation policy name, escalation policy level, schedule name
HEADERS = {
'Authorization': 'Token token={0}'.format(auth_token),
'Content-type': 'application/json',
}
def get_users_and_escalation_policies():
global pd_team
user_data = get_all_eps(pd_team)
user_data = convert_to_ascii(user_data)
print "Processing..."
if header:
output = '"Person\'s name", "Person\'s email", "Team name", "Escalation Policy name", "Escalation Policy level", "Schedule name (if applicable)"' + "\n"
else:
output = ''
# pprint.pprint(user_data)
# sys.exit()
for user in sorted(user_data.itervalues(), key=lambda e: e['name'].lower()): # Sort everything regardless of case
entries = 0
if 'data' in user:
for team in sorted(user['data'], key=lambda e: e.lower()):
for ep_name, ep_value in sorted(user['data'][team].iteritems()):
entries += 1
for ep_level, sched_names in sorted(ep_value.iteritems()):
for sched_name in sorted(sched_names, key=lambda e: e.lower()):
output += '"' + user['name'].replace('"', "''") + '","' + user['email'] + '","' + team.replace('"', "''") + '","' + ep_name.replace('"', "''") + '","' + str(ep_level) + '","' + sched_name.replace('"', "''") + '"' + "\n"
if entries == 0 and inclusive:
output += '"' + user['name'].replace('"', "'") + '","' + user['email'] + '","","","",""' + "\n"
print "Writing results to file: on_call_users.csv"
file = open('on_call_users.csv','a')
file.write(output)
def convert_to_ascii(input):
if isinstance(input, dict):
return {convert_to_ascii(key): convert_to_ascii(value) for key, value in input.iteritems()}
elif isinstance(input, list):
return [convert_to_ascii(element) for element in input]
elif isinstance(input, unicode):
return input.encode('ascii', 'ignore')
else:
return input
## User stuff ##
def get_user_count():
user_count = requests.get(
'https://{0}.pagerduty.com/api/v1/users'.format(pd_subdomain),
headers=HEADERS,
)
return [user_count.json()['total'], user_count.json()['limit']]
def get_all_users():
total_users = get_user_count()
print("Total number of users: {0}".format(total_users[0]))
users = {}
for offset in xrange(0, total_users[0]):
if offset % total_users[1] == 0:
user_data = get_users(offset, total_users[1])
for user in user_data:
users[user['email']] = {}
users[user['email']]['name'] = user['name']
users[user['email']]['email'] = user['email']
return users
def get_users(offset, limit):
params = {
'offset': offset,
'limit': limit
}
users = requests.get(
'https://{0}.pagerduty.com/api/v1/users'.format(pd_subdomain),
headers=HEADERS,
data=json.dumps(params),
)
return users.json()['users']
## Team stuff ##
def get_team_count():
team_count = requests.get(
'https://{0}.pagerduty.com/api/v1/teams'.format(pd_subdomain),
headers=HEADERS,
)
return [team_count.json()['total'], team_count.json()['limit']]
def get_all_teams():
total_teams = get_team_count()
print("Number of teams: {0}".format(total_teams[0]))
teams = {}
for offset in xrange(0, total_teams[0]):
if offset % total_teams[1] == 0:
team_data = get_teams(offset, total_teams[1])
for team in team_data:
teams.setdefault(team['id'], team['name'])
return teams
def get_teams(offset, limit):
params = {
'offset': offset,
'limit': limit
}
teams = requests.get(
'https://{0}.pagerduty.com/api/v1/teams'.format(pd_subdomain),
headers=HEADERS,
data=json.dumps(params),
)
return teams.json()['teams']
## Escalation Policy stuff ##
def get_ep_count(team):
params = {
'teams': team
}
ep_count = requests.get(
'https://{0}.pagerduty.com/api/v1/escalation_policies'.format(pd_subdomain),
headers=HEADERS,
data=json.dumps(params),
)
return [ep_count.json()['total'], ep_count.json()['limit']]
def get_all_eps(pd_team):
users = get_all_users()
teams = {}
if pd_team:
all_teams = get_all_teams()
teams[pd_team] = all_teams[pd_team]
else:
teams = get_all_teams()
for team_id, team_name in teams.iteritems():
total_eps = get_ep_count(team_id)
print("Number of escalation policies for team {0}: {1}".format(team_name, total_eps[0]))
for offset in xrange(0, total_eps[0]):
if offset % total_eps[1] == 0: # For each x, request the next x
ep_data = get_eps(team_id, offset, total_eps[1])
for ep in ep_data: #For each escalation policy
i = 0 #So we can keep track of what level of an escalation policy we are on
for ep_level in ep['escalation_rules']: #For each escalation level in the escalation policy
i += 1 #Up it by one for each level we loop through
for ep_target in ep_level['targets']: #For each target in the escalation level
if ep_target['type'] == "schedule": #We need to fetch the users on a schedule
schedule = get_schedule_users(ep_target['id'])
try:
for user in schedule: #For each user in the schedule
users[user['email']].setdefault('data', {}) #Make sure we have a data dict under the user
users[user['email']]['data'].setdefault(team_name, {}) #
users[user['email']]['data'][team_name].setdefault(ep['name'], {}) #The key is the schedule name
users[user['email']]['data'][team_name][ep['name']].setdefault(i, [])
users[user['email']]['data'][team_name][ep['name']][i].append(ep_target['name']) #The key is the level on the escalation policy, value is the schedule name
except:
continue
elif ep_target['type'] == "user": #A user is the target on the escalation policy
users[ep_target['email']].setdefault('data', {}) #Make sure we have a data dict under the user
users[ep_target['email']]['data'].setdefault(team_name, {}) #The key is the schedule name
users[ep_target['email']]['data'][team_name].setdefault(ep['name'], {}) #The key is the schedule name
users[ep_target['email']]['data'][team_name][ep['name']].setdefault(i, [])
users[ep_target['email']]['data'][team_name][ep['name']][i].append('')
return users
def get_eps(team, offset, limit):
params = {
'offset': offset,
'limit': limit,
'teams': team
}
print("Offset: {0}".format(offset))
ep = requests.get(
'https://{0}.pagerduty.com/api/v1/escalation_policies'.format(pd_subdomain),
headers=HEADERS,
data=json.dumps(params),
)
return ep.json()['escalation_policies']
def get_schedule_users(scheduleID):
global start_date, end_date
if not start_date:
start_date = date.today().isoformat()
if not end_date:
end_date = (date.today() + timedelta(180)).isoformat() ## API has a max of 180 day span in one call
params = {
'since': start_date,
'until': end_date
}
schedule = requests.get(
'https://{0}.pagerduty.com/api/v1/schedules/{1}/users'.format(pd_subdomain, scheduleID),
headers=HEADERS,
data=json.dumps(params),
)
try:
return schedule.json()['users']
except:
print("Schedule ID {0} returned an error".format(scheduleID))
return None
def main(argv=None):
if argv is None:
argv = sys.argv
get_users_and_escalation_policies()
if __name__=='__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment