Skip to content

Instantly share code, notes, and snippets.

@qstokkink
Last active March 4, 2024 10:26
Show Gist options
  • Save qstokkink/29422919c1f28a5d0774b6447a582280 to your computer and use it in GitHub Desktop.
Save qstokkink/29422919c1f28a5d0774b6447a582280 to your computer and use it in GitHub Desktop.
IPv8 online time crawler
import logging
import sys
import time
from asyncio import run
from binascii import hexlify
from pathlib import Path
from ipv8.configuration import ConfigBuilder, WalkerDefinition, Strategy, default_bootstrap_defs
from ipv8.peerdiscovery.community import DiscoveryCommunity
from ipv8.peerdiscovery.network import PeerObserver
from ipv8.types import Peer, Network
from ipv8.util import run_forever
from ipv8_service import IPv8
class CSVPeerObserver(PeerObserver):
"""
Write observed events to CSV.
"""
def __init__(self):
super().__init__()
directory_path = Path("data")
directory_path.mkdir(parents=True, exist_ok=True)
def write(self, peer: Peer, timestamp_start: float, timestamp_stop: float):
file_path = Path("data") / f"{hexlify(peer.mid).decode()}.txt"
if not file_path.exists():
# Create the header
file_path.write_text("start,stop,address,port\n")
with file_path.open("a") as file_handle:
file_handle.write(f"{timestamp_start},{timestamp_stop},{peer.address[0]},{peer.address[1]}\n")
def shutdown(self, network: Network) -> None:
for peer in network.verified_peers:
self.on_peer_removed(peer)
def on_peer_added(self, peer: Peer) -> None:
print("[CONNECT]", peer)
def on_peer_removed(self, peer: Peer) -> None:
print("[DISCONNECT]", peer)
if time.time() >= peer.creation_time:
self.write(peer, peer.creation_time, time.time())
else:
logging.exception("%s was first seen in the future! Something is seriously wrong!", peer)
async def main() -> None:
"""
Just CTRL-C when you're done :-)
"""
config_builder = ConfigBuilder()
config_builder.clear_overlays()
config_builder.add_overlay(
"DiscoveryCommunity",
"anonymous id",
[
WalkerDefinition(Strategy.EdgeWalk, peers=10000, init={
"edge_length": 10,
"neighborhood_size": 1000,
"edge_timeout": 27.5
}),
WalkerDefinition(Strategy.RandomWalk, peers=-1, init={
"timeout": 27.5,
"window_size": 1000
}),
WalkerDefinition(Strategy.RandomChurn, peers=-1, init={
"sample_size": 8,
"ping_interval": 10.0,
"inactive_time": 27.5,
"drop_time": 57.5
})
],
default_bootstrap_defs,
{},
[]
)
rendevous_hook = CSVPeerObserver()
ipv8 = IPv8(config_builder.finalize())
ipv8.get_overlay(DiscoveryCommunity).max_peers = sys.maxsize
ipv8.network.add_peer_observer(rendevous_hook)
await ipv8.start()
print("IPv8 started! Collecting data!")
try:
await run_forever()
except KeyboardInterrupt:
print("STOP!! ONLY PRESS CTRL-C ONCE!!")
rendevous_hook.shutdown(ipv8.network)
await ipv8.stop()
run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment