Skip to content

Instantly share code, notes, and snippets.

@twisteroidambassador
Created June 24, 2018 14:02
Show Gist options
  • Save twisteroidambassador/f90257e8a1ad3df5c8f5265c00e88d0a to your computer and use it in GitHub Desktop.
Save twisteroidambassador/f90257e8a1ad3df5c8f5265c00e88d0a to your computer and use it in GitHub Desktop.
Getting tcp_metrics with gnlpy
"""Accessing TCP metrics in Linux using Generic Netlink via gnlpy."""
import array
import ipaddress
import struct
from pprint import pprint
from typing import Union
import gnlpy.netlink as netlink
IPAddressType = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
TFOCookieType = netlink.create_struct_fmt_type('=16s')
class IPAddrType:
ipaddr_type = None
@classmethod
def pack(cls, val):
assert isinstance(val, cls.ipaddr_type)
return array.array('B', val.packed)
@classmethod
def unpack(cls, data):
return cls.ipaddr_type(data)
class IPv4AddrType(IPAddrType):
ipaddr_type = ipaddress.IPv4Address
class IPv6AddrType(IPAddrType):
ipaddr_type = ipaddress.IPv6Address
class ShiftRightU32Type:
shift_right = 0
@classmethod
def unpack(cls, data):
return struct.unpack('=I', data)[0] >> cls.shift_right
class RTTType(ShiftRightU32Type):
shift_right = 3
class RTTVarType(ShiftRightU32Type):
shift_right = 2
TcpMetricsAttrValsList = netlink.create_attr_list_type(
'TcpMetricxAttrValsList',
('RTT', RTTType), # milliseconds
('RTTVAR', RTTVarType), # milliseconds
('SSTHRESH', netlink.U32Type),
('CWND', netlink.U32Type),
('REORDERING', netlink.U32Type),
('RTT_US', RTTType), # microseconds
('RTTVAR_US', RTTVarType), # microseconds
)
TcpMetricsAttrList = netlink.create_attr_list_type(
'TcpMetricsAttrList',
('ADDR_IPV4', IPv4AddrType),
('ADDR_IPV6', IPv6AddrType),
('AGE', netlink.U64Type), # milliseconds
('TW_TSVAL', netlink.U32Type),
('TW_TS_STAMP', netlink.I32Type),
('VALS', TcpMetricsAttrValsList),
('FOPEN_MSS', netlink.U16Type),
('FOPEN_SYN_DROPS', netlink.U16Type),
('FOPEN_SYN_DROP_TS', netlink.U64Type),
('FOPEN_COOKIE', TFOCookieType),
('SADDR_IPV4', IPv4AddrType),
('SADDR_IPv6', IPv6AddrType),
)
TcpMetricsMsg = netlink.create_genl_message_type(
'TcpMetricsMsg', 'tcp_metrics',
('GET', TcpMetricsAttrList),
('DEL', TcpMetricsAttrList),
)
def attr_list_from_addr(addr: IPAddressType):
if addr.version == 4:
return TcpMetricsAttrList(addr_ipv4=addr)
else:
return TcpMetricsAttrList(addr_ipv6=addr)
def dump(netlinksocket: netlink.NetlinkSocket):
pprint(netlinksocket.query(TcpMetricsMsg(
'GET', flags=netlink.MessageFlags.REQUEST | netlink.MessageFlags.DUMP)))
def get(netlinksocket: netlink.NetlinkSocket, addr: IPAddressType):
pprint(netlinksocket.query(TcpMetricsMsg(
'GET', attr_list=attr_list_from_addr(addr))))
def delete(netlinksocket: netlink.NetlinkSocket, addr: IPAddressType):
netlinksocket.execute(
TcpMetricsMsg('DEL', attr_list=attr_list_from_addr(addr)))
if __name__ == '__main__':
s = netlink.NetlinkSocket(verbose=True)
dump(s)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment