Skip to content

Instantly share code, notes, and snippets.

@stackdump
Created January 15, 2020 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stackdump/c0c12177cbf2f38bb351c60cddb53651 to your computer and use it in GitHub Desktop.
Save stackdump/c0c12177cbf2f38bb351c60cddb53651 to your computer and use it in GitHub Desktop.
package pubsub_test
import (
"github.com/FactomProject/factomd/common/constants"
"github.com/FactomProject/factomd/common/interfaces"
"github.com/FactomProject/factomd/common/messages"
. "github.com/FactomProject/factomd/pubsub"
"testing"
"time"
)
func TestSubUnSub(t *testing.T) {
ResetGlobalRegistry()
nodeName := "FNode0"
type Module struct {
Path string
MsgOut IPublisher
MsgIn *SubChannel
Count int
}
p := &Module{Path: GetPath(nodeName, "test")}
p.MsgOut = PubFactory.Threaded(1).Publish(p.Path)
go p.MsgOut.Start()
p.MsgIn = SubFactory.Channel(1)
p.MsgIn.Subscribe(GetPath(nodeName, "test"))
exit := make(chan interface{})
drain := func() { // purge channel
for {
select {
case <-p.MsgIn.Updates:
// drain
default:
return
}
}
}
_ = drain
go func() { // Reader
timeOut := time.After(1 * time.Second)
for {
select {
case <-timeOut:
t.Log("Successfully unsubscribed")
// pass
close(exit)
return
case v := <-p.MsgIn.Updates:
m := v.(messages.Bounce)
p.Count += 1
t.Logf("%v: %v - %v", p.Count, m.Name, m.Number)
if m.Number < 0 {
p.MsgIn.Unsubscribe()
t.Logf("%v: unsubscribed len: %v", p.Count, len(p.MsgIn.Updates))
drain() // purge remaining updates
}
if p.Count > 10 {
t.Errorf("Failed to unsubscribe")
close(exit)
return
}
}
}
}()
go func() { // writer
var i int32 = 0
m := messages.Bounce{Name: "Test"}
for {
select {
case <-exit:
return
default:
i += 1
if i == 5 {
m.Number = -1 // send disconnect
} else {
m.Number = i
}
p.MsgOut.Write(m)
time.Sleep(time.Millisecond*50)
}
}
}()
<-exit
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment