Created
January 23, 2012 05:52
-
-
Save woodrow/1661012 to your computer and use it in GitHub Desktop.
Reproducing Unhandled Errors with txpostgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# based on fist example at http://wulczer.github.com/txpostgres/usage.html | |
import socket | |
import sys | |
from txpostgres import txpostgres | |
from twisted.internet import reactor, stdio | |
from twisted.python import log, util | |
from twisted.protocols import basic | |
import psycopg2 | |
SSL_ENABLED = False | |
class Echo(basic.LineReceiver): | |
from os import linesep as delimiter | |
def __init__(self): | |
txpostgres.Connection.connectionFactory = staticmethod(tcp_failfast()) | |
self.state = 0 | |
self.conn = None | |
def connectionMade(self): | |
self.update(None) | |
def lineReceived(self, line): | |
self.update(line) | |
def next_state(self, *args): | |
self.state += 1 | |
def update(self, line): | |
if self.state == 0: | |
self.transport.write( | |
"Welcome. Make sure there are no iptables rules affecting " | |
"access to postgres (5432/tcp) on localhost.\n" | |
"Hit ENTER to start.\n") | |
self.next_state() | |
elif self.state == 1: | |
self.transport.write("Establishing connection...\n") | |
self.conn = txpostgres.Connection() | |
if SSL_ENABLED: | |
d = self.conn.connect( | |
'host=127.0.0.1 dbname=postgres sslmode=require') | |
else: | |
d = self.conn.connect( | |
'host=127.0.0.1 dbname=postgres sslmode=disable') | |
d.addCallback(lambda _: self.transport.write( | |
"Connected! Hit ENTER to try a query.\n")) | |
d.addErrback(self.general_errback) | |
#d.addCallback(self.next_state) | |
self.next_state() | |
elif self.state == 2: | |
d = self.simple_query() | |
d.addCallback(lambda _: self.transport.write( | |
"Success! Now let's block postgres. In another terminal," | |
"enter the following to block postgres:\n" | |
" sudo iptables -t filter " | |
"-A INPUT -p tcp --sport 5432 -j DROP\n" | |
"\n" | |
"Once this is done, type 'ready' and hit ENTER.\n")) | |
#d.addCallback(self.next_state) | |
self.next_state() | |
elif self.state == 3: | |
if line.strip().lower() != 'ready': | |
self.transport.write( | |
"Block port 5432/tcp, type 'ready' and hit ENTER.\n") | |
return | |
d = self.simple_query() | |
self.transport.write( | |
"This query should fail with an Unhandled Error, even " | |
"though there is a non-trapping errback on the deferred.\n" | |
"Once the error occurs, hit ENTER to continue.\n") | |
self.next_state() | |
elif self.state == 4: | |
self.transport.write( | |
"Okay, this concludes the demonstration. In another " | |
"terminal, enter the following to restore postgres:\n" | |
" sudo iptables -t filter " | |
"-D INPUT -p tcp --sport 5432 -j DROP\n" | |
"\n" | |
"Once this is done, hit ENTER to quit.\n") | |
self.next_state() | |
else: | |
reactor.stop() | |
def simple_query(self): | |
""" | |
This does a very simple query, adds a callback to write the results and | |
an errback to print the error. | |
""" | |
self.transport.write( | |
"Attempting 'select tablename from pg_tables'...\n") | |
d = self.conn.runQuery('select tablename from pg_tables') | |
d.addCallback(lambda result: self.transport.write( | |
"========================================\n" + | |
"All tables:" + " ".join(map(str, result)) + "\n" + | |
"========================================\n")) | |
d.addErrback(self.general_errback) | |
return d | |
def general_errback(self, e): | |
""" | |
Just print "ERRBACK: (exception object type, exception message)". | |
""" | |
self.transport.write("ERRBACK: (%s, %s)\n" % | |
(str(e.type), e.getErrorMessage().strip())) | |
def stop(self): | |
print("All done. Terminating.\n") | |
if self.conn: | |
self.conn.close() | |
def main(): | |
e = Echo() | |
stdio.StandardIO(e) | |
reactor.addSystemEventTrigger('before', 'shutdown', e.stop) | |
reactor.run() | |
def tcp_failfast(): | |
""" | |
This adjusts some of the TCP connection parameters in order to allow the | |
a blocked TCP connection to fail much more quickly than a the normal Linux | |
defaults would allow. In particular, it sets the keepalive timeouts such | |
that after 30s of no ACKs in keepalive mode, a connection failure will be | |
reported. Similarly, it sets the TCP_USER_TIMEOUT such that after 5s of no | |
ACKs during data transmission, a connection failure will be reported. | |
""" | |
def connect(*args, **kwargs): | |
conn = psycopg2.connect(*args, **kwargs) | |
# 30s timeout in 'keepalive' mode | |
set_tcp_keepalive(conn.fileno(), | |
tcp_keepidle=10, | |
tcp_keepcnt=2, | |
tcp_keepintvl=10) | |
# 5s timeout in 'on' mode | |
set_tcp_user_timeout(conn.fileno(), 5) | |
return conn | |
return connect | |
# lifted from | |
# https://github.com/markokr/skytools/blob/master/python/skytools/sockutil.py | |
# the project https://github.com/markokr/skytools has an MITish license | |
############################################################################## | |
# SkyTools - tool collection for PostgreSQL | |
# | |
# Copyright (c) 2007 Marko Kreen, Skype Technologies OU | |
# | |
# Permission to use, copy, modify, and/or distribute this software for any | |
# purpose with or without fee is hereby granted, provided that the above | |
# copyright notice and this permission notice appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | |
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
############################################################################# | |
def set_tcp_keepalive(fd, keepalive = True, | |
tcp_keepidle = 4 * 60, | |
tcp_keepcnt = 4, | |
tcp_keepintvl = 15): | |
"""Turn on TCP keepalive. The fd can be either numeric or socket | |
object with 'fileno' method. | |
OS defaults for SO_KEEPALIVE=1: | |
- Linux: (7200, 9, 75) - can configure all. | |
- MacOS: (7200, 8, 75) - can configure only tcp_keepidle. | |
- Win32: (7200, 5|10, 1) - can configure tcp_keepidle and tcp_keepintvl. | |
Python needs SIO_KEEPALIVE_VALS support in socket.ioctl to enable it. | |
Our defaults: (240, 4, 15). | |
>>> import socket | |
>>> s = socket.socket() | |
>>> set_tcp_keepalive(s) | |
""" | |
# usable on this OS? | |
if not hasattr(socket, 'SO_KEEPALIVE') or not hasattr(socket, 'fromfd'): | |
return | |
# get numeric fd and cast to socket | |
if hasattr(fd, 'fileno'): | |
fd = fd.fileno() | |
s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) | |
# skip if unix socket | |
if type(s.getsockname()) != type(()): | |
return | |
# turn on keepalive on the connection | |
if keepalive: | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | |
if hasattr(socket, 'TCP_KEEPCNT'): | |
s.setsockopt(socket.IPPROTO_TCP, | |
getattr(socket, 'TCP_KEEPIDLE'), tcp_keepidle) | |
s.setsockopt(socket.IPPROTO_TCP, | |
getattr(socket, 'TCP_KEEPCNT'), tcp_keepcnt) | |
s.setsockopt(socket.IPPROTO_TCP, | |
getattr(socket, 'TCP_KEEPINTVL'), tcp_keepintvl) | |
elif hasattr(socket, 'TCP_KEEPALIVE'): | |
s.setsockopt(socket.IPPROTO_TCP, | |
getattr(socket, 'TCP_KEEPALIVE'), tcp_keepidle) | |
elif sys.platform == 'darwin': | |
TCP_KEEPALIVE = 0x10 | |
s.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, tcp_keepidle) | |
elif sys.platform == 'win32': | |
#s.ioctl(SIO_KEEPALIVE_VALS, (1, tcp_keepidle*1000, tcp_keepintvl*1000)) | |
pass | |
else: | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0) | |
def set_tcp_user_timeout(fd, tcp_user_timeout = 30): | |
""" | |
Inspired by set_tcp_keepalive(). | |
""" | |
# get numeric fd and cast to socket | |
if hasattr(fd, 'fileno'): | |
fd = fd.fileno() | |
s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) | |
# skip if unix socket | |
if type(s.getsockname()) != type(()): | |
return | |
# a little help from /usr/include/linux/tcp.h: #define TCP_USER_TIMEOUT 18 | |
try: | |
setattr(socket, 'TCP_USER_TIMEOUT', 18) | |
s.setsockopt(socket.IPPROTO_TCP, | |
getattr(socket, 'TCP_USER_TIMEOUT'), tcp_user_timeout*1000) | |
except socket.error: | |
print("Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT.") | |
if __name__ == '__main__': | |
if len(sys.argv) == 2 and sys.argv[1].lower() == 'ssl': | |
SSL_ENABLED = True | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment