Skip to content

Instantly share code, notes, and snippets.

@russch
Created June 26, 2017 22:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save russch/066c790aa5b16412ba18f0d893d948aa to your computer and use it in GitHub Desktop.
Save russch/066c790aa5b16412ba18f0d893d948aa to your computer and use it in GitHub Desktop.
Monitor and Save Backgrounder Queue Length
import json
import psycopg2
import sys
import time
# Source database & query which monitors job queue
connStringSource = "host=<hostname> port=8060 dbname=workgroup user=<tableau or read-only> password=<password>"
sqlSource= "SELECT job_name, \
Case \
WHEN progress = -1 then 'Pending'\
WHEN progress < 100 then 'In Process'\
WHEN finish_code = 0 then 'Success'\
ELSE 'Error'\
END as Status,\
COUNT(job_name),\
now() \
FROM background_jobs \
WHERE job_name IN ('Refresh Extracts', 'Increment Extracts', 'Subscription Notifications') \
GROUP BY\
job_name, Case \
WHEN progress = -1 then 'Pending'\
WHEN progress < 100 then 'In Process'\
WHEN finish_code = 0 then 'Success'\
ELSE 'Error'\
END, now()"
# Target database to host results
connStringTarget = "host=foo.bar.rds.amazonaws.com port=5432 dbname=<some_database> user=<user> password=<pw>"
sqlTarget = "INSERT INTO backgrounder_activity (job_name, status, status_count, measurement_time) VALUES (%s,%s, %s, %s)"
while 1:
sourceConnection = psycopg2.connect(connStringSource)
sourceCursor = sourceConnection.cursor()
sourceCursor.execute(sqlSource)
# Get job queue
rows = sourceCursor.fetchall()
sourceCursor.close()
sourceConnection.close()
targetConnection = psycopg2.connect(connStringTarget)
targetCursor = targetConnection.cursor()
for row in rows:
targetCursor.execute(sqlTarget, (row[0], row[1], row[2],row[3] ))
targetConnection.commit()
#print (cursor.mogrify(sql, (1477, 'someMachine', timeslice['from'], metric['name'], timeslice['values']['average_value'] )))
targetCursor.close()
targetConnection.close()
print('Data Recorded')
time.sleep(60)
#-- CREATE TABLE STATEMENTS:
# CREATE TABLE public.backgrounder_activity
#(
# id integer NOT NULL DEFAULT nextval('backgrounder_activity_id_seq'::regclass),
# job_name character varying(50) COLLATE pg_catalog."default",
# status character varying(50) COLLATE pg_catalog."default",
# status_count integer,
# measurement_time timestamp without time zone,
# CONSTRAINT pk_id PRIMARY KEY (id)
#)
#WITH (
# OIDS = FALSE
#)
#TABLESPACE pg_default;
#
#ALTER TABLE public.backgrounder_activity
# OWNER to postgres;
#
#-- Index: idx_id
#
#-- DROP INDEX public.idx_id;
#
#CREATE UNIQUE INDEX idx_id
# ON public.backgrounder_activity USING btree
# (id)
# TABLESPACE pg_default;
#
#-- Index: idx_job_status
#
#-- DROP INDEX public.idx_job_status;
#
#CREATE INDEX idx_job_status
# ON public.backgrounder_activity USING btree
# (job_name COLLATE pg_catalog."default", status COLLATE pg_catalog."default")
# TABLESPACE pg_default;
#
#ALTER TABLE public.backgrounder_activity
# CLUSTER ON idx_job_status;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment