Skip to content

Instantly share code, notes, and snippets.

@clarkmcc
Last active March 19, 2020 04:52
Show Gist options
  • Save clarkmcc/848c49410c6617c002ab92dd48d0f75e to your computer and use it in GitHub Desktop.
Save clarkmcc/848c49410c6617c002ab92dd48d0f75e to your computer and use it in GitHub Desktop.
Used for tracking metadata about a request and dumping it into a data warehouse such as Google BigQuery
package counter
import (
"cloud.google.com/go/civil"
"context"
"fmt"
"gitlab.com/ptmesh/api/pkg/repositories"
"go.mongodb.org/mongo-driver/bson/primitive"
"net/http"
"sync"
"time"
)
// RequestBytes is a counter injected into the router engine. This counter is
// used to save metadata about incoming requests to the api. This data is can
// be saved or dumped via the sink interface
type RequestBytes struct {
interval time.Duration
store *repositories.Store
sink Sink
ks map[string]*RequestEntry
m *sync.Mutex
// Kill channel used for killing a request byte counter go routine
Kc chan struct{}
// Error channel used for reporting errors up the line for logging
Ec chan error
}
// RequestByteOpts is used when creating a new instance of RequestBytes to specify
// the store and sink to use
type RequestByteOpts struct {
Interval time.Duration
Store *repositories.Store
Sink Sink
}
// NewRequestByteCounter creates a new instance of RequestBytes
func NewRequestByteCounter(opts *RequestByteOpts) *RequestBytes {
return &RequestBytes{
interval: opts.Interval,
store: opts.Store,
sink: opts.Sink,
ks: map[string]*RequestEntry{},
m: &sync.Mutex{},
Kc: make(chan struct{}),
Ec: make(chan error),
}
}
// NewEntry creates a new entry of metadata from an individual request. This method
// is utilized by the custom http.ResponseWriter implementation called
// MeasuredResponseWriter
func (b *RequestBytes) NewEntry(key string, r *http.Request, resBytes int) {
b.m.Lock()
defer b.m.Unlock()
b.ks[primitive.NewObjectID().Hex()] = &RequestEntry{
ApiKey: key,
Timestamp: civil.DateTimeOf(time.Now()),
Ip: r.RemoteAddr,
Uri: r.URL.String(),
ResponseBytes: resBytes,
RequestBytes: int(r.ContentLength),
}
}
// Sync iterates over all the entries stored in the RequestBytes struct. When they're
// synced, they are removed from the ks map
func (b *RequestBytes) Sync() {
b.m.Lock()
defer b.m.Unlock()
for k, v := range b.ks {
err := b.updateApiKeyUsage(v)
if err != nil {
b.Ec <- fmt.Errorf("updating api key usage entry %s: %v", k, err)
continue
}
delete(b.ks, k)
}
}
// StartSync kicks off a goroutine that runs the Sync method based on provided
// intervals. The goroutine can be killed using the b.Kc kill channel
func (b *RequestBytes) StartSync() {
go func() {
for {
select {
case <-b.Kc:
break
default:
time.Sleep(b.interval)
b.Sync()
}
}
}()
}
// Get returns the current map of request entries
func (b *RequestBytes) Get() map[string]*RequestEntry {
return b.ks
}
// updateApiKeyUsage uploads a single request entry to the sink
func (b *RequestBytes) updateApiKeyUsage(e *RequestEntry) error {
ctx, _ := context.WithTimeout(context.TODO(), 15*time.Second)
err := b.sink.DumpRequestEntries(ctx, []*RequestEntry{e})
if err != nil {
return err
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment