Skip to content

Instantly share code, notes, and snippets.

@krokodilerian
Created April 3, 2018 13:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krokodilerian/8434e15248d6da7d8947bd2935bdb3fe to your computer and use it in GitHub Desktop.
Save krokodilerian/8434e15248d6da7d8947bd2935bdb3fe to your computer and use it in GitHub Desktop.
influx cq scheduler
#!/usr/bin/python
from glob import glob
from os import rename, unlink, getpid
from os.path import basename
from hashlib import md5
from time import sleep, time
from multiprocessing import Pool
import json
import re
import requests
import random
from sys import argv
global influxurl
global influxroot
global influxrootpass
global NOTCARE
global INTVL
global BACKING
tons = 1000000000
# For data that's just a minute old, skip
NOTCARE = 60*tons # 1 minute
# do not parse more than this much data in one query
INTVL = 3600*tons # parse data in such increments
# go back this much time when parsing data
BACKING = 60*tons
# how many sites to process at once
CHUNKNUM = 100
influxroot = 'root'
influxrootpass = 'xxxxxxxxxxxxxx'
influxurl = 'http://localhost:8086/'
def fetchsinglelist(res):
ret = []
try:
for entry in res['results'][0]['series'][0]['values']:
if entry[0] == '_internal':
continue
ret.append(entry[0])
except KeyError:
return ret
return ret
def fetchfirsttime(res):
try:
return res['results'][0]['series'][0]['values'][0][0]
except KeyError:
return None
def influxquery(query, db=None, debug=False):
if debug:
print "asking: %s" % query
auth = (influxroot, influxrootpass)
headers = {"Connection" : "close"}
if db == None:
payload = { "q": query }
else:
payload = { "q": query , "db": db , "epoch": "ns"}
retry = True
while retry:
retry = False
try:
r = requests.get(influxurl + "query", headers = headers, auth = auth, params = payload, timeout = 3000)
except (requests.ConnectionError, requests.Timeout, requests.HTTPError, requests.exceptions.ChunkedEncodingError):
retry = True
r = None
if debug:
print "%r" % r
if r.status_code == 200:
return r.json()
def processdb_gapjmp(dbname):
return processdb(dbname, True)
def processdb(dbname, gapjmp=False):
retq = []
# the intervals here should've been up to '2d', but for influxdb
# a retention policy of 48h means that you can have +24h of data
# more, as it flushes it once a day or sth like that, and sometimes
# it's possible to select for s1 something which if you add 1h to it
# still doesn't reach now()-2d and will basically think that it has
# no data in m1 for this measurement.
# There should be a better way for this.
intervals = ['10m', '1h', '6h', '1d', '2d', '4d' ]
res = influxquery("show measurements", dbname)
msmts = fetchsinglelist(res)
for msm in msmts:
for iv in intervals:
res = influxquery("select * from m1." + msm + " where time>now()-" + iv + " order by time desc limit 1", dbname)
last_m1 = fetchfirsttime(res)
if last_m1 != None:
break
for iv in intervals:
res = influxquery("select * from s1." + msm + " where time>now()-" + iv + " order by time desc limit 1", dbname)
last_s1 = fetchfirsttime(res)
if last_s1 != None:
break
# no data, nothing to do.
if last_s1 == None:
continue
# what if we have NO information in m1?
# see the comment above for the intervals
if last_m1 == None:
res = influxquery("select * from s1." + msm + " order by time asc limit 1", dbname)
last_m1 = fetchfirsttime(res)
intvl = min(last_s1 - last_m1, INTVL)
if gapjmp:
end_time = last_s1
# also delete data from the future.
retq.append("delete from " + msm + " where time>now()")
else:
end_time = min(last_s1, last_m1+intvl)
#print "db %s msm %s s1 %d m1 %d end %d diff %d intvl %d" % (dbname.ljust(25), msm.ljust(10), last_s1/tons, last_m1/tons, end_time/tons, (last_s1-last_m1)/tons, intvl/tons)
if last_s1 - last_m1 < NOTCARE and last_s1 - last_m1>0:
#print "db %s msm %s up to date." % (dbname, msm)
continue
query_mean = 'SELECT mean(*) INTO "' + dbname + '".m1."'+ msm +'" FROM "' + dbname + '".s1."' + msm + '" WHERE time >= ' + str(last_m1 - BACKING) + ' AND time < '+ str(end_time) + ' GROUP BY time(1m), * fill(linear)'
retq.append(query_mean)
query_max = 'SELECT max(*) INTO "' + dbname + '".m1."'+ msm +'" FROM "' + dbname + '".s1."' + msm + '" WHERE time >= ' + str(last_m1 - BACKING) + ' AND time < '+ str(end_time) + ' GROUP BY time(1m), * fill(linear)'
retq.append(query_max)
return retq
def listdbs():
dblist = []
dbs = influxquery("SHOW DATABASES")
dblist = fetchsinglelist(dbs)
return dblist
def chunks(l, n):
'''
Yield successive n-sized chunks from l
'''
for i in range(0, len(l), n):
yield l[i:i + n]
if (len(argv)) > 1:
# single process
processdb(argv[1])
exit(0)
p = Pool(processes=6, maxtasksperchild=2000)
passnum = 0
while True:
alldbs = listdbs()
random.shuffle(alldbs)
cnt = 0
qgentime_all = 0
parsetime_all = 0
print "Processing {n} sites ({c} at a time)".format(c = CHUNKNUM, n = len(alldbs))
for dbs in chunks(alldbs, CHUNKNUM):
starttime = round(time(), 0)
queries = []
if passnum % 10 == 0:
r = p.map(processdb_gapjmp, dbs, 1)
else:
r = p.map(processdb, dbs, 1)
for lst in r:
for q in lst:
queries.append(q)
# print "%r" % queries
endtime = round(time(), 0)
qgentime = endtime-starttime
qgentime_all += qgentime
starttime = round(time(), 0)
# p.map(processdb, dbs, 1)
random.shuffle(queries)
p.map(influxquery, queries, 1)
endtime = round(time(), 0)
parsetime = endtime-starttime
parsetime_all += parsetime
cnt += len(dbs)
print "[stat] %d/%d done, qgen/process/total %s/%s/%s seconds" % (cnt, len(alldbs), str(qgentime), str(parsetime), str(qgentime+parsetime))
passnum += 1
print "[stat] Full run for %d Dbs qgen/process/total %s/%s/%s seconds" % (len(alldbs), str(qgentime_all), str(parsetime_all), str(qgentime_all+parsetime_all))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment