Skip to content

Instantly share code, notes, and snippets.

@Kalki5
Created March 12, 2020 13:21
Show Gist options
  • Save Kalki5/b0af050fcbc06b67f174182df2a7ee1d to your computer and use it in GitHub Desktop.
Save Kalki5/b0af050fcbc06b67f174182df2a7ee1d to your computer and use it in GitHub Desktop.
Monitor the changes in Listening TCP connections
from socket import SOCK_STREAM
from time import sleep
import psutil
standard_ports = {
# Connections
21, 22, 25, 53, 3389,
# Databases
3306, 5432, 1433, 6379, 11211,
# Unknowns
6010, 6011,
}
def get_listening_tcp_ports():
tcps = {}
for connection in psutil.net_connections(kind='inet'):
if connection.type == SOCK_STREAM and connection.status == 'LISTEN' and connection.laddr.ip not in {'127.0.0.1', '::1'} and connection.laddr.port not in standard_ports:
tcps[connection.laddr.port] = (connection.laddr.ip, connection.laddr.port, connection.pid, psutil.Process(connection.pid).name())
return tcps
def update_mappings(tcps_done, tcps_new):
for port in (tcps_done.keys() - tcps_new.keys()):
print('Make Delete call - Port: {}'.format(port))
tcps_done.pop(port)
for port in (tcps_new.keys() - tcps_done.keys()):
print('Make Add call - Port: {}'.format(port))
tcps_done[port] = tcps_new[port]
def main():
tcps_done = {}
while True:
tcps_new = get_listening_tcp_ports()
update_mappings(tcps_done, tcps_new)
sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment