Last active
January 15, 2024 04:30
-
-
Save exec/ad322888b95943b24c5d25cc3a997ef8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import socket | |
import threading | |
import time | |
import random | |
import json | |
import os | |
import re | |
from loguru import logger | |
# Configure loguru logger | |
LOG_FILE = "/var/log/flak.log" | |
logger.add(LOG_FILE, rotation="10 MB", backtrace=True, diagnose=True) | |
# Ensure log file exists | |
def ensure_log_file(): | |
if not os.path.exists(LOG_FILE): | |
open(LOG_FILE, 'a').close() | |
def sanitize_input(input_string): | |
""" | |
Sanitize the input string to prevent log injection and manipulation. | |
This function performs the following: | |
- Removes ANSI escape codes to prevent terminal color changes or cursor movements. | |
- Escapes potentially harmful characters like newlines, carriage returns, and tabs. | |
- Limits the length of the input string to prevent buffer overflow or DOS attacks. | |
- Optionally, add further checks as required by your application's context. | |
""" | |
# Remove ANSI escape codes | |
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') | |
sanitized = ansi_escape.sub('', input_string) | |
# Escape newlines, carriage returns, and tabs | |
sanitized = sanitized.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") | |
# Limit the length of the input (e.g., 1024 characters) | |
max_length = 1024 | |
if len(sanitized) > max_length: | |
sanitized = sanitized[:max_length] + '...[truncated]' | |
return sanitized | |
# Ensure config directory and file exists | |
CONFIG_DIR = "/etc/flak" | |
CONFIG_FILE = os.path.join(CONFIG_DIR, "ports.json") | |
def ensure_config_file(): | |
if not os.path.exists(CONFIG_DIR): | |
os.makedirs(CONFIG_DIR) | |
if not os.path.exists(CONFIG_FILE): | |
default_config = { | |
21: ["Who uses FTP in 2024?"], | |
1337: ["You aren't tho lol"] | |
} | |
with open(CONFIG_FILE, 'w') as file: | |
json.dump(default_config, file, indent=4) | |
# Load configuration from file | |
def load_config(): | |
with open(CONFIG_FILE, 'r') as file: | |
return json.load(file) | |
# Function to handle client connections | |
def handle_client(client_socket, addr, port_messages): | |
try: | |
logger.info(f"Received connection from {addr[0]}:{addr[1]}") | |
client_socket.settimeout(1.0) # Set timeout for client response | |
while True: | |
message = "\n" + random.choice(port_messages).replace("[IP]", addr[0]) | |
client_socket.send(message.encode()) | |
try: | |
response = client_socket.recv(1024) | |
if response: | |
logger.info(f"Response from {addr[0]}:{addr[1]}: {sanitize_input(response.decode())}") | |
except socket.timeout: | |
pass | |
time.sleep(1) | |
except Exception as e: | |
logger.error(f"Error in handling client {addr}: {e}") | |
ensure_log_file() | |
finally: | |
client_socket.close() | |
# Function to start a dummy service | |
def start_dummy_service(port, port_messages): | |
try: | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind(("0.0.0.0", port)) | |
server.listen(5) | |
logger.info(f"Listening on 0.0.0.0:{port}") | |
while True: | |
client, addr = server.accept() | |
client_handler = threading.Thread(target=handle_client, args=(client, addr, port_messages)) | |
client_handler.start() | |
except Exception as e: | |
logger.error(f"Error binding to port {port}: {e}") | |
ensure_log_file() | |
# Ensure config file and log file exist | |
ensure_config_file() | |
ensure_log_file() | |
# Load port responses from config file | |
port_responses = load_config() | |
# Start dummy services on specified ports | |
for port, messages in port_responses.items(): | |
threading.Thread(target=start_dummy_service, args=(int(port), messages)).start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
GITHUB_SCRIPT_URL="https://gist.githubusercontent.com/exec/ad322888b95943b24c5d25cc3a997ef8/raw/c97e763fcc303ae813582e511ab6eea4c30bd9fa/flak.py" | |
# Install loguru using pip with --break-system-packages | |
echo "Installing loguru..." | |
pip install loguru --break-system-packages | |
# Download the script and save it to /usr/bin/flak | |
echo "Downloading the flak script..." | |
wget -O /usr/bin/flak $GITHUB_SCRIPT_URL | |
chmod +x /usr/bin/flak | |
echo "Script downloaded and permissions set." | |
# Create a systemd service file for flak | |
echo "Creating systemd service for flak..." | |
cat <<EOF > /etc/systemd/system/flak.service | |
[Unit] | |
Description=Flak Service | |
After=network.target | |
[Service] | |
Type=simple | |
ExecStart=/usr/bin/flak | |
[Install] | |
WantedBy=multi-user.target | |
EOF | |
echo "Systemd service file created." | |
# Reload systemd daemon | |
echo "Would you like to refresh the systemd daemon now? [y/N]" | |
read -r refresh_answer | |
if [ "$refresh_answer" = "y" ] || [ "$refresh_answer" = "Y" ]; then | |
systemctl daemon-reload | |
echo "Systemd daemon reloaded." | |
else | |
echo "Skipping systemd daemon reload." | |
fi | |
# Enable flak service on boot | |
echo "Would you like to enable the Flak service on boot? [y/N]" | |
read -r enable_answer | |
if [ "$enable_answer" = "y" ] || [ "$enable_answer" = "Y" ]; then | |
systemctl enable flak.service | |
echo "Flak service enabled on boot." | |
else | |
echo "Flak service not enabled on boot." | |
fi | |
echo "Setup complete. You can start the service manually using 'systemctl start flak.service'." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment