Created
January 18, 2018 01:49
-
-
Save IAD/bae0d935cbd856f1fa189df118f65d00 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package nats | |
import ( | |
"errors" | |
"fmt" | |
"reflect" | |
"strings" | |
"time" | |
"github.com/nats-io/go-nats" | |
) | |
func NewNats(c *nats.EncodedConn) *Nats { | |
return &Nats{ | |
c: c, | |
} | |
} | |
type Nats struct { | |
c *nats.EncodedConn | |
} | |
func (n *Nats) Publish(v interface{}) error { | |
t := reflect.TypeOf(v) | |
subject, err := toSubject(t) | |
if err != nil { | |
return err | |
} | |
return n.c.Publish(subject, v) | |
} | |
func (n *Nats) Reply(subject string, v interface{}) error { | |
//log.Printf("Reply Called with %s", spew.Sdump(v)) | |
return n.c.Publish(subject, v) | |
} | |
func (n *Nats) Request(v interface{}, vPtr interface{}, timeout time.Duration) error { | |
t := reflect.TypeOf(v) | |
subject, err := toSubject(t) | |
if err != nil { | |
return err | |
} | |
return n.c.Request(subject, v, vPtr, timeout) | |
} | |
func (n *Nats) Subscribe(cb nats.Handler) (*nats.Subscription, error) { | |
if cb == nil { | |
return nil, errors.New("handler required for EncodedConn Subscription") | |
} | |
cbType := reflect.TypeOf(cb) | |
if cbType.Kind() != reflect.Func { | |
return nil, errors.New("handler needs to be a func") | |
} | |
numArgs := cbType.NumIn() | |
if numArgs == 0 { | |
return nil, errors.New("handled require at least one argument") | |
} | |
target := cbType.In(numArgs - 1) | |
subject, err := toSubject(target) | |
if err != nil { | |
return nil, err | |
} | |
return n.c.Subscribe(subject, cb) | |
} | |
func (n *Nats) QueueSubscribe(queue string, cb nats.Handler) (*nats.Subscription, error) { | |
if cb == nil { | |
return nil, errors.New("handler required for EncodedConn Subscription") | |
} | |
cbType := reflect.TypeOf(cb) | |
if cbType.Kind() != reflect.Func { | |
return nil, errors.New("handler needs to be a func") | |
} | |
numArgs := cbType.NumIn() | |
if numArgs == 0 { | |
return nil, errors.New("handled require at least one argument") | |
} | |
target := cbType.In(numArgs - 1) | |
subject, err := toSubject(target) | |
if err != nil { | |
return nil, err | |
} | |
return n.c.QueueSubscribe(subject, queue, cb) | |
} | |
func toSubject(t reflect.Type) (string, error) { | |
target := fmt.Sprintf("Target :%+v", t) | |
if strings.Contains(target, "Command") { | |
return strings.Replace(fmt.Sprintf("command.%s", t), "*", "", -1), nil | |
} | |
if strings.Contains(target, "Event") { | |
return strings.Replace(fmt.Sprintf("event.%s", t), "*", "", -1), nil | |
} | |
return strings.Replace(fmt.Sprintf("data.%s", t), "*", "", -1), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment