Skip to content

Instantly share code, notes, and snippets.

@Deconstrained
Last active January 20, 2022 00:20
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Deconstrained/f29fe709f8e4ff28715f7cf715e80f13 to your computer and use it in GitHub Desktop.
Save Deconstrained/f29fe709f8e4ff28715f7cf715e80f13 to your computer and use it in GitHub Desktop.
Updates an EC2 security group with rules to permit connections from PagerDuty IP addresses
#!/usr/bin/env python
import argparse
import logging
import os
import requests
import subprocess
import sys
from boto.ec2 import get_region
from boto.ec2.connection import EC2Connection
log = logging.getLogger('pdip')
def main():
ap = argparse.ArgumentParser(description="Clear and repopulate an AWS EC2 "
"security group's rules such that all TCP traffic from PagerDuty "
"outbound integration source IP addresses is permitted.")
ap.add_argument('-r', '--region', type=str, default='us-east-1',
help="AWS region in which to find the security group.")
ap.add_argument('-p', '--dst-ports', type=str, default='80,443',
help="Comma-delineated list of ports to which PagerDuty will be "
"granted access.")
ap.add_argument('-v', '--verbose', default=False, action='store_true',
help="Show verbose output")
ap.add_argument('security_group_id', type=str)
args = ap.parse_args()
ports = [int(p) for p in args.dst_ports.split(',') if p.isdigit()]
if not ports:
print("No valid destination ports specified.")
sys.exit(1)
if args.verbose:
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
conn = EC2Connection(region=get_region(args.region))
pagerduty_ips = set([])
for ip in requests.get('https://app.pagerduty.com/webhook_ips').json():
pagerduty_ips.add(ip+'/32')
empty = False
while not empty:
# boto does not return more than X rules at a time, so let's re-download
# the data, revoke rules and check for a clean slate until it's truly
# empty. Without this loop, AWS 400's result ("specified rule already
# exists")
groups = conn.get_all_security_groups(group_ids=[args.security_group_id])
if not groups:
print("No group found matching ID")
sys.exit(1)
group = groups[0]
# Clear out existing rules:
rules = list(group.rules)
empty = not len(rules)
for rule in rules:
for grant in rule.grants:
log.info("Revoking %s access to port %d-%d from %s",
rule.ip_protocol, int(rule.from_port), int(rule.to_port),
grant.cidr_ip)
group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port, to_port=rule.to_port,
cidr_ip=grant.cidr_ip)
# Add rules to permit PagerDuty IPs for each port given
for port in ports:
log.info("Authorizing IPs access to port %d", port)
group.authorize(ip_protocol='tcp', from_port=port, to_port=port,
cidr_ip=list(pagerduty_ips))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment