Skip to content

Instantly share code, notes, and snippets.

@lawrencejones
Last active October 28, 2022 09:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lawrencejones/25fda1c6c4d945bef0570eed1e80c9a2 to your computer and use it in GitHub Desktop.
Save lawrencejones/25fda1c6c4d945bef0570eed1e80c9a2 to your computer and use it in GitHub Desktop.
How we compress Pub/Sub messages and more, saving a load of money
var (
exportPubsubWriteCompressionRatio = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "elastic_toolbox_export_pubsub_write_compression_ratio",
Help: "Distribution of compression ratio",
Buckets: prometheus.LinearBuckets(0.1, 0.1, 10), // 0.0 -> 1.0
},
)
exportPubsubWriteCompressDurationSeconds = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "elastic_toolbox_export_pubsub_write_compress_duration_seconds",
Help: "Distribution of time taken to compress hits",
Buckets: prometheus.ExponentialBuckets(0.0625, 2, 8), // 0.0625 -> 16s
},
)
)
// compress applies gzip compression to the incoming data, and instruments
// compression efficiency.
func (f *pubsubExportTarget) compress(data []byte) ([]byte, error) {
defer prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
exportPubsubWriteCompressDurationSeconds.Observe(v)
})).ObserveDuration()
var buffer bytes.Buffer
zw := gzip.NewWriter(&buffer)
if _, err := zw.Write(data); err != nil {
return nil, err
}
if err := zw.Close(); err != nil {
return nil, err
}
compressed := buffer.Bytes()
exportPubsubWriteCompressionRatio.Observe(
float64(len(compressed)) / float64(len(data)))
return compressed, nil
}
// Publish takes a message and publishes it to the Pub/Sub topic. If
// compression is enabled, the message payload is compressed, and the
// message is marked with a compress=true attribute.
func (f *pubsubExportTarget) Publish(ctx context.Context, msg Message) error {
data, _ := json.Marshal(msg)
if f.opt.Compress {
data, _ = f.compress(data)
}
// enqueue marks a message as available to be sent, passing it
// to the Pub/Sub client
f.enqueue(ctx, &pubsub.Message{
Data: data,
Attributes: map[string]string{
"compress": fmt.Sprintf("%v", f.opt.Compress),
},
})
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment