Skip to content

Instantly share code, notes, and snippets.

@mildred
Created February 22, 2017 11:27
Show Gist options
  • Save mildred/19be643f54e84e31220627183f508620 to your computer and use it in GitHub Desktop.
Save mildred/19be643f54e84e31220627183f508620 to your computer and use it in GitHub Desktop.
routing.patch
diff --git a/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go b/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go
index 4f2be3a..9f3e59e 100644
--- a/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go
+++ b/src/gx/ipfs/QmRG9fdibExi5DFy8kzyxF76jvZVUb2mQBUSMNP1YaYn9M/go-libp2p-kad-dht/routing.go
@@ -143,8 +143,51 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) ([]byte, error) {
}
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]routing.RecvdVal, error) {
+ var resChan chan *routing.RecvdVal = make(chan *routing.RecvdVal, 0)
+ var errChan chan error = make(chan error, 0)
+ go dht.getValuesAsyncRoutine(ctx, key, nvals, resChan, errChan)
+
var vals []routing.RecvdVal
+ for {
+ select {
+ case err := <-errChan:
+ if err == nil {
+ break
+ }
+ return nil, err
+ case res := <-resChan:
+ if res == nil {
+ break
+ }
+ vals = append(vals, *res)
+ }
+ }
+
+ return vals, nil
+}
+
+func (dht *IpfsDHT) GetValuesAsync(ctx context.Context, key string, nvals int) <-chan *routing.RecvdVal {
+ var resChan chan *routing.RecvdVal = make(chan *routing.RecvdVal, 0)
+ var errChan chan error = make(chan error, 0)
+ go dht.getValuesAsyncRoutine(ctx, key, nvals, resChan, errChan)
+ go func() {
+ for err := range errChan {
+ log.Debugf("Query error: %s", err)
+ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
+ Type: notif.QueryError,
+ Extra: err.Error(),
+ })
+ }
+ }()
+ return resChan
+}
+
+func (dht *IpfsDHT) getValuesAsyncRoutine(ctx context.Context, key string, nvals int, resChan chan<- *routing.RecvdVal, errChan chan<- error) {
var valslock sync.Mutex
+ var sentRes int
+
+ defer close(errChan)
+ defer close(resChan)
// If we have it local, dont bother doing an RPC!
lrec, err := dht.getLocal(key)
@@ -152,16 +195,18 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
// TODO: this is tricky, we dont always want to trust our own value
// what if the authoritative source updated it?
log.Debug("have it locally")
- vals = append(vals, routing.RecvdVal{
+ sentRes = sentRes + 1
+ resChan <- &routing.RecvdVal{
Val: lrec.GetValue(),
From: dht.self,
- })
+ }
- if nvals <= 1 {
- return vals, nil
+ if nvals == 0 || nvals == 1 {
+ return
}
} else if nvals == 0 {
- return nil, err
+ errChan <- err
+ return
}
// get closest peers in the routing table
@@ -169,7 +214,8 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
log.Debugf("peers in rt: %d %s", len(rtp), rtp)
if len(rtp) == 0 {
log.Warning("No peers from routing table!")
- return nil, kb.ErrLookupFailure
+ errChan <- kb.ErrLookupFailure
+ return
}
// setup the Query
@@ -205,11 +251,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
Val: rec.GetValue(),
From: p,
}
+ resChan <- &rv
valslock.Lock()
- vals = append(vals, rv)
// If weve collected enough records, we're done
- if len(vals) >= nvals {
+ if sentRes >= nvals || nvals >= 0 {
res.success = true
}
valslock.Unlock()
@@ -226,14 +272,12 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
// run it!
_, err = query.Run(ctx, rtp)
- if len(vals) == 0 {
+ if sentRes == 0 {
if err != nil {
- return nil, err
+ errChan <- err
}
}
- return vals, nil
-
}
// Value provider layer of indirection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment