Skip to content

Instantly share code, notes, and snippets.

@bboozzoo
Created September 19, 2022 13:14
Show Gist options
  • Save bboozzoo/ddf9de322c6d9de0ae0b52d7d6e584e0 to your computer and use it in GitHub Desktop.
Save bboozzoo/ddf9de322c6d9de0ae0b52d7d6e584e0 to your computer and use it in GitHub Desktop.
diff --git a/client/client.go b/client/client.go
index dc8c47a..e698ba4 100644
--- a/client/client.go
+++ b/client/client.go
@@ -2,7 +2,6 @@
Package client provides a WAMP client implementation that is interoperable with
any standard WAMP router and is capable of using all of the advanced profile
features supported by the nexus WAMP router.
-
*/
package client
@@ -56,6 +55,8 @@ type Client struct {
routerGoodbye *wamp.Goodbye
idGen *wamp.SyncIDGen
+
+ passthroughSerializer serialize.DirectSerializer
}
// InvokeResult represents the result of invoking a procedure.
@@ -111,6 +112,8 @@ func NewClient(p wamp.Peer, cfg Config) (*Client, error) {
debug: cfg.Debug,
cancelMode: wamp.CancelModeKillNoWait,
idGen: new(wamp.SyncIDGen),
+
+ passthroughSerializer: cfg.PassthroughSerializer,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
go c.run() // start the core goroutine
@@ -152,10 +155,11 @@ type EventHandler func(event *wamp.Event)
// match, or it can specify a URI pattern to match multiple events for the same
// handler by specifying the pattern type in options.
//
-// Subscribe Options
+// # Subscribe Options
//
// To request a pattern-based subscription set:
-// options["match"] = "prefix" or "wildcard"
+//
+// options["match"] = "prefix" or "wildcard"
//
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) error {
@@ -262,26 +266,33 @@ func (c *Client) Unsubscribe(topic string) error {
// Publish publishes an EVENT to all subscribed clients.
//
-// Publish Options
+// # Publish Options
//
// To receive a PUBLISHED response set:
-// options["acknowledge"] = true
+//
+// options["acknowledge"] = true
//
// To request subscriber blacklisting by subscriber, authid, or authrole, set:
-// options["exclude"] = [subscriberID, ...]
-// options["exclude_authid"] = ["authid", ..]
-// options["exclude_authrole"] = ["authrole", ..]
+//
+// options["exclude"] = [subscriberID, ...]
+// options["exclude_authid"] = ["authid", ..]
+// options["exclude_authrole"] = ["authrole", ..]
//
// To request subscriber whitelisting by subscriber, authid, or authrole, set:
-// options["eligible"] = [subscriberID, ...]
-// options["eligible_authid"] = ["authid", ..]
-// options["eligible_authrole"] = ["authrole", ..]
+//
+// options["eligible"] = [subscriberID, ...]
+// options["eligible_authid"] = ["authid", ..]
+// options["eligible_authrole"] = ["authrole", ..]
//
// When connecting to a nexus router, blacklisting and whitelisting can be used
// with any attribute assigned to the subscriber session, by setting:
-// options["exclude_xxx"] = [val1, val2, ..]
+//
+// options["exclude_xxx"] = [val1, val2, ..]
+//
// and
-// options["eligible_xxx"] = [val1, val2, ..]
+//
+// options["eligible_xxx"] = [val1, val2, ..]
+//
// where xxx is the name of any session attribute, typically supplied with the
// HELLO message.
//
@@ -290,7 +301,8 @@ func (c *Client) Unsubscribe(topic string) error {
// whitelist.
//
// To request that this publisher's identity is disclosed to subscribers, set:
-// options["disclose_me"] = true
+//
+// options["disclose_me"] = true
//
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs wamp.Dict) error {
@@ -311,6 +323,17 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs
}
}
+ if c.passthroughSerializer {
+ // serialize the payload
+ data, err = c.passthroughSerializer.SerializeData(wamp.PassthroughPayload{
+ Args: args,
+ KWArgs: kwargs,
+ })
+ args = wamp.List{data}
+ kwargs = nil
+ options["ppt_foo"] = "bar"
+ }
+
c.sess.Send(&wamp.Publish{
Request: id,
Options: options,
@@ -357,16 +380,19 @@ type InvocationHandler func(context.Context, *wamp.Invocation) InvokeResult
// If the registration handler wants to cancel the call without returning a
// result, then is should return InvocationCanceled.
//
-// Register Options
+// # Register Options
//
// To request a pattern-based registration set:
-// options["match"] = "prefix" or "wildcard"
+//
+// options["match"] = "prefix" or "wildcard"
//
// To request a shared registration pattern set:
-// options["invoke"] = "single", "roundrobin", "random", "first", "last"
+//
+// options["invoke"] = "single", "roundrobin", "random", "first", "last"
//
// To request that caller identification is disclosed to this callee, set:
-// options["disclose_caller"] = true
+//
+// options["disclose_caller"] = true
//
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error {
@@ -474,7 +500,7 @@ type ProgressHandler func(*wamp.Result)
// message. This may be necessary for the client application to process error
// data from the RPC invocation.
//
-// Call Canceling
+// # Call Canceling
//
// The provided Context allows the caller to cancel a call, or to set a
// deadline that cancels the call when the deadline expires. There is no
@@ -488,7 +514,7 @@ type ProgressHandler func(*wamp.Result)
// initiated by the client (by canceling context), and cancellation initialed
// elsewhere.
//
-// Call Timeout
+// # Call Timeout
//
// If a timeout is provided in the options, and the callee supports call
// timeout, then the timeout value is passed to the callee so that the
@@ -503,17 +529,18 @@ type ProgressHandler func(*wamp.Result)
// To request automatic call timeout, by the router and callee, specify a
// timeout in milliseconds: options["timeout"] = 30000
//
-// Caller Identification
+// # Caller Identification
//
// A caller may request the disclosure of its identity (its WAMP session ID) to
// callees, if allowed by the dealer.
//
// To request that this caller's identity disclosed to callees, set:
-// options["disclose_me"] = true
+//
+// options["disclose_me"] = true
//
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
//
-// Progressive Call Results
+// # Progressive Call Results
//
// A caller indicates its willingness to receive progressive results by
// supplying a ProgressHandler function to handle progressive results that are
@@ -592,7 +619,7 @@ func (c *Client) Call(ctx context.Context, procedure string, options wamp.Dict,
// default value: "killnowait". The cancel mode is an option that is sent in a
// CANCEL message when a CALL is canceled.
//
-// Cancel Mode Behavior
+// # Cancel Mode Behavior
//
// "skip": The pending call is canceled and ERROR is sent immediately back to
// the caller. No INTERRUPT is sent to the callee and the result is discarded
@@ -1108,6 +1135,7 @@ func (c *Client) runHandleEvent(msg *wamp.Event) {
msg.Subscription)
return
}
+ // TODO unpack passthrough
handler(msg)
}
@@ -1222,6 +1250,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
})
return
}
+ // TODO pack to the passthrough serializer
c.sess.SendCtx(c.ctx, &wamp.Yield{
Request: reqID,
Options: wamp.Dict{},
diff --git a/client/config.go b/client/config.go
index 4f4f7f7..da25096 100644
--- a/client/config.go
+++ b/client/config.go
@@ -69,4 +69,6 @@ type Config struct {
// Websocket transport configuration.
WsCfg transport.WebsocketConfig
+
+ PassthroughSerializer serialize.DirectSerializer
}
diff --git a/transport/serialize/cborserializer.go b/transport/serialize/cborserializer.go
index 7bb303e..d0c90fb 100644
--- a/transport/serialize/cborserializer.go
+++ b/transport/serialize/cborserializer.go
@@ -44,3 +44,11 @@ func (s *CBORSerializer) Deserialize(data []byte) (wamp.Message, error) {
}
return listToMsg(wamp.MessageType(typ), v)
}
+
+func (s *CBORSerializer) SerializeData(from interface{}) ([]byte, error} {
+ ...
+}
+
+func (s *CBORSerializer) DeserializeData(from []byte, to interface{}) error {
+ ...
+}
diff --git a/transport/serialize/serializer.go b/transport/serialize/serializer.go
index f96709d..f295c81 100644
--- a/transport/serialize/serializer.go
+++ b/transport/serialize/serializer.go
@@ -1,7 +1,6 @@
/*
Package serialize provides a Serializer interface with implementations that
encode and decode message data in various ways.
-
*/
package serialize
@@ -34,6 +33,12 @@ type Serializer interface {
Deserialize([]byte) (wamp.Message, error)
}
+// DirectSerializer serializes any data
+type DirectSerializer interface {
+ SerialzideData(from interface{}) ([]byte, error)
+ DeserializeData(from []byte, to interface{}) error
+}
+
// listToMessage takes a list of values from a WAMP message and populates the
// fields of a message type.
func listToMsg(msgType wamp.MessageType, vlist []interface{}) (wamp.Message, error) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment