Skip to content

Instantly share code, notes, and snippets.

@trusch
Created December 6, 2018 14:45
Show Gist options
  • Save trusch/0b807ecc84b7fd4a4f2b49e5622f70e0 to your computer and use it in GitHub Desktop.
Save trusch/0b807ecc84b7fd4a4f2b49e5622f70e0 to your computer and use it in GitHub Desktop.
package events
import (
"io"
"github.com/joomcode/errorx"
"github.com/nats-io/go-nats-streaming"
uuid "github.com/satori/go.uuid"
)
type controller struct {
natsClient stan.Conn
}
// NewController creates a new event controller
func NewController(natsAddr, clusterID string) (ControllerServer, error) {
cli, err := stan.Connect(clusterID, uuid.NewV4().String(), stan.NatsURL(natsAddr))
if err != nil {
return nil, errorx.Decorate(err, "failed to connect to nats")
}
return &controller{cli}, nil
}
// Subscribe subscribes a client to a stream
func (c *controller) Subscribe(req *SubscribeRequest, resp Controller_SubscribeServer) error {
done := make(chan error, 1)
s, err := c.natsClient.Subscribe(req.Channel, func(m *stan.Msg) {
evt := &Event{
Seq: m.Sequence,
Data: m.Data,
}
if err := resp.Send(evt); err != nil {
if err == io.EOF {
done <- nil
} else {
done <- err
}
return
}
}, stan.StartAtSequence(req.Seq))
if err != nil {
return err
}
defer s.Close()
select {
case <-resp.Context().Done():
return resp.Context().Err()
case err := <-done:
return err
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment