-
-
Save GoobyCorp/088ea82a48a4ac099d5b11d28cbad4a0 to your computer and use it in GitHub Desktop.
XBDM Server Emulator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import asyncio | |
from io import BytesIO | |
from shlex import shlex | |
from shutil import rmtree | |
from json import load, dump | |
from calendar import timegm | |
from struct import unpack, pack | |
from typing import Any, BinaryIO | |
from collections import OrderedDict | |
from os import walk, rename, remove, makedirs | |
from os.path import isfile, isdir, join, getsize | |
from datetime import datetime, timedelta, tzinfo | |
from ctypes import Structure, Union, c_ulong, c_uint32, c_int32, c_uint64, c_int64 | |
# constants | |
D3DFMT_A8R8G8B8 = 0x18280186 | |
D3DFMT_A2R10G10B10 = 0x18280192 | |
# ctypes aliases | |
c_dword = c_ulong | |
# xbdm variables | |
XBDM_PORT = 730 | |
# config variables | |
CONFIG_FILE = "config.json" | |
# JRPC2 variables - Byrom | |
JRPC2_CONFIG_FILE = "jrpc2_config.json" | |
# time variables | |
EPOCH_AS_FILETIME = 116444736000000000 | |
HUNDREDS_OF_NANOSECONDS = 10000000 | |
ZERO = timedelta(0) | |
HOUR = timedelta(hours=1) | |
# config | |
cfg: list | dict = {} | |
jrpc2cfg: list | dict = {} | |
# Responses from the console are high to low | |
# PC data is read from low to high | |
# Big Endian = high -> low | |
# Little Endian = low -> high | |
class INT64_PART(Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
("Low", c_int32), | |
("High", c_int32) | |
] | |
class UINT64_PART(Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
("Low", c_uint32), | |
("High", c_uint32) | |
] | |
class INT64(Union): | |
_pack_ = 1 | |
_fields_ = [ | |
("a", INT64_PART), | |
("Quad", c_int64) | |
] | |
class UINT64(Union): | |
_pack_ = 1 | |
_fields_ = [ | |
("a", UINT64_PART), | |
("Quad", c_uint64) | |
] | |
class FileInfo(Structure): | |
_fields_ = [ | |
("dwSize", c_uint32), | |
("CreateTime", INT64), | |
("ChangeTime", INT64), | |
("FileSize", INT64), | |
("FileAttributes", c_uint32) | |
] | |
def list_dirs(path: str) -> tuple | list: | |
return next(walk(path))[1] | |
def list_files(path: str) -> tuple | list: | |
return next(walk(path))[2] | |
def list_drives() -> (list, tuple): | |
return list_dirs(cfg["xbdm_dir"]) | |
def xbdm_to_local_path(path: str) -> str: | |
return join(cfg["xbdm_dir"], path.replace(":\\", "/").replace("\\", "/")).replace("\\", "/") | |
def format_command(command: bytes | bytearray, lowercase: bool = False): | |
command = command.decode("utf8").rstrip() | |
if lowercase: | |
command = command.lower() | |
return command | |
def bswap32(b: bytes | bytearray) -> bytes | bytearray: | |
if len(b) % 4 == 0: | |
return b"".join([bytes([b[x + 3], b[x + 2], b[x + 1], b[x]]) for x in range(0, len(b), 4)]) | |
def uint32_to_uint64(low: str | int, high: str | int) -> int: | |
if isinstance(low, str): | |
low = unpack("!I", bytes.fromhex(low.replace("0x", "")))[0] | |
if isinstance(high, str): | |
high = unpack("!I", bytes.fromhex(high.replace("0x", "")))[0] | |
return unpack("<Q", pack("<II", low, high))[0] | |
def uint64_to_uint32(num: int, as_hex: bool = False, as_bytes: bool = False) -> tuple | list: | |
i = unpack("<II", pack("<Q", num)) | |
if as_hex: | |
low = "0x" + pack("!I", i[0]).hex() | |
high = "0x" + pack("!I", i[1]).hex() | |
if as_bytes: | |
return [bytes(low, "utf8"), bytes(high, "utf8")] | |
return [low, high] | |
return i | |
def dt_to_filetime(dt): | |
if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None): | |
dt = dt.replace(tzinfo=UTC()) | |
ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS) | |
return ft + (dt.microsecond * 10) | |
def filetime_to_dt(ft) -> datetime: | |
# Get seconds and remainder in terms of Unix epoch | |
(s, ns100) = divmod(ft - EPOCH_AS_FILETIME, HUNDREDS_OF_NANOSECONDS) | |
# Convert to datetime object | |
dt = datetime.utcfromtimestamp(s) | |
# Add remainder in as microseconds. Python 3.2 requires an integer | |
dt = dt.replace(microsecond=(ns100 // 10)) | |
return dt | |
def creation_time_to_file_time(path: str) -> int: | |
#dt = datetime.utcfromtimestamp(getctime(path)) | |
return dt_to_filetime(datetime.utcnow()) | |
def modify_time_to_file_time(path: str) -> int: | |
#dt = datetime.utcfromtimestamp(getmtime(path)) | |
return dt_to_filetime(datetime.utcnow()) | |
def system_time() -> int: | |
dt1 = datetime(1, 1, 1, 23, 0, 0) | |
dt2 = datetime.utcnow() | |
return int(abs(dt2 - dt1).total_seconds()) * 10000000 | |
def is_int(s: str) -> str | int: | |
try: | |
return int(s) | |
except: | |
return s | |
class UTC(tzinfo): | |
def utcoffset(self, dt): | |
return ZERO | |
def tzname(self, dt): | |
return "UTC" | |
def dst(self, dt): | |
return ZERO | |
class XBDMShlex(shlex): | |
def __init__(self, *args, **kwargs): | |
super(XBDMShlex, self).__init__(*args, **kwargs) | |
self.escape = "" #remove the \ escape | |
self.whitespace_split = True | |
class XBDMParam: | |
def __init__(self, value: Any): | |
self.value = value | |
def __int__(self) -> int: | |
return self.as_int() | |
def __str__(self) -> str: | |
return self.as_str() | |
def __bytes__(self) -> bytes: | |
return self.as_bytes() | |
def is_none(self) -> bool: | |
return self.value is None | |
def as_int(self) -> int: | |
if isinstance(self.value, str): | |
if self.value.startswith("0x"): | |
return unpack(">I", bytes.fromhex(self.value[2:]))[0] | |
return int(self.value) | |
def as_str(self) -> str: | |
return str(self.value) | |
def as_bytes(self) -> bytes: | |
return bytes.fromhex(self.value) | |
class XBDMCommand(object): | |
name = None | |
args = OrderedDict() | |
flags = [] | |
formatted = None | |
def __init__(self): | |
self.reset() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
pass | |
def reset(self) -> None: | |
self.name = None | |
self.args = OrderedDict() | |
self.flags = [] | |
self.formatted = None | |
@staticmethod | |
def parse(command: str): | |
sh = XBDMShlex(command, posix=True) | |
command = list(sh) | |
cmd = XBDMCommand() | |
cmd.set_name(command[0]) | |
if len(command) > 1: | |
for single in command[1:]: | |
if "=" in single: | |
(key, value) = single.split("=", 1) | |
cmd.add_param(key, value) | |
else: | |
if not cmd.flag_exists(single): | |
cmd.add_flag(single) | |
return cmd | |
def set_name(self, name: str) -> None: | |
self.name = name.lower() | |
def set_response_code(self, code: int) -> None: | |
self.name = str(code) + "-" | |
def flag_exists(self, key: str) -> bool: | |
return key.lower() in self.flags | |
def param_exists(self, key: str, lc_check: bool = False) -> bool: | |
return not self.get_param(key, lc_check).is_none() | |
def add_flag(self, key: str) -> Any: | |
return self.flags.append(key.lower()) | |
def add_param(self, key: str, value: str | int | bytes | bytearray, quoted: bool = False) -> XBDMParam: | |
key = key.lower() | |
if isinstance(value, bytes) or isinstance(value, bytearray): | |
value = str(value, "utf8") | |
if quoted: | |
value = "\"" + value + "\"" | |
if isinstance(value, int): | |
value = "0x" + pack(">I", value).hex() | |
self.args[key] = value | |
return XBDMParam(value) | |
def get_param(self, key: str, lc_check: bool = False) -> XBDMParam: | |
key = key.lower() | |
val = self.args.get(key) | |
if lc_check and val is None: | |
val = self.args.get(key) | |
return XBDMParam(val) | |
def get_output(self, as_bytes: bool = False, line_ending: bool = True) -> str | bytes | bytearray: | |
out_str = " ".join([(key + "=" + value) for (key, value) in self.args.items()]) | |
if self.name is not None: | |
out_str = self.name + " " + out_str | |
if line_ending: | |
out_str += "\r\n" | |
if as_bytes: | |
return bytes(out_str, "utf8") | |
self.reset() | |
return out_str | |
class XBDMServerProtocol(asyncio.Protocol): | |
# file transfer variables | |
receiving_files: bool = False | |
num_files_total: int = 0 | |
num_files_left: int = 0 | |
file_data_left: int = 0 | |
file_path: str = "" | |
file_handle: BinaryIO | None = None | |
client_addr: str = None | |
client_port: int = None | |
def send_single_line(self, line: str | bytes | bytearray) -> None: | |
if isinstance(line, str): | |
line = line.encode("ASCII") | |
if not line.endswith(b"\r\n"): | |
line += b"\r\n" | |
self.transport.write(line) | |
def start_multi_line(self) -> None: | |
self.send_single_line("202- multiline response follows") | |
def end_multi_line(self) -> None: | |
self.send_single_line(".") | |
def send_multi_line(self, lines: list[str | bytes | bytearray], start_stop: bool = True) -> None: | |
if start_stop: | |
self.start_multi_line() | |
for line in lines: | |
if isinstance(line, str): | |
line = line.encode("UTF8") | |
self.send_single_line(line) | |
if start_stop: | |
self.end_multi_line() | |
def connection_made(self, transport: asyncio.ReadTransport | asyncio.WriteTransport): | |
(self.client_addr, self.client_port) = transport.get_extra_info("peername") | |
print(f"Incoming connection from {self.client_addr}:{self.client_port}") | |
self.transport = transport | |
self.send_single_line("201- connected") | |
def connection_lost(self, ex): | |
# print(ex) | |
# print(f"Lost connection to {self.client_addr}:{self.client_port}") | |
self.transport.close() | |
def eof_received(self) -> bool: | |
self.transport.close() | |
return True | |
def data_received(self, raw_command: bytes) -> None: | |
if raw_command: | |
if raw_command.endswith(b"\r\n") and not self.receiving_files: | |
if cfg["debug"]: | |
print(bytes(raw_command)) | |
print(raw_command.hex().upper()) | |
parsed = XBDMCommand.parse(format_command(raw_command)) | |
if parsed.name == "boxid": | |
print("Sending box ID...") | |
self.send_single_line("420- box is not locked") | |
elif parsed.name == "xbupdate!drawtext": | |
self.send_single_line("200- OK") | |
elif parsed.name == "xbupdate!version": | |
self.send_single_line("200- OK") | |
elif parsed.name == "xbupdate!validatehddpartitions": | |
self.send_single_line("200- OK") | |
elif parsed.name == "xbupdate!isflashclean": | |
self.send_single_line("200- OK") | |
elif parsed.name == "xbupdate!instrecoverytype": | |
self.send_single_line("200- OK") | |
elif parsed.name == "xbupdate!validdevice": | |
self.send_single_line("200- OK") | |
elif parsed.name == "recovery": | |
print("Booting recovery...") | |
self.send_single_line("200- OK") | |
elif parsed.name == "dbgname": | |
print("Sending console name...") | |
self.send_single_line("200- " + cfg["console_name"]) | |
elif parsed.name == "consoletype": | |
print("Sending console type...") | |
self.send_single_line("200- " + cfg["console_type"]) | |
elif parsed.name == "consolefeatures": | |
# Basic JRPC2 Support - Byrom | |
if parsed.param_exists("ver") and parsed.param_exists("type"): # is jrpc2 command | |
type_param = parsed.get_param("type").as_int() | |
# type 0 to 8 are related to call function by the look of it | |
#if type_param == "1": # example when loading a plugin | |
# print("JRPC2 - One of the call function commands received! Responding...") | |
# self.transport.write(b"200- 0\r\n") # 0 for load success | |
if type_param == 9: | |
print("JRPC2 - Resolve function command received! Responding...") | |
self.send_single_line("200- 80067F48") | |
elif type_param == 10: | |
print("JRPC2 - Get CPUKey command received! Responding...") | |
self.send_single_line("200- " + jrpc2cfg["CPUKey"]) | |
elif type_param == 11: | |
print("JRPC2 - Shutdown console command received! Responding...") | |
self.send_single_line("200- S_OK") | |
elif type_param == 12: | |
print("JRPC2 - XNotify command received! Responding...") | |
# consolefeatures ver=2 type=12 params=\"A\0\A\2\2/37\53696D706C6520546F6F6C20436F6E6E656374656420546F20596F7572205472696E697479\1\34\" | |
# 53696D706C6520546F6F6C20436F6E6E656374656420546F20596F7572205472696E697479 -> HexToText = Simple Tool Connected To Your Trinity | |
self.send_single_line("200- S_OK") | |
elif type_param == 13: | |
print("JRPC2 - Get Kern Version command received! Responding...") | |
self.send_single_line("200- " + jrpc2cfg["KernelVers"]) | |
elif type_param == 14: | |
print("JRPC2 - Set ROL LED command received! Responding...") # multple options for this green red orange topleft topright bottomleft bottomright | |
self.send_single_line("200- S_OK") | |
elif type_param == 15: | |
gettemp_params = parsed.get_param("params") | |
print(gettemp_params) | |
if gettemp_params == "A\\0\\A\\1\\1\\0\\": | |
print("JRPC2 - Get CPU Temperature command received! Responding...") | |
self.send_single_line("200- " + hex(jrpc2cfg["CPUTemp"]).replace("0x", "")) | |
elif gettemp_params == "A\\0\\A\\1\\1\\1\\": | |
print("JRPC2 - Get GPU Temperature command received! Responding...") | |
self.send_single_line("200- " + hex(jrpc2cfg["GPUTemp"]).replace("0x", "")) | |
elif gettemp_params == "A\\0\\A\\1\\1\\2\\": | |
print("JRPC2 - Get EDRAM Temperature command received! Responding...") | |
self.send_single_line("200- " + hex(jrpc2cfg["EDRAMTemp"]).replace("0x", "")) | |
elif gettemp_params == "A\\0\\A\\1\\1\\3\\": | |
print("JRPC2 - Get MOBO Temperature command received! Responding...") | |
self.send_single_line("200- " + hex(jrpc2cfg["MOBOTemp"]).replace("0x", "")) | |
elif type_param == 16: | |
print("JRPC2 - Get TitleID command received! Responding...") | |
self.send_single_line("200- " + jrpc2cfg["TitleID"]) | |
elif type_param == 17: | |
print("JRPC2 - Get Mobo Type command received! Responding...") | |
self.send_single_line("200- " + jrpc2cfg["MoboType"]) | |
elif type_param == 18: | |
print("JRPC2 - Constant memory setting command received! Responding...") | |
self.send_single_line("200- S_OK") | |
else: | |
print("JRPC2 - Unknown command received! Responding...") # catch any unknowns | |
self.send_single_line("200- 0") # better than nothing / is the return when load plugin is called | |
# end of jrpc2 commands | |
else: | |
print("Sending console features...") | |
if parsed.param_exists("params"): #extended query | |
print("Feature Params: " + parsed.get_param("params").as_str()) | |
self.send_single_line("200- S_OK") | |
else: #simple query | |
self.send_single_line("200- " + cfg["console_type"]) | |
elif parsed.name == "advmem" and parsed.flag_exists("status"): | |
print("Sending memory properties...") | |
self.send_single_line("200- enabled") | |
elif parsed.name == "altaddr": | |
print("Sending title IP address...") | |
addr = bytes(map(int, cfg["alternate_address"].split('.'))).hex() | |
self.send_single_line("200- addr=0x" + addr) | |
elif parsed.name == "systime": | |
print("Sending system time...") | |
(time_low, time_high) = uint64_to_uint32(system_time(), True) | |
with XBDMCommand() as cmd: | |
cmd.set_response_code(200) | |
cmd.add_param("high", time_high) | |
cmd.add_param("low", time_low) | |
cmd_data = cmd.get_output(True) | |
self.transport.write(cmd_data) | |
elif parsed.name == "systeminfo": | |
print("Sending system info...") | |
lines = [ | |
"HDD=" + "Enabled" if cfg["hdd_enabled"] else "Disabled", | |
"Type=" + cfg["console_type"], | |
f"Platform={cfg['platform']} System={cfg['system']}", | |
f"BaseKrnl={cfg['base_kernel']} Krnl={cfg['kernel']} XDK={cfg['xdk']}" | |
] | |
self.send_multi_line(lines) | |
elif parsed.name == "xbeinfo" and parsed.flag_exists("RUNNING"): | |
print("Sending current title info...") | |
lines = [ | |
"timestamp=0x00000000 checksum=0x00000000", | |
f"name=\"{cfg['current_title_path']}\"" | |
] | |
self.send_multi_line(lines) | |
elif parsed.name == "screenshot": | |
print("Sending screenshot...") | |
self.transport.write(b"203- binary response follows\r\n") | |
# output resolution | |
ow = 1280 # 1280 | |
oh = 720 # 720 | |
# screen resolution | |
sw = 1920 # 1920 | |
sh = 1080 # 1080 | |
vw = ow | |
vh = oh | |
if vw % 128 != 0: | |
vw += (128 - vw % 128) | |
if vh % 128 != 0: | |
vh += (128 - vh % 128) | |
# pitch | |
p = ow * 4 | |
with open(cfg["screenshot_file"], "rb") as f: | |
# read and send the file | |
data = f.read() | |
with XBDMCommand() as cmd: | |
cmd.add_param("pitch", p) | |
cmd.add_param("width", ow) | |
cmd.add_param("height", oh) | |
cmd.add_param("format", D3DFMT_A8R8G8B8) | |
cmd.add_param("offsetx", 0) | |
cmd.add_param("offsety", 0) | |
cmd.add_param("framebuffersize", len(data)) # 0x398000 | |
cmd.add_param("sw", sw) | |
cmd.add_param("sh", sh) | |
cmd.add_param("colorspace", 0) | |
self.transport.write(cmd.get_output(True, True)) | |
self.transport.write(data) | |
elif parsed.name == "drivelist": | |
print("Sending drive list...") | |
self.send_multi_line([f"drivename=\"{x}\"" for x in list_drives()]) | |
elif parsed.name == "isdebugger": | |
print("Requesting is debugger...") | |
self.send_single_line("410- name=\"XRPC\" user=" + cfg["username"]) | |
elif parsed.name == "break" and parsed.flag_exists("clearall"): | |
print("Removing all breakpoints...") | |
self.send_single_line("200- OK") | |
elif parsed.name == "modules": | |
print("Sending module listing...") | |
lines = [] | |
for single in cfg["modules"]: | |
with XBDMCommand() as cmd: | |
cmd.add_param("name", single["name"], True) | |
cmd.add_param("base", single["base"]) | |
cmd.add_param("size", single["size"]) | |
cmd.add_param("check", 0) | |
cmd.add_param("timestamp", 0) | |
cmd.add_param("pdata", 0) | |
cmd.add_param("psize", 0) | |
cmd.add_param("thread", 0) | |
cmd.add_param("osize", 0) | |
cmd_data = cmd.get_output(True) | |
lines.append(cmd_data) | |
self.send_multi_line(lines) | |
elif parsed.name == "kdnet": # kdnet config commands | |
if parsed.flag_exists("set"): # set kdnet settings | |
if parsed.param_exists("IP") and parsed.param_exists("Port"): | |
kdnet_addr = parsed.get_param("IP") | |
kdnet_port = parsed.get_param("Port") | |
print(f"Attempted to configure KDNET to talk to {kdnet_addr}:{kdnet_port}...") | |
self.send_single_line("200- kdnet set succeeded.") | |
elif parsed.flag_exists("show"): # show settings | |
self.send_single_line("200- kdnet settings:\x1E\tEnable=1\x1E\tTarget IP: 192.168.0.43\x1E\tTarget MAC: 00-25-AE-E4-43-87\x1E\tHost IP: 192.168.0.2\x1E\tHost Port: 50001\x1E\tEncrypted: 0\x1E") | |
elif parsed.name == "debugger": | |
if parsed.flag_exists("DISCONNECT"): | |
print("Debugger disconnecting...") | |
elif parsed.flag_exists("CONNECT"): | |
print("Debugger connecting...") | |
#dbg_port = int(parsed.get_param("PORT")) | |
#dbg_name = parsed.get_param("user") | |
self.send_single_line("200- OK") | |
elif parsed.name == "drivefreespace": | |
if parsed.get_param("NAME"): | |
drive_label = parsed.get_param("NAME") | |
print(f"Requesting free space for drive label {drive_label}...") | |
(low, high) = uint64_to_uint32(cfg["console_hdd_size"], True, True) | |
with XBDMCommand() as cmd: | |
cmd.add_param("freetocallerlo", low) | |
cmd.add_param("freetocallerhi", high) | |
cmd.add_param("totalbyteslo", low) | |
cmd.add_param("totalbyteshi", high) | |
cmd.add_param("totalfreebyteslo", low) | |
cmd.add_param("totalfreebyteshi", high) | |
cmd_data = cmd.get_output(True) | |
self.send_multi_line([cmd_data]) | |
elif parsed.name == "dirlist": | |
if parsed.param_exists("NAME"): | |
phys_path = xbdm_to_local_path(parsed.get_param("NAME").as_str()) | |
if isdir(phys_path): | |
print(f"Requesting directory listing for {phys_path}...") | |
self.start_multi_line() | |
lines = [] | |
for single in list_files(phys_path): | |
single_path = join(phys_path, single) | |
single_size = getsize(single_path) | |
(ctime_low, ctime_high) = uint64_to_uint32(creation_time_to_file_time(single_path), True) | |
(mtime_low, mtime_high) = uint64_to_uint32(modify_time_to_file_time(single_path), True) | |
(size_low, size_high) = uint64_to_uint32(single_size, True) | |
with XBDMCommand() as cmd: | |
cmd.add_param("name", single, True) | |
cmd.add_param("sizehi", size_high) | |
cmd.add_param("sizelo", size_low) | |
cmd.add_param("createhi", ctime_high) | |
cmd.add_param("createlo", ctime_low) | |
cmd.add_param("changehi", mtime_high) | |
cmd.add_param("changelo", mtime_low) | |
cmd_data = cmd.get_output(True, True) | |
lines.append(cmd_data) | |
self.send_multi_line(lines, False) | |
lines = [f"name=\"{x}\" sizehi=0x0 sizelo=0x0 createhi=0x01d3c0d2 createlo=0x40667d00 changehi=0x01d3c0d2 changelo=0x40667d00 directory" for x in list_dirs(phys_path)] | |
self.send_multi_line(lines, False) | |
self.end_multi_line() | |
else: | |
self.send_single_line("402- directory not found") | |
elif parsed.name == "setfileattributes": | |
self.send_single_line("200- OK") | |
elif parsed.name == "getfileattributes": | |
if parsed.param_exists("NAME"): | |
phys_path = xbdm_to_local_path(parsed.get_param("NAME").as_str()) | |
print(f"Requesting file attributes for \"{phys_path}\"...") | |
if isfile(phys_path): | |
print("File exists...") | |
file_size = getsize(phys_path) | |
(ctime_low, ctime_high) = uint64_to_uint32(creation_time_to_file_time(phys_path), True) | |
(mtime_low, mtime_high) = uint64_to_uint32(modify_time_to_file_time(phys_path), True) | |
(size_low, size_high) = uint64_to_uint32(file_size, True) | |
with XBDMCommand() as cmd: | |
cmd.add_param("sizehi", size_high) | |
cmd.add_param("sizelo", size_low) | |
cmd.add_param("createhi", ctime_high) | |
cmd.add_param("createlo", ctime_low) | |
cmd.add_param("changehi", mtime_high) | |
cmd.add_param("changelo", mtime_low) | |
cmd_data = cmd.get_output(True) | |
self.send_multi_line([cmd_data]) | |
else: | |
print("File doesn't exist...") | |
self.send_single_line("402- file not found") | |
elif parsed.name == "mkdir": | |
if parsed.param_exists("NAME"): | |
phys_path = xbdm_to_local_path(parsed.get_param("NAME").as_str()) | |
if not isfile(phys_path) and not isdir(phys_path): | |
print(f"Created directory \"{phys_path}\"...") | |
makedirs(phys_path, exist_ok=True) | |
self.send_single_line("200- OK") | |
elif parsed.name == "getfile": | |
if parsed.param_exists("NAME"): | |
phys_path = xbdm_to_local_path(parsed.get_param("NAME").as_str()) | |
if isfile(phys_path): | |
print(f"Sending file @ \"{phys_path}\"...") | |
with open(phys_path, "rb") as f: | |
data = f.read() | |
self.transport.write(b"203- binary response follows\r\n") | |
self.transport.write(pack("<I", len(data))) | |
self.transport.write(data) | |
elif parsed.name == "sendvfile": | |
if parsed.param_exists("COUNT"): | |
file_count = parsed.get_param("COUNT").as_int() | |
print(f"Receiving {file_count} file(s)...") | |
if file_count > 0: | |
self.send_single_line("204- send binary data") | |
self.num_files_total = file_count | |
self.num_files_left = file_count | |
self.receiving_files = True | |
self.send_single_line("203- binary response follows") | |
self.transport.write((b"\x00" * 4) * self.num_files_total) | |
elif parsed.name == "sendfile": | |
print("Receiving single file...") | |
self.send_single_line("203- binary response follows") | |
self.transport.write((b"\x00" * 4)) | |
elif parsed.name == "rename": | |
if parsed.param_exists("NAME") and parsed.param_exists("NEWNAME"): | |
old_file_path = xbdm_to_local_path(parsed.get_param("NAME").as_str()) | |
new_file_path = xbdm_to_local_path(parsed.get_param("NEWNAME").as_str()) | |
if isfile(old_file_path) or isdir(old_file_path): | |
print(f"Renaming \"{old_file_path}\" to \"{new_file_path}\"...") | |
rename(old_file_path, new_file_path) | |
self.send_single_line("200- OK") | |
elif parsed.name == "delete": | |
if parsed.param_exists("NAME"): | |
phys_path = xbdm_to_local_path(parsed.get_param("NAME").as_str()) | |
if parsed.flag_exists("DIR"): | |
print(f"Deleting folder @ \"{phys_path}\"...") | |
rmtree(phys_path, True) | |
else: | |
print(f"Deleting file @ \"{phys_path}\"...") | |
remove(phys_path) | |
self.send_single_line("200- OK") | |
elif parsed.name == "setmem": | |
if parsed.param_exists("addr") and parsed.param_exists("data"): | |
print(parsed.get_param("addr")) | |
setmem_addr = parsed.get_param("addr") | |
setmem_data = parsed.get_param("data").as_bytes() | |
print(f"Attempted to set {len(setmem_data)} byte(s) @ {setmem_addr}...") | |
self.send_single_line(f"200- set {str(len(setmem_data))} bytes") | |
elif parsed.name == "getmem" or parsed.name == "getmemex": | |
if parsed.param_exists("ADDR") and parsed.param_exists("LENGTH"): | |
addr = parsed.get_param("ADDR").as_int() | |
length = parsed.get_param("LENGTH").as_int() | |
# length = unpack("!I", bytes.fromhex(length.replace("0x", "")))[0] | |
print(f"Attempted to get {length} byte(s) @ {addr}...") | |
self.send_single_line("203- binary response follows") | |
self.transport.write(pack("<H", 1024) + (b"suckcock" * 128)) | |
self.transport.write(pack("<H", 1024) + (b"suckcock" * 128)) | |
self.transport.write(pack("<H", 1024) + (b"suckcock" * 128)) | |
self.transport.write(pack("<H", 1024) + (b"suckcock" * 128)) | |
elif parsed.name == "setsystime": | |
if parsed.param_exists("clocklo") and parsed.param_exists("clockhi") and parsed.param_exists("tz"): | |
sys_time_low = parsed.get_param("clocklo").as_int() | |
sys_time_high = parsed.get_param("clockhi").as_int() | |
#timezone = bool(parsed.get_param("tz")) | |
sys_time = uint32_to_uint64(sys_time_low, sys_time_high) | |
print(f"Setting system time to {sys_time}...") | |
self.send_single_line("200- OK") | |
elif parsed.name == "notify" and parsed.param_exists("reconnectport") and parsed.flag_exists("reverse"): | |
reconnect_port = int(parsed.get_param("reconnectport")) | |
print(f"Requesting reconnect on TCP port {reconnect_port}...") | |
self.send_single_line("205- now a notification channel") | |
elif parsed.name == "notifyat": | |
if parsed.flag_exists("drop"): | |
self.send_single_line("200- OK") | |
elif parsed.name == "lockmode": | |
if parsed.param_exists("BOXID"): | |
box_id = parsed.get_param("BOXID") | |
print(f"Attempted to lock system with box ID {box_id}...") | |
self.send_single_line("200- OK") | |
elif parsed.flag_exists("unlock"): | |
print("Attempted to unlock system...") | |
self.send_single_line("200- OK") | |
elif parsed.name == "user" and parsed.param_exists("name"): | |
priv_user = parsed.get_param("NAME") | |
print(f"Attempted to add user {priv_user} to locked system with the privilege string \"{' '.join(parsed.flags)}\"...") | |
self.send_single_line("200- OK") | |
elif parsed.name == "userlist": | |
self.send_multi_line([ | |
"name=\"John\" read write control config manage" | |
]) | |
elif parsed.name == "keyxchg": | |
self.send_single_line("200- OK") | |
elif parsed.name == "magicboot": | |
if parsed.param_exists("title") and parsed.param_exists("directory"): | |
magicboot_exe = parsed.get_param("title") | |
magicboot_dir = parsed.get_param("directory") | |
print(f"Magic Boot attempted to run \"{magicboot_exe}\"") | |
self.send_single_line("200- OK") | |
else: | |
print("Reboot attempted!") | |
self.send_single_line("200- OK") | |
elif parsed.name == "getuserpriv": | |
print("Sending user privilege") | |
self.send_single_line("402- file not found") | |
elif parsed.name == "bye": | |
# print("Closing the socket...") | |
self.send_single_line("200- bye") | |
self.transport.close() | |
else: | |
if parsed.name is not None: | |
print(f"UNHANDLED COMMAND \"{parsed.name}\"") | |
elif raw_command == bytes.fromhex("020405B40103030801010402"): | |
print("Sending unknown?") | |
self.transport.write(raw_command) | |
elif self.receiving_files: | |
if self.num_files_left > 0 and self.file_handle is None and self.file_data_left == 0: | |
# print("Size:", len(raw_command)) | |
# print(raw_command.hex()) | |
#receive file header | |
with BytesIO(raw_command) as bio: | |
(header_size,) = unpack(">I", bio.read(4)) | |
# print("Header Size:", header_size) | |
header = bio.read(header_size - 4) # exclude header size | |
(create_hi, create_lo, modify_hi, modify_lo, file_size_hi, file_size_lo, file_attrbs) = unpack(">6IL", header[:28]) | |
file_size = uint32_to_uint64(file_size_lo, file_size_hi) | |
self.file_path = xbdm_to_local_path(header[28:-1].decode("UTF8")) | |
self.file_handle = open(self.file_path, "wb") | |
self.file_handle.write(bio.read()) | |
self.file_data_left = file_size - self.file_handle.tell() | |
elif self.num_files_left > 0 and self.file_handle is not None and self.file_data_left > 0: | |
if self.file_data_left < len(raw_command): # fragmented packet | |
self.file_handle.write(raw_command[:self.file_data_left]) | |
self.file_handle.close() | |
self.file_handle = None | |
self.num_files_left -= 1 | |
self.file_path = "" | |
raw_command = raw_command[self.file_data_left:] | |
self.file_data_left = 0 | |
# send to data_received to process the packet | |
self.data_received(raw_command) | |
return | |
else: # unfragmented packet | |
self.file_handle.write(raw_command) | |
self.file_data_left -= len(raw_command) | |
if self.file_data_left == 0: | |
self.file_handle.close() | |
self.file_handle = None | |
self.num_files_left -= 1 | |
self.file_data_left = 0 | |
self.file_path = "" | |
if self.num_files_left == 0: | |
self.send_single_line("203- binary response follows") | |
self.transport.write((b"\x00" * 4) * self.num_files_total) | |
self.num_files_total = 0 | |
self.receiving_files = False | |
async def run_server(): | |
loop = asyncio.get_running_loop() | |
server = await loop.create_server(XBDMServerProtocol, "0.0.0.0", XBDM_PORT) | |
async with server: | |
await server.serve_forever() | |
def main() -> int: | |
global cfg, jrpc2cfg | |
if isfile(CONFIG_FILE): | |
cfg = load(open(CONFIG_FILE, "r")) | |
else: | |
cfg = {"xbdm_dir": "XBDM"} | |
dump(cfg, open(CONFIG_FILE, "w")) | |
if isfile(JRPC2_CONFIG_FILE): | |
jrpc2cfg = load(open(JRPC2_CONFIG_FILE, "r")) | |
else: | |
jrpc2cfg = { | |
"MoboType": "Trinity", | |
"CPUKey": "13371337133713371337133713371337", | |
"KernelVers": "17559", | |
"TitleID": "FFFE07D1", | |
"CPUTemp": 45, | |
"GPUTemp": 44, | |
"EDRAMTemp": 43, | |
"MOBOTemp": 39 | |
} | |
dump(jrpc2cfg, open(JRPC2_CONFIG_FILE, "w")) | |
asyncio.run(run_server()) | |
return 0 | |
if __name__ == "__main__": | |
exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment