Skip to content

Instantly share code, notes, and snippets.

@fredo
Created November 18, 2020 15:38
Show Gist options
  • Save fredo/c8c242c2ca5ba3bba57dcd11d4e42f63 to your computer and use it in GitHub Desktop.
Save fredo/c8c242c2ca5ba3bba57dcd11d4e42f63 to your computer and use it in GitHub Desktop.
message_federation_penetrator.py
from gevent import monkey # isort:skip
monkey.patch_all() # isort:skip
from collections import defaultdict
from typing import List
import gevent
from gevent.event import Event
from matrix_client.errors import MatrixRequestError
from raiden.constants import (
DISCOVERY_DEFAULT_ROOM,
PATH_FINDING_BROADCASTING_ROOM,
Environment,
Networks,
)
from raiden.network.transport import MatrixTransport
from raiden.network.transport.matrix.client import MatrixSyncMessages, Room
from raiden.network.transport.matrix.rtc.utils import setup_asyncio_event_loop
from raiden.network.transport.matrix.utils import my_place_or_yours
from raiden.settings import CapabilitiesConfig, MatrixTransportConfig
from raiden.tests.utils.mocks import MockRaidenService
from raiden.utils.typing import RoomID
setup_asyncio_event_loop()
SERVER_PREFIX = "https://transport.transport0"
SERVER_SUFFIX = ".raiden.network"
RSB_ID_OUT_OF_ORDER = []
NUMBER_OF_USERS = 5
signers = list()
message_received_bag = defaultdict(lambda: defaultdict(list))
message_sent_bag = defaultdict(lambda: defaultdict(list))
message_failed_bag = defaultdict(lambda: defaultdict(list))
def _handle_invite(self, room_id: RoomID, state: dict) -> None:
try:
self._client.join_room(room_id_or_alias=room_id)
except MatrixRequestError as ex:
print(ex)
print(f"COULD NOT JOIN {room_id}")
def _handle_message_callback(self, message_batch: MatrixSyncMessages):
for room, room_messages in message_batch:
for message in room_messages:
sender = message["sender"]
text = message["content"]["body"]
print(f"receiver: {self._client.user_id}, sender: {sender}")
if self._client.user_id != sender:
message_received_bag[(sender, self._client.user_id)][sender].append(
text
)
def login_all() -> List[MatrixTransport]:
transports = []
for i in range(NUMBER_OF_USERS):
server_count = i % 5 + 1
if server_count in RSB_ID_OUT_OF_ORDER:
continue
server_url = f"{SERVER_PREFIX}{server_count}{SERVER_SUFFIX}"
transport = MatrixTransport(
config=MatrixTransportConfig(
broadcast_rooms=[
DISCOVERY_DEFAULT_ROOM,
PATH_FINDING_BROADCASTING_ROOM,
],
retries_before_backoff=2,
retry_interval_initial=2,
retry_interval_max=2,
server=server_url,
available_servers=[server_url],
capabilities_config=CapabilitiesConfig(web_rtc=False),
),
environment=Environment.PRODUCTION,
)
transports.append(transport)
return transports
def monkey_patch_listeners():
MatrixTransport._handle_invite = _handle_invite
MatrixTransport._handle_sync_messages = _handle_message_callback
def start_all(transports: List[MatrixTransport]) -> None:
greenlets_start_transport = []
for transport in transports:
raiden_service = MockRaidenService()
raiden_service.rpc_client.chain_id = Networks.GOERLI.value
greenlets_start_transport.append(
gevent.spawn(transport.start, raiden_service, [], None)
)
gevent.joinall(greenlets_start_transport, raise_error=True)
def create_rooms(transports: List[MatrixTransport]):
room_creation_greenlets = []
addresses = [transport._raiden_service.address for transport in transports]
for first_transport in transports:
addresses.remove(first_transport._raiden_service.address)
for second_transport in [
partner
for partner in transports
if partner._raiden_service.address in addresses
]:
inviter = my_place_or_yours(
first_transport._raiden_service.address,
second_transport._raiden_service.address,
)
inviter_transport = (
first_transport
if first_transport._raiden_service.address == inviter
else second_transport
)
invitee_transport = (
first_transport
if second_transport._raiden_service.address == inviter
else second_transport
)
room_creation_greenlets.append(
gevent.spawn(
inviter_transport._client.create_room,
invitees=[invitee_transport._client.user_id],
)
)
gevent.joinall(room_creation_greenlets, raise_error=True)
def send_messages(transports: List[MatrixTransport], number_of_messages: int = 10):
send_greenlets = []
for transport in transports:
rooms = transport._client.rooms
for room in rooms.values():
send_greenlets.extend(send_batch(room, number_of_messages))
gevent.joinall(send_greenlets)
def send_messages_to_device(
transports: List[MatrixTransport], number_of_messages: int = 10
):
send_greenlets = []
for sender in transports:
for receiver_user_id in [
transport._client.user_id for transport in transports if transport != sender
]:
send_greenlets.extend(
send_batch_to_device(sender, receiver_user_id, number_of_messages)
)
gevent.joinall(send_greenlets)
def send_batch(room: Room, number_of_messages: int):
return [
gevent.spawn(send_text, room=room, text=f"message {i}")
for i in range(number_of_messages)
]
def send_batch_to_device(transport, user_id, number_of_messages=10):
return [
gevent.spawn(
send_to_device, transport=transport, user_id=user_id, text=f"message {i}"
)
for i in range(number_of_messages)
]
def send_text(room: Room, text: str) -> None:
try:
room.send_text(text=text)
message_sent_bag[room.room_id][room.client.user_id].append(text)
except MatrixRequestError as ex:
message_failed_bag[room.room_id][room.client.user_id].append(text)
def send_to_device(transport, user_id, text):
body = {user_id: {"*": {"msgtype": "m.text", "body": text}}}
transport._client.api.send_to_device(event_type="m.room.message", messages=body)
return
def main() -> None:
stop_event = Event()
monkey_patch_listeners()
transports = login_all()
start_all(transports)
create_rooms(transports)
gevent.sleep(5)
send_messages(transports, 100)
# send_messages_to_device(transports, 100)
gevent.sleep(30)
for room_id, sender_to_messages in message_sent_bag.items():
for sender, sender_bag in sender_to_messages.items():
print(f"room_id {room_id} sent {len(sender_bag)} messages by {sender}")
for room_id, sender_to_messages in message_failed_bag.items():
for sender, sender_bag in sender_to_messages.items():
print(f"room_id {room_id} failed {len(sender_bag)} messages by {sender}")
for room_id, sender_to_messages in message_received_bag.items():
for sender, sender_bag in sender_to_messages.items():
print(f"room_id {room_id} received {len(sender_bag)} messages by {sender}")
gevent.wait([stop_event], timeout=30)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment