Skip to content

Instantly share code, notes, and snippets.

@lachesis
Created March 2, 2024 22:38
Show Gist options
  • Save lachesis/f164ff918d363e1da185e973dfffa0b8 to your computer and use it in GitHub Desktop.
Save lachesis/f164ff918d363e1da185e973dfffa0b8 to your computer and use it in GitHub Desktop.
Adit 600 failover
#!/usr/bin/env python3
"""
Failover between T-1s for an Adit 600.
When called with no arguments, connect to Adit 600 mgmt
via Telnet, introspect the current status of both A T-1s,
and update connections to PRIMARY/SECONDARY T-1.
When called with "snmptrap" as argument, first update the
state internally based on the issued alarms, then blindly
discard it and do the thing above. Silly? Yea.
Assumes that you want the same timeslots on both T-1s.
e.g. connect 1:4 to timeslot 2 on whichever T-1 is up
Put a line like this in /etc/snmp/snmptrapd.conf:
traphandle default /usr/local/bin/adit-failover.py snmptrap
TODO: add some kind of debouncing. flock suffices for now I guess.
TODO: less jank, fewer globals, less dead code, whatever
"""
import contextlib
import fcntl
import json
import re
import sys
import telnetlib # gonna be removed in Python 3.13, jerks
import time
LOCK_FILE = '/tmp/adit-lock'
STATE_FILE = '/tmp/adit-state'
STATE = {
"a:1": None,
"a:2": None,
}
PRIMARY = "a:2"
SECONDARY = "a:1"
ADIT_IP = 'adit600-mgmt'
BASE_CONNECTIONS = [
# (timeslot, other_conn)
# timeslot will be prefaced with either a:1: or a:2:
('1', '1:1'), # connect a:1:1 1:1
('2', '1:3'), # connect a:1:2 1:3
('3', '2:1'),
('4', '2:5'),
]
@contextlib.contextmanager
def flock_context(fn, timeout=30):
# get a file lock or throw an error if timeout
deadline = time.time() + timeout
with open(fn, 'w') as lock_file:
while True:
try:
# Acquire an exclusive lock on the file without blocking
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
if time.time() > deadline:
raise TimeoutError("Unable to acquire lock")
time.sleep(0.25)
continue
yield
# Release the lock and close the file when finished
fcntl.flock(lock_file, fcntl.LOCK_UN)
break
def load_state():
global STATE
try:
with open(STATE_FILE) as inp:
s = json.load(inp)
if PRIMARY in s and SECONDARY in s and all(v in (True, False, None) for v in s.values()):
STATE = s
except Exception:
pass
return STATE
def save_state():
try:
with open(STATE_FILE, 'w') as out:
json.dump(STATE, out)
except Exception:
pass
def change_state(key, val):
assert val in (True, False, None)
assert key in STATE
STATE[key] = val
def decide_correct_connections():
if STATE[PRIMARY]:
return [(PRIMARY + ":" + k, v) for (k,v) in BASE_CONNECTIONS]
elif STATE[SECONDARY]:
return [(SECONDARY + ":" + k, v) for (k,v) in BASE_CONNECTIONS]
else:
# if both are down, fail back to connecting to the primary
return [(PRIMARY + ":" + k, v) for (k,v) in BASE_CONNECTIONS]
def get_current_connections():
txt = send_command('print config')
txt = txt.replace('\r', '')
return [
(m.group(1).strip(), m.group(2).strip())
for m in re.finditer(r'(?mis)^connect ([^ ]*) ([^ ]*)$', txt)
]
def get_current_state():
# example:
'''
SLOT A:
Status for DS1 1:
Receive: Traffic
Transmit: Traffic
Loopback: OFF
'''
new_state = {}
for key in STATE:
txt = send_command('status ' + key)
txt = txt.replace('\r', '')
state = bool( re.search(r'Receive:\s*Traffic', txt) and re.search(r'Transmit:\s*Traffic', txt) )
new_state[key] = state
return new_state
def handle_snmp_alarm(txt):
# example stdin
'''
adit600-mgmt.lan
UDP: [192.168.65.252]:161->[192.168.65.11]:162
iso.3.6.1.2.1.1.3.0 116:0:18:26.63
iso.3.6.1.6.3.1.1.4.1.0 iso.3.6.1.4.1.964.3.4.1.100.0.4
iso.3.6.1.4.1.964.3.4.1.2.4.3.1.4.188 3
iso.3.6.1.4.1.964.3.4.1.2.4.3.1.5.188 205
iso.3.6.1.2.1.2.2.1.1.151003136 151003136
iso.3.6.1.4.1.964.3.4.1.1.1.1.2.1 3
iso.3.6.1.4.1.964.3.4.1.100.3.0 "A:2"
iso.3.6.1.4.1.964.3.4.1.2.4.3.1.10.188 "CAC DS1# A:2-DS1"
'''
if 'iso.3.6.1.6.3.1.1.4.1.0 iso.3.6.1.4.1.964.3.4.1.100.0.4' in txt:
new_state = True
elif 'iso.3.6.1.6.3.1.1.4.1.0 iso.3.6.1.4.1.964.3.4.1.100.0.2' in txt:
new_state = False
else:
return
m = re.search('(?mis)iso.3.6.1.4.1.964.3.4.1.100.3.0 "(A:[12])"$', txt)
if not m:
return
what = m.group(1).lower()
change_state(what, new_state)
def make_connections(conns):
send_command('set verification off')
for a, b in conns:
send_command('disconnect ' + a)
send_command('disconnect ' + b)
send_command('connect {} {}'.format(a,b))
_conn = None
def _connect():
global _conn
if not _conn:
_conn = telnetlib.Telnet()
_conn.open(ADIT_IP, 23)
return _conn
def _disconnect():
global _conn
if not _conn:
return
_conn.close()
_conn = None
def send_command(cmd):
print('>>', cmd)
conn = _connect()
conn.read_very_eager()
conn.write((cmd + "\r\n").encode())
# Return a tuple of three items: the index in the list of the first
# regular expression that matches; the match object returned; and the
# bytes read up till and including the match.
# if nothing, returns (-1, None, data)
_, m, txt = conn.expect([br'\n\r> $'], timeout=10)
return txt.decode()
def main():
with flock_context(LOCK_FILE, 20):
load_state()
print('initial state', STATE)
if sys.argv[-1] == "snmptrap":
handle_snmp_alarm(sys.stdin.read())
print('state after alarm', STATE)
get_current_state() # first command we run no workie
state = get_current_state() # hahaha
STATE.update(state)
print('state after telnet', STATE)
old_conns = get_current_connections()
print('old conns', old_conns)
new_conns = decide_correct_connections()
print('new conns', new_conns)
if new_conns != old_conns:
make_connections(new_conns)
save_state()
_disconnect()
if __name__ == "__main__":
main()
@lachesis
Copy link
Author

lachesis commented Mar 4, 2024

This needs to have better error handling and logging around a number of edge cases.

The whole thing could probably be done with SNMP (TRAP, GET, and SET), but this is my first time using it and I cannot understand the docs for SET. Maybe it's an enterprise feature that I should be buying a license for? I just use SMTP Traps as a tripwire to update state and change connections via Telnet.

The state storage and alarm processing is completely unneeded. We always check the state from telnet each run, so there's no need to track the alarms and compute it, or store it to disk.

The global STATE variable is silly and unneeded. Parts of this are generic to many connections, while other parts (notably the regex in alarm processing and the single SECONDARY) are hard coded.

This should probably have a proper debounce, as the Adit sends more than one alarm for each T-1 link state change. Sometimes it will send Down, Up, Down or Up, Down, Up in quick succession. The flock (file lock) is intended to ensure that these messages are processed one at a time but may not guarantee order. A better idea would be to wait until the storm of events quieted for a moment before verifying the state with telnet and updating connections as needed.

telnetlib was depreciated in Python 3.11 and will be removed in 3.13. The #python folks suggest copying it forward with my program. Gross. We could use expect against telnet binary but even more gross.

@lachesis
Copy link
Author

lachesis commented Mar 4, 2024

I have a slightly different version locally that actually uses the Alarm tracking / state simulation to exit early if the alarm did not change that state. That at least avoids the problem of just running the thing blindly on every alarm. It is probably better throw away all of that code and simply debounce.

If flock allows reordering subsequent events, we might have problems if we have very fast link flashing. The Adit seems to debounce its own alarms a bit. Maybe this whole thing can be avoided by studying SNMP a bit further. I'm processing every SNMP alarm with this script, which is probably not right. This is my only SNMP device at the moment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment