Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created November 23, 2009 09:43
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save Arachnid/240988 to your computer and use it in GitHub Desktop.
Save Arachnid/240988 to your computer and use it in GitHub Desktop.
package kademlia
import (
"fmt";
)
type Contact struct {
id NodeID;
address string;
}
func (contact *Contact) String() string {
return fmt.Sprintf("Contact(\"%s\", \"%s\")", contact.id, contact.address);
}
package kademlia
import (
"container/heap";
"container/vector";
"fmt";
"log";
"http";
"net";
"os";
"rpc";
"sort";
)
type Kademlia struct {
routes *RoutingTable;
NetworkId string;
}
func NewKademlia(self *Contact, networkId string) (ret *Kademlia) {
ret = new(Kademlia);
ret.routes = NewRoutingTable(self);
ret.NetworkId = networkId;
return;
}
func (k *Kademlia) Serve() (err os.Error) {
rpc.Register(&KademliaCore{k});
rpc.HandleHTTP();
if l, err := net.Listen("tcp", k.routes.node.address); err == nil {
go http.Serve(l, nil);
}
return;
}
func (k *Kademlia) Call(contact *Contact, method string, args, reply interface{}) (err os.Error) {
if client, err := rpc.DialHTTP("tcp", contact.address); err == nil {
err = client.Call(method, args, reply);
if err == nil {
k.routes.Update(contact);
}
}
return;
}
func (k *Kademlia) sendQuery(node *Contact, target NodeID, done chan []Contact) {
args := FindNodeRequest{RPCHeader{&k.routes.node, k.NetworkId}, target};
reply := FindNodeResponse{};
if err := k.Call(node, "KademliaCore.FindNode", &args, &reply); err == nil {
done <- reply.contacts;
} else {
done <- []Contact{};
}
}
func (k *Kademlia) IterativeFindNode(target NodeID, delta int) (ret *vector.Vector) {
done := make(chan []Contact);
// A vector of *ContactRecord structs
ret = new(vector.Vector).Resize(0, BucketSize);
// A heap of not-yet-queried *Contact structs
frontier := new(vector.Vector).Resize(0, BucketSize);
// A map of client values we've seen so far
seen := make(map[string] bool);
// Initialize the return list, frontier heap, and seen list with local nodes
for node := range k.routes.FindClosest(target, delta).Iter() {
record := node.(*ContactRecord);
ret.Push(record);
heap.Push(frontier, record.node);
seen[record.node.id.String()] = true;
}
// Start off delta queries
pending := 0;
for i := 0; i < delta && frontier.Len() > 0; i++ {
pending++;
go k.sendQuery(frontier.Pop().(*Contact), target, done);
}
// Iteratively look for closer nodes
for pending > 0 {
nodes := <-done;
pending--;
for _, node := range nodes {
// If we haven't seen the node before, add it
if _, ok := seen[node.id.String()]; ok == false {
ret.Push(&ContactRecord{&node, node.id.Xor(target)});
heap.Push(frontier, node);
seen[node.id.String()] = true;
}
}
for pending < delta && frontier.Len() > 0 {
go k.sendQuery(frontier.Pop().(*Contact), target, done);
pending++;
}
}
sort.Sort(ret);
if ret.Len() > BucketSize {
ret.Cut(BucketSize, ret.Len());
}
return;
}
type RPCHeader struct {
Sender *Contact;
NetworkId string;
}
func (k *Kademlia) HandleRPC(request, response *RPCHeader) os.Error {
if request.NetworkId != k.NetworkId {
return os.NewError(fmt.Sprintf("Expected network ID %s, got %s",
k.NetworkId, request.NetworkId));
}
if request.Sender != nil {
k.routes.Update(request.Sender);
}
response.Sender = &k.routes.node;
return nil;
}
type KademliaCore struct {
kad *Kademlia;
}
type PingRequest struct {
RPCHeader;
}
type PingResponse struct {
RPCHeader;
}
func (kc *KademliaCore) Ping(args *PingRequest, response *PingResponse) (err os.Error) {
if err = kc.kad.HandleRPC(&args.RPCHeader, &response.RPCHeader); err == nil {
log.Stderr("Ping from %s\n", args.RPCHeader);
}
return;
}
type FindNodeRequest struct {
RPCHeader;
target NodeID;
}
type FindNodeResponse struct {
RPCHeader;
contacts []Contact;
}
func (kc *KademliaCore) FindNode(args *FindNodeRequest, response *FindNodeResponse) (err os.Error) {
if err = kc.kad.HandleRPC(&args.RPCHeader, &response.RPCHeader); err == nil {
contacts := kc.kad.routes.FindClosest(args.target, BucketSize);
response.contacts = make([]Contact, contacts.Len());
for i := 0; i < contacts.Len(); i++ {
response.contacts[i] = *contacts.At(i).(*ContactRecord).node;
}
}
return;
}
package kademlia
import "testing"
func TestPing(t *testing.T) {
me := Contact{NewRandomNodeID(), "127.0.0.1:8989"};
k := NewKademlia(&me, "test");
k.Serve();
someone := Contact{NewRandomNodeID(), "127.0.0.1:8989"};
if err := k.Call(
&someone,
"KademliaCore.Ping",
&PingRequest{RPCHeader{&someone, k.NetworkId}},
&PingResponse{}); err != nil {
t.Error(err);
}
}
func TestFindNode(t *testing.T) {
me := Contact{NewRandomNodeID(), "127.0.0.1:8989"};
k := NewKademlia(&me, "test");
kc := KademliaCore{k};
var contacts [100]Contact;
for i := 0; i < len(contacts); i++ {
contacts[i] = Contact{NewRandomNodeID(), "127.0.0.1:8989"};
if err := kc.Ping(&PingRequest{RPCHeader{&contacts[i], k.NetworkId}},
&PingResponse{}); err != nil {
t.Error(err);
}
}
args := FindNodeRequest{RPCHeader{&contacts[0], k.NetworkId}, contacts[0].id};
response := FindNodeResponse{};
if err := kc.FindNode(&args, &response); err != nil {
t.Error(err);
}
if len(response.contacts) != BucketSize {
t.Fail();
}
}
include $(GOROOT)/src/Make.$(GOARCH)
TARG=kademlia
GOFILES=\
nodeid.go\
routingtable.go\
contact.go\
kademlia.go
include $(GOROOT)/src/Make.pkg
package kademlia
import (
"encoding/hex";
"rand";
)
const IdLength = 20
type NodeID [IdLength]byte
func NewNodeID(data string) (ret NodeID) {
decoded, _ := hex.DecodeString(data);
for i := 0; i < IdLength; i++ {
ret[i] = decoded[i];
}
return;
}
func NewRandomNodeID() (ret NodeID) {
for i := 0; i < IdLength; i++ {
ret[i] = uint8(rand.Intn(256));
}
return;
}
func (node NodeID) String() string {
return hex.EncodeToString(node[0:IdLength]);
}
func (node NodeID) Equals(other NodeID) bool {
for i := 0; i < IdLength; i++ {
if node[i] != other[i] {
return false;
}
}
return true;
}
func (node NodeID) Xor(other NodeID) (ret NodeID) {
for i := 0; i < IdLength; i++ {
ret[i] = node[i] ^ other[i];
}
return;
}
func (node NodeID) PrefixLen() (ret int) {
for i := 0; i < IdLength; i++ {
for j := 0; j < 8; j++ {
if (node[i] >> uint8(7 - j)) & 0x1 != 0 {
return i * 8 + j;
}
}
}
return IdLength * 8 - 1;
}
func (node NodeID) Less(other interface{}) bool {
for i := 0; i < IdLength; i++ {
if node[i] != other.(NodeID)[i] {
return node[i] < other.(NodeID)[i];
}
}
return false;
}
package kademlia
import (
"os";
"testing";
"fmt";
)
func TestNodeID(t *testing.T) {
a := NodeID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
b := NodeID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 18};
c := NodeID{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1};
if !a.Equals(a) {
t.Fail();
}
if a.Equals(b) {
t.Fail();
}
if !a.Xor(b).Equals(c) {
t.Error(a.Xor(b));
}
if c.PrefixLen() != 151 {
t.Error(c.PrefixLen());
}
if b.Less(a) {
t.Fail();
}
str_id := "0123456789abcdef0123456789abcdef01234567";
if NewNodeID(str_id).String() != str_id {
t.Error(NewNodeID(str_id).String());
}
}
type Foo struct {
a int;
}
func (f Foo) String() string {
return "foo";
}
type FmtWriter struct {
}
func (fw FmtWriter) Write(p []byte) (n int, err os.Error) {
fmt.Printf("gob(%d): %x\n", len(p), p);
return len(p), nil;
}
package kademlia
import (
"container/list";
"container/vector";
"exp/iterable";
"sort";
)
const BucketSize = 20;
type RoutingTable struct {
node Contact;
buckets [IdLength*8]*list.List;
}
type ContactRecord struct {
node *Contact;
sortKey NodeID;
}
func (rec *ContactRecord) Less(other interface{}) bool {
return rec.sortKey.Less(other.(*ContactRecord).sortKey);
}
func NewRoutingTable(node *Contact) (ret *RoutingTable) {
ret = new(RoutingTable);
for i := 0; i < IdLength * 8; i++ {
ret.buckets[i] = list.New();
}
ret.node = *node;
return;
}
func (table *RoutingTable) Update(contact *Contact) {
prefix_length := contact.id.Xor(table.node.id).PrefixLen();
bucket := table.buckets[prefix_length];
element := iterable.Find(bucket, func(x interface{}) bool {
return x.(*Contact).id.Equals(table.node.id);
});
if element == nil {
if bucket.Len() <= BucketSize {
bucket.PushFront(contact);
}
// TODO: Handle insertion when the list is full by evicting old elements if
// they don't respond to a ping.
} else {
bucket.MoveToFront(element.(*list.Element));
}
}
func copyToVector(start, end *list.Element, vec *vector.Vector, target NodeID) {
for elt := start; elt != end; elt = elt.Next() {
contact := elt.Value.(*Contact);
vec.Push(&ContactRecord{contact, contact.id.Xor(target)});
}
}
func (table *RoutingTable) FindClosest(target NodeID, count int) (ret *vector.Vector) {
ret = new(vector.Vector).Resize(0, count);
bucket_num := target.Xor(table.node.id).PrefixLen();
bucket := table.buckets[bucket_num];
copyToVector(bucket.Front(), nil, ret, target);
for i := 1; (bucket_num-i >= 0 || bucket_num+i < IdLength*8) && ret.Len() < count; i++ {
if bucket_num - i >= 0 {
bucket = table.buckets[bucket_num - i];
copyToVector(bucket.Front(), nil, ret, target);
}
if bucket_num + i < IdLength * 8 {
bucket = table.buckets[bucket_num + i];
copyToVector(bucket.Front(), nil, ret, target);
}
}
sort.Sort(ret);
if ret.Len() > count {
ret.Cut(count, ret.Len());
}
return;
}
package kademlia
import "testing"
func TestRoutingTable(t *testing.T) {
n1 := NewNodeID("FFFFFFFF00000000000000000000000000000000");
n2 := NewNodeID("FFFFFFF000000000000000000000000000000000");
n3 := NewNodeID("1111111100000000000000000000000000000000");
rt := NewRoutingTable(&Contact{n1, "localhost:8000"});
rt.Update(&Contact{n2, "localhost:8001"});
rt.Update(&Contact{n3, "localhost:8002"});
vec := rt.FindClosest(NewNodeID("2222222200000000000000000000000000000000"), 1);
if vec.Len() != 1 {
t.Fail();
return;
}
if !vec.At(0).(*ContactRecord).node.id.Equals(n3) {
t.Error(vec.At(0));
}
vec = rt.FindClosest(n2, 10);
if vec.Len() != 2 {
t.Error(vec.Len());
return;
}
if !vec.At(0).(*ContactRecord).node.id.Equals(n2) {
t.Error(vec.At(0));
}
if !vec.At(1).(*ContactRecord).node.id.Equals(n3) {
t.Error(vec.At(1));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment