Skip to content

Instantly share code, notes, and snippets.

@supershabam
Last active December 30, 2015 08:17
Show Gist options
  • Save supershabam/6a2222556ffd2ae1a381 to your computer and use it in GitHub Desktop.
Save supershabam/6a2222556ffd2ae1a381 to your computer and use it in GitHub Desktop.
package main
import "fmt"
import "sync"
import "time"
import "log"
type IDMesg struct {
ID string
Mesg string
}
type Prog struct {
Done bool
Timer *time.Timer
In chan IDMesg
Out chan IDMesg
Ch map[string]chan string
M sync.RWMutex
}
func MakeProg() *Prog {
return &Prog{
Timer: time.NewTimer(time.Millisecond),
In: make(chan IDMesg),
Out: make(chan IDMesg),
Ch: make(map[string]chan string),
}
}
func (p Prog) Connect(id string, in <-chan string) (<-chan IDMesg, error) {
p.M.Lock()
defer p.M.Unlock()
p.Timer.Reset(time.Hour * 8760)
// TODO error if program has terminated and we're attempting to connect a new channel
if p.Done {
return nil, fmt.Errorf("program has terminated")
}
if _, ok := p.Ch[id]; ok {
return nil, fmt.Errorf("id already connected")
}
ch := make(chan string, 0)
p.Ch[id] = ch
out := make(chan IDMesg)
done := make(chan struct{})
var once sync.Once
go func() {
wg := sync.WaitGroup{}
wg.Add(2)
// drain in
go func() {
defer wg.Done()
for {
select {
case <-done:
return
case m, ok := <-in:
if !ok {
// caller disconnected
once.Do(func() {
close(done)
})
return
}
select {
case <-done:
return
case p.In <- IDMesg{
ID: id,
Mesg: m,
}:
}
}
}
}()
// drain ch
go func() {
defer wg.Done()
for {
select {
case <-done:
return
case m, ok := <-ch:
if !ok {
// caller disconnected
once.Do(func() {
close(done)
})
return
}
select {
case <-done:
return
case out <- IDMesg{
ID: id,
Mesg: m,
}:
}
}
}
}()
wg.Wait()
p.M.Lock()
close(p.Ch[id])
delete(p.Ch, id)
// setup cleanup timer if you're the last one out
if len(p.Ch) == 0 {
p.Timer.Reset(time.Second)
}
p.M.Unlock()
close(out)
}()
return out, nil
}
func (p Prog) Run() error {
go func() {
p.Out <- IDMesg{ID: "1", Mesg: "HI"}
p.Out <- IDMesg{ID: "2", Mesg: "HI"}
p.Out <- IDMesg{ID: "1", Mesg: "HI2"}
close(p.Out)
}()
go func() {
for m := range p.Out {
p.M.RLock()
if ch, ok := p.Ch[m.ID]; ok {
select {
case ch <- m.Mesg:
default: // ch buffer is full - let's handle it
select {
case drop := <-ch: // pull one item off front of ch
log.Printf("dropping message (%s) because ch is busy", drop)
ch <- m.Mesg // we are the only goroutine writing into ch, so we won't block here
default: // ch buffer self-drained and didn't have an item to pop off the front
}
}
}
p.M.RUnlock()
}
}()
for {
select {
case m, ok := <-p.In:
if !ok {
return nil
}
fmt.Printf("writing to program: %+v\n", m)
case <-p.Timer.C:
p.M.Lock()
p.Done = true
close(p.In)
p.M.Unlock()
return nil
}
}
return nil
}
func main() {
p := MakeProg()
go func() {
time.Sleep(time.Millisecond * 20)
in := make(chan string)
go func() {
in <- "hi there"
in <- "don't do that"
close(in)
}()
out, err := p.Connect("1", in)
if err != nil {
log.Fatal(err)
}
for m := range out {
fmt.Printf("receiving from program: %+v\n", m)
}
}()
go func() {
time.Sleep(time.Millisecond * 500)
in := make(chan string)
go func() {
in <- "hi there"
in <- "don't do that"
close(in)
}()
out, err := p.Connect("2", in)
if err != nil {
log.Fatal(err)
}
for m := range out {
fmt.Printf("receiving from program: %+v\n", m)
}
}()
p.Run()
fmt.Println("Hello, playground")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment