Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Last active July 24, 2020 18:16
Show Gist options
  • Save ripienaar/c32db467923d7942cf5205226872500a to your computer and use it in GitHub Desktop.
Save ripienaar/c32db467923d7942cf5205226872500a to your computer and use it in GitHub Desktop.
$ nats -s demo.nats.io req 'registry.detect_type' '{
"schema": "io.nats.jetstream.advisory.v1.api_audit",
"id": "uafvZ1UEDIW5FZV6kvLgWA",
"timestamp": "2020-04-23T16:51:18.516363Z",
"server": "NDJWE4SOUJOJT2TY5Y2YQEOAHGAK5VIGXTGKWJSFHVCII4ITI3LBHBUV",
"client": {
"host": "::1",
"port": 57924,
"cid": 17,
"account": "$G",
"name": "NATS CLI",
"lang": "go",
"version": "1.9.2"
},
"subject": "$JS.STREAM.LIST",
"response": "[\n \"ORDERS\"\n]"
}'
16:22:38 Sending request on [registry.detect_type]
16:22:39 Received on [_INBOX.fbXUVBt2RTBF4WWYxH1ms2]: '{
"address": "https://raw.githubusercontent.com/nats-io/jetstream/master/schemas/jetstream/advisory/v1/api_audit.json",
"type": "io.nats.jetstream.advisory.v1.api_audit"
}'
(code for this not shown)
% nats -s demo.nats.io req 'registry.generate.ruby.io.nats.server.advisory.v1.client_disconnect' '' --raw|jq -r .code
# This code may look unusually verbose for Ruby (and it is), but
# it performs some subtle and complex validation of JSON data.
#
# To parse this JSON, add 'dry-struct' and 'dry-types' gems, then do:
#
# the560028504 = The560028504.from_json! "{…}"
# puts the560028504.server.cluster
#
# If from_json! succeeds, the value returned matches the schema.
require 'json'
require 'dry-types'
require 'dry-struct'
module Types
include Dry::Types.module
Int = Strict::Int
Bool = Strict::Bool
Hash = Strict::Hash
String = Strict::String
Type = Strict::String.enum("io.nats.server.advisory.v1.client_disconnect")
end
# Details about the client that connected to the server
class Client < Dry::Struct
# The remote host the client is connected from
attribute :host, Types::String.optional
# The programming language library in use by the client
attribute :lang, Types::String.optional
# The name presented by the client during connection
attribute :client_name, Types::String.optional
# The last known latency between the NATS Server and the Client
attribute :rtt, Types::String.optional
# Timestamp when the client connected
attribute :start, Types::String.optional
# Timestamp when the client disconnected
attribute :stop, Types::String.optional
# The clients username
attribute :user, Types::String.optional
# The version of the client library in use
attribute :ver, Types::String.optional
attribute :id, Types::Any
attribute :acc, Types::Any
def self.from_dynamic!(d)
d = Types::Hash[d]
new(
.....
$ nats -s demo.nats.io req 'registry.schema.io.nats.server.advisory.v1.client_disconnect' ''
16:12:51 Sending request on [registry.schema.io.nats.server.advisory.v1.client_disconnect]
16:12:51 Received on [_INBOX.HbMId2R0Q75ZnKMzwlKVuq]: '{
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://nats.io/schemas/server/advisory/v1/client_disconnect.json",
"description": "Advisory published a client disconnects to the NATS Server",
"title": "io.nats.server.advisory.v1.client_disconnect",
"type": "object",
.....
$ nats -s demo.nats.io req 'registry.url.io.nats.server.advisory.v1.client_disconnect' ''
16:13:42 Sending request on [registry.url.io.nats.server.advisory.v1.client_disconnect]
16:13:42 Received on [_INBOX.Cw62OYxvWuppWcLb9a0Cu1]: '{
"url": "https://raw.githubusercontent.com/nats-io/jetstream/master/schemas/server/advisory/v1/client_disconnect.json"
}'
$ nats -s demo.nats.io req 'registry.validate' '{
"type": "io.nats.jetstream.advisory.v1.api_audit"
}'
16:21:05 Sending request on [registry.validate]
16:21:06 Received on [_INBOX.wTm0XBkT3HhwBHUVubeZQK]: '{
"errors": [
"timestamp: Does not match format 'date-time'",
"server: String length must be greater than or equal to 1",
"subject: String length must be greater than or equal to 1"
]
}'
$ nats -s demo.nats.io req 'registry.validate' '{
"type": "io.nats.jetstream.advisory.v1.api_audit",
"id": "uafvZ1UEDIW5FZV6kvLgWA",
"timestamp": "2020-04-23T16:51:18.516363Z",
"server": "NDJWE4SOUJOJT2TY5Y2YQEOAHGAK5VIGXTGKWJSFHVCII4ITI3LBHBUV",
"client": {
"host": "::1",
"port": 57924,
"cid": 17,
"account": "$G",
"name": "NATS CLI",
"lang": "go",
"version": "1.9.2"
},
"subject": "$JS.STREAM.LIST",
"response": "[\n \"ORDERS\"\n]"
}'
16:20:21 Sending request on [registry.validate]
16:20:21 Received on [_INBOX.Ysl7f03tdqSyswRFteAE16]: '{
"valid": true
}'
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
servers string
credentials string
ns string
nc *nats.Conn
)
func validateHandler(m *nats.Msg) {
kind, event, err := api.ParseEvent(m.Data)
if err != nil {
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not parse event: %s", err)})
return
}
ok, errs := api.ValidateStruct(event, kind)
if ok {
sendResponse(m, map[string]bool{"valid": true})
return
}
sendResponse(m, map[string][]string{"errors": errs})
}
func urlHandler(m *nats.Msg) {
st := strings.TrimPrefix(m.Subject, ns+".url.")
if !api.IsNatsEventType(st) {
sendResponse(m, map[string]string{"error": fmt.Sprintf("%q is not a valid NATS schema type", st)})
return
}
address, _, err := api.SchemaURLForType(st)
if err != nil {
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not retrieve schema url for %q: %s", st, err)})
return
}
sendResponse(m, map[string]string{"url": address})
}
func typeDetectHandler(m *nats.Msg) {
kind, err := api.SchemaTypeForEvent(m.Data)
if err != nil {
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not detect event type: %s", err)})
}
address, _, err := api.SchemaURLForType(kind)
if err != nil {
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not detect event type: %s", err)})
}
sendResponse(m, map[string]string{"type": kind, "address": address})
}
func schemaHandler(m *nats.Msg) {
st := strings.TrimPrefix(m.Subject, ns+".schema.")
if !api.IsNatsEventType(st) {
sendResponse(m, map[string]string{"error": fmt.Sprintf("%q is not a valid NATS schema type", st)})
return
}
schema, err := api.Schema(st)
if err != nil {
sendResponse(m, map[string]string{"error": fmt.Sprintf("could not retrieve schema for %q: %s", st, err)})
return
}
sendResponse(m, map[string]json.RawMessage{"schema": schema})
}
func main() {
app := kingpin.New("registry", "NATS Service Registry")
app.Flag("server", "NATS Servers").Default("localhost").StringVar(&servers)
app.Flag("creds", "NATS Credentials").StringVar(&credentials)
app.Flag("namespace", "Service namespace").Default("registry").StringVar(&ns)
kingpin.MustParse(app.Parse(os.Args[1:]))
var err error
nc, err = connection()
if err != nil {
panic(err.Error())
}
nc.Subscribe(ns+".url.io.nats.>", urlHandler)
nc.Subscribe(ns+".schema.io.nats.>", schemaHandler)
nc.Subscribe(ns+".detect_type", typeDetectHandler)
nc.Subscribe(ns+".validate", validateHandler)
<-context.Background().Done()
}
func sendResponse(m *nats.Msg, r interface{}) error {
if m.Reply == "" {
return fmt.Errorf("no reply in message")
}
j, err := json.MarshalIndent(r, "", " ")
if err != nil {
return fmt.Errorf("could not encode %#v: %s", r, err)
}
return m.Respond(j)
}
func connection() (nc *nats.Conn, err error) {
if servers == "" {
return nil, fmt.Errorf("specify a server to connect to using NATS_URL")
}
opts := []nats.Option{
nats.MaxReconnects(-1),
nats.ErrorHandler(errorHandler),
nats.ReconnectHandler(reconnectHandler),
nats.DisconnectErrHandler(disconnectHandler),
}
if credentials != "" {
opts = append(opts, nats.UserCredentials(credentials))
}
for {
nc, err := nats.Connect(servers, opts...)
if err == nil {
return nc, nil
}
log.Printf("could not connect to NATS: %s\n", err)
time.Sleep(500 * time.Millisecond)
}
}
func errorHandler(nc *nats.Conn, s *nats.Subscription, err error) {
if s != nil {
log.Fatalf("Error in NATS connection: %s: subscription: %s: %s", nc.ConnectedUrl(), s.Subject, err)
}
log.Fatalf("Error in NATS connection: %s: %s", nc.ConnectedUrl(), err)
}
func reconnectHandler(nc *nats.Conn) {
log.Printf("Reconnected to %s", nc.ConnectedUrl())
}
func disconnectHandler(nc *nats.Conn, err error) {
if err != nil {
log.Printf("Disconnected from NATS due to error: %v", err)
} else {
log.Printf("Disconnected from NATS")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment