Skip to content

Instantly share code, notes, and snippets.

@exekias
Created November 11, 2019 16:24
Show Gist options
  • Save exekias/3860f182169b4fa8f3ea02c82770be55 to your computer and use it in GitHub Desktop.
Save exekias/3860f182169b4fa8f3ea02c82770be55 to your computer and use it in GitHub Desktop.
from prometheus_client import start_http_server, Histogram
from datetime import datetime
from collections import defaultdict
from elasticsearch import Elasticsearch
import random
import time
import json
import threading
import requests
# Create a metric to track time spent and requests made.
REQUEST_TIME = Histogram('request_time_seconds', 'Time spent processing request', ['url'])
# Constantly retrieve these URLs to test request times
URLS=['http://localhost:9200', 'http://www.google.com', 'https://www.python.org/', 'http://elastic.co']
# Keep last samples here to calculate deltas from counters
SAMPLES_CACHE=defaultdict(int)
def setup(es):
"""
Setup test index and mapping
"""
settings = {
'mappings': {
'properties': {
'index_time_seconds': {
'type': 'histogram'
},
'@timestamp': {
'type': 'date',
},
'url': {
'type': 'keyword',
}
}
}
}
es.indices.create(index='histograms', body=settings, ignore=400)
def send(es):
"""
Send all samples to Elasticsearch
"""
now = datetime.utcnow()
metrics = REQUEST_TIME.collect()
docs = defaultdict(dict)
# calculate deltas for bucket cumulative counters
for metric in metrics:
for sample in metric.samples:
if not sample.name.endswith('_bucket'): # ignore _count and _sum
continue
if sample.labels['le'] == '+Inf': # ignore +Inf counter?
continue
key = sample.name + sample.labels['url'] + sample.labels['le']
docs[sample.labels['url']][float(sample.labels['le'])] = sample.value - SAMPLES_CACHE[key]
SAMPLES_CACHE[key] = sample.value
for url, buckets in docs.items():
# calculate bucket deltas
counts = list(y - x for x, y in zip([0] + list(buckets.values())[:-1], list(buckets.values())))
# calculate bucket mid points (prometheus uses top of bucket)
midpoints = list((x + (y-x)/2 for x,y in zip([0] + list(buckets.keys())[:-1], list(buckets.keys()))))
doc = {
'@timestamp': now,
'url': url,
'index_time_seconds': {
'values': midpoints,
'counts': counts,
}
}
res = es.index(index="histograms", body=doc)
def do_requests(url):
"""
Constantly request from all given URLs
"""
while True:
with REQUEST_TIME.labels(url=url).time():
requests.get(url)
if __name__ == '__main__':
start_http_server(8000)
es = Elasticsearch()
print('Creating histograms index')
setup(es)
print('Start downloading pages')
for url in URLS:
t = threading.Thread(target=do_requests, args=(url,))
t.daemon = True
t.start()
print('Scraping every 10s...')
while True:
# convert and send to elasticsearch
send(es)
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment