Last active
July 14, 2024 11:14
-
-
Save mertcangokgoz/7db949a0e13ad77b68140c0ff3a16985 to your computer and use it in GitHub Desktop.
This script decodes MikroTik Neighbor Discovery Protocol (MNDP) packets.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Description: This script decodes MikroTik Neighbor Discovery Protocol (MNDP) packets. | |
# Author: Mertcan GÖKGÖZ | |
# Date: 2024-07-14 | |
# Version: 1.0 | |
import io | |
import struct | |
from enum import Enum | |
class UnpackTypes(Enum): | |
"""Enumeration of Unpack types. | |
The Unpack type is used to indicate the type of the Unpack part. | |
Attributes: | |
NULL: Null | |
NONE: None | |
SIMPLE: Simple | |
UNCOMPRESSED_HEADERS: Uncompressed headers | |
UNCOMPRESSED_ALL: Uncompressed all | |
""" | |
NULL = 0 | |
NONE = "none" | |
SIMPLE = "simple" | |
UNCOMPRESSED_HEADERS = "uncompressed headers" | |
UNCOMPRESSED_ALL = "uncompressed all" | |
def __str__(self): | |
"""Returns a human-readable representation of the Unpack type.""" | |
match self: | |
case UnpackTypes.NULL: | |
return "null" | |
case UnpackTypes.NONE: | |
return "none" | |
case UnpackTypes.SIMPLE: | |
return "simple" | |
case UnpackTypes.UNCOMPRESSED_HEADERS: | |
return "uncompressed-headers" | |
case UnpackTypes.UNCOMPRESSED_ALL: | |
return "uncompressed-all" | |
case _: | |
return "Unknown %s" % self.value | |
class MNDPTLVTypes(Enum): | |
"""Enumeration of MNDP TLV types.""" | |
NULL = 0 | |
MACAddress = 1 | |
Identity = 5 | |
Version = 7 | |
Platform = 8 | |
Uptime = 10 | |
SoftwareID = 11 | |
Board = 12 | |
Unpack = 14 | |
IPv6Address = 15 | |
InterfaceName = 16 | |
IPv4Address = 17 | |
def __str__(self): | |
"""Returns a human-readable representation of the TLV type.""" | |
match self: | |
case MNDPTLVTypes.NULL: | |
return "NULL" | |
case MNDPTLVTypes.MACAddress: | |
return "MAC Address" | |
case MNDPTLVTypes.Identity: | |
return "Identity" | |
case MNDPTLVTypes.Version: | |
return "Version" | |
case MNDPTLVTypes.Platform: | |
return "Platform" | |
case MNDPTLVTypes.Uptime: | |
return "Uptime" | |
case MNDPTLVTypes.SoftwareID: | |
return "Software ID" | |
case MNDPTLVTypes.Board: | |
return "Board" | |
case MNDPTLVTypes.Unpack: | |
return "Unpack" | |
case MNDPTLVTypes.IPv6Address: | |
return "IPv6 Address" | |
case MNDPTLVTypes.InterfaceName: | |
return "Interface Name" | |
case MNDPTLVTypes.IPv4Address: | |
return "IPv4 Address" | |
case _: | |
return "Unknown %s" % self.value | |
class MNDPPart: | |
def __init__(self, type_, value): | |
"""Initializes a MNDP part.""" | |
self.type = type_ | |
self.value = value | |
class MNDPPacket: | |
def __init__(self, seq_no=0, parts=None): | |
"""Initializes a MNDP packet.""" | |
# Sequence number | |
self.seq_no = seq_no | |
# TLV parts | |
self.parts = parts if parts is not None else [] | |
@staticmethod | |
def bytes_to_mac_address(mac: bytes): | |
"""Converts a byte array to a MAC address string. | |
Args: | |
mac (bytes): The MAC address | |
Returns: | |
str: The MAC address in human-readable format | |
""" | |
return ':'.join([f"{x:02x}" for x in mac]) | |
@staticmethod | |
def bytes_to_ipv4_address(ip: bytes): | |
"""Converts a byte array to an IPv4 address string. | |
Args: | |
ip (bytes): The IPv4 address | |
Returns: | |
str: The IPv4 address in human-readable format | |
""" | |
return '.'.join([str(x) for x in ip]) | |
@staticmethod | |
def bytes_to_ipv6_address(ip: bytes): | |
"""Converts a byte array to an IPv6 address string. | |
Args: | |
ip (bytes): The IPv6 address | |
Returns: | |
str: The IPv6 address in human-readable format | |
""" | |
return ':'.join([f"{ip[i]:02x}{ip[i + 1]:02x}" for i in range(0, len(ip), 2)]) | |
@staticmethod | |
def uptime_duration_calc(t: int): | |
"""Converts uptime in seconds to human-readable format. | |
Args: | |
t (int): Uptime in seconds | |
Returns: | |
str: Uptime in human-readable format | |
""" | |
days = t // 86400 | |
t %= 86400 | |
hours = t // 3600 | |
t %= 3600 | |
minutes = t // 60 | |
t %= 60 | |
seconds = t | |
return f"{days}d {hours}h {minutes}m {seconds}s" | |
@staticmethod | |
def decode(content: bytes): | |
"""Decodes a MNDP packet. | |
The packet format is as follows: | |
- 4 bytes: Sequence number | |
- TLV parts: | |
- 2 bytes: Type | |
- 2 bytes: Length | |
- Length bytes: Value | |
The TLV parts are repeated until the end of the packet. | |
I = unsigned int | |
H = unsigned short | |
< = little-endian | |
> = big-endian | |
! = network (big-endian) | |
Args: | |
content (bytes): The packet content | |
Returns: | |
MNDPPacket: The decoded MNDP packet | |
""" | |
packet = MNDPPacket() | |
byte = io.BytesIO(content) | |
# Packet maximum size is 1500 bytes | |
if len(content) > 1500: | |
return packet | |
packet.seq_no, = struct.unpack('<I', byte.read(4)) | |
while True: | |
try: | |
part_type, = struct.unpack('>H', byte.read(2)) | |
contents_length, = struct.unpack('>H', byte.read(2)) | |
except struct.error: | |
break | |
part_type = MNDPTLVTypes(part_type) | |
if part_type == MNDPTLVTypes.MACAddress: | |
packet.parts.append(MNDPPart(part_type, MNDPPacket.bytes_to_mac_address(byte.read(contents_length)))) | |
elif part_type in {MNDPTLVTypes.Identity, MNDPTLVTypes.Version, MNDPTLVTypes.Platform, | |
MNDPTLVTypes.SoftwareID, MNDPTLVTypes.Board, MNDPTLVTypes.InterfaceName}: | |
packet.parts.append(MNDPPart(part_type, byte.read(contents_length).decode('utf-8'))) | |
elif part_type == MNDPTLVTypes.Uptime: | |
uptime, = struct.unpack('<I', byte.read(4)) | |
packet.parts.append(MNDPPart(part_type, MNDPPacket.uptime_duration_calc(uptime))) | |
elif part_type == MNDPTLVTypes.Unpack: | |
packet.parts.append(MNDPPart(part_type, UnpackTypes(struct.unpack('<B', byte.read(1))[0]))) | |
elif part_type == MNDPTLVTypes.IPv6Address: | |
packet.parts.append(MNDPPart(part_type, MNDPPacket.bytes_to_ipv6_address(byte.read(contents_length)))) | |
elif part_type == MNDPTLVTypes.IPv4Address: | |
packet.parts.append(MNDPPart(part_type, MNDPPacket.bytes_to_ipv4_address(byte.read(contents_length)))) | |
else: | |
packet.parts.append(MNDPPart(part_type, byte.read(contents_length))) | |
return packet | |
# How to use the code | |
# ma = (b"\x63\x2f\x01\x00\x00\x01\x00\x06\xb8\x69\xf4\xbe\x3c\x29\x00\x05" | |
# b"\x00\x15\x47\x4e\x53\x5f\x53\x41\x49\x58\x5f\x54\x45\x52\x41\x43" | |
# b"\x4f\x5f\x43\x43\x52\x30\x31\x00\x07\x00\x13\x36\x2e\x34\x37\x2e" | |
# b"\x31\x30\x20\x28\x6c\x6f\x6e\x67\x2d\x74\x65\x72\x6d\x29\x00\x08" | |
# b"\x00\x08\x4d\x69\x6b\x72\x6f\x54\x69\x6b\x00\x0a\x00\x04\x4d\x25" | |
# b"\x47\x00\x00\x0b\x00\x09\x33\x47\x4e\x58\x2d\x41\x37\x53\x47\x00" | |
# b"\x0c\x00\x0f\x43\x43\x52\x31\x30\x31\x36\x2d\x31\x32\x53\x2d\x31" | |
# b"\x53\x2b\x00\x0e\x00\x01\x00\x00\x10\x00\x04\x73\x66\x70\x35\x00" | |
# b"\x11\x00\x04\xc4\xdf\x1f\x2a") | |
# | |
# packets = MNDPPacket.decode(ma) | |
# for part in packets.parts: | |
# print(f"{part.type}: {part.value}") | |
# | |
# Output | |
# | |
# MAC Address: b8:69:f4:be:3c:29 | |
# Identity: GNS_SAIX_TERACO_CCR01 | |
# Version: 6.47.10 (long-term) | |
# Platform: MikroTik | |
# Uptime: 53d 23h 10m 5s | |
# Software ID: 3GNX-A7SG | |
# Board: CCR1016-12S-1S+ | |
# Unpack: null | |
# Interface Name: sfp5 | |
# IPv4 Address: 196.223.31.42 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment