Skip to content

Instantly share code, notes, and snippets.

@jsenecal
Created April 24, 2023 17:16
Show Gist options
  • Save jsenecal/236dcabaa18a7c576ac54425b58d43e2 to your computer and use it in GitHub Desktop.
Save jsenecal/236dcabaa18a7c576ac54425b58d43e2 to your computer and use it in GitHub Desktop.
NETBOX REGRWS
# system
import os
from typing import Literal
from unidecode import unidecode
import googlemaps
# netbox
from dcim.models.sites import Site
# django
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.forms import PasswordInput
from extras.choices import CustomFieldTypeChoices, CustomFieldVisibilityChoices
from extras.models import CustomField
from extras.scripts import BooleanVar, MultiObjectVar, ObjectVar, Script, StringVar
from ipam.choices import PrefixStatusChoices
from ipam.models.ip import Aggregate, Prefix
# regrws
from regrws.api import Api
from regrws.api import constants as regrws_constants
from regrws.models import Customer, Error, Net
from regrws.models.nested import IPVersionEnum, Iso31661, MultiLineElement
from regrws.models.net import NetBlock
from regrws.models.tickets import TicketRequest
from tenancy.models.tenants import Tenant
DEFAULT_TENANT_PK = 53
class ArinScript:
api_key = StringVar(widget=PasswordInput)
if settings.DEBUG:
api_url = StringVar(
default=regrws_constants.BASE_URL_DEFAULT,
)
debug = BooleanVar(description="If checked, the script will wait for a debugger to be attached to the worker")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._regrws = None
self._countries = None
self._gmaps = None
self._commit = False
def get_markdown_for_obj(self, obj):
return f"[`{obj}`]({obj.get_absolute_url()})"
def find_net_from_prefix_or_aggregate(self, prefix_or_aggregate: Aggregate | Prefix):
prefix_or_aggregate_markdown = self.get_markdown_for_obj(prefix_or_aggregate)
if prefix_or_aggregate.prefix.prefixlen == 31:
arin_net_or_err: Net | Error = self._regrws.net.find_net( # type: ignore
prefix_or_aggregate.prefix.network, prefix_or_aggregate.prefix.network + 1
)
else:
arin_net_or_err: Net | Error = self._regrws.net.find_net( # type: ignore
prefix_or_aggregate.prefix.network, prefix_or_aggregate.prefix.broadcast
)
found = False
if isinstance(arin_net_or_err, Error):
if arin_net_or_err.code != "E_OBJECT_NOT_FOUND":
self.log_failure(f"Error finding net for {prefix_or_aggregate_markdown}: {arin_net_or_err}")
elif arin_net_or_err is None:
self.log_warning(f"Unable to process ARIN's response for {prefix_or_aggregate_markdown}")
else:
found = True
return arin_net_or_err, found
class ReassignSimple(ArinScript, Script):
class Meta: # pylint: disable=too-few-public-methods
"""Meta class for setting Script Attributes"""
name = "Reassign Simple"
description = "Reassign prefixes assigned to tenants using the Reassign-Simple Method"
commit_default = False
tenant = ObjectVar(model=Tenant, required=True, default=DEFAULT_TENANT_PK)
aggregates = MultiObjectVar(
description="Run on prefixes within these aggregates",
model=Aggregate,
required=True,
query_params={"rir": "arin", "tenant_id": "$tenant"},
default=Aggregate.objects.filter(rir__slug="arin", tenant_id=DEFAULT_TENANT_PK),
)
def process_prefix(self, prefix: Prefix, aggregate_net: Net):
prefix_markdown = self.get_markdown_for_obj(prefix)
self.log_info(f"Processing prefix {prefix_markdown}")
if prefix.tenant is None:
self.log_warning(f"Prefix {prefix_markdown} has no Tenant, skipping")
return
if prefix.site is None:
self.log_warning(f"Prefix {prefix_markdown} has no Site, skipping")
return
elif prefix.site.cf.get("gmaps_placeid") is None:
self.log_warning(f"Site {self.get_markdown_for_obj(prefix.site)} has no Google Maps PlaceID, skipping")
return
if prefix.prefix.version == 6 and prefix.prefix.prefixlen > 64:
self.log_warning(
f"[{prefix_markdown}] Network reassignments and reallocations in v6 must be /64 or greater, skipping"
)
return
arin_net, found = self.find_net_from_prefix_or_aggregate(prefix)
if found:
self.log_info("Found Net for : " + prefix_markdown)
assert isinstance(arin_net, Net)
arin_ticketrequest_net_or_err = self.process_net(arin_net, aggregate_net, prefix)
if isinstance(arin_ticketrequest_net_or_err, Net):
arin_net = arin_ticketrequest_net_or_err
elif isinstance(arin_ticketrequest_net_or_err, TicketRequest):
arin_net = arin_ticketrequest_net_or_err.net
else:
return
else:
self.log_info("No Net found for: " + prefix_markdown)
arin_ticketrequest_or_err = self.create_objects_from_prefix(prefix, aggregate_net)
if not isinstance(arin_ticketrequest_or_err, TicketRequest):
return
arin_net = arin_ticketrequest_or_err.net
assert isinstance(arin_net, Net)
prefix.custom_field_data["arin_net_handle"] = arin_net.handle
prefix.custom_field_data["arin_net_name"] = arin_net.net_name
prefix.custom_field_data["arin_customer_handle"] = arin_net.customer_handle
prefix.full_clean()
prefix.save()
def process_aggregate(self, aggregate):
self.log_info(f"Processing aggregate {self.get_markdown_for_obj(aggregate)}")
arin_net_or_err, found = self.find_net_from_prefix_or_aggregate(aggregate)
if not found:
self.log_failure(f"Unable to find net for {self.get_markdown_for_obj(aggregate)}")
return
prefixes_qs = aggregate.get_child_prefixes() | Prefix.objects.filter(prefix=aggregate.prefix)
prefixes_qs = prefixes_qs.filter(status=PrefixStatusChoices.STATUS_ACTIVE)
if arin_net_or_err is not None:
for prefix in prefixes_qs:
self.process_prefix(prefix, arin_net_or_err) # type: ignore
@staticmethod
def parse_gmaps_address_components(address_components) -> dict[str, str]:
components_dict = {
"street_number": ["street_number"],
"postal_code": ["postal_code"],
"street": ["street_address", "route"],
"region": [
"administrative_area_level_1",
"administrative_area_level_2",
"administrative_area_level_3",
"administrative_area_level_4",
"administrative_area_level_5",
],
"city": [
"locality",
"sublocality",
"sublocality_level_1",
"sublocality_level_2",
"sublocality_level_3",
"sublocality_level_4",
],
"country": ["country"],
}
shorts = ("region",)
parsed = {}
for component in address_components:
for key, values in components_dict.items():
if any([value in component["types"] for value in values]):
if key in shorts:
parsed[key] = unidecode(component["short_name"])
else:
parsed[key] = unidecode(component["long_name"])
return parsed
def generate_customer(self, tenant: Tenant, site: Site):
site_placeid = site.cf.get("gmaps_placeid")
query = self._gmaps.place(site_placeid, language="en-US") # type: ignore
if query["status"] != "OK":
self.log_failure(
f"Unable to find Google Maps PlaceID {site_placeid} for {self.get_markdown_for_obj(site)}"
)
return None
# Parse the address components from Google Maps results
gmaps_address_dict = self.parse_gmaps_address_components(query["result"]["address_components"])
# Check if we have a known Iso31661 for this site country
if not any([country in gmaps_address_dict["country"] for country in self.countries.keys()]):
self.log_failure(
f"Unable to find Iso31661 information for {self.get_markdown_for_obj(site)} ({site.physical_address})"
)
return None
else:
iso3166_1 = None
for country in self.countries.keys():
if country in gmaps_address_dict["country"]:
iso3166_1 = self.countries[country]
break
if "street" not in gmaps_address_dict.keys():
self.log_failure(
f"Unable to find street address for {self.get_markdown_for_obj(site)} ({site.physical_address})"
)
return None
lines = [
f"{gmaps_address_dict['street_number']} {gmaps_address_dict['street']}"
if gmaps_address_dict.get("street_number")
else gmaps_address_dict["street"]
]
street_address = [MultiLineElement(number=idx + 1, line=lines[idx]) for idx in range(len(lines))]
comments = [MultiLineElement(number=1, line="MetroOptic Customer")]
return Customer(
customer_name=unidecode(tenant.name),
iso3166_1=iso3166_1,
street_address=street_address,
city=gmaps_address_dict["city"],
iso3166_2=gmaps_address_dict["region"],
postal_code=gmaps_address_dict["postal_code"],
private_customer=False,
comments=comments,
)
@staticmethod
def get_netblock_from_prefix(
prefix: Prefix,
type: Literal[
"A",
"AF",
"AP",
"AR",
"AV",
"DA",
"FX",
"IR",
"IU",
"LN",
"LX",
"PV",
"PX",
"RD",
"RN",
"RV",
"RX",
"S",
] = "A",
) -> NetBlock:
return NetBlock(
start_address=prefix.prefix.network, end_address=None, cidr_length=prefix.prefix.prefixlen, type=type
)
@staticmethod
def get_net_name_from_prefix(prefix: Prefix):
prefix_str = str(prefix.prefix).replace(".", "-").replace("/", "-")
assert prefix.tenant is not None
tenant_str = prefix.tenant.slug.upper()
net_name = f"{tenant_str}-{prefix_str}"
return net_name
def generate_net(
self, prefix: Prefix, aggregate_net: Net, customer_handle: str | None = None, org_handle: str | None = None
):
version = IPVersionEnum(prefix.family)
net_name = self.get_net_name_from_prefix(prefix)
parent_net_handle = aggregate_net.handle
net_block = self.get_netblock_from_prefix(prefix, type="S")
return Net(
version=version,
net_name=net_name,
net_blocks=[net_block],
parent_net_handle=parent_net_handle,
customer_handle=customer_handle,
org_handle=org_handle,
poc_links=[],
)
def create_objects_from_prefix(self, prefix: Prefix, aggregate_net: Net):
prefix_markdown = self.get_markdown_for_obj(prefix)
# related objects
prefix_tenant: Tenant | None = prefix.tenant
prefix_site: Site | None = prefix.site
assert prefix_tenant is not None
assert prefix_site is not None
# customfields
org_handle = prefix_tenant.cf.get("arin_org_handle")
customer_handle = prefix.cf.get("arin_customer_handle")
# Create Customer if necessary
if org_handle is None:
if customer_handle is None:
self.log_info(
f"Tenant {self.get_markdown_for_obj(prefix_tenant)} has no Org Handle; Creating Customer for {prefix_markdown}"
)
customer_or_none = self.generate_customer(prefix_tenant, prefix_site)
if customer_or_none is None:
self.log_failure(f"Unable to generate customer for {prefix_markdown}")
return None
customer_dict = customer_or_none.dict()
if self._commit:
customer_or_err: Customer | Error | None = self._regrws.customer.create_for_net( # type: ignore
aggregate_net, **customer_dict
)
if isinstance(customer_or_err, Error):
self.log_failure(f"Error creating customer for {prefix_markdown}: {customer_or_err}")
return None
elif customer_or_err is None:
self.log_warning(f"Unable to process ARIN's response for {prefix_markdown}")
return None
self.log_success(f"Created Customer `{customer_or_err.handle}` for {prefix_markdown}")
customer_handle = customer_or_err.handle
prefix.custom_field_data["arin_customer_handle"] = customer_handle
else:
restart = False
customer_or_err = self._regrws.customer.from_handle(customer_handle) # type: ignore
if isinstance(customer_or_err, Error):
self.log_failure(f"Error getting customer for {prefix_markdown}: {customer_or_err}")
restart = True
elif customer_or_err is None:
self.log_warning(f"Unable to process ARIN's response for {prefix_markdown} ")
restart = True
if isinstance(customer_or_err, Customer):
self.log_success(f"Found Customer `{customer_or_err.handle}` for {prefix_markdown}")
if customer_or_err.customer_name != unidecode(prefix_tenant.name):
self.log_failure(
f"Customer `{customer_or_err.handle}` is not associated with {prefix_tenant.name}"
)
restart = True
if restart:
self.log_info(f"Customer handle is invalid for {prefix_markdown}")
prefix.custom_field_data["arin_customer_handle"] = ""
prefix.full_clean()
del prefix.cf
prefix.save()
return self.create_objects_from_prefix(prefix, aggregate_net)
# Create Net
self.log_info(f"Creating net for {prefix_markdown}")
net = self.generate_net(prefix, aggregate_net, customer_handle, org_handle)
if self._commit:
arin_ticketrequest_or_err = aggregate_net.reassign(net) # type: ignore
if isinstance(arin_ticketrequest_or_err, Error):
self.log_failure(f"Error creating net for {prefix_markdown}: {arin_ticketrequest_or_err}")
elif arin_ticketrequest_or_err is None:
self.log_warning(f"Unable to process ARIN's response for {prefix_markdown}")
else:
assert arin_ticketrequest_or_err.net is not None
self.log_success(f"Created net `{arin_ticketrequest_or_err.net.handle}` for {prefix_markdown}")
return arin_ticketrequest_or_err
def process_net(self, net: Net, aggregate_net: Net, prefix: Prefix):
self.log_info(f"Processing existing net `{net.handle} ({net.net_name})`")
arin_customer_handle = prefix.cf.get("arin_customer_handle")
prefix_tenant: Tenant | None = prefix.tenant
assert prefix_tenant is not None
arin_org_handle = prefix_tenant.cf.get("arin_org_handle")
assert net.net_blocks is not None
if (net.customer_handle is not None and net.customer_handle != arin_customer_handle) or (
net.org_handle is not None and net.org_handle != arin_org_handle
):
if all([block.type == "A" for block in net.net_blocks]):
return self.create_objects_from_prefix(prefix, net)
if all([block.type == "S" for block in net.net_blocks]):
self.log_warning(
f"Net `{net.handle}` was reassigned to a different object, trying to remove arin reassignment"
)
net.remove()
return self.create_objects_from_prefix(prefix, aggregate_net)
self.log_failure(f"Net `{net.handle}` has not been reassigned, cannot remove safely. Skipping...")
return
self.log_success(f"Net `{net.handle}` is up to date")
return net
def create_arin_customfields(self):
prefix_ct = ContentType.objects.get_for_model(Prefix)
tenant_ct = ContentType.objects.get_for_model(Tenant)
arin_org_handle_cf, created = CustomField.objects.get_or_create(
name="arin_org_handle",
defaults=dict(
type=CustomFieldTypeChoices.TYPE_TEXT,
description="Represents a business, nonprofit corporation, or government entity in the ARIN database.",
label="Org Handle",
validation_regex="^[A-Z\\d-]+$",
required=False,
ui_visibility=CustomFieldVisibilityChoices.VISIBILITY_READ_WRITE,
),
)
arin_org_handle_cf.content_types.add(tenant_ct)
arin_org_handle_cf.save()
if created:
self.log_success(f"Created custom field {arin_org_handle_cf.name}")
arin_customer_handle_cf, created = CustomField.objects.get_or_create(
name="arin_customer_handle",
defaults=dict(
type=CustomFieldTypeChoices.TYPE_TEXT,
description="Customer handle for this Prefix/Tenant combination",
label="Customer Handle",
required=False,
ui_visibility=CustomFieldVisibilityChoices.VISIBILITY_HIDDEN,
validation_regex="^C[\\d]+$",
),
)
arin_customer_handle_cf.content_types.add(prefix_ct)
arin_customer_handle_cf.save()
if created:
self.log_success(f"Created custom field {arin_customer_handle_cf.name}")
arin_net_handle_cf, created = CustomField.objects.get_or_create(
name="arin_net_handle",
defaults=dict(
type=CustomFieldTypeChoices.TYPE_TEXT,
description="Handles for IPv4 networks start with NET-, and handles for IPv6 networks start with NET6-",
label="Net Handle",
required=False,
validation_regex="^NET6?-[\\dA-Z\\-]+$",
),
)
arin_net_handle_cf.content_types.add(prefix_ct)
arin_net_handle_cf.save()
if created:
self.log_success(f"Created custom field {arin_net_handle_cf.name}")
arin_net_name_cf, _ = CustomField.objects.get_or_create(
name="arin_net_name",
defaults=dict(
type=CustomFieldTypeChoices.TYPE_TEXT,
description="The name of the network",
label="Net Name",
required=False,
),
)
arin_net_name_cf.content_types.add(prefix_ct)
arin_net_name_cf.save()
if created:
self.log_success(f"Created custom field {arin_net_name_cf.name}")
@property
def countries(self) -> dict[str, Iso31661]:
if self._countries is None:
self._countries = {
"Canada": Iso31661(name="Canada", code2="CA", code3="CAN", e164=1),
"United States": Iso31661(name="United States of America", code2="US", code3="USA", e164=1),
}
return self._countries
def run(self, data, commit):
api_key = data["api_key"]
api_url = data.get("api_url", "https://reg.arin.net/")
aggregates = data.get("aggregates", Aggregate.objects.none())
debug = data.get("debug")
self._gmaps = googlemaps.Client(key=os.environ.get("GMAPS_APIKEY"))
if debug:
import debugpy # pylint: disable=import-outside-toplevel
debugpy.listen(("0.0.0.0", 5678))
debugpy.wait_for_client() # blocks execution until client is attached
self.create_arin_customfields()
self._regrws = Api(base_url=api_url, api_key=api_key)
self._commit = commit
for aggregate in aggregates:
self.process_aggregate(aggregate)
return f"Done, processed {aggregates.count()} aggregates"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment