Skip to content

Instantly share code, notes, and snippets.

@arkadijs
Last active May 5, 2023 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arkadijs/172d9bda4e0d10a530add1faef942852 to your computer and use it in GitHub Desktop.
Save arkadijs/172d9bda4e0d10a530add1faef942852 to your computer and use it in GitHub Desktop.
Streaming Writer-based pipeline in Go
package main
import (
"fmt"
"io"
"strings"
)
const rate = 2 // bytes per millisec of "audio"
type Anon struct {
Start, End int
}
type Writer func(timestamp int, bytes []byte)
func overlap(start, end, timestamp int, bytes []byte) bool {
endTimestamp := timestamp + len(bytes)/rate
return (timestamp >= start && timestamp <= end) ||
(endTimestamp >= start && endTimestamp <= end) ||
(start >= timestamp && start <= endTimestamp) ||
(end >= timestamp && end <= endTimestamp)
}
func NewAnonymizator(start, end int, next Writer) Writer {
anonymize := func(timestamp int, bytes []byte) {
fmt.Printf("anon start=%d end=%d: ts=%02d endTs=%02d %s\n",
start, end, timestamp, timestamp+len(bytes)/rate-1, bytes)
if overlap(start, end, timestamp, bytes) {
// proper slice split logic instead
for i := range bytes {
currentTimestamp := timestamp + i/rate
if currentTimestamp >= start && currentTimestamp <= end {
bytes[i] = '_'
}
}
// ...
}
next(timestamp, bytes)
}
return anonymize
}
func NewEncoder(next Writer) Writer {
passthru := func(timestamp int, bytes []byte) {
next(timestamp, bytes)
}
return passthru
}
func upload(body io.Reader) {
buf := make([]byte, 5)
for {
n, err := body.Read(buf)
if n > 0 {
fmt.Printf("upload: len=%d %s\n", n, buf[:n])
}
if err != nil {
break
}
}
}
type WriterReaderBridge struct {
ch chan []byte
tail []byte
}
func NewWriterReaderBridge(ch chan []byte) *WriterReaderBridge {
return &WriterReaderBridge{ch, nil}
}
func (bridge *WriterReaderBridge) Read(out []byte) (int, error) {
if len(bridge.tail) > 0 {
read := copy(out, bridge.tail)
if read >= len(bridge.tail) {
bridge.tail = nil
} else {
bridge.tail = bridge.tail[read:]
}
return read, nil
}
chunk := <-bridge.ch
read := copy(out, chunk)
if read < len(chunk) {
bridge.tail = chunk[read:]
}
return read, nil
}
func NewUploader() Writer {
ch := make(chan []byte)
writer := func(_ int, bytes []byte) {
ch <- bytes
}
go upload(NewWriterReaderBridge(ch))
return writer
}
type IoWriterAdapter struct {
writer Writer
timestamp int
}
func (adapter *IoWriterAdapter) Write(p []byte) (n int, err error) {
adapter.writer(adapter.timestamp, p)
adapter.timestamp += len(p) / rate
return len(p), nil
}
func NewIoWriterAdapter(writer Writer) *IoWriterAdapter {
return &IoWriterAdapter{writer, 0}
}
func main() {
anon := []Anon{
{2, 3},
{7, 9},
}
pipeline := NewEncoder(NewUploader())
// anonymization doesn't care if it is applied in reverse order
for _, a := range anon {
pipeline = NewAnonymizator(a.Start, a.End, pipeline)
}
writer := NewIoWriterAdapter(pipeline)
io.Copy(writer, strings.NewReader("0123456789"))
io.Copy(writer, strings.NewReader("abcdefghijklmnopqrstxyz"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment