Skip to content

Instantly share code, notes, and snippets.

@lachesis
Created July 17, 2023 15:10
Show Gist options
  • Save lachesis/2e481f695c21de0c3f6d5a67e6461f23 to your computer and use it in GitHub Desktop.
Save lachesis/2e481f695c21de0c3f6d5a67e6461f23 to your computer and use it in GitHub Desktop.
python client for https://github.com/losvedir/ephemeral2, just playing with websockets
#!/usr/bin/python2
import collections
import hashlib
import json
import logging
import os
import time
import websocket
MAX_HEARTBEAT_TIME = 30
class Heartbeatable(object):
def __init__(self, func, rate):
self.func = func
self.rate = rate
self.last_call = None
def call(self):
if self.last_call is None or time.time() - self.last_call > self.rate:
self.func()
self.last_call = time.time()
class PhoenixState(object):
"""An object that tracks the state of multiple phoenix channels."""
def __init__(self):
#self.messages = collections.defaultdict(lambda *args: []) # map of topics (str) to their messages (list)
self.callbacks = collections.defaultdict(lambda *args: []) # map of topics (str) to their message callbacks
self.heartbeats = {}
self.register_heartbeat(self._phoenix_heartbeat, MAX_HEARTBEAT_TIME)
def _generate_message(self, topic, event, payload=None):
if payload is None:
payload = {}
return {
'topic': topic,
'event': event,
'payload': payload
}
def process_message(self, message):
"""Call this with a dictionary for message to process that message."""
topic = message['topic']
#self.messages[topic].append(message)
for callback in self.callbacks[topic]:
callback(message)
def _phoenix_heartbeat(self):
self.send("phoenix", "heartbeat")
def register_callback(self, topic, callback):
"""Register a callback for a given topic."""
self.callbacks[topic].append(callback)
def register_heartbeat(self, func, rate):
self.heartbeats[func] = Heartbeatable(func, rate)
def deregister_heartbeat(self, func):
del self.heartbeats[func]
def heartbeat(self):
"""Call this method at least once every 30 seconds to heartbeat the connection."""
for hb in self.heartbeats.values():
hb.call()
def join(self, topic, callback):
"""Join a topic (and register a callback)"""
self.register_callback(topic, callback)
self.raw_send(self._generate_message(
topic=topic,
event='phx_join'
))
def leave(self, topic):
"""Leave a topic (and deregister all callbacks)"""
self.raw_send(self._generate_message(
topic=topic,
event='phx_leave'
))
self.callbacks[topic] = []
def send(self, topic, event, payload=None):
"""Send a message to a topic."""
self.raw_send(self._generate_message(topic, event, payload))
class WSClient(object):
def __init__(self, url):
self.url = url
self.conn = None
self.ref = 1
def __del__(self):
try: self.conn.close()
except Exception: pass
def raw_connect(self, on_open=None):
self.conn = websocket.WebSocketApp(self.url,
on_message=self.raw_process_message,
on_ping=self.raw_pingpong,
on_pong=self.raw_pingpong,
on_open=on_open
)
def raw_send(self, body):
if 'ref' not in body:
body['ref'] = self.ref
self.ref += 1
bstr = json.dumps(body)
logging.debug("SEND %r", bstr)
self.conn.send(bstr)
def raw_pingpong(self, *args, **kwargs):
self.heartbeat()
def raw_process_message(self, ws, message):
logging.debug("GOT %r", message)
self.heartbeat()
self.process_message(json.loads(message))
def raw_run_forever(self):
self.conn.run_forever(ping_interval=MAX_HEARTBEAT_TIME)
class WSPhoenix(PhoenixState, WSClient):
def __init__(self, url):
WSClient.__init__(self, url)
PhoenixState.__init__(self)
class EphemeralWS(object):
def __init__(self):
self.client = WSPhoenix("ws://ephemeralp2p.durazo.us/ws")
def get_content(self, chash, callback):
"""Get the given hash's content from EphemeralP2P (callback)"""
topic = 'want:' + chash
def ask_func():
# Ask for the content to be sent to us
self.client.send(topic, 'content_request', {'hash': chash})
def want_cb(message):
if message.get('payload', {}).get('content'):
# Deregister the ask func
self.client.deregister_heartbeat(ask_func)
# Call our callback
callback(message['payload']['content'])
# Leave the topic
self.client.leave(topic)
# Join the topic
self.client.join(topic, want_cb)
# Register the ask func to be retried at heartbeat
self.client.register_heartbeat(ask_func, 5)
# Heartbeat
self.client.heartbeat()
def serve_content(self, chash, content):
"""Register to serve content."""
topic = 'have:' + chash
def have_cb(message):
if message['event'] == 'content_request':
logging.info('serving content for %s', chash)
self.client.send(topic, 'content', {'content': content, 'hash': chash})
self.client.join(topic, have_cb)
def get_and_serve_content(self, chash, callback=None):
"""Fetch, then serve a hash."""
def got_func(content):
logging.info('got content for hash %s (len: %d bytes)', chash, len(content))
if callback:
callback(content)
self.serve_content(chash, content)
self.get_content(chash, got_func)
def connect(self, on_open):
self.client.raw_connect(on_open)
self.client.raw_run_forever()
class CachingEphemeralServer(object):
def __init__(self):
self.client = EphemeralWS()
self.cache_dir = '.ephem_cache'
def connect(self, on_open):
try:
os.mkdir(self.cache_dir)
except Exception:
pass
self.client.connect(on_open)
def serve(self, chash):
"""Serve a given hash.
If the content is available in the cache dir, get it from there.
If it's not, fetch it from the server, cache it, and serve.
"""
cache_path = os.path.join(self.cache_dir, chash)
if os.path.exists(cache_path):
with open(cache_path, 'r') as inp:
content = inp.read()
# Make sure the hash matches
calc_hash = hashlib.sha256(content).hexdigest()
if calc_hash != chash:
raise ValueError("CACHED: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash))
logging.info('serving hash %s from cache...', chash)
# Serve!
self.client.serve_content(chash, content)
else:
def callback(content):
# Make sure the hash matches
calc_hash = hashlib.sha256(content).hexdigest()
if calc_hash != chash:
raise ValueError("NET: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash))
logging.info('writing hash %s to cache...', chash)
# Write it out to the cache
with open(cache_path, 'w') as out:
out.write(content)
# Fetch the content from the server (p2p network) and serve it
self.client.get_and_serve_content(chash, callback)
def basic_test():
# enable logging
logging.basicConfig()
client = EphemeralWS()
def on_open(*a, **k):
# Fetch, then start serving the "test123" doc
client.get_and_serve_content('ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae')
# Serve (without fetching) the "ABCdef" doc
client.serve_content('057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', 'ABCdef')
# Fetch and serve the intro doc
client.get_and_serve_content('2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838')
client.connect(on_open)
# start mainloop
def caching_test():
# enable logging
logging.basicConfig(format="[%(asctime)s] %(message)s")
logging.getLogger().setLevel(logging.INFO)
client = CachingEphemeralServer()
HASHES = [
'ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae', # "test123"
'057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', # "ABCdef"
'2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838', # the intro doc for the service
]
def on_open(*a, **k):
for chash in HASHES:
client.serve(chash)
# Connect
client.connect(on_open)
if __name__ == '__main__':
caching_test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment