Skip to content

Instantly share code, notes, and snippets.

@whyrusleeping
Created August 11, 2015 17:52
Show Gist options
  • Save whyrusleeping/323304f8113183841909 to your computer and use it in GitHub Desktop.
Save whyrusleeping/323304f8113183841909 to your computer and use it in GitHub Desktop.
package blockstore
import (
"time"
blocks "github.com/ipfs/go-ipfs/blocks"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
const longTimeout = time.Second * 4
const shortTimeout = time.Millisecond * 500
const maxBatchSize = 30
type Coalescing struct {
Blockstore
puts chan *putReq
ctx context.Context
}
func NewCoalescing(ctx context.Context, bs Blockstore) *Coalescing {
c := &Coalescing{
Blockstore: bs,
puts: make(chan *putReq),
ctx: ctx,
}
return c
}
type putReq struct {
blk *blocks.Block
resp chan error
}
func makePutReq(blk *blocks.Block) *putReq {
return &putReq{
blk: blk,
resp: make(chan error, 1),
}
}
func (c *Coalescing) startCoalescer() {
var reqs []*putReq
var long <-chan time.Time
var short <-chan time.Time
for {
select {
case pr := <-c.puts:
if long == nil {
long = time.After(longTimeout)
}
short = time.After(shortTimeout)
reqs = append(reqs, pr)
if len(reqs) > maxBatchSize {
short = time.After(0)
}
case <-long:
c.putBlocks(reqs)
reqs = nil
long = nil
short = nil
case <-short:
c.putBlocks(reqs)
reqs = nil
long = nil
short = nil
case <-c.ctx.Done():
c.putBlocks(reqs)
return
}
}
}
func (c *Coalescing) putBlocks(reqs []*putReq) {
blks := make([]*blocks.Block, 0, len(reqs))
for _, req := range reqs {
blks = append(blks, req.blk)
}
err := c.PutMany(blks)
for _, req := range reqs {
req.resp <- err
}
}
func (c *Coalescing) Put(b *blocks.Block) error {
pr := makePutReq(b)
select {
case c.puts <- pr:
select {
case err := <-pr.resp:
return err
case <-c.ctx.Done():
return c.ctx.Err()
}
case <-c.ctx.Done():
return c.ctx.Err()
}
}
var _ Blockstore = (*Coalescing)(nil)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment