Skip to content

Instantly share code, notes, and snippets.

@84634E1A607A
Created May 8, 2025 10:02
Show Gist options
  • Select an option

  • Save 84634E1A607A/a28bbb19ad8386710f4a6f05ab981389 to your computer and use it in GitHub Desktop.

Select an option

Save 84634E1A607A/a28bbb19ad8386710f4a6f05ab981389 to your computer and use it in GitHub Desktop.
Honeypot
#! /usr/bin/env python3
import socket
import subprocess
import sys
import time
import os
import ipaddress
import random
# Configuration
LISTEN_PORTS = [1080, 7890]
FAIL2BAN_COMMAND = "/usr/bin/fail2ban-client"
EXEMPT_NETWORK = [
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("10.0.0.0/8")
]
LOG_FILE = "/var/log/fail2ban-honeypot.log"
FAIL_MSG = "Manual ban request for %s\n"
lf = open(LOG_FILE, 'a')
def notify_fail2ban(ip):
"""Notify fail2ban about the suspicious activity."""
if not filter_ip(ip):
print(f"{ip} exempted")
return
# try:
# subprocess.run([FAIL2BAN_COMMAND, "set", "honeypot", "banip", ip], check=True)
# except subprocess.CalledProcessError as e:
# print(f"Failed to notify fail2ban: {e}")
lf.write(FAIL_MSG % ip)
lf.flush()
print(f"fail2ban notified to ban IP: {ip}")
def filter_ip(ip):
try:
ip = ipaddress.ip_address(ip)
except:
print(f"Failed to convert ip address {ip}")
return True
for nw in EXEMPT_NETWORK:
if ip in nw:
return False
return True
def handle_connection(conn, addr):
"""Handle incoming connection and check for suspicious activity."""
try:
# Receive data from the client
data = conn.recv(1024).decode('latin-1', 'ignore')
# Check if the data looks like a SOCKS or HTTP request
if 'HTTP/' in data:
print(f"HTTP request detected from {addr[0]}")
notify_fail2ban(addr[0])
elif 'SOCKS' in data:
print(f"SOCKS request detected from {addr[0]}")
notify_fail2ban(addr[0])
else:
if random.random() > 0.8:
notify_fail2ban(addr[0])
# Close the connection
conn.close()
except Exception as e:
print(f"Error handling connection: {e}")
conn.close()
def start_server(port):
"""Start a server listening on the specified port."""
try:
# Create a socket object
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to the port
server_socket.bind(('', port))
# Start listening
server_socket.listen(5)
print(f"Listening on port {port}")
while True:
# Accept connections
conn, addr = server_socket.accept()
print(f"Connection from {addr} on port {port}")
# Handle the connection in a new thread
handle_connection(conn, addr)
except Exception as e:
print(f"Error starting server on port {port}: {e}")
sys.exit(1)
if __name__ == "__main__":
for port in LISTEN_PORTS:
# Start a new thread for each port
from threading import Thread
thread = Thread(target=start_server, args=(port,))
thread.daemon = True
thread.start()
# Keep the main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Server shutting down.")
sys.exit(0)
#! /usr/bin/env python3
import socket
import threading
import paramiko
from paramiko import RSAKey
from paramiko.ssh_exception import SSHException
import subprocess
# Generate weak RSA key (1024-bit) to appear vulnerable
host_key = RSAKey.generate(1024)
class FakeSSHServer(paramiko.ServerInterface):
def __init__(self, client_ip):
self.client_ip = client_ip
def check_auth_password(self, username, password):
# Log and block all authentication attempts
send_alert(username, password, self.client_ip)
return paramiko.AUTH_FAILED
def get_allowed_auths(self, username):
return 'password'
def send_alert(username, password, client_ip):
try:
# Immediately block the attacking IP
subprocess.run(
f"fail2ban-client set honeypot banip {client_ip}",
shell=True,
check=True
)
print(f"Blocked {client_ip}: {username} attempted with password '{password}'")
except subprocess.CalledProcessError as e:
print(f"Failed to block {client_ip}: {str(e)}")
def handle_connection(client_socket, client_addr):
client_ip = client_addr[0]
print(f"Connection from {client_ip}")
try:
transport = paramiko.Transport(client_socket)
transport.add_server_key(host_key)
# Set older banner version to attract attackers
transport.local_version = "SSH-2.0-OpenSSH_6.4p1"
# Configure weak cryptographic settings
security_options = transport.get_security_options()
security_options.kex = [
'diffie-hellman-group1-sha1', # Weak DH group (1024-bit)
'diffie-hellman-group14-sha1', # Weak DH group (2048-bit)
'diffie-hellman-group-exchange-sha1'# Insecure if not properly implemented
]
security_options.ciphers = [
'aes128-cbc', # CBC mode with 128-bit AES
'3des-cbc', # Legacy triple DES
]
security_options.compression = [
'zlib@openssh.com', # Compression (potentially vulnerable to CRIME)
'none'
]
# Start SSH server
transport.start_server(server=FakeSSHServer(client_ip))
# Maintain connection to capture attack attempts
while transport.is_active():
transport.join(1)
except SSHException as e:
print(f"SSH negotiation failed: {str(e)}")
finally:
transport.close()
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', 22222))
server_socket.listen(100)
print("Fake SSH server listening on port 22222...")
while True:
client_socket, client_addr = server_socket.accept()
threading.Thread(
target=handle_connection,
args=(client_socket, client_addr)
).start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment