Skip to content

Instantly share code, notes, and snippets.

@Tes3awy
Forked from consentfactory/netmiko_threading_queuing.py
Last active February 14, 2024 11:40
Show Gist options
  • Save Tes3awy/5538da7e54f4d9dff4925e6832e941ce to your computer and use it in GitHub Desktop.
Save Tes3awy/5538da7e54f4d9dff4925e6832e941ce to your computer and use it in GitHub Desktop.
Script to perform multithreading with Netmiko.
#!/usr/bin/python3
# Jimmy Taylor and Osama Abbas
# https://www.consentfactory.com/python-threading-queuing-netmiko/
# This method will spin up threads and process IP addresses in a queue
import os
import signal
# threading library
import threading
# Additional modules imported for getting password, pretty print
from getpass import getpass
from pprint import pprint
# Queuing
from queue import Queue
# Importing Netmiko modules
from netmiko import ConnectHandler
from netmiko.exceptions import NetmikoAuthenticationException, NetmikoTimeoutException
# These capture errors relating to hitting ctrl+C (I forget the source)
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # IOError: Broken pipe
signal.signal(signal.SIGINT, signal.SIG_DFL) # KeyboardInterrupt: Ctrl-C
# Get the password
username = input("Username: ")
password = getpass()
# Switch IP addresses from text file that has one IP per line
ip_addrs_file = open("ips.txt", mode="r")
ip_addrs = ip_addrs_file.read().splitlines()
# Set up thread count for number of threads to spin up
num_threads = 8
# This sets up the queue
enclosure_queue = Queue()
# Set up thread lock so that only one thread prints at a time
print_lock = threading.Lock()
# CLI command being sent. This could be anywhere (and even be a passed paramenter)
# but I put at the top for code readability
command = "show inventory" # or show running-config
# Function used in threads to connect to devices, passing in the thread # and queue
def device_connector(i: int, q):
# This while loop runs indefinitely and grabs IP addresses from the queue and processes them
# Loop will stop and restart if "ip = q.get()" is empty
while True:
# These print statements are largely for the user indicating where the process is at
# and aren't required
print(f"{i:,}: Waiting for IP...")
ip = q.get()
print(f"{i:,}: Acquired IP Address: {ip}")
# k,v passed to ConnectHandler
device = {
"device_type": "cisco_ios",
"host": ip,
"username": username,
"password": password,
}
# Connect to the device, and print out auth or timeout errors
try:
conn = ConnectHandler(**device)
except NetmikoTimeoutException:
with print_lock:
print(f"\n{i:,}: ERROR: Connection to {ip} timed-out.", end="\n\n")
q.task_done()
continue
except NetmikoAuthenticationException:
with print_lock:
print(
f"\n{i:,}: ERROR: Authenticaftion failed for {ip}. Stopping script.",
end="\n\n",
)
q.task_done()
os.kill(os.getpid(), signal.SIGUSR1)
# Capture the output, and use TextFSM (in this case) to parse data
output = conn.send_command(command, use_textfsm=True)
with print_lock:
print(f"{i:,}: Printing output...")
pprint(output)
# Disconnect from device
conn.disconnect()
# Set the queue task as complete, thereby removing it from the queue indefinitely
q.task_done()
# Mail function that compiles the thread launcher and manages the queue
def main():
# Setting up threads based on number set above
for i in range(num_threads):
# Create the thread using 'deviceconnector' as the function, passing in
# the thread number and queue object as parameters
thread = threading.Thread(
target=device_connector,
args=(
i,
enclosure_queue,
),
)
# Set the thread as a background daemon/job
thread.setDaemon(True)
# Start the thread
thread.start()
# For each ip address in "ip_addrs", add that IP address to the queue
for ip_addr in ip_addrs:
enclosure_queue.put(ip_addr)
# Wait for all tasks in the queue to be marked as completed (task_done)
enclosure_queue.join()
print("*** Script complete ***")
if __name__ == "__main__":
# Calling the main function
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment