Skip to content

Instantly share code, notes, and snippets.

@zsfelfoldi
Created October 4, 2015 02:17
Show Gist options
  • Save zsfelfoldi/181b80b00c27a3906072 to your computer and use it in GitHub Desktop.
Save zsfelfoldi/181b80b00c27a3906072 to your computer and use it in GitHub Desktop.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package state provides a caching layer atop the Ethereum state trie.
package state
import (
"bytes"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
type StateSync struct {
db ethdb.Database
sync *trie.TrieSync
codeReqs map[common.Hash]struct{} // requested but not yet written to database
codeReqList []common.Hash // requested since last GetHashes
}
var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil))
func NewStateSync(root common.Hash, db ethdb.Database) *StateSync {
ss := &StateSync{
db: db,
codeReqs: make(map[common.Hash]struct{}),
}
ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash
ss.sync = trie.NewTrieSync(root, db, ss.KeyValueCallBack)
return ss
}
func (self *StateSync) KeyValueCallBack(key, value []byte) {
var obj struct {
Nonce uint64
Balance *big.Int
Root common.Hash
CodeHash []byte
}
err := rlp.Decode(bytes.NewReader(value), &obj)
if err != nil {
glog.Errorf("can't decode state object %x: %v", key, err)
return
}
self.sync.AddRoot(obj.Root, 64, nil)
codehash := common.BytesToHash(obj.CodeHash)
if _, ok := self.codeReqs[codehash]; ok {
code, _ := self.db.Get(obj.CodeHash)
if code == nil {
self.codeReqs[codehash] = struct{}{}
self.codeReqList = append(self.codeReqList, codehash)
}
}
}
func (self *StateSync) GetHashes(max int) []common.Hash {
cr := len(self.codeReqList)
gh := 0
if max != 0 {
if cr > max {
cr = max
}
gh = max-cr
}
list := append(self.sync.GetHashes(gh), self.codeReqList[:cr]...)
self.codeReqList = self.codeReqList[cr:]
return list
}
func (self *StateSync) ProcessSyncData(list []trie.RawSyncData) {
for i:=0; i<len(list); i++ {
if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node
self.db.Put(list[i].Hash[:], list[i].Data)
delete(self.codeReqs, list[i].Hash)
list[i] = list[len(list)-1]
list = list[:len(list)-1]
}
}
self.sync.ProcessSyncData(list)
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package trie
import (
"github.com/ethereum/go-ethereum/common"
)
type requestQueue struct {
req []common.Hash
sent int
}
func (self *requestQueue) add(req common.Hash) {
self.req = append(self.req, req)
}
func (self *requestQueue) get(max int) []common.Hash {
rem := len(self.req) - self.sent
cnt := max
if rem < max || max == 0 {
cnt = rem
}
ret := self.req[self.sent : self.sent+cnt]
self.sent += cnt
if self.sent > 100 {
self.req = self.req[self.sent:]
self.sent = 0
}
return ret
}
type syncTrieId struct {
trie *Trie
kvcb TrieSyncKeyValueCallBack
rootDepth int
}
type nodeReq struct {
nodeRef *node // hash node is replaced with actual node when received
hash common.Hash
rlp []byte
parent *nodeReq
prefix []byte // nibbles
depth, reqChildCnt int
id *syncTrieId
}
type TrieSyncKeyValueCallBack func(key, value []byte)
type TrieSync struct {
db Database
rqList []requestQueue
maxDepth int
reqs map[common.Hash]*nodeReq
}
func NewTrieSync(root common.Hash, db Database, kvcb TrieSyncKeyValueCallBack) *TrieSync {
ts := &TrieSync{
db: db,
reqs: make(map[common.Hash]*nodeReq),
}
ts.AddRoot(root, 0, kvcb)
return ts
}
func (self *TrieSync) AddRoot(root common.Hash, depth int, kvcb TrieSyncKeyValueCallBack) {
id := &syncTrieId{
trie: &Trie{
db: self.db,
root: hashNode(root.Bytes()),
},
kvcb: kvcb,
rootDepth: depth,
}
req := &nodeReq{
nodeRef: &id.trie.root,
hash: root,
id: id,
}
self.addReq(req)
}
func (self *TrieSync) addReq(req *nodeReq) {
for req.depth >= len(self.rqList) {
self.rqList = append(self.rqList, requestQueue{})
}
self.rqList[req.depth].add(req.hash)
if self.maxDepth < req.depth {
self.maxDepth = req.depth
}
self.reqs[req.hash] = req
}
func (self *TrieSync) GetHashes(max int) []common.Hash {
res := []common.Hash{}
for (self.maxDepth >= 0) && (max == 0 || len(res) < max) {
mm := 0
if max != 0 {
mm = max - len(res)
}
r := self.rqList[self.maxDepth].get(mm)
if len(r) == 0 {
self.maxDepth--
} else {
res = append(res, r...)
}
}
return res
}
type RawSyncData struct {
Hash common.Hash
Data []byte
}
func (self *TrieSync) ProcessSyncData(nodes []RawSyncData) {
for _, nd := range nodes {
nr := self.reqs[nd.Hash]
if nr == nil {
continue
}
node := mustDecodeNode(nd.Hash[:], nd.Data)
*nr.nodeRef = node
nr.rlp = nd.Data
crs := self.createChildReqs(nr)
nr.reqChildCnt = len(crs)
if nr.reqChildCnt == 0 {
self.subTreeFinished(nr)
}
for _, cr := range crs {
self.addReq(cr)
}
}
}
func (self *TrieSync) createChildReqs(nr *nodeReq) []*nodeReq {
tn := *nr.nodeRef
var cnodes [](*node)
var cdepth int
var nibbles [][]byte
switch n := tn.(type) {
case shortNode:
cnodes = [](*node){&n.Val}
cdepth = nr.depth + len(n.Key)
nibbles = [][]byte{n.Key}
case fullNode:
for i := 0; i < 17; i++ {
if n[i] != nil {
cnodes = append(cnodes, &n[i])
nibbles = append(nibbles, []byte{byte(i)})
}
}
cdepth = nr.depth + 1
default:
panic(nil)
}
var reqs []*nodeReq
for i, cn := range cnodes {
prefix := append(nr.prefix, nibbles[i]...)
if nr.id.kvcb != nil {
value, isvalue := (*cn).(valueNode)
if isvalue {
nr.id.kvcb(prefix, value)
}
}
hash, ishash := (*cn).(hashNode)
if ishash {
nn := nr.id.trie.resolveHash(hash)
if nn != nil {
ishash = false
*cn = nn
}
}
if ishash && (self.reqs[common.BytesToHash(hash)] == nil) { // node not found in db, subtree is missing or incomplete
req := &nodeReq{
nodeRef: cn,
hash: common.BytesToHash(hash),
parent: nr,
depth: cdepth,
prefix: prefix,
id: nr.id,
}
reqs = append(reqs, req)
}
}
return reqs
}
func (self *TrieSync) subTreeFinished(nr *nodeReq) {
// write node to disk
self.db.Put(nr.hash[:], nr.rlp)
// check if parent is finished too
delete(self.reqs, nr.hash)
pr := nr.parent
if pr != nil {
pr.reqChildCnt--
if pr.reqChildCnt == 0 {
self.subTreeFinished(pr)
}
}
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package trie
import (
"bytes"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
)
// makeTestTrie create a sample test trie to test node-wise reconstruction on.
func makeTestTrie() (ethdb.Database, *Trie, map[string][]byte) {
// Create an empty trie
db, _ := ethdb.NewMemDatabase()
trie, _ := New(common.Hash{}, db)
// Will it with some arbitrary data
content := make(map[string][]byte)
for i := byte(0); i < 255; i++ {
key, val := common.LeftPadBytes([]byte{i}, 32), []byte{i}
content[string(key)] = val
trie.Update(key, val)
}
trie.Commit()
// Return the generated trie
return db, trie, content
}
// checkTrieContents cross references a reconstructed trie with an expected data
// content map.
func checkTrieContents(t *testing.T, db Database, root []byte, content map[string][]byte) {
trie, err := New(common.BytesToHash(root), db)
if err != nil {
t.Fatalf("failed to create trie at %x: %v", root, err)
}
for key, val := range content {
if have := trie.Get([]byte(key)); bytes.Compare(have, val) != 0 {
t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val)
}
}
}
// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go.
func TestIterativeTrieSyncIndividual(t *testing.T) { testIterativeTrieSync(t, 1) }
func TestIterativeTrieSyncBatched(t *testing.T) { testIterativeTrieSync(t, 100) }
func testIterativeTrieSync(t *testing.T, batch int) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := append([]common.Hash{}, sched.GetHashes(batch)...)
for len(queue) > 0 {
results := make([]RawSyncData, len(queue))
for i, hash := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = RawSyncData{hash, data}
}
sched.ProcessSyncData(results)
queue = append(queue[:0], sched.GetHashes(batch)...)
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned, and the others sent only later.
func TestIterativeDelayedTrieSync(t *testing.T) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := append([]common.Hash{}, sched.GetHashes(10000)...)
for len(queue) > 0 {
// Sync only half of the scheduled nodes
results := make([]RawSyncData, len(queue)/2+1)
for i, hash := range queue[:len(results)] {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = RawSyncData{hash, data}
}
sched.ProcessSyncData(results)
queue = append(queue[len(results):], sched.GetHashes(10000)...)
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go, however in a
// random order.
func TestIterativeRandomTrieSyncIndividual(t *testing.T) { testIterativeRandomTrieSync(t, 1) }
func TestIterativeRandomTrieSyncBatched(t *testing.T) { testIterativeRandomTrieSync(t, 100) }
func testIterativeRandomTrieSync(t *testing.T, batch int) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.GetHashes(batch) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Fetch all the queued nodes in a random order
results := make([]RawSyncData, 0, len(queue))
for hash, _ := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, RawSyncData{hash, data})
}
// Feed the retrieved results back and queue new tasks
sched.ProcessSyncData(results)
queue = make(map[common.Hash]struct{})
for _, hash := range sched.GetHashes(batch) {
queue[hash] = struct{}{}
}
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned (Even those randomly), others sent only later.
func TestIterativeRandomDelayedTrieSync(t *testing.T) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.GetHashes(10000) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Sync only half of the scheduled nodes, even those in random order
results := make([]RawSyncData, 0, len(queue)/2+1)
for hash, _ := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, RawSyncData{hash, data})
if len(results) >= cap(results) {
break
}
}
// Feed the retrieved results back and queue new tasks
sched.ProcessSyncData(results)
for _, result := range results {
delete(queue, result.Hash)
}
for _, hash := range sched.GetHashes(10000) {
queue[hash] = struct{}{}
}
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment