Skip to content

Instantly share code, notes, and snippets.

@jn0
Created May 2, 2017 15:41
Show Gist options
  • Save jn0/b308294d30313841e30759f114329e2a to your computer and use it in GitHub Desktop.
Save jn0/b308294d30313841e30759f114329e2a to your computer and use it in GitHub Desktop.
A tool to map sockets to processes. If you're lucky enough.
#!/usr/bin/python
# -*- coding: utf-8 -*- vim: et ts=4 sw=4
'''
A tool to replace netstat(8) that has RIP in CentOS and alikes.
Yes, ss(8) should be there.
'''
# /proc/net/udp
# sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode ref pointer drops
# 120: 0100007F:0143 00000000:0000 07 00000000:00000000 00:00000000 00000000 997 0 15319 2 ffff8800f1d80000 0
# /proc/net/tcp
# sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
# 0: 00000000:0016 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 19630 1 ffff8800f2fa8000 100 0 0 10 0
# 1: 00000000:0CBC 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 23600 1 ffff8801076a5540 100 0 0 10 0
import os
import pwd
import sys
listening = True
connected = False
seek_port = None
_cache = {}
def uid2name(uid):
if uid not in _cache:
pw_ent = pwd.getpwuid(int(uid))
_cache[uid] = pw_ent.pw_name
return _cache[uid]
def load(name):
with open(name) as file:
return file.read()
split = lambda s: s.split()
def address(s):
a, p = s.split(':', 1)
p = int(p, 16)
if int(a, 16) == 0:
return '%15s:%-5d' % ('*', p)
addr = [int(a[i:i+2], 16) for i in range(0, len(a), 2)]
addr.reverse()
addr = '.'.join([str(o) for o in addr])
return '%15s:%-5d' % (addr, p)
joiner = '\n : '
def pid4inode(i):
if i == '0':
return None
res = []
for root, dirs, files in os.walk('/proc'):
x = root.split('/')
if len(x) == 4 and x[1] == 'proc' and x[3] == 'fd':
for f in files:
try:
l = os.readlink(os.path.join(root, f))
except (OSError, IOError):
l = ''
if l.startswith('socket:[') and l.endswith(']'):
li = l.split('[', 1).pop(1).split(']').pop(0)
if i == li:
pid = x[2]
cmd = load('/proc/' + pid + '/cmdline').replace('\0', ' ')
res.append(pid + ':' + f + ':' + cmd)
return res and joiner.join(res) or None
def pprint(lst):
lst.pop(0)
for rec in lst:
(sl, local_address, rem_address, st,
tx_queue_rx_queue, tr_tm_when, retrnsmt,
uid, timeout, inode) = rec[:10]
rem = address(rem_address)
if rem.strip() != '*:0' and not connected:
continue
loc = address(local_address)
if loc.strip().startswith('*:') and not listening:
continue
if seek_port:
if rem.split(':')[-1].strip() != seek_port \
and loc.split(':')[-1].strip() != seek_port :
continue
tail = ''
if len(rec) > 11:
tail = ' #%s' % (rec[10],)
print '%5s %s %s <%s> [%s]%s' % (
sl,
loc, rem,
uid2name(uid),
inode,
tail)
pids = pid4inode(inode)
if pids:
print joiner[1:] + pids
def print_type(t):
print t.upper(),
lst = map(split, load(os.path.join('/proc/net', t)).splitlines())
print len(lst) - 1, ':'
pprint(lst)
if len(sys.argv) > 1:
args = sys.argv[1:]
while args:
arg = args.pop(0)
if arg == '-l':
listening = True
connected = False
elif arg == '-a':
listening = True
connected = True
elif arg == '-c':
listening = False
connected = True
else:
seek_port = arg[:]
listening = True
connected = True
pass
print '#', listening and 'LISTENING' or '', connected and 'CONNECTED' or '', seek_port or ''
print_type('tcp')
print_type('raw')
print_type('udp')
# EOF #
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment