Skip to content

Instantly share code, notes, and snippets.

@renatoalmeidaoliveira
Last active December 18, 2022 14:06
Show Gist options
  • Save renatoalmeidaoliveira/d557079dd4460dfab4a29bb0109b11b5 to your computer and use it in GitHub Desktop.
Save renatoalmeidaoliveira/d557079dd4460dfab4a29bb0109b11b5 to your computer and use it in GitHub Desktop.
Netbox GET aiohttp sample
NETBOX_TOKEN = "<token>"
NETBOX_URL = "https://netbox.sample.com/api/"
async def netbox_get(endpoint, params={}):
api_token = NETBOX_TOKEN
api_url = NETBOX_URL
url = f"{api_url}{endpoint}"
headers = {
"accept": "application/json",
"Authorization": f"Token {api_token}",
}
if "limit" not in params:
params["limit"] = 1000
output = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params) as r:
response = await r.json()
if r.status != 200:
raise LookupError(response)
response = await r.json()
output["count"] = response["count"]
output["results"] = []
output["results"] = output["results"] + response["results"]
while response["next"] is not None:
async with session.get(
response["next"], headers=headers, params=params
) as r:
response = await r.json()
output["results"] = output["results"] + response["results"]
return output
except Exception as e:
logger.error(f"{e}")
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment