Skip to content

Instantly share code, notes, and snippets.

@daholino
Created August 8, 2023 21:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save daholino/f3c4e2ddaf15e03aa4a2a22d1228a8c5 to your computer and use it in GitHub Desktop.
Save daholino/f3c4e2ddaf15e03aa4a2a22d1228a8c5 to your computer and use it in GitHub Desktop.
Non-blocking sequential processing in Go, infinite (unbounded) buffered channel
package executor
type ExecHandler[T any] func(T)
type Executor[T any] struct {
reader chan T
writer chan T
buffer []T
execHandler ExecHandler[T]
}
func New[T any](execHandler ExecHandler[T]) *Executor[T] {
e := &Executor[T]{
reader: make(chan T),
writer: make(chan T),
buffer: make([]T, 0),
execHandler: execHandler,
}
go e.run()
return e
}
func (e *Executor[T]) Dispatch(data T) {
e.writer <- data
}
func (e *Executor[T]) run() {
go e.listenForReading()
for {
if len(e.buffer) > 0 {
select {
case e.reader <- e.buffer[0]:
e.buffer = e.buffer[1:]
case data := <-e.writer:
e.buffer = append(e.buffer, data)
}
} else {
data := <-e.writer
e.buffer = append(e.buffer, data)
}
}
}
func (e *Executor[T]) listenForReading() {
for data := range e.reader {
e.execHandler(data)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment