Skip to content

Instantly share code, notes, and snippets.

@asterite3
Created December 20, 2020 10:29
Show Gist options
  • Save asterite3/95f5f7adabce0e60526de946c953f158 to your computer and use it in GitHub Desktop.
Save asterite3/95f5f7adabce0e60526de946c953f158 to your computer and use it in GitHub Desktop.
Unbuffered with CleanContext with a second struct
package broadcaster
import (
"context"
"io"
"sync"
"sync/atomic"
)
// Unbuffered accumulates multiple io.WriteCloser by stream.
type Unbuffered struct {
mu sync.Mutex
writers writers
}
// Add adds new io.WriteCloser.
func (w *Unbuffered) Add(writer io.WriteCloser) {
w.mu.Lock()
w.writers.add(writer)
w.mu.Unlock()
}
// Write writes bytes to all writers. Failed writers will be evicted during
// this call.
func (w *Unbuffered) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
writers := w.writers.get()
if writers == nil {
return
}
var evict []int
for i, sw := range *writers {
if n, err := sw.Write(p); err != nil || n != len(p) {
// On error, evict the writer
evict = append(evict, i)
}
}
w.writers.mu.Lock()
for n, i := range evict {
*writers = append((*writers)[:i-n], (*writers)[i-n+1:]...)
}
w.writers.mu.Unlock()
return len(p), nil
}
// Clean closes and removes all writers. Last non-eol-terminated part of data
// will be saved.
func (w *Unbuffered) Clean() error {
w.mu.Lock()
w.writers.clean()
w.mu.Unlock()
return nil
}
// CleanContext closes and removes all writers.
// CleanContext supports timeouts via the context to unblock and forcefully
// close the io streams. This function should only be used if all writers
// added to Unbuffered support concurrent calls to Close and Write: it will
// call Close while Write may be in progress in order to forcefully close
// writers
func (w *Unbuffered) CleanContext(ctx context.Context) error {
var cleaningUnderway int32 = 0
regularCleanDone := make(chan struct{}, 1)
go func() {
defer close(regularCleanDone)
w.mu.Lock()
defer w.mu.Unlock()
if !atomic.CompareAndSwapInt32(&cleaningUnderway, 0, 1) {
return
}
w.writers.clean()
}()
select {
case <-regularCleanDone:
return nil
case <-ctx.Done():
}
if !atomic.CompareAndSwapInt32(&cleaningUnderway, 0, 1) {
return nil
}
w.writers.clean()
return nil
}
type writers struct {
mu sync.Mutex
writers *[]io.WriteCloser
}
func (w *writers) add(writer io.WriteCloser) {
w.mu.Lock()
if w.writers == nil {
w.writers = new([]io.WriteCloser)
}
*w.writers = append(*w.writers, writer)
w.mu.Unlock()
}
func (w *writers) get() *[]io.WriteCloser {
w.mu.Lock()
defer w.mu.Unlock()
return w.writers
}
func (w *writers) clean() {
w.mu.Lock()
if w.writers == nil {
return
}
for _, sw := range *w.writers {
sw.Close()
}
w.writers = nil
w.mu.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment