Skip to content

Instantly share code, notes, and snippets.

@liuyangc3
Forked from robskillington/prometheus.proto
Last active August 11, 2022 10:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liuyangc3/fded54c973df51deb5842f21b07a9fd9 to your computer and use it in GitHub Desktop.
Save liuyangc3/fded54c973df51deb5842f21b07a9fd9 to your computer and use it in GitHub Desktop.
Example Python Prometheus remote write client

Install awscurl

pip install awscurl

Query metrics on ec2

TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
CREDENTIAL=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/iam/security-credentials/amp-instance-role)

export AWS_ACCESS_KEY_ID=$(echo ${CREDENTIAL}| jq -r .AccessKeyId)
export AWS_SECRET_ACCESS_KEY=$(echo ${CREDENTIAL}| jq -r .SecretAccessKey)
export AWS_SECURITY_TOKEN=$(echo ${CREDENTIAL}| jq -r .Token)

awscurl --service aps --region ap-northeast-1 "https://aps-workspaces.ap-northeast-1.amazonaws.com/workspaces/ws-??????????/api/v1/query?query=your_metric"

https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto

https://grpc.io/docs/languages/python/quickstart/

compile

git clone https://github.com/gogo/protobuf.git

wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/remote.proto
wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/types.proto

python -m pip install grpcio
python -m pip install grpcio-tools

python3 -m grpc_tools.protoc \
--proto_path=. --proto_path=./protobuf \
--python_out=. remote.proto
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: remote.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
import types_pb2 as types__pb2
from gogoproto import gogo_pb2 as gogoproto_dot_gogo__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cremote.proto\x12\nprometheus\x1a\x0btypes.proto\x1a\x14gogoproto/gogo.proto\"z\n\x0cWriteRequest\x12\x30\n\ntimeseries\x18\x01 \x03(\x0b\x32\x16.prometheus.TimeSeriesB\x04\xc8\xde\x1f\x00\x12\x32\n\x08metadata\x18\x03 \x03(\x0b\x32\x1a.prometheus.MetricMetadataB\x04\xc8\xde\x1f\x00J\x04\x08\x02\x10\x03\"\xae\x01\n\x0bReadRequest\x12\"\n\x07queries\x18\x01 \x03(\x0b\x32\x11.prometheus.Query\x12\x45\n\x17\x61\x63\x63\x65pted_response_types\x18\x02 \x03(\x0e\x32$.prometheus.ReadRequest.ResponseType\"4\n\x0cResponseType\x12\x0b\n\x07SAMPLES\x10\x00\x12\x17\n\x13STREAMED_XOR_CHUNKS\x10\x01\"8\n\x0cReadResponse\x12(\n\x07results\x18\x01 \x03(\x0b\x32\x17.prometheus.QueryResult\"\x8f\x01\n\x05Query\x12\x1a\n\x12start_timestamp_ms\x18\x01 \x01(\x03\x12\x18\n\x10\x65nd_timestamp_ms\x18\x02 \x01(\x03\x12*\n\x08matchers\x18\x03 \x03(\x0b\x32\x18.prometheus.LabelMatcher\x12$\n\x05hints\x18\x04 \x01(\x0b\x32\x15.prometheus.ReadHints\"9\n\x0bQueryResult\x12*\n\ntimeseries\x18\x01 \x03(\x0b\x32\x16.prometheus.TimeSeries\"]\n\x13\x43hunkedReadResponse\x12\x31\n\x0e\x63hunked_series\x18\x01 \x03(\x0b\x32\x19.prometheus.ChunkedSeries\x12\x13\n\x0bquery_index\x18\x02 \x01(\x03\x42\x08Z\x06prompbb\x06proto3')
_WRITEREQUEST = DESCRIPTOR.message_types_by_name['WriteRequest']
_READREQUEST = DESCRIPTOR.message_types_by_name['ReadRequest']
_READRESPONSE = DESCRIPTOR.message_types_by_name['ReadResponse']
_QUERY = DESCRIPTOR.message_types_by_name['Query']
_QUERYRESULT = DESCRIPTOR.message_types_by_name['QueryResult']
_CHUNKEDREADRESPONSE = DESCRIPTOR.message_types_by_name['ChunkedReadResponse']
_READREQUEST_RESPONSETYPE = _READREQUEST.enum_types_by_name['ResponseType']
WriteRequest = _reflection.GeneratedProtocolMessageType('WriteRequest', (_message.Message,), {
'DESCRIPTOR' : _WRITEREQUEST,
'__module__' : 'remote_pb2'
# @@protoc_insertion_point(class_scope:prometheus.WriteRequest)
})
_sym_db.RegisterMessage(WriteRequest)
ReadRequest = _reflection.GeneratedProtocolMessageType('ReadRequest', (_message.Message,), {
'DESCRIPTOR' : _READREQUEST,
'__module__' : 'remote_pb2'
# @@protoc_insertion_point(class_scope:prometheus.ReadRequest)
})
_sym_db.RegisterMessage(ReadRequest)
ReadResponse = _reflection.GeneratedProtocolMessageType('ReadResponse', (_message.Message,), {
'DESCRIPTOR' : _READRESPONSE,
'__module__' : 'remote_pb2'
# @@protoc_insertion_point(class_scope:prometheus.ReadResponse)
})
_sym_db.RegisterMessage(ReadResponse)
Query = _reflection.GeneratedProtocolMessageType('Query', (_message.Message,), {
'DESCRIPTOR' : _QUERY,
'__module__' : 'remote_pb2'
# @@protoc_insertion_point(class_scope:prometheus.Query)
})
_sym_db.RegisterMessage(Query)
QueryResult = _reflection.GeneratedProtocolMessageType('QueryResult', (_message.Message,), {
'DESCRIPTOR' : _QUERYRESULT,
'__module__' : 'remote_pb2'
# @@protoc_insertion_point(class_scope:prometheus.QueryResult)
})
_sym_db.RegisterMessage(QueryResult)
ChunkedReadResponse = _reflection.GeneratedProtocolMessageType('ChunkedReadResponse', (_message.Message,), {
'DESCRIPTOR' : _CHUNKEDREADRESPONSE,
'__module__' : 'remote_pb2'
# @@protoc_insertion_point(class_scope:prometheus.ChunkedReadResponse)
})
_sym_db.RegisterMessage(ChunkedReadResponse)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'Z\006prompb'
_WRITEREQUEST.fields_by_name['timeseries']._options = None
_WRITEREQUEST.fields_by_name['timeseries']._serialized_options = b'\310\336\037\000'
_WRITEREQUEST.fields_by_name['metadata']._options = None
_WRITEREQUEST.fields_by_name['metadata']._serialized_options = b'\310\336\037\000'
_WRITEREQUEST._serialized_start=63
_WRITEREQUEST._serialized_end=185
_READREQUEST._serialized_start=188
_READREQUEST._serialized_end=362
_READREQUEST_RESPONSETYPE._serialized_start=310
_READREQUEST_RESPONSETYPE._serialized_end=362
_READRESPONSE._serialized_start=364
_READRESPONSE._serialized_end=420
_QUERY._serialized_start=423
_QUERY._serialized_end=566
_QUERYRESULT._serialized_start=568
_QUERYRESULT._serialized_end=625
_CHUNKEDREADRESPONSE._serialized_start=627
_CHUNKEDREADRESPONSE._serialized_end=720
# @@protoc_insertion_point(module_scope)
package main
import (
"context"
"fmt"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
promconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/sigv4"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"net/url"
"time"
)
func toWriteRequestBytes(metric string, value float64, labels map[string]string) ([]byte, error) {
promLabels := make([]prompb.Label, len(labels)+1)
// add metric
promLabels = append(promLabels, prompb.Label{
Name: "__name__",
Value: metric,
})
// add labels
for name, lb := range labels {
promLabels = append(promLabels, prompb.Label{
Name: name,
Value: lb,
})
}
timeSeries := []prompb.TimeSeries{
{
Labels: promLabels,
Samples: []prompb.Sample{{
// Timestamp for remote write should be in milliseconds.
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Value: value,
}},
},
}
// Marshal proto and compress.
pbBytes, err := proto.Marshal(&prompb.WriteRequest{
Timeseries: timeSeries,
})
if err != nil {
return nil, fmt.Errorf("promwrite: marshaling remote write request proto: %w", err)
}
buf := make([]byte, len(pbBytes), cap(pbBytes))
bytes := snappy.Encode(buf, pbBytes)
return bytes, nil
}
func newClient(endpoint string) (remote.WriteClient, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
cfg, err := awsconfig.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, err
}
client, err := remote.NewWriteClient("amp", &remote.ClientConfig{
URL: &promconfig.URL{URL: u},
Timeout: model.Duration(time.Second * 5),
SigV4Config: &sigv4.SigV4Config{
Region: cfg.Region,
},
})
if err != nil {
return nil, err
}
return client, err
}
func main() {
client, err := newClient("https://aps-workspaces.ap-northeast-1.amazonaws.com/workspaces/ws-2d4ad01c-eada-4a4c-b417-289f093c4066/api/v1/remote_write")
if err != nil {
panic(err)
}
b, err := toWriteRequestBytes("go_test_metric", 3.0, map[string]string{
"instance": "go-test",
"severity": "critical",
})
if err != nil {
panic(err)
}
err = client.Store(context.TODO(), b)
if err != nil {
panic(err)
}
}
from datetime import datetime
from prometheus_pb2 import (
TimeSeries,
Label,
Labels,
Sample,
WriteRequest
)
import calendar
import logging
import requests
import snappy
def dt2ts(dt):
"""Converts a datetime object to UTC timestamp
naive datetime will be considered UTC.
"""
return calendar.timegm(dt.utctimetuple())
def write():
write_request = WriteRequest()
series = write_request.timeseries.add()
# name label always required
label = series.labels.add()
label.name = "__name__"
label.value = "metric_name"
# as many labels you like
label = series.labels.add()
label.name = "ssl_cipher"
label.value = "some_value"
sample = series.samples.add()
sample.value = 42 # your count?
sample.timestamp = dt2ts(datetime.utcnow()) * 1000
uncompressed = write_request.SerializeToString()
compressed = snappy.compress(uncompressed)
url = "http://localhost:7201/api/v1/prom/remote/write"
headers = {
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
"User-Agent": "metrics-worker"
}
try:
response = requests.post(url, headers=headers, data=compressed)
except exception as e:
print(e)
write()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment