Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

GracefulInterruptHandler

View graceful_int_handler.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
import signal
 
class GracefulInterruptHandler(object):
def __init__(self, sig=signal.SIGINT):
self.sig = sig
def __enter__(self):
self.interrupted = False
self.released = False
self.original_handler = signal.getsignal(self.sig)
def handler(signum, frame):
self.release()
self.interrupted = True
signal.signal(self.sig, handler)
return self
def __exit__(self, type, value, tb):
self.release()
def release(self):
if self.released:
return False
 
signal.signal(self.sig, self.original_handler)
self.released = True
return True
 
if __name__ == '__main__':
 
import unittest
import time
class GracefulInterruptHandlerTestCase(unittest.TestCase):
def test_simple(self):
with GracefulInterruptHandler() as h:
while True:
print "..."
time.sleep(1)
if h.interrupted:
print "interrupted!"
time.sleep(5)
break
def test_nested(self):
with GracefulInterruptHandler() as h1:
while True:
print "(1)..."
time.sleep(1)
with GracefulInterruptHandler() as h2:
while True:
print "\t(2)..."
time.sleep(1)
if h2.interrupted:
print "\t(2) interrupted!"
time.sleep(2)
break
if h1.interrupted:
print "(1) interrupted!"
time.sleep(2)
break
unittest.main()
nalbat commented

Hello!

Why should you call release() from within signal handler?

I'm going to use this module to catch SGHUP signal and reload configs and all the other daemon program resources.

        with GracefulInterruptHandler(sig=signal.SIGHUP) as hup_hdl:
            while True:
                self.main(hup_hdl)
                if hup_hdl.interrupted:
                    hup_hdl.interrupted = False
                else:
                    break

Function self.main() at first reload configs and resources, and then proceed to an almost infinite loop.

        for line in ct:
            # do stuff
            if self.hup_hdl.interrupted:
                break

Thus, it's okay to catch SIGHUP several times during this daemon process runtime. But calling release() inside handler brakes things.

What do you think abuot it?

Sorry for my bad english.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.