Skip to content

Instantly share code, notes, and snippets.

@exec
Last active January 15, 2024 04:30
Show Gist options
  • Save exec/ad322888b95943b24c5d25cc3a997ef8 to your computer and use it in GitHub Desktop.
Save exec/ad322888b95943b24c5d25cc3a997ef8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import socket
import threading
import time
import random
import json
import os
import re
from loguru import logger
# Configure loguru logger
LOG_FILE = "/var/log/flak.log"
logger.add(LOG_FILE, rotation="10 MB", backtrace=True, diagnose=True)
# Ensure log file exists
def ensure_log_file():
if not os.path.exists(LOG_FILE):
open(LOG_FILE, 'a').close()
def sanitize_input(input_string):
"""
Sanitize the input string to prevent log injection and manipulation.
This function performs the following:
- Removes ANSI escape codes to prevent terminal color changes or cursor movements.
- Escapes potentially harmful characters like newlines, carriage returns, and tabs.
- Limits the length of the input string to prevent buffer overflow or DOS attacks.
- Optionally, add further checks as required by your application's context.
"""
# Remove ANSI escape codes
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
sanitized = ansi_escape.sub('', input_string)
# Escape newlines, carriage returns, and tabs
sanitized = sanitized.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
# Limit the length of the input (e.g., 1024 characters)
max_length = 1024
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + '...[truncated]'
return sanitized
# Ensure config directory and file exists
CONFIG_DIR = "/etc/flak"
CONFIG_FILE = os.path.join(CONFIG_DIR, "ports.json")
def ensure_config_file():
if not os.path.exists(CONFIG_DIR):
os.makedirs(CONFIG_DIR)
if not os.path.exists(CONFIG_FILE):
default_config = {
21: ["Who uses FTP in 2024?"],
1337: ["You aren't tho lol"]
}
with open(CONFIG_FILE, 'w') as file:
json.dump(default_config, file, indent=4)
# Load configuration from file
def load_config():
with open(CONFIG_FILE, 'r') as file:
return json.load(file)
# Function to handle client connections
def handle_client(client_socket, addr, port_messages):
try:
logger.info(f"Received connection from {addr[0]}:{addr[1]}")
client_socket.settimeout(1.0) # Set timeout for client response
while True:
message = "\n" + random.choice(port_messages).replace("[IP]", addr[0])
client_socket.send(message.encode())
try:
response = client_socket.recv(1024)
if response:
logger.info(f"Response from {addr[0]}:{addr[1]}: {sanitize_input(response.decode())}")
except socket.timeout:
pass
time.sleep(1)
except Exception as e:
logger.error(f"Error in handling client {addr}: {e}")
ensure_log_file()
finally:
client_socket.close()
# Function to start a dummy service
def start_dummy_service(port, port_messages):
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", port))
server.listen(5)
logger.info(f"Listening on 0.0.0.0:{port}")
while True:
client, addr = server.accept()
client_handler = threading.Thread(target=handle_client, args=(client, addr, port_messages))
client_handler.start()
except Exception as e:
logger.error(f"Error binding to port {port}: {e}")
ensure_log_file()
# Ensure config file and log file exist
ensure_config_file()
ensure_log_file()
# Load port responses from config file
port_responses = load_config()
# Start dummy services on specified ports
for port, messages in port_responses.items():
threading.Thread(target=start_dummy_service, args=(int(port), messages)).start()
#!/bin/bash
GITHUB_SCRIPT_URL="https://gist.githubusercontent.com/exec/ad322888b95943b24c5d25cc3a997ef8/raw/c97e763fcc303ae813582e511ab6eea4c30bd9fa/flak.py"
# Install loguru using pip with --break-system-packages
echo "Installing loguru..."
pip install loguru --break-system-packages
# Download the script and save it to /usr/bin/flak
echo "Downloading the flak script..."
wget -O /usr/bin/flak $GITHUB_SCRIPT_URL
chmod +x /usr/bin/flak
echo "Script downloaded and permissions set."
# Create a systemd service file for flak
echo "Creating systemd service for flak..."
cat <<EOF > /etc/systemd/system/flak.service
[Unit]
Description=Flak Service
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/flak
[Install]
WantedBy=multi-user.target
EOF
echo "Systemd service file created."
# Reload systemd daemon
echo "Would you like to refresh the systemd daemon now? [y/N]"
read -r refresh_answer
if [ "$refresh_answer" = "y" ] || [ "$refresh_answer" = "Y" ]; then
systemctl daemon-reload
echo "Systemd daemon reloaded."
else
echo "Skipping systemd daemon reload."
fi
# Enable flak service on boot
echo "Would you like to enable the Flak service on boot? [y/N]"
read -r enable_answer
if [ "$enable_answer" = "y" ] || [ "$enable_answer" = "Y" ]; then
systemctl enable flak.service
echo "Flak service enabled on boot."
else
echo "Flak service not enabled on boot."
fi
echo "Setup complete. You can start the service manually using 'systemctl start flak.service'."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment