Skip to content

Instantly share code, notes, and snippets.

@stypr
Last active August 3, 2022 17:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stypr/ba8931f69a3d82458444f284977ab773 to your computer and use it in GitHub Desktop.
Save stypr/ba8931f69a3d82458444f284977ab773 to your computer and use it in GitHub Desktop.
Bird2 connectivity checker (upstream, downstream, exchanges)
#!/usr/bin/python3 -u
#-*- coding: utf-8 -*-
"""
connectivity_check.py
Simple tool for checking Bird2 connectivity (for personal use)
Developed by AS400671 (https://network.stypr.com/)
You can use description to set speed, countries and names of providers.
```
protocol bgp constant6
{
description "The Constant Company LLC | 1G | US";
local as my_asn;
source address my_ipv6;
...
}
```
"""
import os
import re
import json
import socket
from ipaddress import ip_address, IPv4Address
# Check your bird configurations and set accordingly.
UPSTREAM_PROTOCOLS = [
'constant4',
'constant6',
'virtua6',
'virtua4',
'frantech6',
'frantech4',
]
def parse_protocol_output(output):
"""
Parses the result of birdc show proto all {protocol}
"""
result = {}
ret_address_version = ""
ret_asn_number = ""
ret_provider = ""
ret_ip_addr = ""
for ret_line in output.split("\n"):
if not ret_address_version:
ret_address = re.findall(r"Neighbor address:[\ ]+([0-9a-f\.\:]+)", ret_line.strip())
if ret_address:
ret_address_version = validate_ipaddress(ret_address[0])
if not ret_asn_number:
ret_asn_number = re.findall(r"Neighbor AS:[\ ]+([0-9]+)", ret_line.strip())
if ret_asn_number:
ret_asn_number = ret_asn_number[0]
if not ret_provider:
ret_provider = re.findall(r"Description:[\ ]+([\w\ \(\)\.\|\-\_]+)", ret_line.strip())
if ret_provider:
ret_provider = ret_provider[0].strip()
if not ret_ip_addr:
ret_ip_addr = re.findall(r"Source address:[\ ]+([0-9a-f\.\:]+)", ret_line.strip())
if ret_ip_addr:
ret_ip_addr = ret_ip_addr[0].strip()
if ret_asn_number and ret_address_version and ret_provider and ret_ip_addr:
break
result = {
'asn': ret_asn_number,
'provider': ret_provider,
'version': ret_address_version,
'ip': ret_ip_addr,
}
return result
def validate_ipaddress(target):
"""
Check if the IP address is valid
Return version of IP address if valid
"""
try:
return "v4" if isinstance(ip_address(target), IPv4Address) else "v6"
except ValueError:
return ""
def list_exchanges():
"""
List available protocols from bird
Returns dict of protocols and its descriptions
"""
global UPSTREAM_PROTOCOLS
result = {}
exchanges = []
# fetch exchanges from show proto
ret = os.popen("birdc -r 'show proto' | tail -n +4").read()
ret_parsed = [[j for j in i.split(" ") if j] for i in ret.split("\n") if i]
for protocol in ret_parsed:
if protocol[1] != "BGP":
continue
if protocol[0] in UPSTREAM_PROTOCOLS:
continue
exchanges.append(protocol[0])
# parse all info from each exchange
for exchange in exchanges:
ret = os.popen(f"birdc -r 'show proto all {exchange}' 2>&1").read()
proto_output = parse_protocol_output(ret)
if proto_output:
ret_address_version = proto_output['version']
ret_asn = proto_output['asn']
ret_provider = proto_output['provider'].split("|")
ret_ip_addr = proto_output['ip']
if result.get(exchange):
continue
result[exchange] = {
'asn': ret_asn,
'ip': ret_ip_addr,
'version': ret_address_version,
'provider': ret_provider[0].strip(),
'speed': ret_provider[1].strip() if len(ret_provider) > 1 else "1G",
'country': ret_provider[2].strip() if len(ret_provider) > 1 else "us",
}
return result
def list_peers(protocol):
""" (list) -> list
List available protocols from bird
Returns list of protocols
"""
result = {'v4': {}, 'v6': {}}
for blocked_char in ["'", "\"", "$", "`", "|", "\\", ";", ">", "<", "{", "}"]:
if blocked_char in protocol:
return result
ret = os.popen(f"birdc -r 'show route primary protocol {protocol}'").read().split("\n")
for i in range(3, len(ret), 2):
row = [q for q in ret[i].split(" ") if q]
if not row:
continue
row_prefix = row[0]
row_status = row[1]
row_asn = row[-1]
# Check if the imported network is reachable
if row_status == "unreachable":
continue
if row_status != "unicast":
# print(f"[*] Debug plz: {row}")
continue
# Check if the ASN is valid
try:
row_address_version = validate_ipaddress(row_prefix.split("/")[0])
except IndexError:
continue
row_asn_number = re.findall(r"\[AS([0-9]+)[i\*\?]\]", row_asn)
if not row_asn_number:
# print(f"[*] Debug plz(2): {row}")
continue
row_asn_number = row_asn_number[0]
if result[row_address_version].get(row_asn_number):
continue
result[row_address_version][row_asn_number] = {
'type': 'downstream',
'name': '',
'country': '',
}
return result
def list_upstream():
"""
List available protocols from bird, parse neighbor NS from upstream protocol info
Returns list of ASNs
"""
global UPSTREAM_PROTOCOLS
result = {'v4': {}, 'v6': {}}
# get available upstreams
available_upstreams = []
ret = os.popen("birdc -r 'show proto' | tail -n +4").read()
ret_parsed = [[j for j in i.split(" ") if j] for i in ret.split("\n") if i]
for protocol in ret_parsed:
if protocol[1] != "BGP":
continue
if protocol[0] not in UPSTREAM_PROTOCOLS:
continue
available_upstreams.append(protocol[0])
# parse neighbor ns
for upstream in available_upstreams:
ret = os.popen(f"birdc -r 'show proto all {upstream}'").read()
proto_output = parse_protocol_output(ret)
if proto_output:
ret_address_version = proto_output['version']
ret_asn = proto_output['asn']
ret_provider = proto_output['provider'].split("|")
if result[ret_address_version].get(ret_asn):
continue
result[ret_address_version][ret_asn] = {
'type': 'upstream',
'name': '',
'provider': ret_provider[0].strip(),
'country': ret_provider[2].strip() if len(ret_provider) > 1 else "us",
'speed': ret_provider[1].strip() if len(ret_provider) > 1 else "1G",
}
continue
return result
def list_downstream():
"""
List available exchanges from bird, get list of exported info
Returns list of ASNs
"""
server_list = list_exchanges()
server_peers = {'v4': {}, 'v6': {}}
for server in server_list:
peers = list_peers(server)
for version in peers.keys():
for asn, asn_info in peers[version].items():
server_peers[version][asn] = asn_info
server_peers[version][asn]['exchange'] = server
return server_peers
def fetch_asn_info(asn_list):
""" (list) -> dict
Interact with BGP.tools to retrieve ASN Information
Returns of ASN
"""
whois_server = ("bgp.tools", 43)
payload = ("begin\nverbose\nas" + ("\nas".join(asn_list)) + "\nend\n").encode()
result = {}
# print(payload)
# connect to server and fetch ASN information
recv_data = b""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(whois_server)
sock.sendall(payload)
while True:
data = sock.recv(1024)
if not data:
break
recv_data += data
# parse data based on info from bgp.tools
for asn_info in recv_data.split(b"\n"):
if not asn_info:
continue
asn_info_parsed = [i.strip() for i in asn_info.split(b"|")]
if asn_info_parsed:
result[asn_info_parsed[0].decode()] = {
"country": asn_info_parsed[3].decode().lower(),
"name": asn_info_parsed[-1].decode()
}
return result
def main():
"""
main function to render json output of the connectivity
"""
result = {
'upstreams': list_upstream(),
'downstreams': list_downstream(),
}
# fetch ASN information
asn_list = []
for version in ['v4', 'v6']:
for asn_type in result:
asn_list.extend(list(result[asn_type][version].keys()))
asn_list.extend(list(result[asn_type][version].keys()))
asn_list_info = fetch_asn_info(asn_list)
# add country and name based on fetched asn info
for asn in asn_list_info:
for version in ['v4', 'v6']:
for asn_type in result:
if result[asn_type][version].get(asn):
if not result[asn_type][version][asn]['country']:
result[asn_type][version][asn]['country'] = asn_list_info[asn]['country']
if not result[asn_type][version][asn]['name']:
result[asn_type][version][asn]['name'] = asn_list_info[asn]['name']
# add exchange at last
exchanges = list_exchanges()
result['exchanges'] = exchanges
return json.dumps(result, indent=4)
if __name__ == "__main__":
result = main()
fp_connect = open("/var/www/connectivity.json", "w")
fp_connect.write(result)
fp_connect.close()
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment