-
-
Save jvanderaa/f3a44cd22f72024b4ad9f24f14d2731a to your computer and use it in GitHub Desktop.
Worker.py for Nautobot ChatOps Plugin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Demo netchat addition to Nautobot.""" | |
| import logging | |
| from django_rq import job | |
| from nautobot_chatops.workers import subcommand_of, handle_subcommands | |
| from nautobot_chatops.choices import CommandStatusChoices | |
| import meraki | |
| logger = logging.getLogger("rq.worker") | |
| @job("default") | |
| def netchat(subcommand, **kwargs): | |
| """Interact with netchat.""" | |
| return handle_subcommands("netchat", subcommand, **kwargs) | |
| def get_meraki_orgs(): | |
| dashboard = meraki.DashboardAPI(suppress_logging=True) | |
| return dashboard.organizations.getOrganizations() | |
| def meraki_devices(org_name): | |
| # Get the org ID from the Get Meraki Orgs | |
| org_list = get_meraki_orgs() | |
| dashboard = meraki.DashboardAPI(suppress_logging=True) | |
| for org in org_list: | |
| if org["name"] == org_name: | |
| device_list = dashboard.organizations.getOrganizationDevices(organizationId=org["id"]) | |
| return device_list | |
| @subcommand_of("netchat") | |
| def get_meraki_devices(dispatcher, org_name=None): | |
| """Gathers devices from Meraki API endpoint.""" | |
| logger.info(f"ORG NAME: {org_name}") | |
| if not org_name: | |
| org_list = get_meraki_orgs() | |
| # Build the list of sites | |
| choices = [(x["name"], x["name"]) for x in org_list] | |
| logger.info(f"LENGTH OF CHOICES: {len(choices)}") | |
| dispatcher.prompt_from_menu(f"netchat get-meraki-devices", "Select Organization", choices) | |
| return False | |
| # Handle checking if the network was provided | |
| dispatcher.send_markdown( | |
| f"Standby {dispatcher.user_mention()}, I'm getting the devices at the Organization {org_name}!" | |
| ) | |
| devices = meraki_devices(org_name) | |
| blocks = [ | |
| dispatcher.markdown_block(f"{dispatcher.user_mention()} here are the devices are {org_name}"), | |
| dispatcher.markdown_block("\n".join([x["name"] for x in devices])), | |
| ] | |
| dispatcher.send_blocks(blocks) | |
| return CommandStatusChoices.STATUS_SUCCEEDED |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment