Skip to content

Instantly share code, notes, and snippets.

@kanghyojun
Last active September 26, 2022 04:58
Show Gist options
  • Save kanghyojun/c691aa71d7b0e2b854311b61e22dfa75 to your computer and use it in GitHub Desktop.
Save kanghyojun/c691aa71d7b0e2b854311b61e22dfa75 to your computer and use it in GitHub Desktop.
쿼리 딜리버리 SSH 터널링 진단 도구
"""쿼리 딜리버리 접속용 SSH 터널링 진단 도구
- AWS API 키가 필요합니다.
- Python3.6 이상의 버전에서 실행하는 것을 추천드립니다.
- boto3가 필요합니다. [virtualenv](https://docs.python.org/ko/3/library/venv.html) 환경에서 작업하시는 것을 추천합니다.
```
$ python -m venv .venv
$ source .venv/bin/activate
$ pip install boto3
$ curl -o ssh_tunneling_test.py https://gist.githubusercontent.com/kanghyojun/c691aa71d7b0e2b854311b61e22dfa75/raw/d46888a956010e86b05a13a9befa945f0923cc90/ssh_tunneling_test.py
$ python ssh_tunneling_test.py
"""
import grp
import ipaddress
import os.path
import pathlib
import sys
from boto3 import client as boto3_client
from boto3 import resource as boto3_resource
QD_NAT_IP = ipaddress.ip_address("3.34.56.103")
pubkey = (
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC9JXkvuPc3ZCWIROpM3ph1R+E5O5ppkewlaoO1rXdSvUjo/V7jP1JxPFobMnYMPXH4pTLLjv1eC"
"XJBQMOzPv1e2gJcrDdHd9AJ8BgdaJICHzr9NSVcIGxe57qYXSp+rmIGNRDa73ftkvrPmf8ogylLldELVoLgF/bfihBKq42X6UgIs2mA6TJnWMRv6Yq"
"2ng3Umt105DNntROTS/BmJm5LLfOV1r5k99spKr0OrJl+hwVfQb9kbxcwVOAoF7HcGGOr/Utp8H2IGofCKDrqVCCOF7HOITw9rG03VQnX0rgOJNLZg"
"OShmSjCokp89P77oaEudqBBZiHtK3WC1irFtWpK8MBTk2XNe6tVP0KC62SA4WX/Ta52u6yKdHd5LH81wDxWWSOBmrHJA7PoKHTVlkwHvRK3MplfL3T"
"iASYgLzhQPFfizS9TXshTzK1zBlpWCoOJ1zjzX+knyU24l7a3f2jASb8n9LyW3fd7RYhoyj7fay9ZCddTRO3VTW9v9rMlvU0="
)
ec2 = boto3_client("ec2")
ec2_resource = boto3_resource("ec2")
def check_authorized_keys():
authorized_keys = (
pathlib.Path(os.path.expanduser("~querydelivery")) / ".ssh" / "authorized_keys"
)
with authorized_keys.open("r") as f:
assert pubkey in f.read(), "authorized_keys error"
def check_user():
assert get_users(), "querydelivery 유저가 생성되지 않았습니다. "
def get_users():
r = set()
groups = grp.getgrall()
for group in groups:
for user in group[3]:
r.add(user)
return "querydelivery" in user
def get_instance_name(inst):
name = None
for tag in inst["Tags"]:
if tag["Key"] == "Name":
name = tag["Value"]
break
return name if name is not None else "이름 없음"
def get_instances():
instance_results = ec2.describe_instances()
for reservations in instance_results["Reservations"]:
for inst in reservations["Instances"]:
yield inst
def instance_to_string(instance):
return "{} ({})".format(instance["InstanceId"], get_instance_name(instance))
def get_security_groups_ssh_cidr_ip(instance):
sgs = ec2.describe_security_groups(
GroupIds=[sg["GroupId"] for sg in instance["SecurityGroups"]]
)
for group in sgs["SecurityGroups"]:
for ip_permission in group["IpPermissions"]:
if not ip_permission["IpRanges"]:
continue
if not ip_permission.get("FromPort"):
continue
if ip_permission["FromPort"] == 22:
for range in ip_permission["IpRanges"]:
try:
yield group, ipaddress.ip_network(range["CidrIp"])
except Exception:
continue
def check_ssh_security_group(instance):
for group, ip_network in get_security_groups_ssh_cidr_ip(instance):
if QD_NAT_IP in ip_network:
yield group, ip_network
def check_vpc_network_acl(vpc_id):
vpc = ec2_resource.Vpc(vpc_id)
for acl in vpc.network_acls.all():
for entry in acl.entries:
if (
QD_NAT_IP in ipaddress.ip_network(entry["CidrBlock"])
and not entry["Egress"]
):
yield acl.id, entry
def check_vpc_igw(vpc_id):
vpc = ec2_resource.Vpc(vpc_id)
return vpc.internet_gateways
def main():
print("\n\n# 검사하고 싶은 Instance의 번호를 입력해주세요.\n")
instances = list(get_instances())
for i, instance in enumerate(instances):
print("{}. {}".format(i, instance_to_string(instance)))
select_instnace_index = input()
try:
selected_instance = instances[int(select_instnace_index)]
except ValueError:
print("숫자를 입력해주세요.")
sys.exit(1)
except IndexError:
print("범위 내의 번호를 선택해주세요")
sys.exit(1)
print("\n{}의 설정 검사를 시작합니다.\n".format(instance_to_string(selected_instance)))
print("# EC2 Instance의 Security Group 설정을 검사합니다.\n")
ssh_security_groups = check_ssh_security_group(selected_instance)
if not ssh_security_groups:
print("EC2 Instance의 Security Group 설정이 수정되어야합니다.")
print("{}로 설정된 Security Group을 찾을 수 없습니다.".format(QD_NAT_IP))
sys.exit(1)
else:
print("EC2 Instance의 Security Group 설정이 정상적으로 되었습니다.")
for group, ip_network in ssh_security_groups:
print(
"{} ({})에서 {} 네트워크 대역 설정을 발견".format(
group["GroupId"], group["GroupName"], ip_network
)
)
print("\n\n# EC2 Instance의 VPC 설정을 검사합니다.\n")
if not selected_instance.get("VpcId"):
print("VPC 설정되어 있지 않음")
sys.exit(1)
vpc_acls = check_vpc_network_acl(selected_instance["VpcId"])
if not vpc_acls:
print("VPC Network ACL 설정이 수정되어야합니다.")
sys.exit(1)
else:
print("VPC Network ACL 설정이 정상적으로 되었습니다.")
for acl_id, entry in vpc_acls:
print(
"{}(룰번호: {}) 에서 CIDR 블락 `{}`에 포함됨이 확인되었습니다.".format(
acl_id, entry["RuleNumber"], entry["CidrBlock"]
)
)
print("\n\n# EC2 Instance의 VPC internet gateway 설정을 검사합니다.\n")
vpc_igws = check_vpc_igw(selected_instance["VpcId"])
if not vpc_igws:
print("IGW 설정이 없습니다.")
else:
print("IGW 설정이 정상적으로 되어있습니다.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment