-
-
Save hatschky/83e0cdf1d2b6710c9f883ff20a629774 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
import asyncio | |
import codecs | |
import datetime | |
from enum import Enum, IntEnum, IntFlag | |
import random | |
import string | |
import struct | |
from threading import Thread | |
import time | |
class BGBLinkCommand(IntEnum): | |
Version = 1 | |
Joypad = 101 | |
Sync1 = 104 | |
Sync2 = 105 | |
Sync3 = 106 | |
Status = 108 | |
WantDisconnect = 109 | |
class BGBLinkServer(asyncio.Protocol): | |
def __init__(self, peripheral): | |
self.peripheral = peripheral | |
self.data_buffer = bytearray() | |
self.byte_to_send = 0xD2 | |
def connection_made(self, transport): | |
self.transport = transport | |
def data_received(self, data): | |
self.data_buffer += data | |
while len(self.data_buffer) >= 8: | |
packet = self.data_buffer[:8] | |
self.data_buffer = self.data_buffer[8:] | |
command = BGBLinkCommand(packet[0]) | |
if command is BGBLinkCommand.Version: | |
self.send_packet(BGBLinkCommand.Version, b2=1, b3=4) | |
elif command is BGBLinkCommand.Joypad: | |
pass | |
elif command in [BGBLinkCommand.Sync1, BGBLinkCommand.Sync2]: | |
self.send_packet(packet[0], b2=self.byte_to_send) | |
self.byte_to_send = self.peripheral.sent_byte(packet[1]) | |
elif command is BGBLinkCommand.Sync3: | |
if packet[1] == 0: | |
i1 = struct.unpack('<i', packet[4:8])[0] | |
self.send_packet(BGBLinkCommand.Sync3, i1=i1) | |
elif command is BGBLinkCommand.Status: | |
self.send_packet(BGBLinkCommand.Status, b2=0x01) | |
else: | |
print('Unknown BGB command {0:02X}'.format(packet[0])) | |
print(packet) | |
def send_packet(self, command, b2 = 0, b3 = 0, b4 = 0, i1 = 0): | |
packet = struct.pack('<BBBBi', command, b2, b3, b4, i1) | |
self.transport.write(packet) | |
class DeviceType(IntEnum): | |
PDC = 0x08 | |
cdmaOne = 0x09 | |
DoCoMoPHS = 0x0A | |
DDIPocket = 0x0B | |
Unused_0C = 0x0C | |
Unused_0D = 0x0D | |
Unused_0E = 0x0E | |
Unused_0F = 0x0F | |
class GBICommand(IntFlag): | |
SessionStart = 0x10 | |
SessionEnd = 0x11 | |
PlaceCall = 0x12 | |
EndCall = 0x13 | |
AwaitCall = 0x14 | |
TransferData = 0x15 | |
# = 0x16 | |
GetLineStatus = 0x17 | |
# = 0x18 | |
ReadConfig = 0x19 | |
WriteConfig = 0x1A | |
# = 0x1B | |
# = 0x1C | |
# = 0x1D | |
# = 0x1E | |
TransferEnd = 0x1F | |
# = 0x20 | |
ISPLogIn = 0x21 | |
ISPLogOut = 0x22 | |
TCPConnect = 0x23 | |
TCPDisconnect = 0x24 | |
# = 0x25 | |
# = 0x26 | |
# = 0x27 | |
DNSQuery = 0x28 | |
TCPConnected = 0x80 # adapter sets this flag in responses | |
class GBIPacket: | |
def __init__(self, command=None, body=b''): | |
if command is None: | |
self.data = bytearray() | |
else: | |
self.data = bytearray([command, 0, 0, len(body)]) + body | |
self.append_checksum() | |
def __getitem__(self, key): | |
return self.data[key] | |
def __len__(self): | |
return len(self.data) | |
def __repr__(self): | |
command = self.command() | |
command_name = str(command & ~GBICommand.TCPConnected) | |
if command & GBICommand.TCPConnected: | |
command_name += '|TCPConnected' | |
return '<{0}: {1}>'.format(command_name, codecs.encode(self.body(), 'hex')) | |
def append(self, b): | |
self.data.append(b) | |
def append_checksum(self): | |
checksum = sum(self.data) % 65536 | |
self.data += struct.pack('>H', checksum) | |
def body(self): | |
return self[4:self.body_len() + 4] if len(self) >= 4 + self.body_len() else None | |
def body_len(self): | |
return self[3] if len(self) > 3 else None | |
def checksum(self): | |
return struct.unpack('>H', self[-2:])[0] if self.is_complete() else None | |
def command(self): | |
return GBICommand(self[0]) if len(self) > 0 else None | |
def is_complete(self): | |
return len(self) == self.body_len() + 6 if self.body_len() is not None else False | |
def is_valid(self): | |
return self.is_complete() and self.checksum() == sum(self[:-2]) % 65536 | |
def set_body(self, body): | |
self.data = bytearray(self[0:2] + [len(body)] + body) | |
self.append_checksum() | |
def set_command(self, command): | |
self[0] = command | |
if self.is_complete(): | |
self.data = self[:-2] | |
self.append_checksum() | |
class TransferState(Enum): | |
Idle = 0 | |
ReceivePreamble = 1 | |
ReceivePacket = 2 | |
ReceiveDeviceID = 3 | |
ReceiveStatus = 4 | |
Processing = 5 | |
SendPreamble = 6 | |
SendPacket = 7 | |
SendDeviceID = 8 | |
SendStatus = 9 | |
Finish = 10 | |
class MobileAdapterGB: | |
def __init__(self, telephone, configuration_data = None, device_type = DeviceType.PDC): | |
self.device_type = device_type | |
if configuration_data is None: | |
configuration_data = bytearray([0] * 192) | |
self.configuration_data = configuration_data | |
self.adapter_state = TransferState.Idle | |
self.packet_future = None | |
self.telephone = telephone | |
def sent_byte(self, b): | |
"""Returns the next byte the peripheral will send.""" | |
if self.adapter_state == TransferState.Idle: | |
if b == 0x99: | |
self.adapter_state = TransferState.ReceivePreamble | |
elif self.adapter_state == TransferState.ReceivePreamble: | |
if b == 0x66: | |
self.packet = GBIPacket() | |
self.adapter_state = TransferState.ReceivePacket | |
else: | |
self.adapter_state = TransferState.Idle | |
return 0xF1 | |
elif self.adapter_state == TransferState.ReceivePacket: | |
self.packet.append(b) | |
if self.packet.is_complete(): | |
self.adapter_state = TransferState.ReceiveDeviceID | |
return 0x80 ^ self.device_type | |
elif self.adapter_state == TransferState.ReceiveDeviceID: | |
if self.packet.is_valid(): | |
self.adapter_state = TransferState.ReceiveStatus | |
return 0x80 ^ self.packet.command() | |
else: | |
self.adapter_state = TransferState.Idle | |
return 0xF1 | |
elif self.adapter_state == TransferState.ReceiveStatus: | |
self.adapter_state = TransferState.Processing | |
loop = asyncio.get_event_loop() | |
self.packet_future = loop.run_in_executor(None, self.respond, self.packet) | |
elif self.adapter_state == TransferState.Processing: | |
if self.packet_future.done(): | |
self.adapter_state = TransferState.SendPreamble | |
self.packet = self.packet_future.result() | |
return 0x99 | |
elif self.adapter_state == TransferState.SendPreamble: | |
self.adapter_state = TransferState.SendPacket | |
self.bytes_left = len(self.packet) | |
return 0x66 | |
elif self.adapter_state == TransferState.SendPacket: | |
b = self.packet[-self.bytes_left] | |
self.bytes_left -= 1 | |
if self.bytes_left == 0: | |
self.adapter_state = TransferState.SendDeviceID | |
return b | |
elif self.adapter_state == TransferState.SendDeviceID: | |
self.adapter_state = TransferState.SendStatus | |
return 0x80 ^ self.device_type | |
elif self.adapter_state == TransferState.SendStatus: | |
self.adapter_state = TransferState.Finish | |
return 0x00 | |
elif self.adapter_state == TransferState.Finish: | |
self.adapter_state = TransferState.Idle | |
# if nothing else, send 0xD2 | |
return 0xD2 | |
def respond(self, packet): | |
print('>> {0}'.format(packet)) | |
command = packet.command() | |
if command in [GBICommand.PlaceCall, GBICommand.EndCall, GBICommand.AwaitCall, GBICommand.ISPLogIn, GBICommand.ISPLogOut, GBICommand.TCPConnect, GBICommand.TCPDisconnect, GBICommand.DNSQuery]: | |
response_packet = self.telephone.exchange_packet(packet) | |
print('<< {0}'.format(response_packet)) | |
return response_packet | |
response = b'' | |
if command == GBICommand.SessionStart: | |
response = b'NINTENDO' | |
elif command == GBICommand.SessionEnd: | |
pass | |
# null response | |
elif command == GBICommand.TransferData: | |
self.telephone.write(packet.body()[1:]) | |
data = self.telephone.read(254) | |
if data is None: | |
command = GBICommand.TransferEnd | GBICommand.TCPConnected | |
response = b'\x00' + (data if data is not None else b'') | |
elif command == GBICommand.GetLineStatus: | |
response = b'\x05' if self.telephone.busy() else b'\x00' | |
elif command == GBICommand.ReadConfig: | |
offset, length = packet.body()[0:2] | |
response = bytearray([offset]) + self.configuration_data[offset : offset + length] | |
elif command == GBICommand.WriteConfig: | |
offset = ord(packet.body()[0]) | |
length = len(packet.body()) - 1 | |
self.configuration_data[offset : offset + length] = packet.body()[1:] | |
# null response | |
if self.telephone.tcp_connected(): | |
command |= GBICommand.TCPConnected | |
response_packet = GBIPacket(command, response) | |
print('<< {0}'.format(response_packet)) | |
return response_packet | |
class Telephone: | |
def __init__(self, isp): | |
self.peer = None | |
self.isp = isp | |
def busy(self): | |
return self.peer is not None | |
def exchange_packet(self, packet): | |
command = packet.command() | |
if command in [GBICommand.ISPLogIn, GBICommand.ISPLogOut, GBICommand.TCPConnect, GBICommand.TCPDisconnect, GBICommand.DNSQuery]: | |
return self.peer.exchange_packet(packet) | |
if command == GBICommand.PlaceCall: | |
if packet.body()[1:] in [b'#9677', b'0077487751']: | |
self.peer = self.isp | |
else: | |
# would connect to a peer here | |
pass | |
elif command == GBICommand.EndCall: | |
self.peer = None | |
elif command == GBICommand.AwaitCall: | |
# would wait for a peer to connect here | |
pass | |
# PlaceCall, EndCall, and AwaitCall all have empty responses | |
if self.tcp_connected(): | |
command |= GBICommand.TCPConnected | |
return GBIPacket(command) | |
def read(self, length): | |
return self.peer.read(length) if self.peer is not None else None | |
def tcp_connected(self): | |
return self.peer.tcp_connected() if self.peer is not None else False | |
def write(self, data): | |
if self.peer is not None: | |
self.peer.write(data) | |
class DummyISP: | |
def __init__(self, dns_list): | |
self.dns_list = dns_list | |
self.peer = None | |
def exchange_packet(self, packet): | |
command = packet.command() | |
response = bytearray() | |
if command == GBICommand.DNSQuery: | |
for ip_port, domain, server in self.dns_list: | |
if packet.body().endswith(domain): | |
response = ip_port[0:4] | |
elif command == GBICommand.TCPConnect: | |
for ip_port, domain, server in self.dns_list: | |
if packet.body() == ip_port: | |
self.peer = server | |
self.peer.reset() | |
response = b'\xFF' | |
elif command == GBICommand.TCPDisconnect: | |
self.peer = None | |
if self.tcp_connected(): | |
command |= GBICommand.TCPConnected | |
return GBIPacket(command, response) | |
def read(self, length): | |
return self.peer.read(length) if self.peer is not None else None | |
def tcp_connected(self): | |
return self.peer.tcp_connected() if self.peer is not None else False | |
def write(self, data): | |
if self.peer is not None: | |
self.peer.write(data) | |
class MailServer: | |
def __init__(self): | |
self.email = ( | |
b'From: MISSINGNO.\r\n' # The game requires the From and Date headers to be present, even though it doesn’t read them | |
b'Date: Sat, 27 Jan 2001 12:34:56 +0900\r\n' | |
b'X-Game-code: CGB-BXTJ-00\r\n' | |
b'X-Game-result: 1 8abe3fd7 0210 0310 1\r\n' # Change this to match your TID/SID and the trade you’re making | |
b'X-GBmail-type: exclusive\r\n' # This tells Mobile Trainer not to mess with it | |
b'\r\n' | |
# Base64-encoded Pokémon data | |
b'meOQroYQtyEcEACKvgAGTAAAAAAAAAAAAAB2OyMOIwBHAI4SDgAAACYAJgATABEA' | |
b'FQARABGZ45CuhkOsQ1BQoRmBpouMkp8L4xrjyX9/f05DrEOg46bJf5KMk6CsjeML' | |
b'meOQroaKvhC3\r\n') | |
self.reset() | |
def read(self, length): | |
response = self.response[:length] | |
self.response = self.response[length:] | |
return response | |
def reset(self): | |
self.has_greeted = False | |
self.partial_request = bytearray() | |
self.response = bytearray() | |
def tcp_connected(self): | |
return True | |
def write(self, data): | |
self.partial_request += data | |
response = b'' | |
if not self.has_greeted: | |
response = b'+OK\r\n' | |
self.has_greeted = True | |
elif b'\r\n' in self.partial_request: | |
request, self.partial_request = self.partial_request.split(b'\r\n', 1) | |
if request.startswith(b'STAT') or request.startswith(b'LIST 1'): | |
response = b'+OK 1 %i\r\n' % len(self.email) | |
elif request.startswith(b'LIST '): | |
response = b'-ERR\r\n' | |
elif request.startswith(b'LIST'): | |
response = b'+OK\r\n1 %i\r\n.\r\n' % len(self.email) | |
elif request.startswith(b'TOP 1 0'): | |
response = b'+OK\r\n%b\r\n\r\n.\r\n' % self.email.split(b'\r\n\r\n')[0] | |
elif request.startswith(b'RETR 1'): | |
response = b'+OK\r\n%b\r\n.\r\n' % self.email | |
elif len(request) > 0: | |
response = b'+OK\r\n' | |
else: | |
response = b'-ERR\r\n' | |
self.response += response | |
def http_response(status, headers = {}, content = b''): | |
statuses = {200: b'OK', 400: b'Bad Request', 401: b'Unauthorized', 404: b'Not Found'} | |
date = datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=9))) | |
days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] | |
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] | |
headers[b'Date'] = '{0}, {3:02d} {2} {1:04d} {4:02d}:{5:02d}:{6:02d} +0900'.format(days[date.weekday()], date.year, months[date.month - 1], date.day, date.hour, date.minute, date.second).encode('ascii') | |
response = bytearray() | |
response += b'HTTP/1.0 ' + str(status).encode('ascii') + b' ' + statuses[status] + b'\r\n' | |
for header, value in headers.items(): | |
response += header + b': ' + value + b'\r\n' | |
response += b'\r\n' + content | |
return response | |
class HTTPServer: | |
def __init__(self, resources): | |
self.resources = resources | |
self.reset() | |
def parse_request(self, request): | |
rval = {} | |
action, headers = request.split(b'\r\n', 1) | |
rval['method'], rval['resource'], rval['protocol'] = action.split(b' ', 2) | |
if b'?' in rval['resource']: | |
rval['resource'], query_string = rval['resource'].split(b'?', 1) | |
rval['query'] = {} | |
query_string = query_string.split(b'&') | |
for query in query_string: | |
key, value = query.split(b'=', 1) | |
rval['query'][bytes(key)] = bytes(value) | |
rval['headers'] = {} | |
headers = headers.split(b'\r\n') | |
for header in headers: | |
key, value = header.split(b': ', 1) | |
rval['headers'][bytes(key)] = value | |
return rval | |
def respond(self, request, content = b''): | |
self.responding = True | |
resource = bytes(request['resource']) | |
if resource in self.resources: | |
self.response = self.resources[resource](request, content) | |
else: | |
self.response = http_response(400) | |
self.eof = True | |
self.request = None | |
self.responding = False | |
def read(self, length): | |
response = self.response[:length] | |
self.response = self.response[length:] | |
if len(response) == 0 and self.eof: | |
return None | |
else: | |
return response | |
def reset(self): | |
self.eof = False | |
self.responding = False | |
self.partial_request = bytearray() | |
self.request = None | |
self.content = bytearray() | |
self.response = bytearray() | |
def tcp_connected(self): | |
return len(self.response) > 0 or not self.eof | |
def write(self, data): | |
if not self.responding and not self.eof: | |
self.partial_request += data | |
if self.request is None and b'\r\n\r\n' in self.partial_request: | |
request, self.partial_request = self.partial_request.split(b'\r\n\r\n', 1) | |
self.request = self.parse_request(request) | |
if self.request is not None: | |
if b'Content-Length' in self.request['headers']: | |
content_length = int(self.request['headers'][b'Content-Length']) | |
if len(self.partial_request) >= content_length: | |
self.content = self.partial_request[0:content_length] | |
self.partial_request = self.partial_request[content_length:] | |
Thread(target=self.respond, args=(self.request, self.content)).start() | |
elif self.request['method'] != b'POST': | |
Thread(target=self.respond, args=(self.request,)).start() | |
else: # POST request without Content-Length header | |
self.response = http_response(400) | |
self.request = None | |
class MobileDatacenter: | |
def __init__(self): | |
self.games = {} | |
def unauthenticated_request(self, request, content): | |
if b'name' in request['query']: | |
_, publisher, game_code, resource = bytes(request['query'][b'name']).split(b'/', 3) | |
resource = b'/' + resource # re-insert the leading slash | |
if (publisher, game_code) in self.games: | |
game = self.games[publisher, game_code] | |
if resource in game.resources: | |
return game.resources[resource](request, content) | |
return http_response(404) | |
def authenticated_request(self, request, content, is_upload = False): | |
response_headers = {} | |
if b'name' in request['query']: | |
if not b'Gb-Auth-ID' in request['headers']: | |
if not b'Authorization' in request['headers']: | |
response_headers[b'WWW-Authenticate'] = b'GB00 name="ABCDEFGHIJKLMNOPabcdefghijklmnopABCDEFGHIJKLMNOP"' | |
return http_response(401, headers=response_headers) | |
else: | |
response_headers[b'Gb-Auth-ID'] = b'HAIL GIOVANNI' | |
if(is_upload): # Don’t bite on the GET request, it’s just establishing authentication | |
return http_response(200) | |
return self.unauthenticated_request(request, content) | |
def authenticated_upload(self, request, content): | |
return self.authenticated_request(request, content, is_upload=True) | |
def register_game(self, game): | |
self.games[game.publisher, game.game_code] = game | |
class PokémonCrystal: | |
def __init__(self): | |
self.publisher = b'01' # Nintendo | |
self.game_code = b'CGB-BXTJ' | |
self.resources = {} | |
def generate_battle_tower(self, english_rom): | |
room_count = 20 * 10 # must be a multiple of 10 (one for each level from 10 to 100) | |
english_rom.seek(0x1F0000) | |
trainer_text = english_rom.read(70 * 3 * 12) | |
english_rom.seek(0x1F814E) | |
trainer_classes = english_rom.read(70 * 11) | |
english_rom.seek(0x1F8450) | |
trainer_parties = english_rom.read(70 * 3 * 59) | |
# The Japanese ROM (CGB-BXTJ-0) does not include any sample data for the | |
# Battle Tower. The English ROMs (CGB-BYTE-0, CGB-BYTE-1, CGB-BYTU-0) | |
# do, but the trainer names were localized for the single-player Battle | |
# Tower. The following list transcribes those names into katakana. Some | |
# of them will be truncated because the Japanese game allows only five | |
# characters for trainer names. | |
trainer_names = [ | |
'ハンソン', 'ソーヤー', 'マスダ', 'ニッケル', 'オルソン', 'ザボロブスキ', 'ライト', | |
'アレクサンダー', 'カワカミ', 'ビケット', 'サイトウ', 'クロフォード', 'ディアズ', 'エリクソン', | |
'フェアフィールド', 'ハンター', 'ヒル', 'ハビエル', 'カウフマン', 'ランカスター', 'マックメーヒル', | |
'オブライアン', 'フロスト', 'モース', 'ユフネ', 'ラジャン', 'ロドリゲス', 'サンティアゴ', | |
'ストック', 'サーマン', 'バレンティーノ', 'ワーグナー', 'イェーツ', 'アンドルーズ', 'バーン', | |
'モリ', 'バックマン', 'コブ', 'ヒューズ', 'アリタ', 'イーストン', 'フリーマン', | |
'ギーゼ', 'ハッチャー', 'ジャクソン', 'カーン', 'レオン', 'マリーノ', 'ニューマン', | |
'グエン', 'オグデン', 'パーク', 'レイン', 'セルズ', 'ロックウェル', 'ソーントン', | |
'ターナー', 'バン・ダイク', 'ウォーカー', 'マイヤー', 'ジョンソン', 'アダムズ', 'スミス', | |
'タジリ', 'ベイカー', 'コリンズ', 'スマート', 'ダイクストラ', 'イートン', 'ウォン' | |
] | |
self.resources[b'/battle/index.txt'] = self.serve_static_resource( | |
b'http://gameboy.datacenter.ne.jp/cgb/ranking?name=/01/CGB-BXTJ/battle/battle.cgi\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/battle/rooms.cgb\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/utility?name=/01/CGB-BXTJ/battle/10roomXXXX.cgb\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/battle/leadersXXXX.cgb\r\n' | |
) | |
self.resources[b'/battle/rooms.cgb'] = self.serve_static_resource(struct.pack('>H', room_count)) | |
# A functioning Battle Tower would need to do something with the data uploaded here | |
self.resources[b'/battle/battle.cgi'] = self.serve_static_resource(b'') | |
self.battle_rooms = [] | |
self.room_leaders = [] | |
for i in range(room_count): | |
leader_list = bytearray() | |
room = bytearray() | |
level = i % 10 | |
# Initialize the leader list with the default leader | |
leader_list += self.unicode_to_bxtj(trainer_names[level * 7])[0:5].ljust(5, b'\x50') | |
self.room_leaders.append(leader_list) | |
leaders_path = '/battle/leaders{0:04d}.cgb'.format(i + 1).encode('ascii') | |
self.resources[leaders_path] = lambda request, content=b'', game=self, room=i: ( | |
http_response(200, content=game.room_leaders[room].ljust(150, b'\x00')) | |
) | |
for j in range(7): | |
index = level * 7 + j | |
room += self.unicode_to_bxtj(trainer_names[index])[0:5].ljust(5, b'\x50') | |
room.append(trainer_classes[index * 11 + 10]) | |
for k in range(3): | |
# The Pokémon structure in the English ROM is 59 bytes, | |
# including an unused 11-byte “nickname” that is a rōmaji | |
# conversion of the species’s Japanese name. The Japanese | |
# game expects a 54-byte structure with a 6-byte nickname. | |
# By setting the nickname to an invalid character, the | |
# game will automatically replace it with the correct | |
# Japanese species name. | |
room += trainer_parties[(index * 3 + k) * 59 : (index * 3 + k) * 59 + 48] | |
room += b'\x01'.ljust(6, b'\x50') | |
room += trainer_text[index * 3 * 12 : (index + 1) * 3 * 12] | |
self.battle_rooms.append(room) | |
room_path = '/battle/10room{0:04d}.cgb'.format(i + 1).encode('ascii') | |
self.resources[room_path] = lambda request, content=b'', game=self, room=i: ( | |
http_response(200, content=game.battle_rooms[room]) | |
) | |
def generate_eggs(self, english_rom): | |
english_rom.seek(0x1FB552) | |
probabilities = english_rom.read(14 * 2) | |
eggs = english_rom.read(14 * 59) | |
# The eggs are instead nicknamed “EGG” in the English ROM. | |
egg_name = self.unicode_to_bxtj('タマゴ').ljust(6, b'\x50') | |
# The probabilities are stored in the English ROM as little-endian | |
# words, but index.txt must use big-endian ASCII hex. | |
hex_probabilities = b'' | |
for i in range(14): | |
probability = probabilities[i * 2 : (i + 1) * 2] | |
probability = struct.unpack('<H', probability)[0] | |
probability = struct.pack('>H', probability) | |
hex_probabilities += codecs.encode(probability, 'hex') | |
self.resources[b'/tamago/index.txt'] = self.serve_static_resource( | |
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/tamago/tamagoXX.cgb\r\n' | |
+ hex_probabilities + b'\r\n' | |
) | |
for i in range(14): | |
egg = eggs[i * 59 : i * 59 + 48] + egg_name | |
egg_path = '/tamago/tamago{0:02x}.cgb'.format(i).encode('ascii') | |
self.resources[egg_path] = self.serve_static_resource(egg) | |
def generate_news(self, english_rom): | |
self.resources[b'/news/index.txt'] = self.serve_static_resource( | |
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/news/info.cgb\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/news/0save.cgi\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/news/0ranking.cgi\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/utility?name=/01/CGB-BXTJ/news/100news.cgb\r\n' | |
) | |
self.resources[b'/news/info.cgb'] = self.serve_static_resource( | |
# 12-byte code used to identify this News | |
b''.join(bytes(random.choice(string.printable), 'ascii') for _ in range(12)) # 12-byte code used to identify this News | |
# Description displayed before downloading the News | |
+ self.unicode_to_bxtj('これは フェイクニュースですよ') | |
# Address in SRAM6 where rankings data will be downloaded | |
+ struct.pack('<H', 0xB000) | |
# Length of rankings table metadata | |
+ struct.pack('<H', 18) | |
# Record length of each rankings table | |
+ struct.pack('<HHHHHHHHH', 26, 26, 26, 26, 26, 26, 26, 26, 26) | |
# Save data upload instructions | |
+ struct.pack('<BHB', 0x00, 0xA000, 0x0D) | |
+ b'\xFF' | |
# Rankings data upload instructions | |
+ b'abc\x50' + struct.pack('<BHB', 0, 0xD802, 2) | |
+ b'\x50' | |
) | |
# A real Pokémon News agency would save the data uploaded here for analysis | |
self.resources[b'/news/0save.cgi'] = self.serve_static_resource(b'') | |
# A real Pokémon News agency would update the rankings with data uploaded here | |
self.resources[b'/news/0ranking.cgi'] = self.serve_static_resource( | |
bytes.fromhex( | |
'00 00 00 01' # total number of ranked players? | |
'00 00' # ??? | |
'00 00 00 01' # your ranking | |
'00 01' # number of entries in this table | |
'99 E3 90 AE 86 50 50' # name | |
'1A' # prefecture | |
'00 00' # postal code? | |
'19' # age | |
'00' # gender | |
'0D 02 00 05 00 00 22 02 01 05 00 00' # easy chat | |
'00 00' # score | |
) * 9 | |
) | |
english_rom.seek(0x1F4DD3) | |
news_issue = english_rom.read(0xFCC) | |
self.resources[b'/news/100news.cgb'] = self.serve_static_resource(news_issue) | |
def generate_trade_corner(self): | |
self.resources[b'/exchange/index.txt'] = self.serve_static_resource( | |
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi\r\n' | |
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/0cancel.cgi\r\n' | |
) | |
self.resources[b'/exchange/10upload.cgi'] = self.serve_static_resource(b'') | |
self.resources[b'/exchange/0cancel.cgi'] = self.serve_static_resource(b'') | |
def generate_mobile_stadium(self): | |
self.resources[b'/POKESTA/menu.cgb'] = self.serve_static_resource(b'\x00') | |
def serve_static_resource(self, response): | |
return lambda request, content=b'', resopnse=response: http_response(200, content=response) | |
def unicode_to_bxtj(self, s): | |
bxtj_encoding = ( | |
'�����ガギグゲゴザジズゼゾダヂヅデド�����バビブボ���' | |
'������がぎぐげござじずぜぞだぢづでど�����ばびぶべぼ�' | |
'パピプポぱぴぷぺぽ������\n����������������' | |
'�������������:ぃぅ「」『』・…ぁぇぉ������ ' | |
'アイウエオカキクケコサシスセソタチツテトナニヌネノハヒフホマミム' | |
'メモヤユヨラルレロワヲンッャュョィあいうえおかきくけこさしすせそ' | |
'たちつてとなにぬねのはひふへほまみむめもやゆよらりるれろわをんっ' | |
'ゃゅょー゜゛?!。ァゥェ▷▶▼♂円×./ォ♀0123456789' | |
) | |
overloaded = {'ヘ': 'へ', 'ベ': 'べ', 'ペ': 'ぺ', 'リ': 'り'} | |
rval = bytearray() | |
for char in s: | |
b = bxtj_encoding.find(overloaded[char] if char in overloaded else char) | |
rval.append(b if b in range(256) else 0) | |
rval.append(0x50) | |
return rval | |
arg_parser = ArgumentParser(description='Set BGB to “Link → Listen” and run this program to emulate a Mobile Adapter GB and a dummy Mobile System GB.') | |
arg_parser.add_argument('-c', '--config', default='mobilegb.cfg', help='Path to Mobile Adapter GB configuration data (default: mobilegb.cfg; will be created if missing, run Mobile Trainer to initialize it)') | |
arg_parser.add_argument('--crystal-rom', default='Pokemon - Crystal Version (USA, Europe).gbc', help='Path to an English-language Pokémon Crystal ROM file, containing data for the Battle Tower, Egg Ticket, and Pokémon News.') | |
arg_parser.add_argument('--host', default='127.0.0.1', help='IP address of the BGB instance (default: 127.0.0.1)') | |
arg_parser.add_argument('-p', '--port', default=8765, help='Port that BGB is listening on (default: 8765)') | |
args = arg_parser.parse_args() | |
configuration_data = bytearray() | |
try: | |
with open(args.config, 'rb') as f: | |
configuration_data = bytearray(f.read()) | |
f.closed | |
except FileNotFoundError: | |
pass | |
if(len(configuration_data) != 192): | |
print('Configuration data file “{0}” is invalid or does not exist.'.format(args['config'])) | |
print('Creating a blank configuration.\n') | |
loop = asyncio.get_event_loop() | |
loop.set_debug(True) | |
pokémon_crystal = PokémonCrystal() | |
pokémon_crystal.generate_trade_corner() | |
pokémon_crystal.generate_mobile_stadium() | |
try: | |
if args.crystal_rom is None: | |
raise FileNotFoundError() | |
with open(args.crystal_rom, 'rb') as english_rom: | |
pokémon_crystal.generate_battle_tower(english_rom) | |
pokémon_crystal.generate_eggs(english_rom) | |
pokémon_crystal.generate_news(english_rom) | |
f.closed | |
except FileNotFoundError: | |
print('English Pokémon Crystal ROM not found. Can’t load data for the Battle Tower, Egg Ticket, or Pokémon News.') | |
datacenter = MobileDatacenter() | |
datacenter.register_game(pokémon_crystal) | |
http_server = HTTPServer({b'/cgb/download': datacenter.unauthenticated_request, b'/cgb/upload': datacenter.authenticated_upload, b'/cgb/ranking': datacenter.unauthenticated_request, b'/cgb/utility': datacenter.authenticated_request}) | |
mail_server = MailServer() | |
isp = DummyISP([ | |
(b'\xFA\xCA\xDE\x01\x00\x50', b'gameboy.datacenter.ne.jp', http_server), | |
(b'\xFA\xCA\xDE\x02\x00\x6E', b'dion.ne.jp', mail_server)]) | |
telephone = Telephone(isp) | |
adapter = MobileAdapterGB(telephone, configuration_data, DeviceType.PDC) | |
server = BGBLinkServer(adapter) | |
coro = loop.create_connection(lambda: server, host=args.host, port=args.port) | |
(transport, _protocol) = loop.run_until_complete(coro) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
print('Saving configuration to “{0}”.'.format(args.config)) | |
with open(args.config, 'wb') as out: | |
out.write(adapter.configuration_data) | |
out.closed |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment