Skip to content

Instantly share code, notes, and snippets.

@woodrow
Created January 23, 2012 05:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save woodrow/1661012 to your computer and use it in GitHub Desktop.
Save woodrow/1661012 to your computer and use it in GitHub Desktop.
Reproducing Unhandled Errors with txpostgres
# based on fist example at http://wulczer.github.com/txpostgres/usage.html
import socket
import sys
from txpostgres import txpostgres
from twisted.internet import reactor, stdio
from twisted.python import log, util
from twisted.protocols import basic
import psycopg2
SSL_ENABLED = False
class Echo(basic.LineReceiver):
from os import linesep as delimiter
def __init__(self):
txpostgres.Connection.connectionFactory = staticmethod(tcp_failfast())
self.state = 0
self.conn = None
def connectionMade(self):
self.update(None)
def lineReceived(self, line):
self.update(line)
def next_state(self, *args):
self.state += 1
def update(self, line):
if self.state == 0:
self.transport.write(
"Welcome. Make sure there are no iptables rules affecting "
"access to postgres (5432/tcp) on localhost.\n"
"Hit ENTER to start.\n")
self.next_state()
elif self.state == 1:
self.transport.write("Establishing connection...\n")
self.conn = txpostgres.Connection()
if SSL_ENABLED:
d = self.conn.connect(
'host=127.0.0.1 dbname=postgres sslmode=require')
else:
d = self.conn.connect(
'host=127.0.0.1 dbname=postgres sslmode=disable')
d.addCallback(lambda _: self.transport.write(
"Connected! Hit ENTER to try a query.\n"))
d.addErrback(self.general_errback)
#d.addCallback(self.next_state)
self.next_state()
elif self.state == 2:
d = self.simple_query()
d.addCallback(lambda _: self.transport.write(
"Success! Now let's block postgres. In another terminal,"
"enter the following to block postgres:\n"
" sudo iptables -t filter "
"-A INPUT -p tcp --sport 5432 -j DROP\n"
"\n"
"Once this is done, type 'ready' and hit ENTER.\n"))
#d.addCallback(self.next_state)
self.next_state()
elif self.state == 3:
if line.strip().lower() != 'ready':
self.transport.write(
"Block port 5432/tcp, type 'ready' and hit ENTER.\n")
return
d = self.simple_query()
self.transport.write(
"This query should fail with an Unhandled Error, even "
"though there is a non-trapping errback on the deferred.\n"
"Once the error occurs, hit ENTER to continue.\n")
self.next_state()
elif self.state == 4:
self.transport.write(
"Okay, this concludes the demonstration. In another "
"terminal, enter the following to restore postgres:\n"
" sudo iptables -t filter "
"-D INPUT -p tcp --sport 5432 -j DROP\n"
"\n"
"Once this is done, hit ENTER to quit.\n")
self.next_state()
else:
reactor.stop()
def simple_query(self):
"""
This does a very simple query, adds a callback to write the results and
an errback to print the error.
"""
self.transport.write(
"Attempting 'select tablename from pg_tables'...\n")
d = self.conn.runQuery('select tablename from pg_tables')
d.addCallback(lambda result: self.transport.write(
"========================================\n" +
"All tables:" + " ".join(map(str, result)) + "\n" +
"========================================\n"))
d.addErrback(self.general_errback)
return d
def general_errback(self, e):
"""
Just print "ERRBACK: (exception object type, exception message)".
"""
self.transport.write("ERRBACK: (%s, %s)\n" %
(str(e.type), e.getErrorMessage().strip()))
def stop(self):
print("All done. Terminating.\n")
if self.conn:
self.conn.close()
def main():
e = Echo()
stdio.StandardIO(e)
reactor.addSystemEventTrigger('before', 'shutdown', e.stop)
reactor.run()
def tcp_failfast():
"""
This adjusts some of the TCP connection parameters in order to allow the
a blocked TCP connection to fail much more quickly than a the normal Linux
defaults would allow. In particular, it sets the keepalive timeouts such
that after 30s of no ACKs in keepalive mode, a connection failure will be
reported. Similarly, it sets the TCP_USER_TIMEOUT such that after 5s of no
ACKs during data transmission, a connection failure will be reported.
"""
def connect(*args, **kwargs):
conn = psycopg2.connect(*args, **kwargs)
# 30s timeout in 'keepalive' mode
set_tcp_keepalive(conn.fileno(),
tcp_keepidle=10,
tcp_keepcnt=2,
tcp_keepintvl=10)
# 5s timeout in 'on' mode
set_tcp_user_timeout(conn.fileno(), 5)
return conn
return connect
# lifted from
# https://github.com/markokr/skytools/blob/master/python/skytools/sockutil.py
# the project https://github.com/markokr/skytools has an MITish license
##############################################################################
# SkyTools - tool collection for PostgreSQL
#
# Copyright (c) 2007 Marko Kreen, Skype Technologies OU
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#############################################################################
def set_tcp_keepalive(fd, keepalive = True,
tcp_keepidle = 4 * 60,
tcp_keepcnt = 4,
tcp_keepintvl = 15):
"""Turn on TCP keepalive. The fd can be either numeric or socket
object with 'fileno' method.
OS defaults for SO_KEEPALIVE=1:
- Linux: (7200, 9, 75) - can configure all.
- MacOS: (7200, 8, 75) - can configure only tcp_keepidle.
- Win32: (7200, 5|10, 1) - can configure tcp_keepidle and tcp_keepintvl.
Python needs SIO_KEEPALIVE_VALS support in socket.ioctl to enable it.
Our defaults: (240, 4, 15).
>>> import socket
>>> s = socket.socket()
>>> set_tcp_keepalive(s)
"""
# usable on this OS?
if not hasattr(socket, 'SO_KEEPALIVE') or not hasattr(socket, 'fromfd'):
return
# get numeric fd and cast to socket
if hasattr(fd, 'fileno'):
fd = fd.fileno()
s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
# skip if unix socket
if type(s.getsockname()) != type(()):
return
# turn on keepalive on the connection
if keepalive:
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, 'TCP_KEEPCNT'):
s.setsockopt(socket.IPPROTO_TCP,
getattr(socket, 'TCP_KEEPIDLE'), tcp_keepidle)
s.setsockopt(socket.IPPROTO_TCP,
getattr(socket, 'TCP_KEEPCNT'), tcp_keepcnt)
s.setsockopt(socket.IPPROTO_TCP,
getattr(socket, 'TCP_KEEPINTVL'), tcp_keepintvl)
elif hasattr(socket, 'TCP_KEEPALIVE'):
s.setsockopt(socket.IPPROTO_TCP,
getattr(socket, 'TCP_KEEPALIVE'), tcp_keepidle)
elif sys.platform == 'darwin':
TCP_KEEPALIVE = 0x10
s.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, tcp_keepidle)
elif sys.platform == 'win32':
#s.ioctl(SIO_KEEPALIVE_VALS, (1, tcp_keepidle*1000, tcp_keepintvl*1000))
pass
else:
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0)
def set_tcp_user_timeout(fd, tcp_user_timeout = 30):
"""
Inspired by set_tcp_keepalive().
"""
# get numeric fd and cast to socket
if hasattr(fd, 'fileno'):
fd = fd.fileno()
s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
# skip if unix socket
if type(s.getsockname()) != type(()):
return
# a little help from /usr/include/linux/tcp.h: #define TCP_USER_TIMEOUT 18
try:
setattr(socket, 'TCP_USER_TIMEOUT', 18)
s.setsockopt(socket.IPPROTO_TCP,
getattr(socket, 'TCP_USER_TIMEOUT'), tcp_user_timeout*1000)
except socket.error:
print("Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT.")
if __name__ == '__main__':
if len(sys.argv) == 2 and sys.argv[1].lower() == 'ssl':
SSL_ENABLED = True
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment