Skip to content

Instantly share code, notes, and snippets.

@acdha
Last active May 28, 2023 07:12
Show Gist options
  • Save acdha/64151bd91d03b901b68aa9dc6cfaf380 to your computer and use it in GitHub Desktop.
Save acdha/64151bd91d03b901b68aa9dc6cfaf380 to your computer and use it in GitHub Desktop.
Ensure that AWS security groups have a list of the current Cloudflare CIDR ranges
#!/usr/bin/env python3
"""
Ensure that every security group tagged with 'AllowCloudflareIngress' has
an ingress rule allowing HTTPS in from every public Cloudflare edge IPv4 and
IPv6 CIDR block.
Note that HTTP is intentionally not enabled: use the always-HTTPS page rule for
that to avoid potential security problems.
"""
import sys
from urllib.request import Request, urlopen
import boto3
from botocore.exceptions import ClientError
EC2_CLIENT = boto3.client("ec2")
def fetch_text(url):
# Avoid Cloudflare's “Browser Integrity Check” blocking the default
# Python User-Agent:
headers = {"User-Agent": "Cloudflare Ingress Manager"}
req = Request(url, headers=headers)
if req.type not in ("http", "https"):
raise ValueError(f"Unexpected URL scheme: {req.type}")
with urlopen(req) as response: # nosec
if response.getcode() != 200:
raise RuntimeError(f"Unexpected HTTP {response.getcode()} from {url}")
else:
return response.read().decode("utf-8")
def fetch_lines(url):
return [i.strip() for i in fetch_text(url).splitlines()]
CLOUDFLARE_IPV4 = fetch_lines("https://www.cloudflare.com/ips-v4")
CLOUDFLARE_IPV6 = fetch_lines("https://www.cloudflare.com/ips-v6")
def add_inbound_rules_to_group(sg_id, existing_ip_permissions):
rule = {"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443}
# We'll collect the list of existing rules so we can reliably only add new
# values, avoiding API errors:
existing_ipv4 = set()
existing_ipv6 = set()
for existing in existing_ip_permissions:
# Ignore any existing rules for other protocols & ports
if any(rule[k] != existing[k] for k in ("IpProtocol", "FromPort", "ToPort")):
continue
existing_ipv4.update(i["CidrIp"] for i in existing["IpRanges"])
existing_ipv6.update(i["CidrIpv6"] for i in existing["Ipv6Ranges"])
# Now we'll construct the list of new ranges:
new_ipv4_ranges = [
{"CidrIp": cidr, "Description": "Cloudflare"}
for cidr in CLOUDFLARE_IPV4
if cidr not in existing_ipv4
]
new_ipv6_ranges = [
{"CidrIpv6": cidr, "Description": "Cloudflare"}
for cidr in CLOUDFLARE_IPV6
if cidr not in existing_ipv6
]
if not new_ipv4_ranges and not new_ipv6_ranges:
return f"{sg_id}: no new rules are necessary"
if new_ipv4_ranges:
print(f"{sg_id}: adding new IPv4 ranges: {new_ipv4_ranges}")
if new_ipv6_ranges:
print(f"{sg_id}: adding new IPv6 ranges: {new_ipv6_ranges}")
rule["IpRanges"] = new_ipv4_ranges
rule["Ipv6Ranges"] = new_ipv6_ranges
try:
EC2_CLIENT.authorize_security_group_ingress(GroupId=sg_id, IpPermissions=[rule])
except ClientError as exc:
print(f"Unable to add permssions for {sg_id}: {exc}", file=sys.stderr)
return "%s added new rules: %d IPv4, %d IPv6" % (
sg_id,
len(new_ipv4_ranges),
len(new_ipv6_ranges),
)
def get_applicable_security_groups():
"""
Yields (group ID, IP permissions) pairs for security groups which have the
AllowCloudflareIngress tag
"""
paginator = EC2_CLIENT.get_paginator("describe_security_groups")
for name in ("AllowCloudFlareIngress", "AllowCloudflareIngress"):
page_iterator = paginator.paginate(
Filters=[{"Name": f"tag:{name}", "Values": ["true"]}]
)
for page in page_iterator:
for sg in page["SecurityGroups"]:
yield sg["GroupId"], sg["IpPermissions"]
def update_all_groups():
for security_group_id, ip_permissions in get_applicable_security_groups():
yield add_inbound_rules_to_group(security_group_id, ip_permissions)
def handler(event, context):
results = list(update_all_groups())
return {"results": results}
if __name__ == "__main__":
for message in update_all_groups():
print(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment