Skip to content

Instantly share code, notes, and snippets.

@IAD
Created January 18, 2018 01:48
Show Gist options
  • Save IAD/f42cb383f046b24b31bf608882d99617 to your computer and use it in GitHub Desktop.
Save IAD/f42cb383f046b24b31bf608882d99617 to your computer and use it in GitHub Desktop.
package nats
import (
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/nats-io/go-nats"
)
func NewNats(c *nats.EncodedConn) *Nats {
return &Nats{
c: c,
}
}
type Nats struct {
c *nats.EncodedConn
}
func (n *Nats) Publish(v interface{}) error {
t := reflect.TypeOf(v)
subject, err := toSubject(t)
if err != nil {
return err
}
return n.c.Publish(subject, v)
}
func (n *Nats) Reply(subject string, v interface{}) error {
//log.Printf("Reply Called with %s", spew.Sdump(v))
return n.c.Publish(subject, v)
}
func (n *Nats) Request(v interface{}, vPtr interface{}, timeout time.Duration) error {
t := reflect.TypeOf(v)
subject, err := toSubject(t)
if err != nil {
return err
}
return n.c.Request(subject, v, vPtr, timeout)
}
func (n *Nats) Subscribe(cb nats.Handler) (*nats.Subscription, error) {
if cb == nil {
return nil, errors.New("handler required for EncodedConn Subscription")
}
cbType := reflect.TypeOf(cb)
if cbType.Kind() != reflect.Func {
return nil, errors.New("handler needs to be a func")
}
numArgs := cbType.NumIn()
if numArgs == 0 {
return nil, errors.New("handled require at least one argument")
}
target := cbType.In(numArgs - 1)
subject, err := toSubject(target)
if err != nil {
return nil, err
}
return n.c.Subscribe(subject, cb)
}
func (n *Nats) QueueSubscribe(queue string, cb nats.Handler) (*nats.Subscription, error) {
if cb == nil {
return nil, errors.New("handler required for EncodedConn Subscription")
}
cbType := reflect.TypeOf(cb)
if cbType.Kind() != reflect.Func {
return nil, errors.New("handler needs to be a func")
}
numArgs := cbType.NumIn()
if numArgs == 0 {
return nil, errors.New("handled require at least one argument")
}
target := cbType.In(numArgs - 1)
subject, err := toSubject(target)
if err != nil {
return nil, err
}
return n.c.QueueSubscribe(subject, queue, cb)
}
func toSubject(t reflect.Type) (string, error) {
target := fmt.Sprintf("Target :%+v", t)
if strings.Contains(target, "Command") {
return strings.Replace(fmt.Sprintf("command.%s", t), "*", "", -1), nil
}
if strings.Contains(target, "Event") {
return strings.Replace(fmt.Sprintf("event.%s", t), "*", "", -1), nil
}
return strings.Replace(fmt.Sprintf("data.%s", t), "*", "", -1), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment