Created
December 25, 2023 12:55
-
-
Save qstokkink/12ad77031c18bf234917a12c3e859260 to your computer and use it in GitHub Desktop.
Default asyncio TCP over IPv8 example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from asyncio import StreamReader, StreamWriter, StreamReaderProtocol, get_running_loop, Transport | |
from typing import Mapping, Any | |
from ipv8.community import Community | |
from ipv8.messaging.lazy_payload import vp_compile, VariablePayload | |
from ipv8.peer import Peer | |
@vp_compile | |
class Frame(VariablePayload): | |
msg_id = 230 | |
names = ["eof", "payload"] | |
format_list = ["B", "raw"] | |
class PeerTransport(Transport): | |
""" | |
Transport implementation using an IPv8 Community and Peer. | |
""" | |
def __init__(self, community: Community, peer: Peer, reader: StreamReader, extra: Mapping[str, Any] | None = None): | |
super().__init__(extra) | |
self.community = community | |
self.peer = peer | |
self.reader = reader | |
self.community.add_message_handler(Frame, self.on_packet) | |
def write(self, data: bytes) -> None: | |
self.community.ez_send(self.peer, Frame(False, data), sig=False) | |
def write_eof(self) -> None: | |
self.community.ez_send(self.peer, Frame(True, b""), sig=False) | |
def on_packet(self, source_address, data): | |
frame = self.community._ez_unpack_noauth(Frame, data, False) | |
if self.community.network.get_verified_by_address(source_address) == self.peer: | |
if frame.payload: | |
self.reader.feed_data(frame.payload) | |
if frame.eof: | |
self.reader.feed_eof() | |
async def open_connection(community: Community, peer: Peer) -> tuple[StreamReader, StreamWriter]: | |
""" | |
Asyncio-like open_connection but with IPv8, to a given Peer over a certain Community. | |
""" | |
loop = get_running_loop() | |
reader = StreamReader() | |
protocol = StreamReaderProtocol(reader) | |
transport = PeerTransport(community, peer, reader) | |
writer = StreamWriter(transport, protocol, reader, loop) | |
return reader, writer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import typing | |
from asyncio import sleep, run | |
from ipv8.community import CommunitySettings, Community | |
from ipv8.keyvault.crypto import default_eccrypto | |
from ipv8.peer import Peer | |
from ipv8.peerdiscovery.network import Network | |
from ipv8.test.mocking.endpoint import AutoMockEndpoint | |
from tcp_over_ipv8 import open_connection | |
class TestCommunity(Community): | |
community_id = b'\x20' * 20 | |
class TestPeerEnv(typing.NamedTuple): | |
settings: CommunitySettings | |
community: Community | |
@classmethod | |
def create(cls): | |
settings = CommunitySettings(my_peer=Peer(default_eccrypto.generate_key("low")), network=Network(), | |
endpoint=AutoMockEndpoint()) | |
out = cls(settings=settings, community=TestCommunity(settings)) | |
out.community.cancel_all_pending_tasks() | |
settings.endpoint.open() | |
return out | |
def public_peer(self): | |
""" | |
How others see this peer (using a public key). | |
""" | |
return Peer(self.settings.my_peer.public_key.key_to_bin(), self.settings.endpoint.wan_address) | |
def introduce_to(self, other: TestPeerEnv): | |
""" | |
Normally, this is bi-directional and happens as the result of a walk. | |
""" | |
other.community.network.add_verified_peer(self.public_peer()) | |
async def main(): | |
# Setup, normally this happens in an IPv8 instance when peers connect. | |
env1 = TestPeerEnv.create() | |
env2 = TestPeerEnv.create() | |
env1.introduce_to(env2) | |
env2.introduce_to(env1) | |
# The actual usage: do some synchronous TCP-like stuff. | |
# Normally, you would do this in some message handler or something. | |
reader1, writer1 = await open_connection(env1.community, env2.public_peer()) | |
reader2, writer2 = await open_connection(env2.community, env1.public_peer()) | |
writer1.write(b"hello!") | |
writer1.write_eof() | |
await sleep(0) | |
print("We got our", (await reader2.read()).decode(), "Now, we wait for the responses.") | |
# This definitely does not fit in a single UDP packet | |
for _ in range(4000): | |
writer2.write(b"hello") | |
await sleep(0) | |
writer2.write(b" to") | |
await sleep(0) | |
writer2.write(b" you") | |
await sleep(0) | |
writer2.write(b" too!") | |
await sleep(0) | |
writer2.write(b"\n") | |
await sleep(0) | |
writer2.write_eof() | |
await sleep(0) | |
buf = await reader1.read() | |
print("Peer 2 sent us", len(buf), "bytes:") | |
print(buf.decode()) | |
if __name__ == "__main__": | |
run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment