Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@zelig
Last active April 27, 2018 08:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zelig/cad5b633438040840e61e2f61b3b73da to your computer and use it in GitHub Desktop.
Save zelig/cad5b633438040840e61e2f61b3b73da to your computer and use it in GitHub Desktop.
chunk refactor proposal
  • eliminate ChunkStore interface
  • eliminate dbapi, replace with netstore
  • netstore takes contexts
  • refactor chunker in its own package
  • localstore does nothing with requests just stores Chunk interface objects (this leaves open the possibility that requests are just stored together with chunks)
  • handleRetrieveRequest just uses netstore.Get
  • neetData uses HasStored()
  • chunk delivery loop uses netstore put
  • requesting peer communicated via context.value
type Address []byte
// LocalStore just implements a combined memory cache and disk Store
// it only stores values on disk if they are Marshalable
type LocalStore interface {
Get(ref Address) (Chunk, error)
Put(ch Chunk) (func() error, error)
}
// an interface for iteratable db store used by the syncer
type SyncerStore interface {
StorageCount(po int) int64
Iterator(po int8, from, to int64)
}
type Marshalable interface {
Marshal() ([]byte, error)
}
// Chunk interface implemented by requests and data chunks
type Chunk interface {
Address() Address
Data() ([]byte, error)
Span() ([]byte, error)
Marshal() []byte
}
type chunk struct {
addr Address
sdata []byte
data []byte
metadata []byte
span int64
}
func NewChunk(addr Address, data []byte) *Chunk {
return &chunk{
addr: addr,
sdata: data,
}
}
// Request is created when a chunk is not found locally
// it starts a process of fetching once and keeps it
// alive until all active requests complete
// either by the chunk delivered or all cancelled/timed out
type Request struct {
chunk *chunk
deliveredC chan struct{}
quitC chan struct{}
init sync.Once
wg sync.WaitGroup
// Deliver([]byte) error
}
func NewRequest() *Request {
return &Request{
deliveredC: make(chan struct{}),
quitC: make(chan struct{}),
}
}
// NetStore is the interface for
type NetStore interface {
Get(ctx context.Context, ref Address) (Chunk, func(ctx context.Context) (Chunk, error), error)
Put(ctx context.Context, ch Chunk) (func(ctx context.Context) error, error)
HasStored(ctx context.Context, ref Address) (func(context.Context) error, error)
}
// Get attempts at retrieving the chunk from LocalStore
// if it is not found, attempts at retrieving an existing requests
// if none exists, creates one and saves it in the requests cache
// From here on, all Get will hit on this request until the chunk is delivered
// or all request contexts are done
// it returns a chunk, a fetcher function and an error
// if chunk is nil, fetcher needs to be called with a context to return the chunk
func (n *NetStore) Get(ctx context.Context, ref Address) (Chunk, func(ctx context.Context) (Chunk, error), error) {
n.mu.Lock()
defer n.mu.Unlock()
chunk, err := n.LocalStore.Get(ctx, ref)
if err == nil {
return chunk, nil, nil
}
request, err := n.getOrCreateRequest(ref)
if err != nil {
return nil, nil, err
}
return nil, request.Run, nil
}
// getOrCreateRequest attempts at retrieving an existing requests
// if none exists, creates one and saves it in the requests cache
// caller must hold the lock
func (n *NetStore) getOrCreateRequest(ref Address) (*Request, error) {
r, err := n.requests.Get(ref)
if err == nil {
return r, err
}
r = NewRequest(n)
err = n.requests.Add(ref, r)
if err != nil {
return nil, err
}
return r, nil
}
// Run is a fetcher function to be called
// it launches the fetching only once by calling
// the retrieve function
func (r *Request) Run(ctx context.Context) (Chunk, error) {
r.wg.Add(1)
r.init.Do(func() { r.retrieve(ctx) })
defer r.wg.Done()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-r.ReqC:
return r.chunk, nil
}
}
// retrieve is called only once, it launches
// fetching by calling netstores retrieve function
// it keeps the request alive by rerequesting
// * after a search timeouted if request was successful
// * after retryInterval if request was unsuccessful
func (r *Request) retrieve(ctx context.Context) {
wait := time.NewTimer(0)
var quitC chan struct{}
var err error
// wait till all actual requests a closed
go func() {
r.wg.Wait()
close(r.quitC)
}()
// loop that keeps the request alive
go func() {
// remove the request from the cache when all requests
// contexts closed
defer func() {
r.netstore.requests.Remove(r.Ref)
}()
F:
for {
quitC, err = r.netstore.retrieve(ctx, r.ref)
if err != nil {
// retrieve error, wait before retry
wait.Reset(retryInterval)
} else {
// otherwise wait for response
wait.Reset(serchTimeout)
}
select {
case <-wait.C:
// search or retry timeout; rerequest
case <-quitC:
// requested downstream peer disconnected; rerequest
case <-r.quitC:
// all request context closed, can quit
break F
}
}
select {
case <-r.deliveredC:
default:
}
}()
}
// Put stores a chunk in localstore, manages the request for the chunk if exists
// by closing the ReqC channel
func (n *NetStore) Put(ctx context.Context, ch Chunk) (func(ctx context.Context) error, error) {
n.mu.Lock()
defer n.mu.Unlock()
defer func() {
r := n.requests.Get(ch.Ref)
if r != nil {
r.chunk = ch
close(r.deliveredC)
n.requests.Remove(ch.Ref)
}
}()
waitToStore, err := n.LocalStore.Put(ch)
if err != nil {
return nil, err
}
return waitToStore, nil
}
@gbalint
Copy link

gbalint commented Apr 27, 2018

Could this be in a separate package from other storage code? As far as I see it doesn't have to depend on storage, it just needs an interfaces for NetStore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment