Skip to content

Instantly share code, notes, and snippets.

@maiah
Last active April 7, 2016 05:41
Show Gist options
  • Save maiah/28652fd2b121ef2634e4b9911338bf44 to your computer and use it in GitHub Desktop.
Save maiah/28652fd2b121ef2634e4b9911338bf44 to your computer and use it in GitHub Desktop.
Syncing and Cancellable gophers
package main
import (
"errors"
"sync"
"time"
)
func main() {
sm := NewSyncedMessage("abc")
f1 := NewFilter()
f1.Do = func() {
resultCh := make(chan string)
go func() {
resultCh <- "def"
}()
select {
case result := <-resultCh:
sm.Update(func(message string) (string, error) {
return message + result, nil
})
case <-f1.CancelCh:
}
close(resultCh)
f1.Done = true
}
f2 := NewFilter()
f2.Do = func() {
resultCh := make(chan string)
go func() {
time.Sleep(1 * time.Millisecond) // simulate delay
resultCh <- "ghi"
}()
select {
case <-resultCh:
sm.Update(func(message string) (string, error) {
//return message + result, nil
return message, errors.New("ERROR ON GHI") // Simulate error
})
case <-f2.CancelCh:
}
close(resultCh)
f2.Done = true
}
f3 := NewFilter()
f3.Do = func() {
resultCh := make(chan string)
go func() {
time.Sleep(2 * time.Millisecond) // simulate delay
resultCh <- "jkl"
}()
select {
case result := <-resultCh:
sm.Update(func(message string) (string, error) {
return message + result, nil
})
case <-f3.CancelCh:
}
close(resultCh)
f3.Done = true
}
result, err := Process(sm, f1, f2, f3)
println("Final result is " + result)
if err != nil {
println("Got error: " + err.Error())
}
}
type MessageResult struct {
message string
err error
}
type SyncedMessage struct {
message string
mtx *sync.Mutex
resultCh chan MessageResult
}
func NewSyncedMessage(msg string) *SyncedMessage {
return &SyncedMessage{
message: msg,
mtx: &sync.Mutex{},
resultCh: make(chan MessageResult),
}
}
func (sm *SyncedMessage) Update(fn func(string) (string, error)) {
sm.mtx.Lock()
result, err := fn(sm.message)
sm.message = result
sm.resultCh <- MessageResult{result, err}
sm.mtx.Unlock()
}
type Filter struct {
Do func()
Done bool
CancelCh chan bool
}
func NewFilter() *Filter {
return &Filter{CancelCh: make(chan bool)}
}
func Process(sm *SyncedMessage, filters ...*Filter) (string, error) {
for _, f := range filters {
go f.Do()
}
var finalResult MessageResult
count := 0
for result := range sm.resultCh {
finalResult = result
count++
if result.err == nil {
if count == len(filters) {
break // Exit loop
}
} else {
for _, filter := range filters {
if !filter.Done {
filter.CancelCh <- true
}
}
break // Exit loop
}
}
return finalResult.message, finalResult.err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment