Last active
May 28, 2023 07:12
-
-
Save acdha/64151bd91d03b901b68aa9dc6cfaf380 to your computer and use it in GitHub Desktop.
Ensure that AWS security groups have a list of the current Cloudflare CIDR ranges
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Ensure that every security group tagged with 'AllowCloudflareIngress' has | |
an ingress rule allowing HTTPS in from every public Cloudflare edge IPv4 and | |
IPv6 CIDR block. | |
Note that HTTP is intentionally not enabled: use the always-HTTPS page rule for | |
that to avoid potential security problems. | |
""" | |
import sys | |
from urllib.request import Request, urlopen | |
import boto3 | |
from botocore.exceptions import ClientError | |
EC2_CLIENT = boto3.client("ec2") | |
def fetch_text(url): | |
# Avoid Cloudflare's “Browser Integrity Check” blocking the default | |
# Python User-Agent: | |
headers = {"User-Agent": "Cloudflare Ingress Manager"} | |
req = Request(url, headers=headers) | |
if req.type not in ("http", "https"): | |
raise ValueError(f"Unexpected URL scheme: {req.type}") | |
with urlopen(req) as response: # nosec | |
if response.getcode() != 200: | |
raise RuntimeError(f"Unexpected HTTP {response.getcode()} from {url}") | |
else: | |
return response.read().decode("utf-8") | |
def fetch_lines(url): | |
return [i.strip() for i in fetch_text(url).splitlines()] | |
CLOUDFLARE_IPV4 = fetch_lines("https://www.cloudflare.com/ips-v4") | |
CLOUDFLARE_IPV6 = fetch_lines("https://www.cloudflare.com/ips-v6") | |
def add_inbound_rules_to_group(sg_id, existing_ip_permissions): | |
rule = {"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443} | |
# We'll collect the list of existing rules so we can reliably only add new | |
# values, avoiding API errors: | |
existing_ipv4 = set() | |
existing_ipv6 = set() | |
for existing in existing_ip_permissions: | |
# Ignore any existing rules for other protocols & ports | |
if any(rule[k] != existing[k] for k in ("IpProtocol", "FromPort", "ToPort")): | |
continue | |
existing_ipv4.update(i["CidrIp"] for i in existing["IpRanges"]) | |
existing_ipv6.update(i["CidrIpv6"] for i in existing["Ipv6Ranges"]) | |
# Now we'll construct the list of new ranges: | |
new_ipv4_ranges = [ | |
{"CidrIp": cidr, "Description": "Cloudflare"} | |
for cidr in CLOUDFLARE_IPV4 | |
if cidr not in existing_ipv4 | |
] | |
new_ipv6_ranges = [ | |
{"CidrIpv6": cidr, "Description": "Cloudflare"} | |
for cidr in CLOUDFLARE_IPV6 | |
if cidr not in existing_ipv6 | |
] | |
if not new_ipv4_ranges and not new_ipv6_ranges: | |
return f"{sg_id}: no new rules are necessary" | |
if new_ipv4_ranges: | |
print(f"{sg_id}: adding new IPv4 ranges: {new_ipv4_ranges}") | |
if new_ipv6_ranges: | |
print(f"{sg_id}: adding new IPv6 ranges: {new_ipv6_ranges}") | |
rule["IpRanges"] = new_ipv4_ranges | |
rule["Ipv6Ranges"] = new_ipv6_ranges | |
try: | |
EC2_CLIENT.authorize_security_group_ingress(GroupId=sg_id, IpPermissions=[rule]) | |
except ClientError as exc: | |
print(f"Unable to add permssions for {sg_id}: {exc}", file=sys.stderr) | |
return "%s added new rules: %d IPv4, %d IPv6" % ( | |
sg_id, | |
len(new_ipv4_ranges), | |
len(new_ipv6_ranges), | |
) | |
def get_applicable_security_groups(): | |
""" | |
Yields (group ID, IP permissions) pairs for security groups which have the | |
AllowCloudflareIngress tag | |
""" | |
paginator = EC2_CLIENT.get_paginator("describe_security_groups") | |
for name in ("AllowCloudFlareIngress", "AllowCloudflareIngress"): | |
page_iterator = paginator.paginate( | |
Filters=[{"Name": f"tag:{name}", "Values": ["true"]}] | |
) | |
for page in page_iterator: | |
for sg in page["SecurityGroups"]: | |
yield sg["GroupId"], sg["IpPermissions"] | |
def update_all_groups(): | |
for security_group_id, ip_permissions in get_applicable_security_groups(): | |
yield add_inbound_rules_to_group(security_group_id, ip_permissions) | |
def handler(event, context): | |
results = list(update_all_groups()) | |
return {"results": results} | |
if __name__ == "__main__": | |
for message in update_all_groups(): | |
print(message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment