Skip to content

Instantly share code, notes, and snippets.

@bruth
Created June 19, 2017 17:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bruth/cec8397fb35d69b6d85446414a402d5f to your computer and use it in GitHub Desktop.
Save bruth/cec8397fb35d69b6d85446414a402d5f to your computer and use it in GitHub Desktop.

Transport

This is a Go library that provides a thin, but opinionated abstraction over the go-nats API. The use case is for writing services that use NATS as a transport layer.

The NATS API expects a slice of bytes as the representation of a message. In general, it is often necessary to standardize on a serialization format to simplify designed and interacting with messages.

This library standardizes on Protocol Buffers as the message serialization format and it provides a few conveniences when working with Protobuf messages.

The two main value-add features this library provides are the Transport interface and implementation and the Message type.

The Transport interface describes a set of API methods that take Protobuf messages rather than byte slices. Being an interface, it enables implementing wrapper middleware to the API itself for the purpose of instrumentation. For example:

// Declare a transport and initialize the core implementation using an
// existing NATS connection.
var tp transport.Transport
tp = transport.New(nc)

// Define a struct that satisfies the Transport interface and takes another
// Transport value to wrap.
type timerTransport struct {
  tp transport.Transport
}

func (t *timerTransport) Request(sub string, req proto.Message, rep proto.Message) (*transport.Message, error) {
  t0 := time.Now()
  msg, err := t.tp.Request(sub, req, rep)
  log.Printf(time.Now().Sub(t0))
  return msg, err
}

// Implement remaining methods..

// Wrap the base transport.
tp = &timerTransport{tp}

// Using this wrapped transport will now automatically log the duration of the call.
tp.Request(...)

In the Request method shown above, a transport.Message value is returned. Message is a Protobuf message which wraps all messages sent through the API. It annotates the message with additional metadata such as:

  • id - a unique message ID.
  • timestamp - timestamp in nanoseconds.
  • cause - the causal upstream message.
  • subject - the subject of the message.
  • reply - the reply subject of a request message.
  • queue - the queue that handled the message.
  • error - a handling error if one occurred.

This provides additional metadata on the message which can be useful for logging or instrumentation.

Quickstart

To initialize a new transport client, use either transport.Connect with NATS options or transport.New with an existing *nats.Conn value.

tp, err := transport.Connect(&nats.Options{
  Url: "nats://localhost:4222",
})
defer tp.Close()

Deferring the close ensures all subscriptions are stopped and the connection is closed.

There are two ways to publish messages. The first is Publish, the standard "fire and forget" broadcast to all subscribers of the message subject.

val := pb.Value{ ... }
msg, err := tp.Publish("query.sink", &val)

The payload must implement proto.Message, meaning that is must be a Protobuf message.

The second way to publish messages is Request, which waits for a reply so it can return response data to the client.

// Request message to send.
req := pb.Request{ ... }

// The protobuf message to decode the reply into.
var rep pb.Reply

msg, err := tp.Request("query.execute", &req, &rep)

Here's how to subscribe to a subject using Subscribe.

// Define the handler.
hdlr := func(msg *transport.Message) (proto.Message, error) {
  // Decode message payload into local request value.
  var req pb.Request
  if err := msg.Decode(&req); err != nil {
    return nil, err
  }
  // Do things..
  return &pb.Reply{ ... }, nil
}

// Subscribe the handler.
_, err := c.Subscribe("query.execute", hdlr)
package transport
import (
"errors"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/nats-io/go-nats"
"github.com/nats-io/nuid"
"go.uber.org/zap"
)
var (
DefaultRequestTimeout = 2 * time.Second
)
// PublishOptions are options for a publication.
type PublishOptions struct {
Cause string
}
type PublishOption func(*PublishOptions)
// PublishCause sets the cause of the publication.
func PublishCause(s string) PublishOption {
return func(o *PublishOptions) {
o.Cause = s
}
}
// RequestOptions are options for a publication.
type RequestOptions struct {
Cause string
Timeout time.Duration
}
type RequestOption func(*RequestOptions)
// RequestTimeout sets a request timeout duration.
func RequestTimeout(t time.Duration) RequestOption {
return func(o *RequestOptions) {
o.Timeout = t
}
}
// RequestCause sets the cause of the request.
func RequestCause(s string) RequestOption {
return func(o *RequestOptions) {
o.Cause = s
}
}
// SubscribeOptions are options for a subscriber.
type SubscribeOptions struct {
Queue string
}
type SubscribeOption func(*SubscribeOptions)
// SubscribeQueue specifies the queue name of the subscriber.
func SubscribeQueue(q string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Queue = q
}
}
// Decode decodes the message payload into a proto message.
func (m *Message) Decode(pb proto.Message) error {
return proto.Unmarshal(m.Payload, pb)
}
// Handler is the handler used by a subscriber. The return value may be nil if
// no output is yielded. If this is a request, the reply will be sent automatically
// with the reply value or an error if one occurred. If a reply is not expected
// and error occurs, it will be logged.
type Handler func(msg *Message) (proto.Message, error)
// Transport describes the interface
type Transport interface {
// Publish publishes a message asynchronously to the specified subject.
// The wrapped message is returned or an error. The error would only be due to
// a connection issue, but does not reflect any consumer error.
Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error)
// Request publishes a message synchronously and waits for a response that
// is decoded into the Protobuf message supplied. The wrapped message is
// returned or an error. The error could be a connection error, timeout,
// or a consumer error.
Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error)
// Subscribe creates a subscription to a subject.
Subscribe(sub string, hdl Handler, opts ...SubscribeOption) (*nats.Subscription, error)
// Conn returns the underlying NATS connection.
Conn() *nats.Conn
// Close closes the transport connection and unsubscribes all subscribers.
Close()
// Set the logger.
SetLogger(*zap.Logger)
}
// Connect is a convenience function establishing a connection with
// NATS and returning a transport.
func Connect(opts *nats.Options) (Transport, error) {
conn, err := opts.Connect()
if err != nil {
return nil, err
}
logger, _ := zap.NewProduction()
return &transport{
logger: logger,
conn: conn,
}, nil
}
// New returns a transport using an existing NATS connection.
func New(conn *nats.Conn) Transport {
logger, _ := zap.NewProduction()
return &transport{
logger: logger,
conn: conn,
}
}
type transport struct {
logger *zap.Logger
conn *nats.Conn
subs []*nats.Subscription
mux sync.Mutex
}
func (c *transport) SetLogger(l *zap.Logger) {
c.logger = l
}
func (c *transport) Conn() *nats.Conn {
return c.conn
}
func (c *transport) Close() {
for _, sub := range c.subs {
sub.Unsubscribe()
}
c.conn.Close()
}
func (c *transport) wrap(payload proto.Message) (*Message, error) {
var (
pb []byte
err error
)
if payload != nil {
pb, err = proto.Marshal(payload)
if err != nil {
return nil, err
}
}
id := nuid.Next()
ts := time.Now().UnixNano()
msg := Message{
Id: id,
Timestamp: uint64(ts),
Payload: pb,
}
return &msg, nil
}
func (c *transport) unwrap(nmsg *nats.Msg) (*Message, error) {
var msg Message
if err := proto.Unmarshal(nmsg.Data, &msg); err != nil {
return nil, err
}
msg.Subject = nmsg.Subject
msg.Reply = nmsg.Reply
msg.Queue = nmsg.Sub.Queue
return &msg, nil
}
func (c *transport) Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error) {
pubOpts := &PublishOptions{}
// Apply options.
for _, opt := range opts {
opt(pubOpts)
}
m, err := c.wrap(msg)
if err != nil {
return nil, err
}
m.Subject = sub
m.Cause = pubOpts.Cause
mb, err := proto.Marshal(m)
if err != nil {
return nil, err
}
if err := c.conn.Publish(sub, mb); err != nil {
return nil, err
}
return m, nil
}
func (c *transport) Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error) {
reqOpts := &RequestOptions{
Timeout: DefaultRequestTimeout,
}
// Apply options.
for _, opt := range opts {
opt(reqOpts)
}
m, err := c.wrap(req)
if err != nil {
return nil, err
}
m.Subject = sub
m.Cause = reqOpts.Cause
mb, err := proto.Marshal(m)
if err != nil {
return nil, err
}
nm, err := c.conn.Request(sub, mb, reqOpts.Timeout)
if err != nil {
return nil, err
}
m, err = c.unwrap(nm)
if err != nil {
return nil, err
}
// Error occurred in the handler.
if m.Error != "" {
return nil, errors.New(m.Error)
}
if rep != nil {
if err := proto.Unmarshal(m.Payload, rep); err != nil {
return nil, err
}
}
return m, nil
}
// Subscribe creates a subscription to a subject.
func (c *transport) Subscribe(sub string, hdlr Handler, opts ...SubscribeOption) (*nats.Subscription, error) {
subOpts := &SubscribeOptions{}
// Apply options.
for _, opt := range opts {
opt(subOpts)
}
// Replies to the recipient with an error if applicable.
replyWithError := func(logger *zap.Logger, msg *Message, srcErr error) {
rmsg, err := c.wrap(nil)
// If an error occurs, this is a bug since this only relies on the local
// Message protobuf definition.
if err != nil {
logger.Error("failed to create transport message",
zap.Error(err),
)
return
}
rmsg.Cause = msg.Id
rmsg.Subject = msg.Reply
rmsg.Error = srcErr.Error()
mb, err := proto.Marshal(rmsg)
if err != nil {
logger.Error("failed to marshal transport message",
zap.Error(err),
)
return
}
if err := c.conn.Publish(msg.Reply, mb); err != nil {
logger.Error("failed to publish nats message",
zap.Error(err),
)
}
}
// NATS message handler.
natsHandler := func(nmsg *nats.Msg) {
// Copy logger for this request.
logger := c.logger.With(
zap.String("msg.subject", nmsg.Subject),
zap.String("msg.reply", nmsg.Reply),
)
msg, err := c.unwrap(nmsg)
// Failed unwrap which means the message is likely in the wrong format.
// A reply is ignored if this occurs since if the sent message was invalid
// it is unlikely the requester will be able to parse the message in the
// same format. Instead we log this case.
if err != nil {
logger.Error("failed to decode nats message")
return
}
// Add more context.
logger = c.logger.With(
zap.String("trace.id", msg.Id),
zap.String("msg.id", msg.Id),
zap.String("msg.cause", msg.Cause),
)
// In case the handler panics, catch and log.
defer func() {
if rec := recover(); rec != nil {
err := fmt.Errorf("recovered subscription handler panic:\n%s", rec)
if msg.Reply == "" {
logger.Error("subscription handler panic",
zap.Error(err),
)
return
}
replyWithError(logger, msg, err)
}
}()
// Pass to handler.
resp, err := hdlr(msg)
// Log error only if no reply.
if msg.Reply == "" {
if err != nil {
logger.Error("subscription handler error",
zap.Error(err),
)
}
return
}
// Error occured while handling message. We assume this is an error in the
// business logic and will reply with the error. The handler itself should
// have logged the error if it occurred since it can provide context.
if err != nil {
replyWithError(logger, msg, err)
return
}
// This will only fail if the response itself cannot be marshaled which
// means the handler is likely at fault. The error should be logged
// and a reply with a faulty handler can be returned.
rmsg, err := c.wrap(resp)
if err != nil {
logger.Error("failed to marshal response message",
zap.Error(err),
)
replyWithError(logger, msg, err)
return
}
rmsg.Cause = msg.Id
rmsg.Subject = msg.Reply
// Bug.
mb, err := proto.Marshal(rmsg)
if err != nil {
logger.Error("failed to marshal transport message",
zap.Error(err),
)
return
}
if err := c.conn.Publish(msg.Reply, mb); err != nil {
logger.Error("failed to publish nats message",
zap.Error(err),
)
}
}
// Queue-based subscriber.
if subOpts.Queue != "" {
s, err := c.conn.QueueSubscribe(sub, subOpts.Queue, natsHandler)
if err != nil {
return nil, err
}
c.mux.Lock()
c.subs = append(c.subs, s)
c.mux.Unlock()
return s, nil
}
// Standalone subscriber.
s, err := c.conn.Subscribe(sub, natsHandler)
if err != nil {
return nil, err
}
c.mux.Lock()
c.subs = append(c.subs, s)
c.mux.Unlock()
return s, nil
}
syntax = "proto3";
// Message is the envelope/wrapper for all messages.
message Message {
// ID is a globally unique message.
string id = 1;
// Timestamp is the timestamp in nanoseconds with the message was published.
uint64 timestamp = 2;
// Payload is the actual payload being published that will be consumed.
// This is expected to use protobuf encoding.
bytes payload = 3;
// Error will be populated if an error occurs within a subscription handler.
string error = 4;
// Cause is the ID of a message that resulted in this message being published.
string cause = 5;
// Subject is the subject the message is published.
string subject = 6;
// Queue is the queue this message was received by.
string queue = 7;
// Reply is the inbox ID of the publisher of this message.
string reply = 8;
}
package transport
import (
"log"
"os"
"testing"
"github.com/golang/protobuf/proto"
"github.com/nats-io/go-nats"
"github.com/nats-io/nuid"
)
func newTransport(t testing.TB) Transport {
natsAddr := os.Getenv("NATS_ADDR")
if natsAddr == "" {
t.Fatal("no nats address provided")
}
tp, err := Connect(&nats.Options{
Url: natsAddr,
})
if err != nil {
t.Fatal(err)
}
return tp
}
func TestPublish(t *testing.T) {
tp := newTransport(t)
defer tp.Close()
// Publish a message.
msg, err := tp.Publish("_transport", nil, PublishCause("foobar"))
if err != nil {
t.Fatal(err)
}
if msg.Id == "" {
t.Errorf("id not set")
}
if msg.Timestamp == 0 {
t.Errorf("timestamp not set")
}
if msg.Subject != "_transport" {
t.Errorf("wrong subject: %s", msg.Subject)
}
if len(msg.Payload) != 0 {
t.Errorf("expected empty payload, got %d bytes", len(msg.Payload))
}
if msg.Cause != "foobar" {
t.Errorf("expected foobar, got %s", msg.Cause)
}
if msg.Error != "" {
t.Errorf("error set")
}
if msg.Queue != "" {
t.Errorf("queue set")
}
if msg.Reply != "" {
t.Errorf("reply set")
}
}
func TestSubscribe(t *testing.T) {
tp := newTransport(t)
defer tp.Close()
var (
msg *Message
err error
)
hdlr := func(cmsg *Message) (proto.Message, error) {
if msg.Id != cmsg.Id {
t.Errorf("wrong id")
}
return nil, nil
}
// Subscribe.
_, err = tp.Subscribe("_transport", hdlr, SubscribeQueue("_queue"))
if err != nil {
t.Fatal(err)
}
msg, err = tp.Publish("_transport", nil)
if err != nil {
t.Fatal(err)
}
}
func TestRequest(t *testing.T) {
tp := newTransport(t)
defer tp.Close()
exp := &Message{
Id: nuid.Next(),
}
// No-op reply.
hdlr := func(cmsg *Message) (proto.Message, error) {
if cmsg.Cause != "foobar" {
t.Errorf("expected foobar, got %s", cmsg.Cause)
}
return exp, nil
}
// Subscribe.
_, err := tp.Subscribe("_transport", hdlr, SubscribeQueue("_queue"))
if err != nil {
t.Fatal(err)
}
// Send request.
var rep Message
_, err = tp.Request("_transport", nil, &rep, RequestCause("foobar"))
if err != nil {
t.Fatal(err)
}
if rep.Id != exp.Id {
t.Error("reply ids differ")
}
}
func TestHandlerPanic(t *testing.T) {
tp := newTransport(t)
defer tp.Close()
hdlr := func(cmsg *Message) (proto.Message, error) {
var i *int
log.Println(*i)
return nil, nil
}
_, err := tp.Subscribe("_transport", hdlr)
if err != nil {
t.Fatal(err)
}
var rep Message
_, err = tp.Request("_transport", nil, &rep)
if err == nil {
t.Errorf("expected error")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment