-
-
Save whyrusleeping/323304f8113183841909 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package blockstore | |
import ( | |
"time" | |
blocks "github.com/ipfs/go-ipfs/blocks" | |
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" | |
) | |
const longTimeout = time.Second * 4 | |
const shortTimeout = time.Millisecond * 500 | |
const maxBatchSize = 30 | |
type Coalescing struct { | |
Blockstore | |
puts chan *putReq | |
ctx context.Context | |
} | |
func NewCoalescing(ctx context.Context, bs Blockstore) *Coalescing { | |
c := &Coalescing{ | |
Blockstore: bs, | |
puts: make(chan *putReq), | |
ctx: ctx, | |
} | |
return c | |
} | |
type putReq struct { | |
blk *blocks.Block | |
resp chan error | |
} | |
func makePutReq(blk *blocks.Block) *putReq { | |
return &putReq{ | |
blk: blk, | |
resp: make(chan error, 1), | |
} | |
} | |
func (c *Coalescing) startCoalescer() { | |
var reqs []*putReq | |
var long <-chan time.Time | |
var short <-chan time.Time | |
for { | |
select { | |
case pr := <-c.puts: | |
if long == nil { | |
long = time.After(longTimeout) | |
} | |
short = time.After(shortTimeout) | |
reqs = append(reqs, pr) | |
if len(reqs) > maxBatchSize { | |
short = time.After(0) | |
} | |
case <-long: | |
c.putBlocks(reqs) | |
reqs = nil | |
long = nil | |
short = nil | |
case <-short: | |
c.putBlocks(reqs) | |
reqs = nil | |
long = nil | |
short = nil | |
case <-c.ctx.Done(): | |
c.putBlocks(reqs) | |
return | |
} | |
} | |
} | |
func (c *Coalescing) putBlocks(reqs []*putReq) { | |
blks := make([]*blocks.Block, 0, len(reqs)) | |
for _, req := range reqs { | |
blks = append(blks, req.blk) | |
} | |
err := c.PutMany(blks) | |
for _, req := range reqs { | |
req.resp <- err | |
} | |
} | |
func (c *Coalescing) Put(b *blocks.Block) error { | |
pr := makePutReq(b) | |
select { | |
case c.puts <- pr: | |
select { | |
case err := <-pr.resp: | |
return err | |
case <-c.ctx.Done(): | |
return c.ctx.Err() | |
} | |
case <-c.ctx.Done(): | |
return c.ctx.Err() | |
} | |
} | |
var _ Blockstore = (*Coalescing)(nil) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment