Skip to content

Instantly share code, notes, and snippets.

@bartek
Created November 26, 2012 20:18
Show Gist options
  • Save bartek/4150358 to your computer and use it in GitHub Desktop.
Save bartek/4150358 to your computer and use it in GitHub Desktop.
postgres replication status
# I hate urllib and subprocess, so use tools by Kenneth Reitz.
import envoy
import requests
from socket import socket, AF_INET, SOCK_DGRAM
# Simple script to check the delay in replication between master and slave(s).
# The server runnin this will need to be allowed access via pg_hba to all the
# hosts it connects to.
hosts = (
# list hosts.
)
# We like to send each ping of this script to statsd for nice historicial data
STATSD = {
'HOST': 'mysite.com',
'PORT': 8125,
}
# The delay is mostly an arbitrary number until you figure out your environment.
# Run the script in a cron over time and see where your delay generally lands,
# and adjust as necessary.
critical_delay = 1000
def send_error(error):
# Send the information somewhere. We inform our IRC channel because email sucks
data = {
'service': 'PostgresBot',
'message': error,
'icon_url': 'https://dl.dropbox.com/u/103581/postgresql-icon-32.png',
}
requests.post('https://grove.io/api/notice/YOUR_GROVE_KEY/', data=data)
def run_command(host, command):
output = envoy.run('psql -h{0}'.format(host), data='{0};'.format(command))
if output.std_err:
return send_error('Command {0} could not be completed. {1}'.format(command, output.std_err))
# For each line of output, store the data into a dictionary that makes it
# easier to access.
output_data = {}
# This is very rustic but works for the output we expect from these
# commands.
linenum = 0 # Can't use enumarate as we may enumerate on an empty line.
for line in output.std_out.split('\n'):
line = line.strip()
# blank lines, dividers
if not line or line.startswith(('-', '(')):
continue
# the first actual line will contain the keys.
elif linenum == 0:
linenum = linenum + 1
keys = [key.strip() for key in line.split('|')]
for key in keys:
output_data[key] = []
else:
linenum = linenum + 1
data_keys = output_data.keys()
line_data = [data.strip() for data in line.split('|')]
# For each piece stripped, add it its respective key. (hopefully!)
for index, ld in enumerate(line_data):
output_data[data_keys[index]].append(ld)
# For every line of output data, if there is only a single instance, turn it
# into a single item, not a list.
for key, value in output_data.iteritems():
if len(value) == 1:
output_data[key] = value[0]
return output_data
# Define which machines are master/slave since this can change during a failover
def standby_check():
master = None
slaves = []
for host in hosts:
status = run_command(host, 'select pg_is_in_recovery() AS recovery')
if status['recovery'] == 'f':
master = host
elif status['recovery'] == 't':
slaves.append(host)
if not slaves or not master:
return send_error("Either master or a slave could not be found.")
# Get xlog positions.
# on master
info = run_command(master, "SELECT pg_xlog_location_diff(pg_current_xlog_location(), '0/0') AS offset")
master_offset = int(info['offset'])
slave_offsets = {}
for slave in slaves:
info = run_command(slave, "SELECT pg_xlog_location_diff(pg_last_xlog_receive_location(), '0/0') AS receive, \
pg_xlog_location_diff(pg_last_xlog_replay_location(), '0/0') AS replay")
slave_offsets[slave] = {
'receive_offset': int(info['receive']),
'replay_offset': int(info['replay']),
}
# Compute deltas based on the offsets we found.
deltas = []
for slave, data in slave_offsets.iteritems():
deltas.append({
'host': slave,
'receive_delta': master_offset - data['receive_offset'],
'replay_delta': master_offset - data['replay_offset'],
})
# Send to statsd as a guage.
statsd_addr = (STATSD['HOST'], STATSD['PORT'])
udp_sock = socket(AF_INET, SOCK_DGRAM)
for delta in deltas:
host = delta['host'].split('.')[0]
data = 'postgres.replication.receive_delta.{0}:{1}|g'.format(host, delta['receive_delta'])
try:
udp_sock.sendto(data, statsd_addr)
except:
return
data = 'postgres.replication.replay_delta.{0}:{1}|g'.format(host, delta['replay_delta'])
try:
udp_sock.sendto(data, statsd_addr)
except:
return
if delta['receive_delta'] > critical_delay:
send_error('Receive location is offset by {0} on {1}'.format(delta['receive_delta'], host))
if delta['replay_delta'] > critical_delay:
send_error('Replay location is offset by {0} on {1}'.format(delta['replay_delta'], host))
if __name__ == "__main__":
standby_check()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment