Created
June 26, 2017 22:20
-
-
Save russch/066c790aa5b16412ba18f0d893d948aa to your computer and use it in GitHub Desktop.
Monitor and Save Backgrounder Queue Length
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import psycopg2 | |
import sys | |
import time | |
# Source database & query which monitors job queue | |
connStringSource = "host=<hostname> port=8060 dbname=workgroup user=<tableau or read-only> password=<password>" | |
sqlSource= "SELECT job_name, \ | |
Case \ | |
WHEN progress = -1 then 'Pending'\ | |
WHEN progress < 100 then 'In Process'\ | |
WHEN finish_code = 0 then 'Success'\ | |
ELSE 'Error'\ | |
END as Status,\ | |
COUNT(job_name),\ | |
now() \ | |
FROM background_jobs \ | |
WHERE job_name IN ('Refresh Extracts', 'Increment Extracts', 'Subscription Notifications') \ | |
GROUP BY\ | |
job_name, Case \ | |
WHEN progress = -1 then 'Pending'\ | |
WHEN progress < 100 then 'In Process'\ | |
WHEN finish_code = 0 then 'Success'\ | |
ELSE 'Error'\ | |
END, now()" | |
# Target database to host results | |
connStringTarget = "host=foo.bar.rds.amazonaws.com port=5432 dbname=<some_database> user=<user> password=<pw>" | |
sqlTarget = "INSERT INTO backgrounder_activity (job_name, status, status_count, measurement_time) VALUES (%s,%s, %s, %s)" | |
while 1: | |
sourceConnection = psycopg2.connect(connStringSource) | |
sourceCursor = sourceConnection.cursor() | |
sourceCursor.execute(sqlSource) | |
# Get job queue | |
rows = sourceCursor.fetchall() | |
sourceCursor.close() | |
sourceConnection.close() | |
targetConnection = psycopg2.connect(connStringTarget) | |
targetCursor = targetConnection.cursor() | |
for row in rows: | |
targetCursor.execute(sqlTarget, (row[0], row[1], row[2],row[3] )) | |
targetConnection.commit() | |
#print (cursor.mogrify(sql, (1477, 'someMachine', timeslice['from'], metric['name'], timeslice['values']['average_value'] ))) | |
targetCursor.close() | |
targetConnection.close() | |
print('Data Recorded') | |
time.sleep(60) | |
#-- CREATE TABLE STATEMENTS: | |
# CREATE TABLE public.backgrounder_activity | |
#( | |
# id integer NOT NULL DEFAULT nextval('backgrounder_activity_id_seq'::regclass), | |
# job_name character varying(50) COLLATE pg_catalog."default", | |
# status character varying(50) COLLATE pg_catalog."default", | |
# status_count integer, | |
# measurement_time timestamp without time zone, | |
# CONSTRAINT pk_id PRIMARY KEY (id) | |
#) | |
#WITH ( | |
# OIDS = FALSE | |
#) | |
#TABLESPACE pg_default; | |
# | |
#ALTER TABLE public.backgrounder_activity | |
# OWNER to postgres; | |
# | |
#-- Index: idx_id | |
# | |
#-- DROP INDEX public.idx_id; | |
# | |
#CREATE UNIQUE INDEX idx_id | |
# ON public.backgrounder_activity USING btree | |
# (id) | |
# TABLESPACE pg_default; | |
# | |
#-- Index: idx_job_status | |
# | |
#-- DROP INDEX public.idx_job_status; | |
# | |
#CREATE INDEX idx_job_status | |
# ON public.backgrounder_activity USING btree | |
# (job_name COLLATE pg_catalog."default", status COLLATE pg_catalog."default") | |
# TABLESPACE pg_default; | |
# | |
#ALTER TABLE public.backgrounder_activity | |
# CLUSTER ON idx_job_status; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment