Created
September 28, 2018 14:01
-
-
Save jpcarey/454d5a17a9faa7f5ada78019ba2aa49e to your computer and use it in GitHub Desktop.
Generate SSH Config for connecting to Google Compute Instances
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
import dns.resolver | |
import ipaddress | |
import json | |
import shutil | |
import os | |
# AWSURL = 'https://ip-ranges.amazonaws.com/ip-ranges.json' | |
GCPROOTDNS = '_cloud-netblocks.googleusercontent.com' | |
def round_down(num, divisor): | |
return int(num) - (int(num)%divisor) | |
def gcp_range(ROOTDNS): | |
gcp_ranges = [] | |
maindns = dns.resolver.query(ROOTDNS, "TXT")[0] | |
includes = [x for x in maindns.to_text().split() | |
if x.startswith("include:")] | |
for toprecord in includes: | |
subrecords = dns.resolver.query(toprecord.split(":")[1], "TXT")[0] | |
subrecords = [x for x in subrecords.to_text().split() | |
if x.startswith("ip")] | |
for ipblocks in subrecords: | |
if ipblocks.startswith("ip4"): | |
gcp_ranges.append(ipblocks[4:]) | |
if ipblocks.startswith("ip6"): | |
gcp_ranges.append(ipblocks[4:]) | |
return gcp_ranges | |
def expand_subnets(list): | |
IPaddress = namedtuple('IPaddress', 'o1 o2 o3 o4') | |
output = dict() | |
error = [] | |
for it in list: | |
range = ipaddress.ip_network(it) | |
if isinstance(range, ipaddress.IPv4Network): | |
if range.prefixlen < 24: | |
subnets = range.subnets(new_prefix=24) | |
elif range.prefixlen == 24: | |
subnets = range.subnets() | |
else: | |
error.append(it) | |
elif isinstance(range, ipaddress.IPv6Network): | |
error.append(type(range)) | |
else: | |
pass | |
for item in subnets: | |
ip = IPaddress(*str(item[1]).split(".")) | |
group = round_down(ip.o3, 10) | |
if ip.o1 not in output: | |
output[ip.o1] = {ip.o2: {group: [ip.o3] }} | |
elif ip.o2 not in output[ip.o1]: | |
output[ip.o1][ip.o2]: {group: [ip.o3]} | |
else: | |
output[ip.o1][ip.o2].setdefault(group, []).append(ip.o3) | |
return output | |
def ssh_expand(ip_list): | |
ssh_list = [] | |
for k1,v1 in ip_list.items(): | |
for k2,v2 in v1.items(): | |
for k3,v3 in v2.items(): | |
if len(v3) == 10: | |
ssh_list.append('{0}.{1}.{2}.{3}'.format( | |
k1,k2,str(k3)[:-1]+'?','*')) | |
else: | |
for it in v3: | |
ssh_list.append('{0}.{1}.{2}.{3}'.format(k1,k2,it,'*')) | |
return ssh_list | |
def update_ssh_config(ssh_config, generated_update): | |
shutil.copyfile(src=ssh_config,dst=ssh_config+'.backup') | |
f = open(ssh_config, 'r+') | |
line = f.readline() | |
while line: | |
if line.startswith('########## START GENERATED CONTENT ##########'): | |
print(line) | |
print(f.tell()) | |
break | |
line = f.readline() | |
f.truncate(f.tell()) | |
print(f.tell()) | |
f.write(GCP_CONFIG) | |
f.close() | |
if __name__ == "__main__": | |
gcp_ranges = gcp_range(GCPROOTDNS) | |
final = ssh_expand(expand_subnets(gcp_ranges)) | |
GCP_CONFIG = """ | |
Host {0} | |
User jared.carey | |
StrictHostKeyChecking no | |
UserKnownHostsFile=/dev/null | |
""".format(' '.join(final)) | |
SSH_CONFIG = os.path.expanduser('~/.ssh/config') | |
update_ssh_config(SSH_CONFIG, GCP_CONFIG) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment