Created
September 25, 2025 21:51
-
-
Save prestonprice57/9a3bf349259fa827f92e994724cc638a to your computer and use it in GitHub Desktop.
AI Generated Checkov Rule
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ckv_admin_ports_public.py | |
| from typing import List, Dict, Any, Optional | |
| from checkov.common.models.enums import CheckCategories, CheckResult | |
| from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck | |
| from checkov.terraform.checks.utils import resource_attribute_has_value | |
| from checkov.common.util.type_forcers import force_int | |
| PUBLIC_V4 = "0.0.0.0/0" | |
| PUBLIC_V6 = "::/0" | |
| # A conservative, extensible set of sensitive/admin ports. | |
| # (Covers remote admin, DBs, common admin consoles, k8s, docker, etc.) | |
| SENSITIVE_PORTS: List[int] = sorted(set([ | |
| 22, # SSH | |
| 3389, # RDP | |
| 5985, 5986, # WinRM | |
| 2375, 2376, # Docker daemon API | |
| 10250, 10255, # Kubelet | |
| 6443, # Kubernetes API server | |
| 2379, 2380, # etcd | |
| 5432, # PostgreSQL | |
| 1433, # MS SQL Server | |
| 1521, # Oracle | |
| 3306, # MySQL | |
| 27017, # MongoDB | |
| 6379, # Redis | |
| 11211, # Memcached | |
| 9200, 9201, # Elasticsearch | |
| 5601, # Kibana | |
| 5900, 5901, # VNC | |
| 8080, 8081, # Common admin UIs / proxies | |
| 8443, # TLS admin UI | |
| 8888, # Jupyter / dev consoles | |
| 9000, # SonarQube et al. | |
| 9090, # Prometheus | |
| 4505, 4506, # SaltStack | |
| ])) | |
| # Extra sensitive ranges to consider "admin-like" | |
| SENSITIVE_RANGES: List[range] = [ | |
| range(2379, 2381), # etcd (2379-2380 inclusive) | |
| range(5900, 5902), # VNC (5900-5901) | |
| range(9200, 9202), # Elasticsearch (9200-9201) | |
| ] | |
| def _is_public_cidr(conf: Dict[str, Any]) -> bool: | |
| """Return True if any ingress source is the public internet.""" | |
| # For inline ingress on aws_security_group | |
| cidr_blocks = conf.get("cidr_blocks") or conf.get("ingress", []) | |
| ipv6_cidr_blocks = conf.get("ipv6_cidr_blocks") or [] | |
| prefix_list_ids = conf.get("prefix_list_ids") or [] | |
| # Inline ingress case: conf may be the ingress block dict itself | |
| # Normalize possible shapes (list or scalar) | |
| def _listify(v): | |
| if v is None: | |
| return [] | |
| if isinstance(v, list): | |
| return v | |
| return [v] | |
| cidrs_v4 = _listify(conf.get("cidr_blocks")) + _listify(conf.get("cidr_block")) | |
| cidrs_v6 = _listify(conf.get("ipv6_cidr_blocks")) + _listify(conf.get("ipv6_cidr_block")) | |
| # If it's the SG resource root, these keys won't be present here; handled by caller | |
| for c in cidrs_v4: | |
| if isinstance(c, str) and c.strip() == PUBLIC_V4: | |
| return True | |
| for c in cidrs_v6: | |
| if isinstance(c, str) and c.strip() == PUBLIC_V6: | |
| return True | |
| # prefix_list_ids generally represent internal/private traffic; we don't treat them as public | |
| return False | |
| def _is_public_in_sg_rule(conf: Dict[str, Any]) -> bool: | |
| """For aws_security_group_rule (standalone), detect public internet sources.""" | |
| def _listify(v): | |
| if v is None: | |
| return [] | |
| if isinstance(v, list): | |
| return v | |
| return [v] | |
| for c in _listify(conf.get("cidr_blocks")): | |
| if isinstance(c, str) and c.strip() == PUBLIC_V4: | |
| return True | |
| for c in _listify(conf.get("ipv6_cidr_blocks")): | |
| if isinstance(c, str) and c.strip() == PUBLIC_V6: | |
| return True | |
| return False | |
| def _port_hits_sensitive(from_port: Optional[Any], to_port: Optional[Any], protocol: Optional[Any]) -> bool: | |
| """ | |
| Return True if the rule’s port or range hits any sensitive/admin port(s). | |
| Protocol of "-1" or "all" means all ports -> always sensitive if public. | |
| """ | |
| proto = str(protocol[0] if isinstance(protocol, list) and protocol else protocol or "").lower() | |
| if proto in {"-1", "all"}: | |
| return True | |
| # Some providers render from_port/to_port as lists | |
| f = force_int(from_port[0] if isinstance(from_port, list) and from_port else from_port) | |
| t = force_int(to_port[0] if isinstance(to_port, list) and to_port else to_port) | |
| if f is None or t is None: | |
| # If missing, be conservative only if protocol is tcp/udp and one of them is present | |
| return False | |
| # Normalize ordering | |
| if f > t: | |
| f, t = t, f | |
| # Single-port and ranges: check membership | |
| port_set = set(SENSITIVE_PORTS) | |
| # Quick path: if either endpoint is sensitive or range overlaps any sensitive port/range | |
| if any(p >= f and p <= t for p in port_set): | |
| return True | |
| # Check overlaps with known sensitive ranges | |
| for r in SENSITIVE_RANGES: | |
| if f <= (r.stop - 1) and t >= r.start: | |
| return True | |
| return False | |
| class AdminPortsNotPublic(BaseResourceCheck): | |
| def __init__(self) -> None: | |
| name = "Do not expose admin/sensitive ports to the public internet" | |
| id = "CKV_AWS_CUSTOM_ADMIN_PORTS_PUBLIC" | |
| supported_resources = ["aws_security_group", "aws_security_group_rule"] | |
| categories = [CheckCategories.NETWORKING] | |
| super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) | |
| def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: | |
| resource_type = self.entity_type | |
| if resource_type == "aws_security_group": | |
| # Inline ingress blocks (can be list of maps) | |
| ingress_blocks = conf.get("ingress") | |
| if not ingress_blocks: | |
| return CheckResult.PASSED | |
| for ingress in ingress_blocks: | |
| if not isinstance(ingress, dict): | |
| continue | |
| # Skip non-public sources early | |
| if not _is_public_cidr(ingress): | |
| continue | |
| protocol = ingress.get("protocol") | |
| from_port = ingress.get("from_port") | |
| to_port = ingress.get("to_port") | |
| # If protocol is "-1" or "all" and public -> fail | |
| if _port_hits_sensitive(from_port, to_port, protocol): | |
| return CheckResult.FAILED | |
| return CheckResult.PASSED | |
| if resource_type == "aws_security_group_rule": | |
| # Only ingress rules are relevant here | |
| rule_type = conf.get("type") | |
| if isinstance(rule_type, list): | |
| rule_type = rule_type[0] if rule_type else None | |
| if (rule_type or "").lower() != "ingress": | |
| return CheckResult.PASSED | |
| if not _is_public_in_sg_rule(conf): | |
| return CheckResult.PASSED | |
| protocol = conf.get("protocol") | |
| from_port = conf.get("from_port") | |
| to_port = conf.get("to_port") | |
| if _port_hits_sensitive(from_port, to_port, protocol): | |
| return CheckResult.FAILED | |
| return CheckResult.PASSED | |
| return CheckResult.PASSED | |
| check = AdminPortsNotPublic() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment