Last active
March 19, 2020 04:52
-
-
Save clarkmcc/848c49410c6617c002ab92dd48d0f75e to your computer and use it in GitHub Desktop.
Used for tracking metadata about a request and dumping it into a data warehouse such as Google BigQuery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package counter | |
import ( | |
"cloud.google.com/go/civil" | |
"context" | |
"fmt" | |
"gitlab.com/ptmesh/api/pkg/repositories" | |
"go.mongodb.org/mongo-driver/bson/primitive" | |
"net/http" | |
"sync" | |
"time" | |
) | |
// RequestBytes is a counter injected into the router engine. This counter is | |
// used to save metadata about incoming requests to the api. This data is can | |
// be saved or dumped via the sink interface | |
type RequestBytes struct { | |
interval time.Duration | |
store *repositories.Store | |
sink Sink | |
ks map[string]*RequestEntry | |
m *sync.Mutex | |
// Kill channel used for killing a request byte counter go routine | |
Kc chan struct{} | |
// Error channel used for reporting errors up the line for logging | |
Ec chan error | |
} | |
// RequestByteOpts is used when creating a new instance of RequestBytes to specify | |
// the store and sink to use | |
type RequestByteOpts struct { | |
Interval time.Duration | |
Store *repositories.Store | |
Sink Sink | |
} | |
// NewRequestByteCounter creates a new instance of RequestBytes | |
func NewRequestByteCounter(opts *RequestByteOpts) *RequestBytes { | |
return &RequestBytes{ | |
interval: opts.Interval, | |
store: opts.Store, | |
sink: opts.Sink, | |
ks: map[string]*RequestEntry{}, | |
m: &sync.Mutex{}, | |
Kc: make(chan struct{}), | |
Ec: make(chan error), | |
} | |
} | |
// NewEntry creates a new entry of metadata from an individual request. This method | |
// is utilized by the custom http.ResponseWriter implementation called | |
// MeasuredResponseWriter | |
func (b *RequestBytes) NewEntry(key string, r *http.Request, resBytes int) { | |
b.m.Lock() | |
defer b.m.Unlock() | |
b.ks[primitive.NewObjectID().Hex()] = &RequestEntry{ | |
ApiKey: key, | |
Timestamp: civil.DateTimeOf(time.Now()), | |
Ip: r.RemoteAddr, | |
Uri: r.URL.String(), | |
ResponseBytes: resBytes, | |
RequestBytes: int(r.ContentLength), | |
} | |
} | |
// Sync iterates over all the entries stored in the RequestBytes struct. When they're | |
// synced, they are removed from the ks map | |
func (b *RequestBytes) Sync() { | |
b.m.Lock() | |
defer b.m.Unlock() | |
for k, v := range b.ks { | |
err := b.updateApiKeyUsage(v) | |
if err != nil { | |
b.Ec <- fmt.Errorf("updating api key usage entry %s: %v", k, err) | |
continue | |
} | |
delete(b.ks, k) | |
} | |
} | |
// StartSync kicks off a goroutine that runs the Sync method based on provided | |
// intervals. The goroutine can be killed using the b.Kc kill channel | |
func (b *RequestBytes) StartSync() { | |
go func() { | |
for { | |
select { | |
case <-b.Kc: | |
break | |
default: | |
time.Sleep(b.interval) | |
b.Sync() | |
} | |
} | |
}() | |
} | |
// Get returns the current map of request entries | |
func (b *RequestBytes) Get() map[string]*RequestEntry { | |
return b.ks | |
} | |
// updateApiKeyUsage uploads a single request entry to the sink | |
func (b *RequestBytes) updateApiKeyUsage(e *RequestEntry) error { | |
ctx, _ := context.WithTimeout(context.TODO(), 15*time.Second) | |
err := b.sink.DumpRequestEntries(ctx, []*RequestEntry{e}) | |
if err != nil { | |
return err | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment