Skip to content

Instantly share code, notes, and snippets.

@gmr
Last active January 12, 2017 13:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gmr/5120854 to your computer and use it in GitHub Desktop.
Save gmr/5120854 to your computer and use it in GitHub Desktop.
Load missing datapoints on a remote graphite instance from the WHISPER_DIR on a graphite instance. Use threads to allow for multiple files to be processed at the same time to improve throughput.
#!/usr/bin/env python
import logging
import os
import random
import requests
import socket
import sys
import threading
import time
import whisper
LOGGER = logging.getLogger(__name__)
FORMAT = '%(levelname) -10s %(asctime)s %(threadName) -15s: %(message)s'
MIN_TIME = 999999999999999
MAX_THREADS = 10
URL = 'http://graphite01.scs.mtmeprod.net/render?from=-1w&until=-&target=%s&format=json'
START_TIME = 1338523200
RELAYS = [('graphite02.scs.mtmeprod.net', 2003),
('graphite03.scs.mtmeprod.net', 2003),
('graphite04.scs.mtmeprod.net', 2003)]
def fetch_data(stat):
url = URL % stat
response = requests.get(url)
if response.status_code == 200:
return response.json()
return None
def send_stats(relay, file_path, min_time, stat):
LOGGER.info(stat)
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection.connect(relay)
(value_time, end_time, time_step), values = whisper.fetch(file_path, START_TIME, min_time)
for value in values:
if value is not None and time_step < min_time:
value_to_send = '%s %s %s\n' % (stat, value, value_time)
connection.send(value_to_send)
value_time += time_step
connection.close()
def wait_on_open_thread(threads):
# If the thread count is at the max, sleep
while len([thread for thread in threads if thread.is_alive()]) == MAX_THREADS:
time.sleep(1)
# Remove any finished threads
for offset, thread in enumerate(threads):
if not threads[offset].is_alive():
threads.remove(thread)
def main():
threads = list()
relay_offset = 0
# Iterate from the current directory down, assumes is WHISPER_DIR
for dirpath, dirnames, filenames in os.walk('.', topdown=False):
# Ignore carbon stats
if 'carbon' in dirpath:
continue
# We only care about the bottom of a tree
if not dirnames:
# Iterate through each whisper file
for filename in filenames:
# Build the path and the stat name
file_path = '%s/%s' % (dirpath, filename)
stat = file_path[2:].replace('/', '.').replace('.wsp', '')
# Build the remote URL and get the data
data = fetch_data(stat)
# No data means empty or bad response
if not data:
continue
# Find the first instance of a data point on remote side
min_time = MIN_TIME
for [count, timestamp] in data[0]['datapoints']:
if count and timestamp < min_time:
min_time = timestamp
# Block on MAX_THREADS processing
wait_on_open_thread(threads)
# Create the new processing thread
thread = threading.Thread(target=send_stats,
args=(RELAYS[relay_offset],
file_path,
min_time,
stat))
thread.start()
threads.append(thread)
relay_offset += 1
if relay_offset >= len(RELAYS):
relay_offset = 0
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
tmp = logging.getLogger('requests.packages.urllib3',
format=FORMAT)
tmp.setLevel(logging.ERROR)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment