Skip to content

Instantly share code, notes, and snippets.

@recklessop
Last active February 21, 2019 18:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save recklessop/f388def7bd70381e2ad1c51ebf25127e to your computer and use it in GitHub Desktop.
Save recklessop/f388def7bd70381e2ad1c51ebf25127e to your computer and use it in GitHub Desktop.
multi-threaded daily stats table generator
#!/usr/bin/python3
import pymysql.cursors
import threading
import time
import queue
import multiprocessing
exitFlag = 0
mysqlHost = 'localhost'
mysqlUser = 'root'
mysqlPass = 'Zertodata1!'
mysqlDb = 'zerto'
class myThread (threading.Thread):
def __init__(self, id, threadName, workQueue, vmList, mysqlHost, mysqlUser, mysqlPass, mysqlDb):
threading.Thread.__init__(self)
self.id = id
self.threadName = threadName
self.workQueue = workQueue
self.vmlist = vmList
self.mysqlHost = mysqlHost
self.mysqlUser = mysqlUser
self.mysqlPass = mysqlPass
self.mysqlDb = mysqlDb
def run(self):
print ("Starting " + self.name)
mysql_worker(self.id, self.threadName, self.workQueue, self.vmlist, self.mysqlHost, self.mysqlUser, self.mysqlPass, self.mysqlDb)
print ("Exiting " + self.name)
def mysql_worker(id, threadName, workQueue, vmlist, servername, user, password, db):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
print("{} working...".format(threadName))
date = workQueue.get()
queueLock.release()
connection = pymysql.connect(host=mysqlHost,user=mysqlUser,password=mysqlPass,db=mysqlDb,charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor)
try:
with connection.cursor() as cursor:
# Get the list of monitored VMs
for i in range(len(vmlist)):
sql = "SELECT (SUM(`IOPSWriteAvg`)) as iops, (SUM(`KBWriteAvg`)) as KBWrite FROM `stats` WHERE datestamp = '{}' AND vm = '{}';".format(date, vmlist[i])
cursor.execute(sql)
stats = cursor.fetchone()
io = stats['iops']
KB = stats['KBWrite']
if io is None:
io = 0
if KB is None:
KB = 0
io = str(int(io))
KB = str(int(KB))
vm = vmlist[i]
datestr = str(date)
print('{} - {} - {} - {} iops - {} KBps'.format(threadName, datestr, vm, io, KB))
sql = "INSERT INTO dailystats (datestamp, vm, WriteIOps, WriteKBps) VALUES ('{0}', '{1}', '{2}', '{3}');".format(datestr, vm, io, KB)
cursor.execute(sql)
connection.commit()
finally:
connection.close()
else:
queueLock.release()
print("{} sleeping".format(threadName))
time.sleep(1)
# main thread begins here
numCores = multiprocessing.cpu_count()
print("{} Cores".format(numCores))
threadList = []
dateList = []
vmList = []
queueLock = threading.Lock()
workQueue = queue.Queue()
threads = []
threadID = 1
#build thread list for with numcores threads
for x in range(numCores):
y = "thread{}".format(x)
threadList.append(y)
connection = pymysql.connect(host=mysqlHost,user=mysqlUser,password=mysqlPass,db=mysqlDb,charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor)
try:
with connection.cursor() as cursor:
# Get the list of monitored VMs
sql = "SELECT `name` FROM `vms` WHERE `monitor` = 'Y';"
cursor.execute(sql)
MonitoredVms = cursor.fetchall()
print("VM list retrieved")
for x in MonitoredVms:
vmList.append(x['name'])
with connection.cursor() as cursor:
# get the list of datestamps
sql = "SELECT DISTINCT `datestamp` FROM `stats`;"
cursor.execute(sql)
datestamps = cursor.fetchall()
print("Datestamps retrieved")
for x in datestamps:
dateList.append(x['datestamp'])
for tName in threadList:
# Create new threads
thread = myThread(threadID, tName, workQueue, vmList, mysqlHost, mysqlUser, mysqlPass, mysqlDb)
thread.start()
threads.append(thread)
threadID += 1
queueCount = 0
queueLock.acquire()
while dateList:
workQueue.put(dateList.pop())
queueCount += 1
queueLock.release()
print("{} Items in queue".format(queueCount))
# Wait for queue to empty
while not workQueue.empty():
time.sleep(1)
pass
# Notify threads it's time to exit
exitFlag = 1
for t in threads:
t.join()
print ("Exiting Main Thread")
finally:
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment