Created
November 26, 2012 20:18
-
-
Save bartek/4150358 to your computer and use it in GitHub Desktop.
postgres replication status
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I hate urllib and subprocess, so use tools by Kenneth Reitz. | |
import envoy | |
import requests | |
from socket import socket, AF_INET, SOCK_DGRAM | |
# Simple script to check the delay in replication between master and slave(s). | |
# The server runnin this will need to be allowed access via pg_hba to all the | |
# hosts it connects to. | |
hosts = ( | |
# list hosts. | |
) | |
# We like to send each ping of this script to statsd for nice historicial data | |
STATSD = { | |
'HOST': 'mysite.com', | |
'PORT': 8125, | |
} | |
# The delay is mostly an arbitrary number until you figure out your environment. | |
# Run the script in a cron over time and see where your delay generally lands, | |
# and adjust as necessary. | |
critical_delay = 1000 | |
def send_error(error): | |
# Send the information somewhere. We inform our IRC channel because email sucks | |
data = { | |
'service': 'PostgresBot', | |
'message': error, | |
'icon_url': 'https://dl.dropbox.com/u/103581/postgresql-icon-32.png', | |
} | |
requests.post('https://grove.io/api/notice/YOUR_GROVE_KEY/', data=data) | |
def run_command(host, command): | |
output = envoy.run('psql -h{0}'.format(host), data='{0};'.format(command)) | |
if output.std_err: | |
return send_error('Command {0} could not be completed. {1}'.format(command, output.std_err)) | |
# For each line of output, store the data into a dictionary that makes it | |
# easier to access. | |
output_data = {} | |
# This is very rustic but works for the output we expect from these | |
# commands. | |
linenum = 0 # Can't use enumarate as we may enumerate on an empty line. | |
for line in output.std_out.split('\n'): | |
line = line.strip() | |
# blank lines, dividers | |
if not line or line.startswith(('-', '(')): | |
continue | |
# the first actual line will contain the keys. | |
elif linenum == 0: | |
linenum = linenum + 1 | |
keys = [key.strip() for key in line.split('|')] | |
for key in keys: | |
output_data[key] = [] | |
else: | |
linenum = linenum + 1 | |
data_keys = output_data.keys() | |
line_data = [data.strip() for data in line.split('|')] | |
# For each piece stripped, add it its respective key. (hopefully!) | |
for index, ld in enumerate(line_data): | |
output_data[data_keys[index]].append(ld) | |
# For every line of output data, if there is only a single instance, turn it | |
# into a single item, not a list. | |
for key, value in output_data.iteritems(): | |
if len(value) == 1: | |
output_data[key] = value[0] | |
return output_data | |
# Define which machines are master/slave since this can change during a failover | |
def standby_check(): | |
master = None | |
slaves = [] | |
for host in hosts: | |
status = run_command(host, 'select pg_is_in_recovery() AS recovery') | |
if status['recovery'] == 'f': | |
master = host | |
elif status['recovery'] == 't': | |
slaves.append(host) | |
if not slaves or not master: | |
return send_error("Either master or a slave could not be found.") | |
# Get xlog positions. | |
# on master | |
info = run_command(master, "SELECT pg_xlog_location_diff(pg_current_xlog_location(), '0/0') AS offset") | |
master_offset = int(info['offset']) | |
slave_offsets = {} | |
for slave in slaves: | |
info = run_command(slave, "SELECT pg_xlog_location_diff(pg_last_xlog_receive_location(), '0/0') AS receive, \ | |
pg_xlog_location_diff(pg_last_xlog_replay_location(), '0/0') AS replay") | |
slave_offsets[slave] = { | |
'receive_offset': int(info['receive']), | |
'replay_offset': int(info['replay']), | |
} | |
# Compute deltas based on the offsets we found. | |
deltas = [] | |
for slave, data in slave_offsets.iteritems(): | |
deltas.append({ | |
'host': slave, | |
'receive_delta': master_offset - data['receive_offset'], | |
'replay_delta': master_offset - data['replay_offset'], | |
}) | |
# Send to statsd as a guage. | |
statsd_addr = (STATSD['HOST'], STATSD['PORT']) | |
udp_sock = socket(AF_INET, SOCK_DGRAM) | |
for delta in deltas: | |
host = delta['host'].split('.')[0] | |
data = 'postgres.replication.receive_delta.{0}:{1}|g'.format(host, delta['receive_delta']) | |
try: | |
udp_sock.sendto(data, statsd_addr) | |
except: | |
return | |
data = 'postgres.replication.replay_delta.{0}:{1}|g'.format(host, delta['replay_delta']) | |
try: | |
udp_sock.sendto(data, statsd_addr) | |
except: | |
return | |
if delta['receive_delta'] > critical_delay: | |
send_error('Receive location is offset by {0} on {1}'.format(delta['receive_delta'], host)) | |
if delta['replay_delta'] > critical_delay: | |
send_error('Replay location is offset by {0} on {1}'.format(delta['replay_delta'], host)) | |
if __name__ == "__main__": | |
standby_check() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment