Skip to content

Instantly share code, notes, and snippets.

@zed
Created November 13, 2012 23:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zed/4069063 to your computer and use it in GitHub Desktop.
Save zed/4069063 to your computer and use it in GitHub Desktop.
test whether socket ignores signals
/.tox/
/*.egg-info/
/tox-*.egg/

Run python setup.py test to test whether socket ignores signals on various Python versions. If it does then the test hangs.

Or just:

$ pythonX.Y test_socket_ignore_signal.py

to test Python X.Y version.

#!/usr/bin/env python
"""Server program that doesn't respond.
From http://docs.python.org/release/3.3.0/library/socket.html
"""
import socket
import sys
def sprint(*args):
print((("server",) + args)) # print tuple to support Python 2.4+
HOST = None # Symbolic name meaning all available interfaces
if len(sys.argv) > 1:
HOST, _, PORT = sys.argv[1].partition(':')
PORT = int(PORT)
else:
PORT = 31950 # arbitrary non-privileged port
s = None
for res in socket.getaddrinfo(HOST, PORT,
socket.AF_UNSPEC,
socket.SOCK_STREAM,
0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
sprint(sa)
try:
s = socket.socket(af, socktype, proto)
except socket.error: # use `socket.error` to support Python 2.4+
s = None
continue
try:
s.bind(sa)
s.listen(1)
except socket.error:
s.close()
s = None
continue
break
if s is None:
sprint('could not open socket')
sys.exit(1)
conn, addr = s.accept()
sprint('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
sprint(repr(data))
# do not respond
conn.close()
import sys
from setuptools import setup
from setuptools.command.test import test as TestCommand
# `python setup.py test` installs and runs `tox`
class Tox(TestCommand):
def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = []
self.test_suite = True
def run_tests(self):
#import here, cause outside the eggs aren't loaded
import tox
errno = tox.cmdline(self.test_args)
sys.exit(errno)
setup(
name='test-socket-ignore-signal',
tests_require=['tox'],
cmdclass={'test': Tox},
url='https://gist.github.com/4069063',
)
#!/usr/bin/env python
"""Test whether socket ignores signals."""
import os
import signal
import socket
import sys
import time
from subprocess import Popen
class Alarm(Exception):
pass
def sigalrm_handler(signum, frame):
raise Alarm
def sprint(*args):
print((("client",) + args)) # print tuple to support Python 2.4+
def readline(host, port):
"""Read a single line from host:port.
From http://docs.python.org/release/3.3.0/library/socket.html
"""
s = None
for _ in range(10):
for res in socket.getaddrinfo(host, port,
socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sprint(sa)
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s = None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s = None
continue
break
else:
# no break; server might not be ready yet
if server_process.poll() is not None:
break
time.sleep(1) # allow server to start
continue
break
if s is None:
sprint('could not open socket')
sys.exit(1)
s.sendall('Hello, world'.encode('ascii'))
f = s.makefile('rb')
try:
data = f.readline() # should block
finally:
f.close()
s.close()
sprint('Received', repr(data))
return data
port = 12345
if len(sys.argv) > 1:
host = sys.argv[1]
else:
# get public network interface (according to docs)
host = socket.gethostbyname(socket.gethostname())
# start server that doesn't respond
scriptdir = os.path.dirname(os.path.abspath(__file__))
server_script = os.path.join(scriptdir, 'server.py')
server_process = Popen([sys.executable, server_script, "%s:%s" % (host, port)])
# setup signal handler & alarm
signal.signal(signal.SIGALRM, sigalrm_handler)
delay = 3 # seconds
signal.alarm(delay)
sprint("wait %d seconds" % (delay,))
try:
readline(host, port)
except Alarm: # got alarm
sprint('ALARM WORKS')
else:
assert 0, "shouldn't get here, readline() must block"
[tox]
envlist = py25,py26,py27,pypy,py31,py32,py33
[testenv]
commands=
python -V
python test_socket_ignore_signal.py []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment