Skip to content

Instantly share code, notes, and snippets.

@qstokkink
Created October 3, 2020 07:35
Show Gist options
  • Save qstokkink/32488e9a250649333b95aaa10886c4cb to your computer and use it in GitHub Desktop.
Save qstokkink/32488e9a250649333b95aaa10886c4cb to your computer and use it in GitHub Desktop.
Documentation checks
from asyncio import ensure_future, get_event_loop
from pyipv8.ipv8_service import IPv8
from pyipv8.ipv8.configuration import get_default_configuration
async def start_ipv8():
# Create an IPv8 object with the default settings.
ipv8 = IPv8(get_default_configuration())
await ipv8.start()
# Create a task that runs an IPv8 instance.
# The task will run as soon as the event loop has started.
ensure_future(start_ipv8())
# Start the asyncio event loop: this is the engine scheduling all of the
# asynchronous calls.
get_event_loop().run_forever()
from asyncio import ensure_future, get_event_loop
from pyipv8.ipv8_service import IPv8
from pyipv8.ipv8.configuration import get_default_configuration
async def start_ipv8():
# The first IPv8 will attempt to claim a port.
await IPv8(get_default_configuration()).start()
# The second IPv8 will attempt to claim a port.
# It cannot claim the same port and will end up claiming a different one.
await IPv8(get_default_configuration()).start()
ensure_future(start_ipv8())
get_event_loop().run_forever()
from asyncio import ensure_future, get_event_loop
from pyipv8.ipv8.community import Community
from pyipv8.ipv8_service import IPv8
from pyipv8.ipv8.configuration import ConfigBuilder, Strategy, WalkerDefinition
from pyipv8.ipv8.keyvault.crypto import ECCrypto
from pyipv8.ipv8.peer import Peer
class MyCommunity(Community):
# Register this community with a master peer.
# This peer defines the service identifier of this community.
# Other peers will connect to this community based on the sha-1
# hash of this peer's public key.
master_peer = Peer(ECCrypto().generate_key(u"medium"))
async def start_communities():
for i in [1, 2]:
builder = ConfigBuilder().clear_keys().clear_overlays()
# If we actually want to communicate between two different peers
# we need to assign them different keys.
# We will generate an EC key called 'my peer' which has 'medium'
# security and will be stored in file 'ecI.pem' where 'I' is replaced
# by the peer number (1 or 2).
builder.add_key("my peer", "medium", f"ec{i}.pem")
# Instruct IPv8 to load our custom overlay, registered in _COMMUNITIES.
# We use the 'my peer' key, which we registered before.
# We will attempt to find other peers in this overlay using the
# RandomWalk strategy, until we find 10 peers.
# We do not provide additional startup arguments or a function to run
# once the overlay has been initialized.
builder.add_overlay("MyCommunity", "my peer", [WalkerDefinition(Strategy.RandomWalk, 10, {'timeout': 3.0})], {}, [])
ipv8 = IPv8(builder.finalize(), extra_communities={'MyCommunity': MyCommunity})
await ipv8.start()
ensure_future(start_communities())
get_event_loop().run_forever()
from asyncio import ensure_future, get_event_loop
from pyipv8.ipv8.community import Community
from pyipv8.ipv8_service import IPv8
from pyipv8.ipv8.configuration import ConfigBuilder, Strategy, WalkerDefinition
from pyipv8.ipv8.keyvault.crypto import ECCrypto
from pyipv8.ipv8.peer import Peer
class MyCommunity(Community):
master_peer = Peer(ECCrypto().generate_key(u"medium"))
def started(self):
async def print_peers():
print("I am:", self.my_peer, "\nI know:", [str(p) for p in self.get_peers()])
# We register a asyncio task with this overlay.
# This makes sure that the task ends when this overlay is unloaded.
# We call the 'print_peers' function every 5.0 seconds, starting now.
self.register_task("print_peers", print_peers, interval=5.0, delay=0)
async def start_communities():
for i in [1, 2]:
builder = ConfigBuilder().clear_keys().clear_overlays()
builder.add_key("my peer", "medium", f"ec{i}.pem")
# We provide the 'started' function to the 'on_start'.
# We will call the overlay's 'started' function without any
# arguments once IPv8 is initialized.
builder.add_overlay("MyCommunity", "my peer", [WalkerDefinition(Strategy.RandomWalk, 10, {'timeout': 3.0})], {}, [('started', )])
await IPv8(builder.finalize(), extra_communities={'MyCommunity': MyCommunity}).start()
ensure_future(start_communities())
get_event_loop().run_forever()
from asyncio import ensure_future, get_event_loop
from pyipv8.ipv8.community import Community
from pyipv8.ipv8.configuration import ConfigBuilder, Strategy, WalkerDefinition
from pyipv8.ipv8.keyvault.crypto import ECCrypto
from pyipv8.ipv8.lazy_community import lazy_wrapper
from pyipv8.ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from pyipv8.ipv8.peer import Peer
from pyipv8.ipv8_service import IPv8
@vp_compile
class MyMessage(VariablePayload):
msg_id = 1 # The byte identifying this message, must be unique per community.
format_list = ['I'] # When reading data, we unpack an unsigned integer from it.
names = ["clock"] # We will name this unsigned integer "clock"
class MyCommunity(Community):
master_peer = Peer(ECCrypto().generate_key(u"medium"))
def __init__(self, my_peer, endpoint, network):
super(MyCommunity, self).__init__(my_peer, endpoint, network)
# Register the message handler for messages with the identifier "1".
self.add_message_handler(1, self.on_message)
# The Lamport clock this peer maintains.
# This is for the example of global clock synchronization.
self.lamport_clock = 0
def started(self):
async def start_communication():
if not self.lamport_clock:
# If we have not started counting, try boostrapping
# communication with our other known peers.
for p in self.get_peers():
self.ez_send(p, MyMessage(self.lamport_clock))
else:
self.cancel_pending_task("start_communication")
self.register_task("start_communication", start_communication, interval=5.0, delay=0)
@lazy_wrapper(MyMessage)
def on_message(self, peer, payload):
# Update our Lamport clock.
self.lamport_clock = max(self.lamport_clock, payload.clock) + 1
print(self.my_peer, "current clock:", self.lamport_clock)
# Then synchronize with the rest of the network again.
self.ez_send(peer, MyMessage(self.lamport_clock))
async def start_communities():
for i in [1, 2, 3]:
builder = ConfigBuilder().clear_keys().clear_overlays()
builder.add_key("my peer", "medium", f"ec{i}.pem")
builder.add_overlay("MyCommunity", "my peer", [WalkerDefinition(Strategy.RandomWalk, 10, {'timeout': 3.0})], {}, [('started', )])
await IPv8(builder.finalize(), extra_communities={'MyCommunity': MyCommunity}).start()
ensure_future(start_communities())
get_event_loop().run_forever()
import timeit
from pyipv8.ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from pyipv8.ipv8.messaging.payload import Payload
from pyipv8.ipv8.messaging.serialization import Serializable
class MySerializable(Serializable):
format_list = ['I', 'H']
def __init__(self, field1, field2):
self.field1 = field1
self.field2 = field2
def to_pack_list(self):
return [('I', self.field1),
('H', self.field2)]
@classmethod
def from_unpack_list(cls, *args):
return cls(*args)
class MyPayload(Payload):
format_list = ['I', 'H']
def __init__(self, field1, field2):
self.field1 = field1
self.field2 = field2
def to_pack_list(self):
return [('I', self.field1),
('H', self.field2)]
@classmethod
def from_unpack_list(cls, *args):
return cls(*args)
class MyVariablePayload(VariablePayload):
format_list = ['I', 'H']
names = ['field1', 'field2']
@vp_compile
class MyCVariablePayload(VariablePayload):
format_list = ['I', 'H']
names = ['field1', 'field2']
serializable1 = MySerializable(1, 2)
serializable2 = MyPayload(1, 2)
serializable3 = MyVariablePayload(1, 2)
serializable4 = MyCVariablePayload(1, 2)
print("As string:")
print(serializable1)
print(serializable2)
print(serializable3)
print(serializable4)
print("Field values:")
print(serializable1.field1, serializable1.field2)
print(serializable2.field1, serializable2.field2)
print(serializable3.field1, getattr(serializable3, 'field2', '<undefined>'))
print(serializable4.field1, getattr(serializable4, 'field2', '<undefined>'))
print("Serialization speed:")
print(timeit.timeit('serializable1.to_pack_list()', number=1000, globals=locals()))
print(timeit.timeit('serializable2.to_pack_list()', number=1000, globals=locals()))
print(timeit.timeit('serializable3.to_pack_list()', number=1000, globals=locals()))
print(timeit.timeit('serializable4.to_pack_list()', number=1000, globals=locals()))
print("Unserialization speed:")
print(timeit.timeit('serializable1.from_unpack_list(1, 2)', number=1000, globals=locals()))
print(timeit.timeit('serializable2.from_unpack_list(1, 2)', number=1000, globals=locals()))
print(timeit.timeit('serializable3.from_unpack_list(1, 2)', number=1000, globals=locals()))
print(timeit.timeit('serializable4.from_unpack_list(1, 2)', number=1000, globals=locals()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment