Last active
January 12, 2017 13:14
-
-
Save gmr/5120854 to your computer and use it in GitHub Desktop.
Load missing datapoints on a remote graphite instance from the WHISPER_DIR on a graphite instance. Use threads to allow for multiple files to be processed at the same time to improve throughput.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import os | |
import random | |
import requests | |
import socket | |
import sys | |
import threading | |
import time | |
import whisper | |
LOGGER = logging.getLogger(__name__) | |
FORMAT = '%(levelname) -10s %(asctime)s %(threadName) -15s: %(message)s' | |
MIN_TIME = 999999999999999 | |
MAX_THREADS = 10 | |
URL = 'http://graphite01.scs.mtmeprod.net/render?from=-1w&until=-&target=%s&format=json' | |
START_TIME = 1338523200 | |
RELAYS = [('graphite02.scs.mtmeprod.net', 2003), | |
('graphite03.scs.mtmeprod.net', 2003), | |
('graphite04.scs.mtmeprod.net', 2003)] | |
def fetch_data(stat): | |
url = URL % stat | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.json() | |
return None | |
def send_stats(relay, file_path, min_time, stat): | |
LOGGER.info(stat) | |
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
connection.connect(relay) | |
(value_time, end_time, time_step), values = whisper.fetch(file_path, START_TIME, min_time) | |
for value in values: | |
if value is not None and time_step < min_time: | |
value_to_send = '%s %s %s\n' % (stat, value, value_time) | |
connection.send(value_to_send) | |
value_time += time_step | |
connection.close() | |
def wait_on_open_thread(threads): | |
# If the thread count is at the max, sleep | |
while len([thread for thread in threads if thread.is_alive()]) == MAX_THREADS: | |
time.sleep(1) | |
# Remove any finished threads | |
for offset, thread in enumerate(threads): | |
if not threads[offset].is_alive(): | |
threads.remove(thread) | |
def main(): | |
threads = list() | |
relay_offset = 0 | |
# Iterate from the current directory down, assumes is WHISPER_DIR | |
for dirpath, dirnames, filenames in os.walk('.', topdown=False): | |
# Ignore carbon stats | |
if 'carbon' in dirpath: | |
continue | |
# We only care about the bottom of a tree | |
if not dirnames: | |
# Iterate through each whisper file | |
for filename in filenames: | |
# Build the path and the stat name | |
file_path = '%s/%s' % (dirpath, filename) | |
stat = file_path[2:].replace('/', '.').replace('.wsp', '') | |
# Build the remote URL and get the data | |
data = fetch_data(stat) | |
# No data means empty or bad response | |
if not data: | |
continue | |
# Find the first instance of a data point on remote side | |
min_time = MIN_TIME | |
for [count, timestamp] in data[0]['datapoints']: | |
if count and timestamp < min_time: | |
min_time = timestamp | |
# Block on MAX_THREADS processing | |
wait_on_open_thread(threads) | |
# Create the new processing thread | |
thread = threading.Thread(target=send_stats, | |
args=(RELAYS[relay_offset], | |
file_path, | |
min_time, | |
stat)) | |
thread.start() | |
threads.append(thread) | |
relay_offset += 1 | |
if relay_offset >= len(RELAYS): | |
relay_offset = 0 | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
tmp = logging.getLogger('requests.packages.urllib3', | |
format=FORMAT) | |
tmp.setLevel(logging.ERROR) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment