Skip to content

Instantly share code, notes, and snippets.

@pauldub
Created April 15, 2016 07:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pauldub/8182c50f163148ecd5dd08fc0a8a68e3 to your computer and use it in GitHub Desktop.
Save pauldub/8182c50f163148ecd5dd08fc0a8a68e3 to your computer and use it in GitHub Desktop.
micro-particle-broker
package main
import (
"log"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/cmd"
"broker/particle"
)
func main() {
cmd.Init()
if err := broker.Init(
particle.AccessToken("your-token"),
); err != nil {
log.Fatalf("broker init failed: %v", err)
}
if err := broker.Connect(); err != nil {
log.Fatalf("broker init failed: %v", err)
}
_, err := broker.Subscribe("event-prefix", func(p broker.Publication) error {
log.Printf("[sub] received headers: %+v", p.Message().Header)
log.Printf("[sub] received message: %v", string(p.Message().Body))
return nil
}, particle.Device("device-id"))
if err != nil {
log.Fatal(err)
}
select {}
}
package particle
import (
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/cmd"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"bufio"
"bytes"
"net/http"
"net/url"
)
const (
particleApi = "https://api.particle.io"
)
type optKey int
const (
accessTokenKey optKey = iota
deviceKey
)
func AccessToken(t string) broker.Option {
return func(opts *broker.Options) {
opts.Context = context.WithValue(opts.Context, accessTokenKey, t)
}
}
func Device(d string) broker.SubscribeOption {
return func(opts *broker.SubscribeOptions) {
opts.Context = context.WithValue(opts.Context, deviceKey, d)
}
}
type pbroker struct {
opts broker.Options
httpClient *http.Client
subs []*subscriber
}
type subscriber struct {
t string
opts broker.SubscribeOptions
closed bool
close chan<- bool
}
type publication struct {
t string
m *broker.Message
}
func init() {
cmd.DefaultBrokers["particle"] = NewBroker
}
func (p *publication) Topic() string {
return p.t
}
func (p *publication) Message() *broker.Message {
return p.m
}
func (p *publication) Ack() error {
return nil
}
func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}
func (s *subscriber) Topic() string {
return s.t
}
func (s *subscriber) Unsubscribe() error {
s.close <- true
return nil
}
func (b *pbroker) Options() broker.Options {
return b.opts
}
func (b *pbroker) Address() string {
return particleApi
}
func (b *pbroker) Connect() error {
b.httpClient = &http.Client{}
return nil
}
func (b *pbroker) Disconnect() error {
for _, s := range b.subs {
if s.closed {
continue
}
s.close <- true
}
return nil
}
func (b *pbroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&b.opts)
}
return nil
}
func (b *pbroker) Publish(t string, msg *broker.Message, opts ...broker.PublishOption) error {
var params url.Values
token := b.opts.Context.Value(accessTokenKey).(string)
params.Set("access_token", token)
private := msg.Header["private"]
if len(private) == 0 {
private = "true"
}
params.Set("private", private)
ttl := msg.Header["ttl"]
if len(ttl) > 0 {
params.Set("ttl", ttl)
}
params.Set("name", t)
params.Set("data", string(msg.Body))
_, err := ctxhttp.PostForm(context.Background(), b.httpClient, particleApi+"/v1/devices/events", params)
if err != nil {
return err
}
// TODO(paul): test response
return nil
}
func (b *pbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
opt := broker.SubscribeOptions{
AutoAck: true,
}
for _, o := range opts {
o(&opt)
}
var url string
device, ok := opt.Context.Value(deviceKey).(string)
if ok && len(device) > 0 {
url = particleApi + "/v1/devices/" + device + "/events"
} else {
url = particleApi + "/v1/devices/events"
}
if len(topic) > 0 {
url += "/" + topic
}
token := b.opts.Context.Value(accessTokenKey).(string)
if len(token) > 0 {
url += "?access_token=" + token
}
closeC := make(chan bool)
s := &subscriber{t: topic, opts: opt, closed: false, close: closeC}
// Start subscribing to events.
go func() {
res, err := ctxhttp.Get(context.TODO(), b.httpClient, url)
if err != nil {
// TODO(paul): Handle error
return
}
if res.StatusCode != http.StatusOK {
return
}
var buf bytes.Buffer
var msg broker.Message
msg.Header = make(map[string]string)
reader := bufio.NewReader(res.Body)
for {
select {
case <-closeC:
s.closed = true
close(s.close)
res.Body.Close()
return
default:
line, err := reader.ReadBytes('\n')
if err != nil {
return
}
switch {
case bytes.HasPrefix(line, []byte("ok:")):
// NOOP
case bytes.HasPrefix(line, []byte("id:")):
msg.Header["id"] = string(line[4:])
case bytes.HasPrefix(line, []byte("event:")):
msg.Header["event"] = string(line[7 : len(line)-1])
case bytes.HasPrefix(line, []byte("data:")):
buf.Write(line[6:])
case bytes.Equal(line, []byte("\n")):
b := buf.Bytes()
if bytes.HasPrefix(b, []byte("{")) {
msg.Body = b
buf.Reset()
handler(&publication{m: &msg, t: topic})
msg = broker.Message{}
msg.Header = make(map[string]string)
}
}
}
}
}()
return s, nil
}
func (b *pbroker) String() string {
return "particle"
}
func NewBroker(opts ...broker.Option) broker.Broker {
var opt broker.Options
for _, o := range opts {
o(&opt)
}
return &pbroker{}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment