Skip to content

Instantly share code, notes, and snippets.

@JonTheNiceGuy
Last active December 20, 2024 19:47
Show Gist options
  • Save JonTheNiceGuy/289a8a2e0233e730f0fbc8f958ec4bc6 to your computer and use it in GitHub Desktop.
Save JonTheNiceGuy/289a8a2e0233e730f0fbc8f958ec4bc6 to your computer and use it in GitHub Desktop.
Ansible lookup in PhpIpam

PHPIPAM Lookups In Ansible

Hi! I wrote these a while ago now! I was looking into how to write a lookup in Ansible so that I could query the PHPIPAM API for subnets. I'm publishing this now for a blog post and podcast on Hacker Public Radio.

This code is released under the Unlicense. Please feel free to reuse it at will, or extend it (and let me know if you do!)

from __future__ import annotations
from ansible.utils.display import Display
from ansible.plugins.lookup import LookupBase
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError
from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.errors import AnsibleError
import json
import ipaddress
from urllib.error import HTTPError, URLError
DOCUMENTATION = """
options:
ipam_client:
description: The URL, including the API client, to the API endpoint
required: False
type: string
version_added: 2.10
ipam_token:
description: The API token to access the endpoint
required: False
type: string
version_added: 2.10
with_mac_address:
description: Return only items with a MAC address
required: False
type: boolean
default: False
version_added: 2.10
with_hostname:
description: Return only items with a hostname
required: False
type: boolean
default: False
version_added: 2.10
all_child_subnets:
description: Descend into child subnets
required: False
type: boolean
default: True
version_added: 2.10
"""
EXAMPLES = """
"""
RETURN = """
"""
display = Display()
def get_ipam_connection_params(variables, option_ipam_client=None, option_ipam_token=None):
ipam_client = variables.get('ipam_client', None)
ipam_token = variables.get('ipam_token', None)
if option_ipam_client:
ipam_client = option_ipam_client
if option_ipam_token:
ipam_token = option_ipam_token
if not ipam_client or not ipam_token:
raise AnsibleError(
"Connection values are not complete. Requires at least ipam_client and ipam_token to be set.")
return ipam_client, ipam_token
class LookupModule(LookupBase):
def run(self, cidrs, variables=None, **kwargs):
self.set_options(var_options=variables, direct=kwargs)
ipam_client, ipam_token = get_ipam_connection_params(
variables, self.get_option('ipam_client'), self.get_option('ipam_token'))
headers = {"token": ipam_token}
ret = []
for cidr in cidrs:
return_data = []
api = f"{ipam_client}/subnets/cidr/{cidr}"
display.vvvv("url lookup connecting to %s" % api)
try:
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
for item in data.get('data', []):
display.vvvv(
f'Found Subnet ID {item.get("id", "NONE")} from cidr: {cidr}')
id = item.get('id', '')
except HTTPError as e:
raise AnsibleError(
"Received HTTP error for %s : %s" % (api, to_native(e)))
except URLError as e:
raise AnsibleError(
"Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError(
"Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError("Error connecting to %s: %s" %
(api, to_native(e)))
subnets = [id]
if self.get_option('all_child_subnets'):
api = f"{ipam_client}/subnets/{id}/slaves_recursive/"
try:
display.vvvv(f'Accessing api: {api}')
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
display.vvvvv(to_text(data))
for item in data.get('data', []):
subnet_id = item.get('id')
if subnet_id not in subnets:
subnets.append(subnet_id)
except HTTPError as e:
raise AnsibleError(
"Received HTTP error for %s : %s" % (api, to_native(e)))
except URLError as e:
raise AnsibleError(
"Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError(
"Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError(
"Error connecting to %s: %s" % (api, to_native(e)))
display.vvvv(f'Subnets to process: {subnets}')
for subnet in subnets:
api = f"{ipam_client}/subnets/{subnet}/addresses/"
display.vvvv(f'Accessing api: {api}')
try:
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
display.vvvvv(to_text(data))
for item in data.get('data', []):
display.vvvvv(f'Found address item: {item}')
if (
not self.get_option('with_mac_address') or item.get(
'mac', None) is not None
) and (
not self.get_option('with_hostname') or item.get(
'hostname', None) is not None
):
result_item = {
'hostname': item.get('hostname', f'ip-{item.get("ip").replace(".", "-")}').split('.')[0],
'ip': item.get('ip', 'undefined'),
'decimalip': int(ipaddress.ip_address(item.get('ip', '0.0.0.0'))),
'mac': item.get('mac', 'undefined'),
'subnet_id': subnet,
}
display.vvvvv(f'Adding item to ret: {result_item}')
return_data.append(result_item)
except HTTPError as e:
display.vvvv(f'HTTP Error received: {to_text(e)}')
except URLError as e:
raise AnsibleError(
"Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError(
"Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError(
"Error connecting to %s: %s" % (api, to_native(e)))
ret.append(return_data)
return ret
from __future__ import annotations
DOCUMENTATION = """
options:
ipam_client:
description: The URL, including the API client, to the API endpoint
required: False
type: string
version_added: 2.10
ipam_token:
description: The API token to access the endpoint
required: False
type: string
version_added: 2.10
with_mac_address:
description: Return only items with a MAC address
required: False
type: boolean
default: False
version_added: 2.10
with_hostname:
description: Return only items with a hostname
required: False
type: boolean
default: False
version_added: 2.10
all_child_subnets:
description: Descend into child subnets
required: False
type: boolean
default: True
version_added: 2.10
"""
EXAMPLES = """
"""
RETURN = """
"""
from urllib.error import HTTPError, URLError
import json
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
display = Display()
def get_ipam_connection_params(variables, option_ipam_client = None, option_ipam_token = None):
ipam_client = variables.get('ipam_client', None)
ipam_token = variables.get('ipam_token', None)
if option_ipam_client:
ipam_client = option_ipam_client
if option_ipam_token:
ipam_token = option_ipam_token
if not ipam_client or not ipam_token:
raise AnsibleError("Connection values are not complete. Requires at least ipam_client and ipam_token to be set.")
return ipam_client, ipam_token
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
for id in terms:
self.set_options(var_options=variables, direct=kwargs)
ipam_client, ipam_token = get_ipam_connection_params(variables, self.get_option('ipam_client'), self.get_option('ipam_token'))
headers={"token": ipam_token}
ret = []
subnets=[id]
if self.get_option('all_child_subnets'):
api=f"{ipam_client}/subnets/{id}/slaves_recursive/"
try:
display.vvvv(f'Accessing api: {api}')
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
display.vvvvv(to_text(data))
for item in data.get('data', []):
subnet_id = item.get('id')
if subnet_id not in subnets:
subnets.append(subnet_id)
except HTTPError as e:
raise AnsibleError("Received HTTP error for %s : %s" % (api, to_native(e)))
except URLError as e:
raise AnsibleError("Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError("Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError("Error connecting to %s: %s" % (api, to_native(e)))
display.vvvv(f'Subnets to process: {subnets}')
for subnet in subnets:
api=f"{ipam_client}/subnets/{subnet}/addresses/"
display.vvvv(f'Accessing api: {api}')
try:
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
display.vvvvv(to_text(data))
for item in data.get('data', []):
if (
not self.get_option('with_mac_address') or item.get('mac', None) is not None
) and (
not self.get_option('with_hostname') or item.get('hostname', None) is not None
):
ret.append(
to_text(
{
'hostname': item.get('hostname', f'ip-{item.get("ip").replace(".", "-")}'),
'ip': item.get('ip', 'undefined'),
'mac': item.get('mac', 'undefined'),
'subnet_id': subnet,
}
)
)
except HTTPError as e:
display.vvvv(f'HTTP Error received: {to_text(e)}')
except URLError as e:
raise AnsibleError("Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError("Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError("Error connecting to %s: %s" % (api, to_native(e)))
return ret
from __future__ import annotations
DOCUMENTATION = """
options:
ipam_client:
description: The URL, including the API client, to the API endpoint
required: False
type: string
version_added: 2.10
ipam_token:
description: The API token to access the endpoint
required: False
type: string
version_added: 2.10
"""
EXAMPLES = """
"""
RETURN = """
"""
from urllib.error import HTTPError, URLError
import json
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
display = Display()
def get_ipam_connection_params(variables, option_ipam_client = None, option_ipam_token = None):
ipam_client = variables.get('ipam_client', None)
ipam_token = variables.get('ipam_token', None)
if option_ipam_client:
ipam_client = option_ipam_client
if option_ipam_token:
ipam_token = option_ipam_token
if not ipam_client or not ipam_token:
raise AnsibleError("Connection values are not complete. Requires at least ipam_client and ipam_token to be set.")
return ipam_client, ipam_token
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
for id in terms:
self.set_options(var_options=variables, direct=kwargs)
ipam_client, ipam_token = get_ipam_connection_params(variables, self.get_option('ipam_client'), self.get_option('ipam_token'))
api=f"{ipam_client}/subnets/{id}/slaves_recursive/"
headers={"token": ipam_token}
ret = []
display.vvvv("url lookup connecting to %s" % api)
try:
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
for item in data.get('data', []):
ret.append(to_text({
"id": item.get('id', ''),
"description": item.get('description', ''),
"cidr": f"{item.get('subnet', '')}/{item.get('mask', '')}"
}))
display.vvvv(ret)
except HTTPError as e:
raise AnsibleError("Received HTTP error for %s : %s" % (api, to_native(e)))
except URLError as e:
raise AnsibleError("Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError("Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError("Error connecting to %s: %s" % (api, to_native(e)))
return ret
from __future__ import annotations
DOCUMENTATION = """
options:
ipam_client:
description: The URL, including the API client, to the API endpoint
required: False
type: string
version_added: 2.10
ipam_token:
description: The API token to access the endpoint
required: False
type: string
version_added: 2.10
"""
EXAMPLES = """
"""
RETURN = """
"""
from urllib.error import HTTPError, URLError
import json
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
display = Display()
def get_ipam_connection_params(variables, option_ipam_client = None, option_ipam_token = None):
ipam_client = variables.get('ipam_client', None)
ipam_token = variables.get('ipam_token', None)
if option_ipam_client:
ipam_client = option_ipam_client
if option_ipam_token:
ipam_token = option_ipam_token
if not ipam_client or not ipam_token:
raise AnsibleError("Connection values are not complete. Requires at least ipam_client and ipam_token to be set.")
return ipam_client, ipam_token
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
for cidr in terms:
self.set_options(var_options=variables, direct=kwargs)
ipam_client, ipam_token = get_ipam_connection_params(variables, self.get_option('ipam_client'), self.get_option('ipam_token'))
api=f"{ipam_client}/subnets/cidr/{cidr}"
headers={"token": ipam_token}
ret = []
display.vvvv("url lookup connecting to %s" % api)
try:
response = open_url(
api,
headers=headers,
)
data = json.loads(to_text(response.read()))
for item in data.get('data', []):
ret.append(to_text(item.get('id', '')))
display.vvvv(ret)
except HTTPError as e:
raise AnsibleError("Received HTTP error for %s : %s" % (api, to_native(e)))
except URLError as e:
raise AnsibleError("Failed lookup url for %s : %s" % (api, to_native(e)))
except SSLValidationError as e:
raise AnsibleError("Error validating the server's certificate for %s: %s" % (api, to_native(e)))
except ConnectionError as e:
raise AnsibleError("Error connecting to %s: %s" % (api, to_native(e)))
return ret
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <https://unlicense.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment