Skip to content

Instantly share code, notes, and snippets.

@pilgrim2go
Forked from robertsosinski/postgresql.py
Created March 6, 2019 08:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pilgrim2go/7ee3bc07bb474267cc65d395ab7500fd to your computer and use it in GitHub Desktop.
Save pilgrim2go/7ee3bc07bb474267cc65d395ab7500fd to your computer and use it in GitHub Desktop.
DataDog script for collecting PostgreSQL stats
# Create the datadog user with select only permissions:
# CREATE USER datadog WITH PASSWORD '<complex_password>';
#
# Grant select permissions on a table or view that you want to monitor:
# GRANT SELECT ON <schema>.<table> TO datadog;
#
# Grant permissions for a specific column on a table or view that you want to monitor:
# GRANT SELECT (id, name) ON <schema>.<table> TO datadog;
#
# Let non-superusers look at pg_stat_activity in a read-only fashon.
# Create this function as a superuser for the database:
#
# CREATE OR REPLACE FUNCTION public.pg_stat_activity() RETURNS SETOF pg_catalog.pg_stat_activity AS $BODY$
# DECLARE
# rec RECORD;
# BEGIN
# FOR rec IN SELECT * FROM pg_stat_activity
# LOOP
# RETURN NEXT rec;
# END LOOP;
# RETURN;
# END;
# $BODY$ LANGUAGE plpgsql SECURITY DEFINER;
#
# Enable stats collection for functions:
# set track_functions to 'pl';
from checks import AgentCheck
import psycopg2 as pg
class PostgresqlCheck(AgentCheck):
def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)
self.name = init_config.get('name', 'postgresql')
def check(self, instance):
self.log.info('Starting PostgreSQL Check')
tags = instance.get('tags', [])
host = instance.get('host', 'localhost')
port = instance.get('port', '5432')
username = instance.get('username')
password = instance.get('password')
database = instance.get('database')
tags = tags + ['database:%s' % (database)]
self.log.info('Connecting to PostgreSQL')
db = pg.connect(host=host, port=port, user=username, password=password, database=database)
cu = db.cursor()
# Start Collecting Table Stats
self.log.info('Collecting Table Stats')
cu.execute(self.query_table_stats())
for stat in cu.fetchall():
(
schemaname,
relname,
relsize,
total_relsize,
reltuples,
relpages,
avg_tuplesize,
seq_scan,
idx_scan,
per_idx_scan,
per_rel_hit,
per_idx_hit,
n_tup_ins,
n_tup_upd,
n_tup_hot_upd,
per_hot_upd,
n_tup_del,
n_live_tup,
n_dead_tup,
per_deadfill
) = stat
self.gauge('%s.table.relsize' % (self.name), relsize, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.total_relsize' % (self.name), total_relsize, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.reltuples' % (self.name), reltuples, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.relpages' % (self.name), relpages, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.avg_tuplesize' % (self.name), avg_tuplesize, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.rate( '%s.table.seq_scan' % (self.name), seq_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.rate( '%s.table.idx_scan' % (self.name), idx_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.per_idx_scan' % (self.name), per_idx_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.per_rel_hit' % (self.name), per_rel_hit, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.per_idx_hit' % (self.name), per_idx_hit, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.rate( '%s.table.n_tup_ins' % (self.name), n_tup_ins, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.rate( '%s.table.n_tup_upd' % (self.name), n_tup_upd, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.rate( '%s.table.n_tup_hot_upd' % (self.name), n_tup_hot_upd, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.per_hot_upd' % (self.name), per_hot_upd, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.rate( '%s.table.n_tup_del' % (self.name), n_tup_del, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.n_live_tup' % (self.name), n_live_tup, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.n_dead_tup' % (self.name), n_dead_tup, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
self.gauge('%s.table.per_deadfill' % (self.name), per_deadfill, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname)]))
# Start Collecting Index Stats
self.log.info('Collecting Index Stats')
cu.execute(self.query_index_stats())
for stat in cu.fetchall():
(
schemaname,
relname,
idxname,
idx_scan,
idx_tup_read,
idx_tup_fetch
) = stat
self.rate('%s.index.idx_scan' % (self.name), idx_scan, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname), 'index:%s' % (idxname)]))
self.rate('%s.index.idx_tup_read' % (self.name), idx_tup_read, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname), 'index:%s' % (idxname)]))
self.rate('%s.index.idx_tup_fetch' % (self.name), idx_tup_fetch, tags = (tags + ['schema:%s' % (schemaname), 'table:%s' % (relname), 'index:%s' % (idxname)]))
# Start Collecting Function Stats
self.log.info('Collecting Function Stats')
cu.execute(self.query_function_stats())
for stat in cu.fetchall():
(
schemaname,
funcname,
calls,
self_time,
total_time
) = stat
self.rate('%s.function.calls' % (self.name) , calls, tags = (tags + ['schema:%s' % (schemaname), 'function:%s' % (funcname)]))
self.rate('%s.function.self_time' % (self.name) , self_time, tags = (tags + ['schema:%s' % (schemaname), 'function:%s' % (funcname)]))
self.rate('%s.function.total_time' % (self.name), total_time, tags = (tags + ['schema:%s' % (schemaname), 'function:%s' % (funcname)]))
# Start Collecting Database Stats
self.log.info('Collecting Database Stats')
cu.execute(self.query_database_stats(database))
for stat in cu.fetchall():
(
datname,
xact_commit,
xact_rollback,
blks_read,
blks_hit
) = stat
self.rate('%s.database.xact_commit' % (self.name), xact_commit, tags = tags)
self.rate('%s.database.xact_rollback' % (self.name), xact_rollback, tags = tags)
self.rate('%s.database.blks_read' % (self.name), blks_read, tags = tags)
self.rate('%s.database.blks_hit' % (self.name), blks_hit, tags = tags)
# Start Collecting Database Connections
self.log.info('Collecting Database Connections')
cu.execute(self.query_connections(database))
for stat in cu.fetchall():
(connections) = stat
self.gauge('%s.database.connections' % (self.name), connections, tags = tags)
# Start Collecting Database Waiting Connections
self.log.info('Collecting Database Waiting Connections')
cu.execute(self.query_waiting_connections(database))
for stat in cu.fetchall():
(waiting_connections) = stat
self.gauge('%s.database.waiting_connections' % (self.name), waiting_connections, tags = tags)
# Start Collecting Database Replication Delay
self.log.info('Collecting Database Replication Delay')
cu.execute(self.query_replication_delay())
for stat in cu.fetchall():
(replication_delay) = stat
self.gauge('%s.database.replication_delay' % (self.name), replication_delay, tags = tags)
# Start Collecting Heap Memory Stats
self.log.info('Collecting Heap Memory Stats')
cu.execute(self.query_heap_memory_stats())
for stat in cu.fetchall():
(
heap_read,
heap_hit,
per_heap_ratio
) = stat
self.rate( '%s.database.heap_read' % (self.name), heap_read, tags = tags)
self.rate( '%s.database.heap_hit' % (self.name), heap_hit, tags = tags)
self.gauge('%s.database.per_heap_ratio' % (self.name), per_heap_ratio, tags = tags)
# Start Collecting Index Memory Stats
self.log.info('Collecting Index Memory Stats')
cu.execute(self.query_index_memory_stats())
for stat in cu.fetchall():
(
idx_read,
idx_hit,
per_idx_ratio
) = stat
self.rate( '%s.database.idx_read' % (self.name), idx_read, tags = tags)
self.rate( '%s.database.idx_hit' % (self.name), idx_hit, tags = tags)
self.gauge('%s.database.per_idx_ratio' % (self.name), per_idx_ratio, tags = tags)
# Closing PostgreSQL Connection
self.log.info('Closing PostgreSQL Connection')
cu.close()
db.close()
def query_table_stats(self):
return """
select
psut.schemaname,
pc.relname,
pg_table_size(pc.relname::varchar) tblsize,
pg_indexes_size(pc.relname::varchar) idxsize,
pg_total_relation_size(pc.relname::varchar) relsize,
pc.reltuples::bigint,
pc.relpages,
coalesce(round((8192 / (nullif(pc.reltuples, 0) / nullif(pc.relpages, 0)))), 0) avg_tuplesize,
psut.seq_scan,
psut.idx_scan,
coalesce(100 * psut.idx_scan / nullif((psut.idx_scan + psut.seq_scan), 0), 0)::int per_idx_scan,
coalesce(100 * psiout.heap_blks_hit / nullif((psiout.heap_blks_hit + psiout.heap_blks_read), 0), 0)::int per_rel_hit,
coalesce(100 * psiout.idx_blks_hit / nullif((psiout.idx_blks_hit + psiout.idx_blks_read), 0), 0)::int per_idx_hit,
psut.n_tup_ins,
psut.n_tup_upd,
psut.n_tup_hot_upd,
coalesce(100 * psut.n_tup_hot_upd / nullif(psut.n_tup_upd, 0), 0)::int per_hot_upd,
psut.n_tup_del,
psut.n_live_tup,
psut.n_dead_tup,
coalesce(100 * psut.n_dead_tup / nullif(psut.n_live_tup, 0), 0)::int per_deadfill
from pg_stat_user_tables psut
inner join pg_statio_user_tables psiout on psiout.relname = psut.relname
inner join pg_class pc on pc.relname = psut.relname
order by pc.relname asc
"""
def query_index_stats(self):
return """
select
pi.schemaname,
pcr.relname as relname,
pci.relname as idxname,
pg_size_pretty(pg_total_relation_size(pci.relname::varchar)) idxsize_pret,
pg_total_relation_size(pci.relname::varchar) idxsize,
pci.reltuples::bigint idxtuples,
pcr.reltuples::bigint reltuples,
coalesce(100 * pci.reltuples / nullif(pcr.reltuples, 0), 0)::int per_idx_covered,
pi.idx_scan,
pi.idx_tup_read,
pi.idx_tup_fetch
from pg_stat_user_indexes pi
inner join pg_class pci on pci.oid = pi.indexrelid
inner join pg_class pcr on pcr.oid = pi.relid
order by schemaname, relname, idxname
"""
def query_function_stats(self):
return """
select
schemaname,
funcname,
calls,
self_time,
total_time
from pg_stat_user_functions
where schemaname <> 'pg_catalog'
"""
def query_database_stats(self, database):
return """
select
datname,
pg_database_size('%s') db_size,
xact_commit,
xact_rollback,
blks_read,
blks_hit
from pg_stat_database where datname = '%s'
""" % (database, database)
def query_connections(self, database):
return """
select count(1) connections from pg_stat_activity() where datname = '%s'
""" % (database)
def query_waiting_connections(self, database):
return """
select count(1) waiting_connections from pg_stat_activity() where waiting is true and datname = '%s'
""" % (database)
def query_replication_delay(self):
return """
select extract(epoch from (now() - pg_last_xact_replay_timestamp())) * 1000 as replication_delay
"""
def query_heap_memory_stats(self):
return """
select
cast(sum(heap_blks_read) as bigint) heap_read,
cast(sum(heap_blks_hit) as bigint) heap_hit,
coalesce(cast(sum(heap_blks_hit) / nullif((sum(heap_blks_hit) + sum(heap_blks_read)), 0) * 100 as bigint), 0)::int per_heap_ratio
from pg_statio_user_tables
"""
def query_index_memory_stats(self):
return """
select
cast(sum(idx_blks_read) as bigint) idx_read,
cast(sum(idx_blks_hit) as bigint) idx_hit,
coalesce(cast(sum(idx_blks_hit) / nullif((sum(idx_blks_hit) + sum(idx_blks_read)), 0) * 100 as bigint), 0)::int per_idx_ratio
from pg_statio_user_indexes
"""
init_config:
name: 'postgresql'
instances:
- host: 'localhost'
port: '5432'
username: 'datadog'
password: '<complex_password>'
database: '<database_name>'
tags:
- 'env:<env>'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment