Skip to content

Instantly share code, notes, and snippets.

@machinaut
Created August 14, 2017 17:18
Show Gist options
  • Save machinaut/6af23043aae14528ba1c06f0b9e01cfc to your computer and use it in GitHub Desktop.
Save machinaut/6af23043aae14528ba1c06f0b9e01cfc to your computer and use it in GitHub Desktop.
kademlia implementation (work in progress)
#!/usr/bin/env python
# https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
import hashlib
import json
import random
import requests
import time
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from http.server import BaseHTTPRequestHandler
from requests_futures.sessions import FuturesSession
from socketserver import ThreadingTCPServer
from threading import Thread
from urllib.parse import parse_qs, urlparse
k = 20 # system-wide replication parameter
a = 3 # system-wide concurrency parameter
def sha1(b=b''):
return int(hashlib.sha1(b).hexdigest(), 16)
class NodeHandler(BaseHTTPRequestHandler):
def see(self, node):
bucket = self.buckets[(node.id ^ self.id).bit_length()]
if node in bucket:
bucket.remove(node)
bucket.append(node)
def send(self, message=dict(status='ok'), status=200):
message['time'] = time.time()
msg_bytes = json.dumps(message).encode()
self.send_response(status)
self.send_header('Content-type', 'application/json')
self.send_header('Content-length', len(msg_bytes))
self.end_headers()
self.wfile.write(msg_bytes)
def error(self, msg='Invalid path!'):
self.send(dict(error=msg), status=404)
def do_GET(self):
if self.path.startswith('/ping'):
return self.send()
if self.path.startswith('/node'):
return self.find_node()
if self.path.startswith('/value'):
return self.find_value()
self.error()
def do_POST(self):
if self.path.startswith('/store'):
return self.store()
self.error()
def iter_closest(self, node_id):
l = (node_id ^ self.node_id).bit_length()
for b in sorted(range(160), key=lambda x: abs(x - l)):
for n in sorted(self.buckets[b], key=lambda x: x['node_id'] ^ node_id):
yield n
def store(self):
msg_bytes = self.rfile.read(int(self.headers.get('content-length')))
msg = json.loads(msg_bytes.decode())
self.values[msg['value_id']] = msg['value']
self.send()
def find_node(self):
session = FuturesSession(executor=ThreadPoolExecutor(max_workers=a))
results = []
# XXX TODO: this
self.send()
def find_value(self):
value_id = int(parse_qs(urlparse(self.path).query)['value_id'][0])
if value_id in self.values:
return self.send({'value': self.values[value_id]})
self.find_node(value_id)
def node_handler(my_node_id):
class SubNodeHandler(NodeHandler):
buckets = [deque(maxlen=k) for _ in range(160)]
node_id = my_node_id
values = {}
return SubNodeHandler
class Node:
def __init__(self, port=None):
if port is None:
port = random.randint(1000, 2000)
self.port = port
self.node_id = sha1((str(self.port) + str(time.time())).encode())
self.handler = node_handler(self.node_id)
self.server = ThreadingTCPServer(('', self.port), self.handler)
self.server.daemon_threads = True
self.thread = Thread(target=self.server.serve_forever)
self.thread.daemon = True
self.thread.start()
self.url = 'http://localhost:{}'.format(self.port)
if __name__ == '__main__':
n1 = Node()
data = json.dumps(dict(value_id=sha1(b'a'), value='b')).encode()
requests.post(n1.url + '/store', data=data)
r = requests.get(n1.url + '/value', params={'value_id': sha1(b'a')})
print(r.json())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment