Created
January 18, 2018 01:48
-
-
Save IAD/f42cb383f046b24b31bf608882d99617 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package nats | |
import ( | |
"errors" | |
"fmt" | |
"reflect" | |
"strings" | |
"time" | |
"github.com/nats-io/go-nats" | |
) | |
func NewNats(c *nats.EncodedConn) *Nats { | |
return &Nats{ | |
c: c, | |
} | |
} | |
type Nats struct { | |
c *nats.EncodedConn | |
} | |
func (n *Nats) Publish(v interface{}) error { | |
t := reflect.TypeOf(v) | |
subject, err := toSubject(t) | |
if err != nil { | |
return err | |
} | |
return n.c.Publish(subject, v) | |
} | |
func (n *Nats) Reply(subject string, v interface{}) error { | |
//log.Printf("Reply Called with %s", spew.Sdump(v)) | |
return n.c.Publish(subject, v) | |
} | |
func (n *Nats) Request(v interface{}, vPtr interface{}, timeout time.Duration) error { | |
t := reflect.TypeOf(v) | |
subject, err := toSubject(t) | |
if err != nil { | |
return err | |
} | |
return n.c.Request(subject, v, vPtr, timeout) | |
} | |
func (n *Nats) Subscribe(cb nats.Handler) (*nats.Subscription, error) { | |
if cb == nil { | |
return nil, errors.New("handler required for EncodedConn Subscription") | |
} | |
cbType := reflect.TypeOf(cb) | |
if cbType.Kind() != reflect.Func { | |
return nil, errors.New("handler needs to be a func") | |
} | |
numArgs := cbType.NumIn() | |
if numArgs == 0 { | |
return nil, errors.New("handled require at least one argument") | |
} | |
target := cbType.In(numArgs - 1) | |
subject, err := toSubject(target) | |
if err != nil { | |
return nil, err | |
} | |
return n.c.Subscribe(subject, cb) | |
} | |
func (n *Nats) QueueSubscribe(queue string, cb nats.Handler) (*nats.Subscription, error) { | |
if cb == nil { | |
return nil, errors.New("handler required for EncodedConn Subscription") | |
} | |
cbType := reflect.TypeOf(cb) | |
if cbType.Kind() != reflect.Func { | |
return nil, errors.New("handler needs to be a func") | |
} | |
numArgs := cbType.NumIn() | |
if numArgs == 0 { | |
return nil, errors.New("handled require at least one argument") | |
} | |
target := cbType.In(numArgs - 1) | |
subject, err := toSubject(target) | |
if err != nil { | |
return nil, err | |
} | |
return n.c.QueueSubscribe(subject, queue, cb) | |
} | |
func toSubject(t reflect.Type) (string, error) { | |
target := fmt.Sprintf("Target :%+v", t) | |
if strings.Contains(target, "Command") { | |
return strings.Replace(fmt.Sprintf("command.%s", t), "*", "", -1), nil | |
} | |
if strings.Contains(target, "Event") { | |
return strings.Replace(fmt.Sprintf("event.%s", t), "*", "", -1), nil | |
} | |
return strings.Replace(fmt.Sprintf("data.%s", t), "*", "", -1), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment