Skip to content

Instantly share code, notes, and snippets.

@qstokkink
Created December 25, 2023 12:55
Show Gist options
  • Save qstokkink/12ad77031c18bf234917a12c3e859260 to your computer and use it in GitHub Desktop.
Save qstokkink/12ad77031c18bf234917a12c3e859260 to your computer and use it in GitHub Desktop.
Default asyncio TCP over IPv8 example
from __future__ import annotations
from asyncio import StreamReader, StreamWriter, StreamReaderProtocol, get_running_loop, Transport
from typing import Mapping, Any
from ipv8.community import Community
from ipv8.messaging.lazy_payload import vp_compile, VariablePayload
from ipv8.peer import Peer
@vp_compile
class Frame(VariablePayload):
msg_id = 230
names = ["eof", "payload"]
format_list = ["B", "raw"]
class PeerTransport(Transport):
"""
Transport implementation using an IPv8 Community and Peer.
"""
def __init__(self, community: Community, peer: Peer, reader: StreamReader, extra: Mapping[str, Any] | None = None):
super().__init__(extra)
self.community = community
self.peer = peer
self.reader = reader
self.community.add_message_handler(Frame, self.on_packet)
def write(self, data: bytes) -> None:
self.community.ez_send(self.peer, Frame(False, data), sig=False)
def write_eof(self) -> None:
self.community.ez_send(self.peer, Frame(True, b""), sig=False)
def on_packet(self, source_address, data):
frame = self.community._ez_unpack_noauth(Frame, data, False)
if self.community.network.get_verified_by_address(source_address) == self.peer:
if frame.payload:
self.reader.feed_data(frame.payload)
if frame.eof:
self.reader.feed_eof()
async def open_connection(community: Community, peer: Peer) -> tuple[StreamReader, StreamWriter]:
"""
Asyncio-like open_connection but with IPv8, to a given Peer over a certain Community.
"""
loop = get_running_loop()
reader = StreamReader()
protocol = StreamReaderProtocol(reader)
transport = PeerTransport(community, peer, reader)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
from __future__ import annotations
import typing
from asyncio import sleep, run
from ipv8.community import CommunitySettings, Community
from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.peerdiscovery.network import Network
from ipv8.test.mocking.endpoint import AutoMockEndpoint
from tcp_over_ipv8 import open_connection
class TestCommunity(Community):
community_id = b'\x20' * 20
class TestPeerEnv(typing.NamedTuple):
settings: CommunitySettings
community: Community
@classmethod
def create(cls):
settings = CommunitySettings(my_peer=Peer(default_eccrypto.generate_key("low")), network=Network(),
endpoint=AutoMockEndpoint())
out = cls(settings=settings, community=TestCommunity(settings))
out.community.cancel_all_pending_tasks()
settings.endpoint.open()
return out
def public_peer(self):
"""
How others see this peer (using a public key).
"""
return Peer(self.settings.my_peer.public_key.key_to_bin(), self.settings.endpoint.wan_address)
def introduce_to(self, other: TestPeerEnv):
"""
Normally, this is bi-directional and happens as the result of a walk.
"""
other.community.network.add_verified_peer(self.public_peer())
async def main():
# Setup, normally this happens in an IPv8 instance when peers connect.
env1 = TestPeerEnv.create()
env2 = TestPeerEnv.create()
env1.introduce_to(env2)
env2.introduce_to(env1)
# The actual usage: do some synchronous TCP-like stuff.
# Normally, you would do this in some message handler or something.
reader1, writer1 = await open_connection(env1.community, env2.public_peer())
reader2, writer2 = await open_connection(env2.community, env1.public_peer())
writer1.write(b"hello!")
writer1.write_eof()
await sleep(0)
print("We got our", (await reader2.read()).decode(), "Now, we wait for the responses.")
# This definitely does not fit in a single UDP packet
for _ in range(4000):
writer2.write(b"hello")
await sleep(0)
writer2.write(b" to")
await sleep(0)
writer2.write(b" you")
await sleep(0)
writer2.write(b" too!")
await sleep(0)
writer2.write(b"\n")
await sleep(0)
writer2.write_eof()
await sleep(0)
buf = await reader1.read()
print("Peer 2 sent us", len(buf), "bytes:")
print(buf.decode())
if __name__ == "__main__":
run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment