Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
geth_p2p.patch
diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go
index 3177a877e..6b77b3e58 100644
--- a/eth/fetcher/block_fetcher.go
+++ b/eth/fetcher/block_fetcher.go
@@ -97,6 +97,9 @@ type chainInsertFn func(types.Blocks) (int, error)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+// peerStatsUpdateFn is a callback type for reporting peer events for stats update.
+type peerStatsUpdateFn func(peer string, event string)
+
// blockAnnounce is the hash notification of the availability of a new block in the
// network.
type blockAnnounce struct {
@@ -187,6 +190,7 @@ type BlockFetcher struct {
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
+ updatePeerStats peerStatsUpdateFn // Callback to update peer stats
// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
@@ -197,7 +201,7 @@ type BlockFetcher struct {
}
// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
-func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
+func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn, updatePeerStats peerStatsUpdateFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
@@ -222,6 +226,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
+ updatePeerStats: updatePeerStats,
}
}
@@ -788,6 +793,11 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
+
+ if f.updatePeerStats != nil {
+ f.updatePeerStats(peer, "importBlocks")
+ }
+
go func() {
defer func() { f.done <- hash }()
diff --git a/eth/handler.go b/eth/handler.go
index aff4871af..7e449fb59 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -108,6 +108,7 @@ type handler struct {
blockFetcher *fetcher.BlockFetcher
txFetcher *fetcher.TxFetcher
peers *peerSet
+ peersStats *peerSetStats
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
@@ -143,6 +144,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
+ h.peersStats = newPeerSetStats(h.peers)
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
// block is ahead, so fast sync was enabled for this node at a certain point.
@@ -219,7 +221,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return n, err
}
- h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
+ h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer, h.peersStats.UpdatePeerStats)
fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
@@ -438,6 +440,14 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := h.peers.peersWithoutBlock(hash)
+ if propagate {
+ h.peersStats.ReportPeersWithoutBlock(peers)
+ }
+ // Print peers stats every 100 blocks
+ if block.NumberU64() % 100 == 0 {
+ h.peersStats.LogStats()
+ }
+
// If propagation is requested, send to a subset of the peer
if propagate {
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
@@ -448,12 +458,20 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
- // Send the block to a subset of our peers
- transfer := peers[:int(math.Sqrt(float64(len(peers))))]
- for _, peer := range transfer {
- peer.AsyncSendNewBlock(block, td)
+ // Prioritize sending to trusted peers
+ log.Warn("Broadcasting block to trusted peers", "number", block.Number(), "hash", hash)
+ for _, peer := range peers {
+ if peer.Peer.Info().Network.Trusted {
+ peer.AsyncSendNewBlock(block, td)
+ }
+ }
+ // Send to all remaining peers as well
+ for _, peer := range peers {
+ if !peer.Peer.Info().Network.Trusted {
+ peer.AsyncSendNewBlock(block, td)
+ }
}
- log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ log.Trace("Propagated block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in out own chain, announce it
diff --git a/eth/peerset.go b/eth/peerset.go
index 1e864a8e4..8d5293e41 100644
--- a/eth/peerset.go
+++ b/eth/peerset.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)
@@ -257,3 +258,54 @@ func (ps *peerSet) close() {
}
ps.closed = true
}
+
+type peerSetStats struct {
+ ps *peerSet
+ importedBlocksFrom map[string]int
+ totalBroadcastsTo map[string]int
+ broadcastsToWithoutBlock map[string]int
+ lock sync.RWMutex
+}
+
+func newPeerSetStats(ps *peerSet) *peerSetStats {
+ return &peerSetStats{
+ ps: ps,
+ importedBlocksFrom: make(map[string]int),
+ totalBroadcastsTo: make(map[string]int),
+ broadcastsToWithoutBlock: make(map[string]int),
+ }
+}
+
+func (s *peerSetStats) ReportPeersWithoutBlock(peers []*ethPeer) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for _, p := range s.ps.peers {
+ s.totalBroadcastsTo[p.Peer.ID()] += 1
+ }
+ for _, p := range peers {
+ s.broadcastsToWithoutBlock[p.Peer.ID()] += 1
+ }
+}
+
+func (s *peerSetStats) UpdatePeerStats(peer string, event string) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if event == "importBlocks" {
+ s.importedBlocksFrom[peer] += 1
+ }
+}
+
+func (s *peerSetStats) LogStats() {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ log.Info("Peers:")
+ for _, p := range s.ps.peers {
+ log.Info("Peer: ", "id", p.Peer.ID(), "enode", p.Peer.Node().URLv4(),
+ "importedBlocksFrom", s.importedBlocksFrom[p.Peer.ID()],
+ "totalBroadcastsTo", s.totalBroadcastsTo[p.Peer.ID()],
+ "broadcastsToWithoutBlock", s.broadcastsToWithoutBlock[p.Peer.ID()])
+ }
+}
diff --git a/les/fetcher.go b/les/fetcher.go
index a6d1c93c4..38a4f8417 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -181,7 +181,7 @@ func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *se
chaindb: chaindb,
chain: chain,
reqDist: reqDist,
- fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper),
+ fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper, nil),
peers: make(map[enode.ID]*fetcherPeer),
synchronise: syncFn,
announceCh: make(chan *announce),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment