Skip to content

Instantly share code, notes, and snippets.

@a3r0id
Last active July 31, 2022 07:55
Show Gist options
  • Save a3r0id/4a40f4a5b7c52ef789066685911acf2a to your computer and use it in GitHub Desktop.
Save a3r0id/4a40f4a5b7c52ef789066685911acf2a to your computer and use it in GitHub Desktop.
Get IP + Geolocation info for all remote UDP/TCP connections of a process by name or substring.
import psutil
from requests import get
from sys import argv
from threading import Thread
from scapy.all import sniff
import IPy
from os import popen
# By A3R0 - Get IP + Geolocation info for all remote UDP/TCP connections of a process by name.
if len(argv) == 1:
print("Usage: %s <process name or substring - not case sensitive>" % argv[0])
# Example: python remote_tcp_udp.py chrome.exe
exit(1)
search_term = argv[1]
positive_results = []
def get_ip_info(ip):
data = get("http://ip-api.com/json/%s?fields=66846719" % ip).json()
isp = data["isp"]# + ", " + data["org"]
geo = data["city"] + ", " + data["regionName"] + ", " + data["country"]
usageType = "Proxy: " + str(data["proxy"]) + ", Hosting: " + str(data["hosting"]) + ", Mobile: " + str(data["mobile"])
return isp + " | " + geo + " | " + usageType
def processCurrentConnections(current):
if len(current) != 0:
for conn in current:
if conn.pid != 0:
try:
process = psutil.Process(conn.pid)
process_name = process.name()
if not search_term.lower() in process_name.lower(): continue
raddrIp = conn.raddr[0]
raddrPort = conn.raddr[1]
except:
continue
if raddrIp in positive_results: continue
if raddrIp in ["127.0.0.1", "localhost", "::1", "0.0.0.0"]: continue
if IPy.IP(raddrIp).iptype() != "PUBLIC": continue
# Add this remote address to the list of positive results
if raddrIp not in positive_results: positive_results.append(raddrIp)
try:
# Get the remote address's info
ip_info = get_ip_info(raddrIp)
except:
ip_info = "[Failed]"
print(f"[{conn.pid}] - {process_name} - {conn.status} - {raddrIp}:{raddrPort} - {ip_info}")
def getPIDSys(port):
return popen(f"netstat -a -n -o | find \"{port}\"").read().split()[3]
def udphandler(x):
try:
if x['IP']:
# check if ip is public
ipToCheck = ""
if IPy.IP(x['IP'].src).iptype() == 'PUBLIC':
ipToCheck = x['IP'].src
myPort = x['UDP'].dport
elif IPy.IP(x['IP'].dst).iptype() == 'PUBLIC':
ipToCheck = x['IP'].dst
myPort = x['UDP'].sport
else:
return
pid = getPIDSys(myPort)
if pid == "0": return
process = psutil.Process(int(pid))
process_name = process.name()
if not search_term.lower() in process_name.lower(): return
if ipToCheck in positive_results: return
# Add this remote address to the list of positive results
if ipToCheck not in positive_results:
positive_results.append(ipToCheck)
try:
# Get the remote address's info
ip_info = get_ip_info(ipToCheck)
except:
ip_info = "[Failed]"
print(f"[{pid}] - {process_name} - [UDP] - {ipToCheck} - {ip_info}")
except:
pass
def startSniffThread():
cap = sniff(filter="udp", prn=lambda x: udphandler(x))
cap.summary()
print("\nUse CTRL + BREAK To Stop Sniffing\n")
# Starts UDP sniffing
t = Thread(target=startSniffThread)
t.start()
# Starts TCP thread
while True:
processCurrentConnections(psutil.net_connections(kind='tcp'))
requirements.txt
requests==2.25.1
scapy==2.4.5
psutil==5.8.0
IPy==1.01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment