Skip to content

Instantly share code, notes, and snippets.

@morgangallant
Created January 11, 2023 16:41
Show Gist options
  • Save morgangallant/83bc32690c291a175cace6bfa9dddf3f to your computer and use it in GitHub Desktop.
Save morgangallant/83bc32690c291a175cace6bfa9dddf3f to your computer and use it in GitHub Desktop.
// June is a client library for the June analytics API.
package june
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"sync"
"time"
)
// Logf is a function that logs a message.
type Logf func(format string, args ...interface{})
func defaultLogger() Logf {
logger := log.New(os.Stdout, "june", log.LstdFlags|log.Lshortfile)
return func(format string, args ...interface{}) {
logger.Printf(format, args...)
}
}
// Client is a client for the June analytics API.
// It is safe for concurrent use by multiple goroutines, and events
// will be queued to be uploaded via a pool of background workers.
type Client struct {
writeKey string
httpClient *http.Client
logf Logf
bufferSize int
numWorkers int
messageQueue chan Message
waitGroup sync.WaitGroup
shutdownFunc context.CancelFunc
}
// ClientOpt is a function that configures a Client.
type ClientOpt func(*Client)
// WithHTTPClient configures the HTTP client used by the client.
func WithHTTPClient(client *http.Client) ClientOpt {
return func(c *Client) {
c.httpClient = client
}
}
// WithBufferSize configures the size of the message queue.
// By default, we use a queue size of 64 which should be large enough
// for most use cases. If you are sending a large number of events,
// you may want to increase this.
func WithBufferSize(size int) ClientOpt {
return func(c *Client) {
c.bufferSize = size
}
}
// WithNumWorkers configures the number of workers that will upload
// events to the API. By default, we use 3 workers which should be
// sufficient for most use cases. If you are sending a large number
// of events, you may want to increase this.
func WithNumWorkers(num int) ClientOpt {
return func(c *Client) {
c.numWorkers = num
}
}
// WithLogf configures the function used to log messages. This is used
// to log any errors that occur while uploading events to the API.
// By default, we write messages to stdout using the `log` package.
func WithLogf(logf Logf) ClientOpt {
return func(c *Client) {
c.logf = logf
}
}
// NewClient creates a new client for the June analytics API.
func NewClient(writeKey string, opts ...ClientOpt) *Client {
c := &Client{
writeKey: writeKey,
httpClient: http.DefaultClient,
logf: defaultLogger(),
bufferSize: 64,
numWorkers: 3,
}
for _, opt := range opts {
opt(c)
}
ctx, cancel := context.WithCancel(context.Background())
c.messageQueue = make(chan Message, c.bufferSize)
c.shutdownFunc = cancel
for i := 0; i < c.numWorkers; i++ {
c.waitGroup.Add(1)
go func() {
defer c.waitGroup.Done()
c.worker(ctx)
}()
}
return c
}
// Enqueue adds a message to the queue to be uploaded to the API.
// If the queue is full, this function will block until there is
// space in the queue. The client should (ideally) be configured
// with a big enough message buffer to avoid this, and enough workers
// to upload events quickly enough to avoid blocking.
func (c *Client) Enqueue(msg Message) {
c.messageQueue <- msg
}
// Close shuts down the client, and waits for all pending events to be uploaded.
// It's important to call this function before terminating your program, otherwise,
// some events may not be uploaded correctly.
func (c *Client) Close() {
c.shutdownFunc()
c.waitGroup.Wait()
for msg := range c.messageQueue {
if err := c.upload(context.Background(), msg); err != nil {
c.logf("error uploading %q analytics message: %v", msg.Kind(), err)
}
}
close(c.messageQueue)
}
func (c *Client) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-c.messageQueue:
if err := c.upload(context.Background(), msg); err != nil {
c.logf("error uploading %q analytics message: %v", msg.Kind(), err)
}
}
}
}
func (c *Client) upload(ctx context.Context, message Message) error {
body, err := message.MarshalJSON()
if err != nil {
return err
}
url := "https://api.june.so/api/" + message.Kind()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Basic "+c.writeKey)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("june: unexpected status code: %d (%s)", resp.StatusCode, resp.Status)
}
return nil
}
// Message is an interface implemented by all message types.
type Message interface {
json.Marshaler
Kind() string // The event needs to be sent to https://api.june.so/api/$kind.
}
var (
_ Message = (*Identify)(nil)
_ Message = (*Group)(nil)
_ Message = (*Track)(nil)
)
// Identify is a message that identifies a user.
type Identify struct {
UserID string `json:"userId"`
Traits map[string]any `json:"traits"`
Timestamp time.Time `json:"timestamp"`
}
func (i *Identify) Kind() string {
return "identify"
}
func (i *Identify) MarshalJSON() ([]byte, error) {
if i.Timestamp.IsZero() {
i.Timestamp = time.Now()
}
return json.Marshal(i)
}
// Group is a message that identifies a group, and associates it with a user.
type Group struct {
GroupID string `json:"groupId"`
UserID string `json:"userId"`
Traits map[string]any `json:"traits"`
Timestamp time.Time `json:"timestamp"`
}
func (g *Group) Kind() string {
return "group"
}
func (g *Group) MarshalJSON() ([]byte, error) {
if g.Timestamp.IsZero() {
g.Timestamp = time.Now()
}
return json.Marshal(g)
}
// Track is a message that tracks an event.
type Track struct {
UserID string `json:"userId"`
Event string `json:"event"`
Properties map[string]any `json:"properties"`
Context map[string]any `json:"context"`
Timestamp time.Time `json:"timestamp"`
}
func (t *Track) Kind() string {
return "track"
}
func (t *Track) MarshalJSON() ([]byte, error) {
if t.Timestamp.IsZero() {
t.Timestamp = time.Now()
}
return json.Marshal(t)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment