Skip to content

Instantly share code, notes, and snippets.

@Jerrylum
Created April 17, 2022 02:00
Show Gist options
  • Save Jerrylum/fc8ac643d63dd0e7716046f59bb8f8c3 to your computer and use it in GitHub Desktop.
Save Jerrylum/fc8ac643d63dd0e7716046f59bb8f8c3 to your computer and use it in GitHub Desktop.
import json
import random
from typing import Dict, Tuple
from xmlrpc.client import ProtocolError
import requests
from twisted.python import failure
from twisted.internet import reactor
from quarry.types.uuid import UUID
from quarry.net.proxy import UpstreamFactory, Upstream, DownstreamFactory, Downstream, Bridge
from quarry.net import auth, crypto
from twisted.internet import reactor
class MyUpstream(Upstream):
def packet_login_encryption_request(self, buff):
p_server_id = buff.unpack_string()
# 1.7.x
if self.protocol_version <= 5:
def unpack_array(b): return b.read(b.unpack('h'))
# 1.8.x
else:
def unpack_array(b): return b.read(b.unpack_varint(max_bits=16))
p_public_key = unpack_array(buff)
p_verify_token = unpack_array(buff)
if not self.factory.profile.online:
raise ProtocolError("Can't log into online-mode server while using"
" offline profile")
self.shared_secret = crypto.make_shared_secret()
self.public_key = crypto.import_public_key(p_public_key)
self.verify_token = p_verify_token
# make digest
digest = crypto.make_digest(
p_server_id.encode('ascii'),
self.shared_secret,
p_public_key)
# do auth
# deferred = self.factory.profile.join(digest)
# deferred.addCallbacks(self.auth_ok, self.auth_failed)
url = "https://sessionserver.mojang.com/session/minecraft/join"
payload = json.dumps({
"accessToken": self.factory.profile.access_token,
"selectedProfile": self.factory.profile.uuid.to_hex(False),
"serverId": digest
})
headers = {
'Content-Type': 'application/json'
}
r = requests.request(
"POST", "https://sessionserver.mojang.com/session/minecraft/join", headers=headers, data=payload)
if r.status_code == 204:
self.auth_ok(r.text)
else:
self.auth_failed(failure.Failure(
auth.AuthException('unverified', 'unverified username')))
class MyDownstream(Downstream):
def packet_login_encryption_response(self, buff):
if self.login_expecting != 1:
raise ProtocolError("Out-of-order login")
# 1.7.x
if self.protocol_version <= 5:
def unpack_array(b): return b.read(b.unpack('h'))
# 1.8.x
else:
def unpack_array(b): return b.read(b.unpack_varint(max_bits=16))
p_shared_secret = unpack_array(buff)
p_verify_token = unpack_array(buff)
shared_secret = crypto.decrypt_secret(
self.factory.keypair,
p_shared_secret)
verify_token = crypto.decrypt_secret(
self.factory.keypair,
p_verify_token)
self.login_expecting = None
if verify_token != self.verify_token:
raise ProtocolError("Verify token incorrect")
# enable encryption
self.cipher.enable(shared_secret)
self.logger.debug("Encryption enabled")
# make digest
digest = crypto.make_digest(
self.server_id.encode('ascii'),
shared_secret,
self.factory.public_key)
# do auth
remote_host = None
if self.factory.prevent_proxy_connections:
remote_host = self.remote_addr.host
# deferred = auth.has_joined(
# self.factory.auth_timeout,
# digest,
# self.display_name,
# remote_host)
# deferred.addCallbacks(self.auth_ok, self.auth_failed)
r = requests.get('https://sessionserver.mojang.com/session/minecraft/hasJoined',
params={'username': self.display_name, 'serverId': digest, 'ip': remote_host})
if r.status_code == 200:
self.auth_ok(r.json())
else:
self.auth_failed(failure.Failure(
auth.AuthException('invalid', 'invalid session')))
class MyUpstreamFactory(UpstreamFactory):
protocol = MyUpstream
connection_timeout = 10
class MyBridge(Bridge):
upstream_factory_class = MyUpstreamFactory
uuid_name_map: Dict[str, str] = {}
eid_uuid_map: Dict[int, str] = {}
eid_eqslot_map: Dict[int, Tuple[any]] = {}
def packet_downstream_spawn_player(self, buff):
buff.save()
eid = buff.unpack_varint()
uuid = buff.unpack_uuid()
self.eid_uuid_map[eid] = uuid
self.eid_eqslot_map[eid] = [None for _ in range(6)]
buff.restore()
self.packet_unhandled(buff, "downstream", "spawn_player")
def packet_downstream_entity_equipment(self, buff):
buff.save()
entity_id = buff.unpack_varint()
if entity_id in self.eid_uuid_map: # a player
while True:
eq_slot_raw = buff.read(1)[0]
eq_slot_enum = eq_slot_raw & 0b01111111
slot_data = buff.unpack_slot()
self.eid_eqslot_map[entity_id][eq_slot_enum] = slot_data
if not eq_slot_raw & 0b10000000:
break
buff.restore()
self.packet_unhandled(buff, "downstream", "entity_equipment")
def packet_downstream_player_list_item(self, buff):
buff.save()
action_id = buff.unpack_varint()
if action_id == 0:
count = buff.unpack_varint()
for _ in range(count):
uuid = buff.unpack_uuid()
name = buff.unpack_string()
num_properties = buff.unpack_varint()
for _ in range(num_properties):
buff.unpack_string()
buff.unpack_string()
buff.read(1)
buff.unpack_string()
buff.unpack_varint()
buff.unpack_varint()
buff.pack_optional(buff.pack_chat, None)
self.uuid_name_map[uuid] = name
buff.restore()
self.packet_unhandled(buff, "downstream", "player_list_item")
def packet_upstream_chat_message(self, buff):
buff.save()
chat_message = buff.unpack_string()
if chat_message.startswith("/equip "):
args = chat_message.split(" ")
name = args[1]
uuid = next((k for k, v in self.uuid_name_map.items() if v == name), None)
eid = next((k for k, v in self.eid_uuid_map.items() if v == uuid), None)
if eid is None:
print('player not found')
else:
eqslots = self.eid_eqslot_map[eid]
# TODO
buff.discard()
return
buff.restore()
self.packet_unhandled(buff, "upstream", "chat_message")
def make_profile(self):
"""
Support online mode
"""
# follow: https://kqzz.github.io/mc-bearer-token/
# accessToken = '<YOUR TOKEN>'
url = "https://api.minecraftservices.com/minecraft/profile"
headers = {'Authorization': 'Bearer ' + accessToken}
response = requests.request("GET", url, headers=headers)
result = response.json()
myUuid = UUID.from_hex(result['id'])
myUsername = result['name']
return auth.Profile('(skip)', accessToken, myUsername, myUuid)
class MyDownstreamFactory(DownstreamFactory):
protocol = MyDownstream
bridge_class = MyBridge
motd = "Proxy Server"
def main(argv):
# Parse options
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-a1", "--listen-host1", default="",
help="address to listen on")
parser.add_argument("-p1", "--listen-port1", default=25566,
type=int, help="port to listen on")
parser.add_argument("-b", "--connect-host",
default="127.0.0.1", help="address to connect to")
parser.add_argument("-q", "--connect-port", default=25565,
type=int, help="port to connect to")
args = parser.parse_args(argv)
# Create factory
factory = MyDownstreamFactory()
factory.connect_host = args.connect_host
factory.connect_port = args.connect_port
# Listen
factory.listen(args.listen_host1, args.listen_port1)
reactor.run()
if __name__ == "__main__":
import sys
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment