Created
November 11, 2019 16:24
-
-
Save exekias/3860f182169b4fa8f3ea02c82770be55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prometheus_client import start_http_server, Histogram | |
from datetime import datetime | |
from collections import defaultdict | |
from elasticsearch import Elasticsearch | |
import random | |
import time | |
import json | |
import threading | |
import requests | |
# Create a metric to track time spent and requests made. | |
REQUEST_TIME = Histogram('request_time_seconds', 'Time spent processing request', ['url']) | |
# Constantly retrieve these URLs to test request times | |
URLS=['http://localhost:9200', 'http://www.google.com', 'https://www.python.org/', 'http://elastic.co'] | |
# Keep last samples here to calculate deltas from counters | |
SAMPLES_CACHE=defaultdict(int) | |
def setup(es): | |
""" | |
Setup test index and mapping | |
""" | |
settings = { | |
'mappings': { | |
'properties': { | |
'index_time_seconds': { | |
'type': 'histogram' | |
}, | |
'@timestamp': { | |
'type': 'date', | |
}, | |
'url': { | |
'type': 'keyword', | |
} | |
} | |
} | |
} | |
es.indices.create(index='histograms', body=settings, ignore=400) | |
def send(es): | |
""" | |
Send all samples to Elasticsearch | |
""" | |
now = datetime.utcnow() | |
metrics = REQUEST_TIME.collect() | |
docs = defaultdict(dict) | |
# calculate deltas for bucket cumulative counters | |
for metric in metrics: | |
for sample in metric.samples: | |
if not sample.name.endswith('_bucket'): # ignore _count and _sum | |
continue | |
if sample.labels['le'] == '+Inf': # ignore +Inf counter? | |
continue | |
key = sample.name + sample.labels['url'] + sample.labels['le'] | |
docs[sample.labels['url']][float(sample.labels['le'])] = sample.value - SAMPLES_CACHE[key] | |
SAMPLES_CACHE[key] = sample.value | |
for url, buckets in docs.items(): | |
# calculate bucket deltas | |
counts = list(y - x for x, y in zip([0] + list(buckets.values())[:-1], list(buckets.values()))) | |
# calculate bucket mid points (prometheus uses top of bucket) | |
midpoints = list((x + (y-x)/2 for x,y in zip([0] + list(buckets.keys())[:-1], list(buckets.keys())))) | |
doc = { | |
'@timestamp': now, | |
'url': url, | |
'index_time_seconds': { | |
'values': midpoints, | |
'counts': counts, | |
} | |
} | |
res = es.index(index="histograms", body=doc) | |
def do_requests(url): | |
""" | |
Constantly request from all given URLs | |
""" | |
while True: | |
with REQUEST_TIME.labels(url=url).time(): | |
requests.get(url) | |
if __name__ == '__main__': | |
start_http_server(8000) | |
es = Elasticsearch() | |
print('Creating histograms index') | |
setup(es) | |
print('Start downloading pages') | |
for url in URLS: | |
t = threading.Thread(target=do_requests, args=(url,)) | |
t.daemon = True | |
t.start() | |
print('Scraping every 10s...') | |
while True: | |
# convert and send to elasticsearch | |
send(es) | |
time.sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment