-
-
Save nezza/34b89eb664355515b86f69dd2ea0a7ec to your computer and use it in GitHub Desktop.
A fake telnet server compatible with Mirai - Mirai will detect this Telnet server as a vulnerable target and report it to the scan listener.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
# Mirai compatible fake telnet server | |
ALLOW_ALL_CREDENTIALS = False | |
USER = "root" | |
PASS = "default" | |
# Function to handle client connection | |
def handle_client_connection(client_socket): | |
try: | |
# Telnet protocol commands | |
IAC = bytes([255]) # Interpret As Command | |
DONT = bytes([254]) | |
DO = bytes([253]) | |
WONT = bytes([252]) | |
WILL = bytes([251]) | |
client_socket.send(IAC + DONT + bytes([1])) # Suppress Go Ahead | |
client_socket.send(IAC + WILL + bytes([3])) # Suppress Go Ahead | |
# Skip over IAC | |
client_socket.recv(3) | |
client_socket.send(b"login: ") | |
username = b'' | |
while username == b'': | |
username = client_socket.recv(1024).strip() | |
client_socket.send(b"password: ") | |
password = b'' | |
while password == b'': | |
password = client_socket.recv(1024).strip() | |
if not ALLOW_ALL_CREDENTIALS and (username.decode() != USER or password.decode() != PASS): | |
print(f"Login denied for {username.decode()}:{password.decode()}\n") | |
client_socket.close() | |
return | |
# Log and send back username and password | |
login_info = f"Logged in with Username: {username.decode()} and Password: {password.decode()}\n" | |
print(login_info) | |
while True: | |
client_socket.send("# \n".encode()) | |
data = client_socket.recv(1024).strip() | |
if b'/bin/busybox MIRAI' in data: | |
print("Sending MIRAI confirm.") | |
client_socket.send(b"MIRAI: applet not found\n") | |
break | |
# Close the connection | |
client_socket.close() | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
client_socket.close() | |
def start_telnet_server(host, port): | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind((host, port)) | |
server.listen(5) # Listen for up to 5 connections | |
print(f"Telnet server running on {host}:{port}") | |
try: | |
while True: | |
client_sock, address = server.accept() | |
print(f"Accepted connection from {address[0]}:{address[1]}") | |
client_handler = threading.Thread( | |
target=handle_client_connection, | |
args=(client_sock,) | |
) | |
client_handler.start() | |
except KeyboardInterrupt: | |
print("Shutting down the server.") | |
server.close() | |
# Start the server | |
if __name__ == "__main__": | |
HOST, PORT = "0.0.0.0", 23 # Listening on all network interfaces, port 23 | |
start_telnet_server(HOST, PORT) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment