Skip to content

Instantly share code, notes, and snippets.

@hatschky
Created May 15, 2020 05:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hatschky/83e0cdf1d2b6710c9f883ff20a629774 to your computer and use it in GitHub Desktop.
Save hatschky/83e0cdf1d2b6710c9f883ff20a629774 to your computer and use it in GitHub Desktop.
from argparse import ArgumentParser
import asyncio
import codecs
import datetime
from enum import Enum, IntEnum, IntFlag
import random
import string
import struct
from threading import Thread
import time
class BGBLinkCommand(IntEnum):
Version = 1
Joypad = 101
Sync1 = 104
Sync2 = 105
Sync3 = 106
Status = 108
WantDisconnect = 109
class BGBLinkServer(asyncio.Protocol):
def __init__(self, peripheral):
self.peripheral = peripheral
self.data_buffer = bytearray()
self.byte_to_send = 0xD2
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
self.data_buffer += data
while len(self.data_buffer) >= 8:
packet = self.data_buffer[:8]
self.data_buffer = self.data_buffer[8:]
command = BGBLinkCommand(packet[0])
if command is BGBLinkCommand.Version:
self.send_packet(BGBLinkCommand.Version, b2=1, b3=4)
elif command is BGBLinkCommand.Joypad:
pass
elif command in [BGBLinkCommand.Sync1, BGBLinkCommand.Sync2]:
self.send_packet(packet[0], b2=self.byte_to_send)
self.byte_to_send = self.peripheral.sent_byte(packet[1])
elif command is BGBLinkCommand.Sync3:
if packet[1] == 0:
i1 = struct.unpack('<i', packet[4:8])[0]
self.send_packet(BGBLinkCommand.Sync3, i1=i1)
elif command is BGBLinkCommand.Status:
self.send_packet(BGBLinkCommand.Status, b2=0x01)
else:
print('Unknown BGB command {0:02X}'.format(packet[0]))
print(packet)
def send_packet(self, command, b2 = 0, b3 = 0, b4 = 0, i1 = 0):
packet = struct.pack('<BBBBi', command, b2, b3, b4, i1)
self.transport.write(packet)
class DeviceType(IntEnum):
PDC = 0x08
cdmaOne = 0x09
DoCoMoPHS = 0x0A
DDIPocket = 0x0B
Unused_0C = 0x0C
Unused_0D = 0x0D
Unused_0E = 0x0E
Unused_0F = 0x0F
class GBICommand(IntFlag):
SessionStart = 0x10
SessionEnd = 0x11
PlaceCall = 0x12
EndCall = 0x13
AwaitCall = 0x14
TransferData = 0x15
# = 0x16
GetLineStatus = 0x17
# = 0x18
ReadConfig = 0x19
WriteConfig = 0x1A
# = 0x1B
# = 0x1C
# = 0x1D
# = 0x1E
TransferEnd = 0x1F
# = 0x20
ISPLogIn = 0x21
ISPLogOut = 0x22
TCPConnect = 0x23
TCPDisconnect = 0x24
# = 0x25
# = 0x26
# = 0x27
DNSQuery = 0x28
TCPConnected = 0x80 # adapter sets this flag in responses
class GBIPacket:
def __init__(self, command=None, body=b''):
if command is None:
self.data = bytearray()
else:
self.data = bytearray([command, 0, 0, len(body)]) + body
self.append_checksum()
def __getitem__(self, key):
return self.data[key]
def __len__(self):
return len(self.data)
def __repr__(self):
command = self.command()
command_name = str(command & ~GBICommand.TCPConnected)
if command & GBICommand.TCPConnected:
command_name += '|TCPConnected'
return '<{0}: {1}>'.format(command_name, codecs.encode(self.body(), 'hex'))
def append(self, b):
self.data.append(b)
def append_checksum(self):
checksum = sum(self.data) % 65536
self.data += struct.pack('>H', checksum)
def body(self):
return self[4:self.body_len() + 4] if len(self) >= 4 + self.body_len() else None
def body_len(self):
return self[3] if len(self) > 3 else None
def checksum(self):
return struct.unpack('>H', self[-2:])[0] if self.is_complete() else None
def command(self):
return GBICommand(self[0]) if len(self) > 0 else None
def is_complete(self):
return len(self) == self.body_len() + 6 if self.body_len() is not None else False
def is_valid(self):
return self.is_complete() and self.checksum() == sum(self[:-2]) % 65536
def set_body(self, body):
self.data = bytearray(self[0:2] + [len(body)] + body)
self.append_checksum()
def set_command(self, command):
self[0] = command
if self.is_complete():
self.data = self[:-2]
self.append_checksum()
class TransferState(Enum):
Idle = 0
ReceivePreamble = 1
ReceivePacket = 2
ReceiveDeviceID = 3
ReceiveStatus = 4
Processing = 5
SendPreamble = 6
SendPacket = 7
SendDeviceID = 8
SendStatus = 9
Finish = 10
class MobileAdapterGB:
def __init__(self, telephone, configuration_data = None, device_type = DeviceType.PDC):
self.device_type = device_type
if configuration_data is None:
configuration_data = bytearray([0] * 192)
self.configuration_data = configuration_data
self.adapter_state = TransferState.Idle
self.packet_future = None
self.telephone = telephone
def sent_byte(self, b):
"""Returns the next byte the peripheral will send."""
if self.adapter_state == TransferState.Idle:
if b == 0x99:
self.adapter_state = TransferState.ReceivePreamble
elif self.adapter_state == TransferState.ReceivePreamble:
if b == 0x66:
self.packet = GBIPacket()
self.adapter_state = TransferState.ReceivePacket
else:
self.adapter_state = TransferState.Idle
return 0xF1
elif self.adapter_state == TransferState.ReceivePacket:
self.packet.append(b)
if self.packet.is_complete():
self.adapter_state = TransferState.ReceiveDeviceID
return 0x80 ^ self.device_type
elif self.adapter_state == TransferState.ReceiveDeviceID:
if self.packet.is_valid():
self.adapter_state = TransferState.ReceiveStatus
return 0x80 ^ self.packet.command()
else:
self.adapter_state = TransferState.Idle
return 0xF1
elif self.adapter_state == TransferState.ReceiveStatus:
self.adapter_state = TransferState.Processing
loop = asyncio.get_event_loop()
self.packet_future = loop.run_in_executor(None, self.respond, self.packet)
elif self.adapter_state == TransferState.Processing:
if self.packet_future.done():
self.adapter_state = TransferState.SendPreamble
self.packet = self.packet_future.result()
return 0x99
elif self.adapter_state == TransferState.SendPreamble:
self.adapter_state = TransferState.SendPacket
self.bytes_left = len(self.packet)
return 0x66
elif self.adapter_state == TransferState.SendPacket:
b = self.packet[-self.bytes_left]
self.bytes_left -= 1
if self.bytes_left == 0:
self.adapter_state = TransferState.SendDeviceID
return b
elif self.adapter_state == TransferState.SendDeviceID:
self.adapter_state = TransferState.SendStatus
return 0x80 ^ self.device_type
elif self.adapter_state == TransferState.SendStatus:
self.adapter_state = TransferState.Finish
return 0x00
elif self.adapter_state == TransferState.Finish:
self.adapter_state = TransferState.Idle
# if nothing else, send 0xD2
return 0xD2
def respond(self, packet):
print('>> {0}'.format(packet))
command = packet.command()
if command in [GBICommand.PlaceCall, GBICommand.EndCall, GBICommand.AwaitCall, GBICommand.ISPLogIn, GBICommand.ISPLogOut, GBICommand.TCPConnect, GBICommand.TCPDisconnect, GBICommand.DNSQuery]:
response_packet = self.telephone.exchange_packet(packet)
print('<< {0}'.format(response_packet))
return response_packet
response = b''
if command == GBICommand.SessionStart:
response = b'NINTENDO'
elif command == GBICommand.SessionEnd:
pass
# null response
elif command == GBICommand.TransferData:
self.telephone.write(packet.body()[1:])
data = self.telephone.read(254)
if data is None:
command = GBICommand.TransferEnd | GBICommand.TCPConnected
response = b'\x00' + (data if data is not None else b'')
elif command == GBICommand.GetLineStatus:
response = b'\x05' if self.telephone.busy() else b'\x00'
elif command == GBICommand.ReadConfig:
offset, length = packet.body()[0:2]
response = bytearray([offset]) + self.configuration_data[offset : offset + length]
elif command == GBICommand.WriteConfig:
offset = ord(packet.body()[0])
length = len(packet.body()) - 1
self.configuration_data[offset : offset + length] = packet.body()[1:]
# null response
if self.telephone.tcp_connected():
command |= GBICommand.TCPConnected
response_packet = GBIPacket(command, response)
print('<< {0}'.format(response_packet))
return response_packet
class Telephone:
def __init__(self, isp):
self.peer = None
self.isp = isp
def busy(self):
return self.peer is not None
def exchange_packet(self, packet):
command = packet.command()
if command in [GBICommand.ISPLogIn, GBICommand.ISPLogOut, GBICommand.TCPConnect, GBICommand.TCPDisconnect, GBICommand.DNSQuery]:
return self.peer.exchange_packet(packet)
if command == GBICommand.PlaceCall:
if packet.body()[1:] in [b'#9677', b'0077487751']:
self.peer = self.isp
else:
# would connect to a peer here
pass
elif command == GBICommand.EndCall:
self.peer = None
elif command == GBICommand.AwaitCall:
# would wait for a peer to connect here
pass
# PlaceCall, EndCall, and AwaitCall all have empty responses
if self.tcp_connected():
command |= GBICommand.TCPConnected
return GBIPacket(command)
def read(self, length):
return self.peer.read(length) if self.peer is not None else None
def tcp_connected(self):
return self.peer.tcp_connected() if self.peer is not None else False
def write(self, data):
if self.peer is not None:
self.peer.write(data)
class DummyISP:
def __init__(self, dns_list):
self.dns_list = dns_list
self.peer = None
def exchange_packet(self, packet):
command = packet.command()
response = bytearray()
if command == GBICommand.DNSQuery:
for ip_port, domain, server in self.dns_list:
if packet.body().endswith(domain):
response = ip_port[0:4]
elif command == GBICommand.TCPConnect:
for ip_port, domain, server in self.dns_list:
if packet.body() == ip_port:
self.peer = server
self.peer.reset()
response = b'\xFF'
elif command == GBICommand.TCPDisconnect:
self.peer = None
if self.tcp_connected():
command |= GBICommand.TCPConnected
return GBIPacket(command, response)
def read(self, length):
return self.peer.read(length) if self.peer is not None else None
def tcp_connected(self):
return self.peer.tcp_connected() if self.peer is not None else False
def write(self, data):
if self.peer is not None:
self.peer.write(data)
class MailServer:
def __init__(self):
self.email = (
b'From: MISSINGNO.\r\n' # The game requires the From and Date headers to be present, even though it doesn’t read them
b'Date: Sat, 27 Jan 2001 12:34:56 +0900\r\n'
b'X-Game-code: CGB-BXTJ-00\r\n'
b'X-Game-result: 1 8abe3fd7 0210 0310 1\r\n' # Change this to match your TID/SID and the trade you’re making
b'X-GBmail-type: exclusive\r\n' # This tells Mobile Trainer not to mess with it
b'\r\n'
# Base64-encoded Pokémon data
b'meOQroYQtyEcEACKvgAGTAAAAAAAAAAAAAB2OyMOIwBHAI4SDgAAACYAJgATABEA'
b'FQARABGZ45CuhkOsQ1BQoRmBpouMkp8L4xrjyX9/f05DrEOg46bJf5KMk6CsjeML'
b'meOQroaKvhC3\r\n')
self.reset()
def read(self, length):
response = self.response[:length]
self.response = self.response[length:]
return response
def reset(self):
self.has_greeted = False
self.partial_request = bytearray()
self.response = bytearray()
def tcp_connected(self):
return True
def write(self, data):
self.partial_request += data
response = b''
if not self.has_greeted:
response = b'+OK\r\n'
self.has_greeted = True
elif b'\r\n' in self.partial_request:
request, self.partial_request = self.partial_request.split(b'\r\n', 1)
if request.startswith(b'STAT') or request.startswith(b'LIST 1'):
response = b'+OK 1 %i\r\n' % len(self.email)
elif request.startswith(b'LIST '):
response = b'-ERR\r\n'
elif request.startswith(b'LIST'):
response = b'+OK\r\n1 %i\r\n.\r\n' % len(self.email)
elif request.startswith(b'TOP 1 0'):
response = b'+OK\r\n%b\r\n\r\n.\r\n' % self.email.split(b'\r\n\r\n')[0]
elif request.startswith(b'RETR 1'):
response = b'+OK\r\n%b\r\n.\r\n' % self.email
elif len(request) > 0:
response = b'+OK\r\n'
else:
response = b'-ERR\r\n'
self.response += response
def http_response(status, headers = {}, content = b''):
statuses = {200: b'OK', 400: b'Bad Request', 401: b'Unauthorized', 404: b'Not Found'}
date = datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=9)))
days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
headers[b'Date'] = '{0}, {3:02d} {2} {1:04d} {4:02d}:{5:02d}:{6:02d} +0900'.format(days[date.weekday()], date.year, months[date.month - 1], date.day, date.hour, date.minute, date.second).encode('ascii')
response = bytearray()
response += b'HTTP/1.0 ' + str(status).encode('ascii') + b' ' + statuses[status] + b'\r\n'
for header, value in headers.items():
response += header + b': ' + value + b'\r\n'
response += b'\r\n' + content
return response
class HTTPServer:
def __init__(self, resources):
self.resources = resources
self.reset()
def parse_request(self, request):
rval = {}
action, headers = request.split(b'\r\n', 1)
rval['method'], rval['resource'], rval['protocol'] = action.split(b' ', 2)
if b'?' in rval['resource']:
rval['resource'], query_string = rval['resource'].split(b'?', 1)
rval['query'] = {}
query_string = query_string.split(b'&')
for query in query_string:
key, value = query.split(b'=', 1)
rval['query'][bytes(key)] = bytes(value)
rval['headers'] = {}
headers = headers.split(b'\r\n')
for header in headers:
key, value = header.split(b': ', 1)
rval['headers'][bytes(key)] = value
return rval
def respond(self, request, content = b''):
self.responding = True
resource = bytes(request['resource'])
if resource in self.resources:
self.response = self.resources[resource](request, content)
else:
self.response = http_response(400)
self.eof = True
self.request = None
self.responding = False
def read(self, length):
response = self.response[:length]
self.response = self.response[length:]
if len(response) == 0 and self.eof:
return None
else:
return response
def reset(self):
self.eof = False
self.responding = False
self.partial_request = bytearray()
self.request = None
self.content = bytearray()
self.response = bytearray()
def tcp_connected(self):
return len(self.response) > 0 or not self.eof
def write(self, data):
if not self.responding and not self.eof:
self.partial_request += data
if self.request is None and b'\r\n\r\n' in self.partial_request:
request, self.partial_request = self.partial_request.split(b'\r\n\r\n', 1)
self.request = self.parse_request(request)
if self.request is not None:
if b'Content-Length' in self.request['headers']:
content_length = int(self.request['headers'][b'Content-Length'])
if len(self.partial_request) >= content_length:
self.content = self.partial_request[0:content_length]
self.partial_request = self.partial_request[content_length:]
Thread(target=self.respond, args=(self.request, self.content)).start()
elif self.request['method'] != b'POST':
Thread(target=self.respond, args=(self.request,)).start()
else: # POST request without Content-Length header
self.response = http_response(400)
self.request = None
class MobileDatacenter:
def __init__(self):
self.games = {}
def unauthenticated_request(self, request, content):
if b'name' in request['query']:
_, publisher, game_code, resource = bytes(request['query'][b'name']).split(b'/', 3)
resource = b'/' + resource # re-insert the leading slash
if (publisher, game_code) in self.games:
game = self.games[publisher, game_code]
if resource in game.resources:
return game.resources[resource](request, content)
return http_response(404)
def authenticated_request(self, request, content, is_upload = False):
response_headers = {}
if b'name' in request['query']:
if not b'Gb-Auth-ID' in request['headers']:
if not b'Authorization' in request['headers']:
response_headers[b'WWW-Authenticate'] = b'GB00 name="ABCDEFGHIJKLMNOPabcdefghijklmnopABCDEFGHIJKLMNOP"'
return http_response(401, headers=response_headers)
else:
response_headers[b'Gb-Auth-ID'] = b'HAIL GIOVANNI'
if(is_upload): # Don’t bite on the GET request, it’s just establishing authentication
return http_response(200)
return self.unauthenticated_request(request, content)
def authenticated_upload(self, request, content):
return self.authenticated_request(request, content, is_upload=True)
def register_game(self, game):
self.games[game.publisher, game.game_code] = game
class PokémonCrystal:
def __init__(self):
self.publisher = b'01' # Nintendo
self.game_code = b'CGB-BXTJ'
self.resources = {}
def generate_battle_tower(self, english_rom):
room_count = 20 * 10 # must be a multiple of 10 (one for each level from 10 to 100)
english_rom.seek(0x1F0000)
trainer_text = english_rom.read(70 * 3 * 12)
english_rom.seek(0x1F814E)
trainer_classes = english_rom.read(70 * 11)
english_rom.seek(0x1F8450)
trainer_parties = english_rom.read(70 * 3 * 59)
# The Japanese ROM (CGB-BXTJ-0) does not include any sample data for the
# Battle Tower. The English ROMs (CGB-BYTE-0, CGB-BYTE-1, CGB-BYTU-0)
# do, but the trainer names were localized for the single-player Battle
# Tower. The following list transcribes those names into katakana. Some
# of them will be truncated because the Japanese game allows only five
# characters for trainer names.
trainer_names = [
'ハンソン', 'ソーヤー', 'マスダ', 'ニッケル', 'オルソン', 'ザボロブスキ', 'ライト',
'アレクサンダー', 'カワカミ', 'ビケット', 'サイトウ', 'クロフォード', 'ディアズ', 'エリクソン',
'フェアフィールド', 'ハンター', 'ヒル', 'ハビエル', 'カウフマン', 'ランカスター', 'マックメーヒル',
'オブライアン', 'フロスト', 'モース', 'ユフネ', 'ラジャン', 'ロドリゲス', 'サンティアゴ',
'ストック', 'サーマン', 'バレンティーノ', 'ワーグナー', 'イェーツ', 'アンドルーズ', 'バーン',
'モリ', 'バックマン', 'コブ', 'ヒューズ', 'アリタ', 'イーストン', 'フリーマン',
'ギーゼ', 'ハッチャー', 'ジャクソン', 'カーン', 'レオン', 'マリーノ', 'ニューマン',
'グエン', 'オグデン', 'パーク', 'レイン', 'セルズ', 'ロックウェル', 'ソーントン',
'ターナー', 'バン・ダイク', 'ウォーカー', 'マイヤー', 'ジョンソン', 'アダムズ', 'スミス',
'タジリ', 'ベイカー', 'コリンズ', 'スマート', 'ダイクストラ', 'イートン', 'ウォン'
]
self.resources[b'/battle/index.txt'] = self.serve_static_resource(
b'http://gameboy.datacenter.ne.jp/cgb/ranking?name=/01/CGB-BXTJ/battle/battle.cgi\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/battle/rooms.cgb\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/utility?name=/01/CGB-BXTJ/battle/10roomXXXX.cgb\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/battle/leadersXXXX.cgb\r\n'
)
self.resources[b'/battle/rooms.cgb'] = self.serve_static_resource(struct.pack('>H', room_count))
# A functioning Battle Tower would need to do something with the data uploaded here
self.resources[b'/battle/battle.cgi'] = self.serve_static_resource(b'')
self.battle_rooms = []
self.room_leaders = []
for i in range(room_count):
leader_list = bytearray()
room = bytearray()
level = i % 10
# Initialize the leader list with the default leader
leader_list += self.unicode_to_bxtj(trainer_names[level * 7])[0:5].ljust(5, b'\x50')
self.room_leaders.append(leader_list)
leaders_path = '/battle/leaders{0:04d}.cgb'.format(i + 1).encode('ascii')
self.resources[leaders_path] = lambda request, content=b'', game=self, room=i: (
http_response(200, content=game.room_leaders[room].ljust(150, b'\x00'))
)
for j in range(7):
index = level * 7 + j
room += self.unicode_to_bxtj(trainer_names[index])[0:5].ljust(5, b'\x50')
room.append(trainer_classes[index * 11 + 10])
for k in range(3):
# The Pokémon structure in the English ROM is 59 bytes,
# including an unused 11-byte “nickname” that is a rōmaji
# conversion of the species’s Japanese name. The Japanese
# game expects a 54-byte structure with a 6-byte nickname.
# By setting the nickname to an invalid character, the
# game will automatically replace it with the correct
# Japanese species name.
room += trainer_parties[(index * 3 + k) * 59 : (index * 3 + k) * 59 + 48]
room += b'\x01'.ljust(6, b'\x50')
room += trainer_text[index * 3 * 12 : (index + 1) * 3 * 12]
self.battle_rooms.append(room)
room_path = '/battle/10room{0:04d}.cgb'.format(i + 1).encode('ascii')
self.resources[room_path] = lambda request, content=b'', game=self, room=i: (
http_response(200, content=game.battle_rooms[room])
)
def generate_eggs(self, english_rom):
english_rom.seek(0x1FB552)
probabilities = english_rom.read(14 * 2)
eggs = english_rom.read(14 * 59)
# The eggs are instead nicknamed “EGG” in the English ROM.
egg_name = self.unicode_to_bxtj('タマゴ').ljust(6, b'\x50')
# The probabilities are stored in the English ROM as little-endian
# words, but index.txt must use big-endian ASCII hex.
hex_probabilities = b''
for i in range(14):
probability = probabilities[i * 2 : (i + 1) * 2]
probability = struct.unpack('<H', probability)[0]
probability = struct.pack('>H', probability)
hex_probabilities += codecs.encode(probability, 'hex')
self.resources[b'/tamago/index.txt'] = self.serve_static_resource(
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/tamago/tamagoXX.cgb\r\n'
+ hex_probabilities + b'\r\n'
)
for i in range(14):
egg = eggs[i * 59 : i * 59 + 48] + egg_name
egg_path = '/tamago/tamago{0:02x}.cgb'.format(i).encode('ascii')
self.resources[egg_path] = self.serve_static_resource(egg)
def generate_news(self, english_rom):
self.resources[b'/news/index.txt'] = self.serve_static_resource(
b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/news/info.cgb\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/news/0save.cgi\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/news/0ranking.cgi\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/utility?name=/01/CGB-BXTJ/news/100news.cgb\r\n'
)
self.resources[b'/news/info.cgb'] = self.serve_static_resource(
# 12-byte code used to identify this News
b''.join(bytes(random.choice(string.printable), 'ascii') for _ in range(12)) # 12-byte code used to identify this News
# Description displayed before downloading the News
+ self.unicode_to_bxtj('これは フェイクニュースですよ')
# Address in SRAM6 where rankings data will be downloaded
+ struct.pack('<H', 0xB000)
# Length of rankings table metadata
+ struct.pack('<H', 18)
# Record length of each rankings table
+ struct.pack('<HHHHHHHHH', 26, 26, 26, 26, 26, 26, 26, 26, 26)
# Save data upload instructions
+ struct.pack('<BHB', 0x00, 0xA000, 0x0D)
+ b'\xFF'
# Rankings data upload instructions
+ b'abc\x50' + struct.pack('<BHB', 0, 0xD802, 2)
+ b'\x50'
)
# A real Pokémon News agency would save the data uploaded here for analysis
self.resources[b'/news/0save.cgi'] = self.serve_static_resource(b'')
# A real Pokémon News agency would update the rankings with data uploaded here
self.resources[b'/news/0ranking.cgi'] = self.serve_static_resource(
bytes.fromhex(
'00 00 00 01' # total number of ranked players?
'00 00' # ???
'00 00 00 01' # your ranking
'00 01' # number of entries in this table
'99 E3 90 AE 86 50 50' # name
'1A' # prefecture
'00 00' # postal code?
'19' # age
'00' # gender
'0D 02 00 05 00 00 22 02 01 05 00 00' # easy chat
'00 00' # score
) * 9
)
english_rom.seek(0x1F4DD3)
news_issue = english_rom.read(0xFCC)
self.resources[b'/news/100news.cgb'] = self.serve_static_resource(news_issue)
def generate_trade_corner(self):
self.resources[b'/exchange/index.txt'] = self.serve_static_resource(
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi\r\n'
b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/0cancel.cgi\r\n'
)
self.resources[b'/exchange/10upload.cgi'] = self.serve_static_resource(b'')
self.resources[b'/exchange/0cancel.cgi'] = self.serve_static_resource(b'')
def generate_mobile_stadium(self):
self.resources[b'/POKESTA/menu.cgb'] = self.serve_static_resource(b'\x00')
def serve_static_resource(self, response):
return lambda request, content=b'', resopnse=response: http_response(200, content=response)
def unicode_to_bxtj(self, s):
bxtj_encoding = (
'�����ガギグゲゴザジズゼゾダヂヅデド�����バビブボ���'
'������がぎぐげござじずぜぞだぢづでど�����ばびぶべぼ�'
'パピプポぱぴぷぺぽ������\n����������������'
'�������������:ぃぅ「」『』・…ぁぇぉ������ '
'アイウエオカキクケコサシスセソタチツテトナニヌネノハヒフホマミム'
'メモヤユヨラルレロワヲンッャュョィあいうえおかきくけこさしすせそ'
'たちつてとなにぬねのはひふへほまみむめもやゆよらりるれろわをんっ'
'ゃゅょー゜゛?!。ァゥェ▷▶▼♂円×./ォ♀0123456789'
)
overloaded = {'ヘ': 'へ', 'ベ': 'べ', 'ペ': 'ぺ', 'リ': 'り'}
rval = bytearray()
for char in s:
b = bxtj_encoding.find(overloaded[char] if char in overloaded else char)
rval.append(b if b in range(256) else 0)
rval.append(0x50)
return rval
arg_parser = ArgumentParser(description='Set BGB to “Link → Listen” and run this program to emulate a Mobile Adapter GB and a dummy Mobile System GB.')
arg_parser.add_argument('-c', '--config', default='mobilegb.cfg', help='Path to Mobile Adapter GB configuration data (default: mobilegb.cfg; will be created if missing, run Mobile Trainer to initialize it)')
arg_parser.add_argument('--crystal-rom', default='Pokemon - Crystal Version (USA, Europe).gbc', help='Path to an English-language Pokémon Crystal ROM file, containing data for the Battle Tower, Egg Ticket, and Pokémon News.')
arg_parser.add_argument('--host', default='127.0.0.1', help='IP address of the BGB instance (default: 127.0.0.1)')
arg_parser.add_argument('-p', '--port', default=8765, help='Port that BGB is listening on (default: 8765)')
args = arg_parser.parse_args()
configuration_data = bytearray()
try:
with open(args.config, 'rb') as f:
configuration_data = bytearray(f.read())
f.closed
except FileNotFoundError:
pass
if(len(configuration_data) != 192):
print('Configuration data file “{0}” is invalid or does not exist.'.format(args['config']))
print('Creating a blank configuration.\n')
loop = asyncio.get_event_loop()
loop.set_debug(True)
pokémon_crystal = PokémonCrystal()
pokémon_crystal.generate_trade_corner()
pokémon_crystal.generate_mobile_stadium()
try:
if args.crystal_rom is None:
raise FileNotFoundError()
with open(args.crystal_rom, 'rb') as english_rom:
pokémon_crystal.generate_battle_tower(english_rom)
pokémon_crystal.generate_eggs(english_rom)
pokémon_crystal.generate_news(english_rom)
f.closed
except FileNotFoundError:
print('English Pokémon Crystal ROM not found. Can’t load data for the Battle Tower, Egg Ticket, or Pokémon News.')
datacenter = MobileDatacenter()
datacenter.register_game(pokémon_crystal)
http_server = HTTPServer({b'/cgb/download': datacenter.unauthenticated_request, b'/cgb/upload': datacenter.authenticated_upload, b'/cgb/ranking': datacenter.unauthenticated_request, b'/cgb/utility': datacenter.authenticated_request})
mail_server = MailServer()
isp = DummyISP([
(b'\xFA\xCA\xDE\x01\x00\x50', b'gameboy.datacenter.ne.jp', http_server),
(b'\xFA\xCA\xDE\x02\x00\x6E', b'dion.ne.jp', mail_server)])
telephone = Telephone(isp)
adapter = MobileAdapterGB(telephone, configuration_data, DeviceType.PDC)
server = BGBLinkServer(adapter)
coro = loop.create_connection(lambda: server, host=args.host, port=args.port)
(transport, _protocol) = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
print('Saving configuration to “{0}”.'.format(args.config))
with open(args.config, 'wb') as out:
out.write(adapter.configuration_data)
out.closed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment