Skip to content

Instantly share code, notes, and snippets.

@raimannma
Last active October 18, 2022 17:21
Show Gist options
  • Save raimannma/46d41a6f4c060081d2f5f2b491e9d810 to your computer and use it in GitHub Desktop.
Save raimannma/46d41a6f4c060081d2f5f2b491e9d810 to your computer and use it in GitHub Desktop.
from typing import Union
import socketio
from socketio import Client
class XMPPClient:
client: Client = socketio.Client()
def __init__(self, address: str, port: int):
self.address = address
self.port = port
def connect(self):
print("Try connecting to XMPP server...")
self.client.connect(f"https://{self.address}:{self.port}")
print("Connected to XMPP server")
def disconnect(self):
self.client.disconnect()
@property
def is_alive(self):
return self.client.connected
@client.on("connect")
def on_connect(self):
print("I'm connected!")
@client.on("disconnect")
def on_disconnect(self):
print("I'm disconnected!")
def send_message(self, message: Union[str, dict, bytes, list]):
self.client.emit("message", message)
@client.on("*")
def catch_all(self, event, data):
print("-" * 20)
print("Received event: ", event)
print("Received data: ", data)
@client.event
def message(self, data: Union[str, dict, bytes, list]):
self.on_receive(data)
def on_receive(self, data: Union[str, dict, bytes, list]):
raise NotImplementedError
class RiotClient(XMPPClient):
def __init__(
self, region: str, access_token: str, entitlements_token: str, pas_token: str
):
self.region = region
self.access_token = access_token
self.entitlements_token = entitlements_token
self.pas_token = pas_token
super().__init__(f"{self.region}.chat.si.riotgames.com", 5223)
def on_connect(self):
super().on_connect()
self.send_initial_presence()
def send_initial_presence(self):
messages = [
init := (
'<?xml version="1.0" encoding="UTF-8"?>'
f'<stream:stream to="{self.region}.pvp.net" '
'xml:lang="en" version="1.0" '
'xmlns="jabber:client" xmlns:stream="http://etherx.jabber.org/streams">'
),
(
'<auth mechanism="X-Riot-RSO-PAS" xmlns="urn:ietf:params:xml:ns:xmpp-sasl">'
f"<rso_token>{self.access_token}</rso_token>"
f"<pas_token>{self.pas_token}</pas_token>"
"</auth>"
),
init,
(
'<iq id="_xmpp_bind1" type="set">'
'<bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">'
'<puuid-mode enabled="true"/>'
"<resource>RC-2709252368</resource>"
"</bind>"
"</iq>"
),
(
'<iq id="_xmpp_session1" type="set">'
'<session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>'
"</iq>"
),
(
'<iq type="set" id="xmpp_entitlements_0">'
'<entitlements xmlns="urn:riotgames:entitlements">'
f"<token>{self.entitlements_token}</token>"
"</entitlements>"
"</iq>"
'<iq type="set" id="set_rxep_1">'
'<rxep xmlns="urn:riotgames:rxep">'
'&lt;last-online-state enabled="true" /&gt;'
"</rxep>"
"</iq>"
'<iq id="_xmpp_session1" type="set">'
'<session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>'
"</iq>"
),
]
for message in messages:
self.send_message(message)
def on_receive(self, data: Union[str, dict, bytes, list]):
print("-" * 20)
print("Received data: ", data)
def send_friend_request(self, username: str, tagline: str):
self.send_message(
'<iq type="set" id="roster_add_10">'
'<query xmlns="jabber:iq:riotgames:roster">'
'<item subscription="pending_out">'
f'<id name="{username}" tagline="{tagline}"/>'
"</item>"
"</query>"
"</iq>"
)
def delete_friend_request(self, user_jid: str):
self.send_message(
'<iq type="set" id="roster_remove_1">'
'<query xmlns="jabber:iq:riotgames:roster">'
f'<item subscription="remove" jid="{user_jid}">'
"</item>"
"</query>"
"</iq>"
)
if __name__ == "__main__":
client = RiotClient(
region="tr1",
access_token="",
entitlements_token="",
pas_token="",
)
client.connect()
# client.send_friend_request("max1ne", "3131")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment