Skip to content

Instantly share code, notes, and snippets.

@yosignals
Last active May 1, 2023 11:50
Show Gist options
  • Save yosignals/59163d074c2ed0edf1ce311db16d9c59 to your computer and use it in GitHub Desktop.
Save yosignals/59163d074c2ed0edf1ce311db16d9c59 to your computer and use it in GitHub Desktop.
something hits your port, you get a slack notification IP/NSLookup *added whois + time stuff + write out to local sqlite
import socket
import sys
import subprocess
import requests
import json
import time
import sqlite3
import select
# Check for required packages and install them if needed
try:
import dns.resolver
except ImportError:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "dnspython"])
import dns.resolver
try:
import whois
except ImportError:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "python-whois"])
import whois
try:
import pytz
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "pytz"])
import pytz
try:
import ntplib
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "ntplib"])
import ntplib
try:
from dateutil import tz
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip",
"install", "python-dateutil"])
from dateutil import tz
# Set up Slack Webhook
# Replace with your webhook URL
slack_webhook_url = "YOURSLACKWEBHOOK"
# Set up SQLite database
db_name = "connections.db"
conn = sqlite3.connect(db_name)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS connections
(timestamp REAL, ip TEXT, nslookup TEXT, whois TEXT)''')
conn.commit()
# Listening sockets
ports = [50050, 50051] # Add more port numbers as needed
server_sockets = []
for port in ports:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', port))
server_socket.listen(1)
server_sockets.append(server_socket)
def nslookup(ip):
try:
result = dns.resolver.resolve_address(ip)
return result[0].to_text()
except Exception as e:
return "Not Found"
def whois_lookup(ip):
try:
w = whois.whois(ip)
whois_string = str(w).replace('\n', ' ').replace('\r', '')
return whois_string
except Exception as e:
return "WHOIS Lookup Failed"
def post_to_slack(text):
payload = {"text": text}
try:
response = requests.post(slack_webhook_url, json=payload)
if response.status_code == 200:
print("Posted to Slack.")
else:
print(f"Error posting to Slack: {response.status_code}")
except requests.exceptions.RequestException as e:
print("Error posting to Slack:", e)
def save_to_db(timestamp, ip, nslookup, whois):
c.execute("INSERT INTO connections (timestamp, ip, nslookup, whois) VALUES (?, ?, ?, ?)",
(timestamp, ip, nslookup, whois))
conn.commit()
def get_timezone_and_ntp():
# Get local timezone
local_timezone = str(tz.tzlocal())
# Get NTP server details
ntp_client = ntplib.NTPClient()
ntp_server = 'pool.ntp.org'
try:
response = ntp_client.request(ntp_server, version=3)
ntp_time = response.tx_time
except ntplib.NTPException as e:
ntp_time = "NTP request failed"
return local_timezone, ntp_time
# Debounce configuration
debounce_time = 5 # Time in seconds to group events from the same IP address
ip_debounce = {}
while True:
read_sockets, _, _ = select.select(server_sockets, [], [])
for sock in read_sockets:
client_socket, address = sock.accept()
ip = address[0]
current_time = time.time()
local_timezone, ntp_time = get_timezone_and_ntp()
# If the IP is not in the debounce dictionary or enough time has passed, send a message
if ip not in ip_debounce or current_time - ip_debounce[ip] >= debounce_time:
nslookup_result = nslookup(ip)
whois_result = whois_lookup(ip)
message = f"Timezone: {local_timezone}\nNTP Time: {ntp_time}\nConnection from IP: {ip}\nNSLookup: {nslookup_result}\nWHOIS: {whois_result}"
print(message)
post_to_slack(message)
save_to_db(current_time, ip, nslookup_result, whois_result)
# Update the debounce dictionary with the current timestamp for the IP address
ip_debounce[ip] = current_time
client_socket.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment