There's a subtle bug here:
// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
ctx := childContext(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(proc, c.NumWorkers)
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
limiter.LimitedGo(func(proc process.Process) {
if err := w.exchange.HasBlock(ctx, block); err != nil {
log.Infof("blockservice worker error: %s", err)
}
})
}
}
})
It causes the panic: https://travis-ci.org/jbenet/go-ipfs/jobs/49011475#L344
The flow is this:
- w.process spawns child proc
- proc spawns child limiter
- proc selects on closing and next work item
- client calls w.process.Close(), which calls proc.Close(), which calls limiter.Close() limiter closes. proc waiting on function return to exit.
- select randomizes on channels, jumps into next work item part.
- tries to call limiter.Go() which is an error as limiter is now closed.
The problem is that limiter is a child or proc, and at the same time proc is depending on limiter's lifetime exceeding its own, as a child would (sort of a close cycle). if proc wants to continue using limiter, it must make sure limiter is alive until proc is done. proc adding limiter as a child will forward the kill signal immediately, and since limiter isnt waiting on proc, will die before proc is done.
The goprocess API doesnt make this problem easy to detect or fix. I wonder how can the API change to make this not a class of problems. One thought I had was make adding children to closed processes OK -- they just close immediately -- but this violates the WaitFor(...) semantics:
foo = goprocess.WithParent(goprocess.Background())
bar = goprocess.WithParent(goprocess.Background())
foo.Close()
foo.WaitFor(bar) <--- should be an error.
So, could be done only for AddChildNoWait
but making calls to
AddChildNoWait
and AddChild
callable at different times
may be worse. :/
Separately, the above code can be fixed two ways:
- defer-closing the limiter.
It no longer receives the close signal from proc until proc wont give it more
children. proc still waits for limiter to close before exiting (calls
limiter.Close()
which blocks until limiter is fully closed).
// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
ctx := childContext(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers)
defer limiter.Close()
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
limiter.LimitedGo(func(proc process.Process) {
if err := w.exchange.HasBlock(ctx, block); err != nil {
log.Infof("blockservice worker error: %s", err)
}
})
}
}
})
- define limiter outside
since limiter's lifetime should be as long or longer than proc, define rate limiter outside.
// reads from |workerChan| until process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
proc := w.process.Go(func(proc process.Process) {
ctx := childContext(proc) // shut down in-progress HasBlock when time to die
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
limiter.LimitedGo(func(proc process.Process) {
if err := w.exchange.HasBlock(ctx, block); err != nil {
log.Infof("blockservice worker error: %s", err)
}
})
}
}
})
limiter.AddChild(proc)
I dislike both of these and want to make the API clearer in this case.