Created
May 8, 2025 10:02
-
-
Save 84634E1A607A/a28bbb19ad8386710f4a6f05ab981389 to your computer and use it in GitHub Desktop.
Honeypot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| import socket | |
| import subprocess | |
| import sys | |
| import time | |
| import os | |
| import ipaddress | |
| import random | |
| # Configuration | |
| LISTEN_PORTS = [1080, 7890] | |
| FAIL2BAN_COMMAND = "/usr/bin/fail2ban-client" | |
| EXEMPT_NETWORK = [ | |
| ipaddress.ip_network("192.168.0.0/16"), | |
| ipaddress.ip_network("10.0.0.0/8") | |
| ] | |
| LOG_FILE = "/var/log/fail2ban-honeypot.log" | |
| FAIL_MSG = "Manual ban request for %s\n" | |
| lf = open(LOG_FILE, 'a') | |
| def notify_fail2ban(ip): | |
| """Notify fail2ban about the suspicious activity.""" | |
| if not filter_ip(ip): | |
| print(f"{ip} exempted") | |
| return | |
| # try: | |
| # subprocess.run([FAIL2BAN_COMMAND, "set", "honeypot", "banip", ip], check=True) | |
| # except subprocess.CalledProcessError as e: | |
| # print(f"Failed to notify fail2ban: {e}") | |
| lf.write(FAIL_MSG % ip) | |
| lf.flush() | |
| print(f"fail2ban notified to ban IP: {ip}") | |
| def filter_ip(ip): | |
| try: | |
| ip = ipaddress.ip_address(ip) | |
| except: | |
| print(f"Failed to convert ip address {ip}") | |
| return True | |
| for nw in EXEMPT_NETWORK: | |
| if ip in nw: | |
| return False | |
| return True | |
| def handle_connection(conn, addr): | |
| """Handle incoming connection and check for suspicious activity.""" | |
| try: | |
| # Receive data from the client | |
| data = conn.recv(1024).decode('latin-1', 'ignore') | |
| # Check if the data looks like a SOCKS or HTTP request | |
| if 'HTTP/' in data: | |
| print(f"HTTP request detected from {addr[0]}") | |
| notify_fail2ban(addr[0]) | |
| elif 'SOCKS' in data: | |
| print(f"SOCKS request detected from {addr[0]}") | |
| notify_fail2ban(addr[0]) | |
| else: | |
| if random.random() > 0.8: | |
| notify_fail2ban(addr[0]) | |
| # Close the connection | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error handling connection: {e}") | |
| conn.close() | |
| def start_server(port): | |
| """Start a server listening on the specified port.""" | |
| try: | |
| # Create a socket object | |
| server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| # Bind the socket to the port | |
| server_socket.bind(('', port)) | |
| # Start listening | |
| server_socket.listen(5) | |
| print(f"Listening on port {port}") | |
| while True: | |
| # Accept connections | |
| conn, addr = server_socket.accept() | |
| print(f"Connection from {addr} on port {port}") | |
| # Handle the connection in a new thread | |
| handle_connection(conn, addr) | |
| except Exception as e: | |
| print(f"Error starting server on port {port}: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| for port in LISTEN_PORTS: | |
| # Start a new thread for each port | |
| from threading import Thread | |
| thread = Thread(target=start_server, args=(port,)) | |
| thread.daemon = True | |
| thread.start() | |
| # Keep the main thread alive | |
| try: | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| print("Server shutting down.") | |
| sys.exit(0) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| import socket | |
| import threading | |
| import paramiko | |
| from paramiko import RSAKey | |
| from paramiko.ssh_exception import SSHException | |
| import subprocess | |
| # Generate weak RSA key (1024-bit) to appear vulnerable | |
| host_key = RSAKey.generate(1024) | |
| class FakeSSHServer(paramiko.ServerInterface): | |
| def __init__(self, client_ip): | |
| self.client_ip = client_ip | |
| def check_auth_password(self, username, password): | |
| # Log and block all authentication attempts | |
| send_alert(username, password, self.client_ip) | |
| return paramiko.AUTH_FAILED | |
| def get_allowed_auths(self, username): | |
| return 'password' | |
| def send_alert(username, password, client_ip): | |
| try: | |
| # Immediately block the attacking IP | |
| subprocess.run( | |
| f"fail2ban-client set honeypot banip {client_ip}", | |
| shell=True, | |
| check=True | |
| ) | |
| print(f"Blocked {client_ip}: {username} attempted with password '{password}'") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Failed to block {client_ip}: {str(e)}") | |
| def handle_connection(client_socket, client_addr): | |
| client_ip = client_addr[0] | |
| print(f"Connection from {client_ip}") | |
| try: | |
| transport = paramiko.Transport(client_socket) | |
| transport.add_server_key(host_key) | |
| # Set older banner version to attract attackers | |
| transport.local_version = "SSH-2.0-OpenSSH_6.4p1" | |
| # Configure weak cryptographic settings | |
| security_options = transport.get_security_options() | |
| security_options.kex = [ | |
| 'diffie-hellman-group1-sha1', # Weak DH group (1024-bit) | |
| 'diffie-hellman-group14-sha1', # Weak DH group (2048-bit) | |
| 'diffie-hellman-group-exchange-sha1'# Insecure if not properly implemented | |
| ] | |
| security_options.ciphers = [ | |
| 'aes128-cbc', # CBC mode with 128-bit AES | |
| '3des-cbc', # Legacy triple DES | |
| ] | |
| security_options.compression = [ | |
| 'zlib@openssh.com', # Compression (potentially vulnerable to CRIME) | |
| 'none' | |
| ] | |
| # Start SSH server | |
| transport.start_server(server=FakeSSHServer(client_ip)) | |
| # Maintain connection to capture attack attempts | |
| while transport.is_active(): | |
| transport.join(1) | |
| except SSHException as e: | |
| print(f"SSH negotiation failed: {str(e)}") | |
| finally: | |
| transport.close() | |
| def main(): | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: | |
| server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| server_socket.bind(('0.0.0.0', 22222)) | |
| server_socket.listen(100) | |
| print("Fake SSH server listening on port 22222...") | |
| while True: | |
| client_socket, client_addr = server_socket.accept() | |
| threading.Thread( | |
| target=handle_connection, | |
| args=(client_socket, client_addr) | |
| ).start() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment