Skip to content

Instantly share code, notes, and snippets.

@plockaby
Last active June 10, 2019 17:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save plockaby/10b75dfa72dc952c5bdb7000328c9b72 to your computer and use it in GitHub Desktop.
Save plockaby/10b75dfa72dc952c5bdb7000328c9b72 to your computer and use it in GitHub Desktop.
A Supervisord eventlistener that does nothing.
import os
import io
import sys
import logging
import select
from threading import Event
import signal
import argparse
class GracefulSignalKiller(object):
def __init__(self):
self.event = Event()
signal.signal(signal.SIGINT, self.kill)
signal.signal(signal.SIGTERM, self.kill)
def kill(self, signal_number, signal_stack):
self.event.set()
def killed(self, timeout=0):
return self.event.wait(timeout=timeout)
class EventListener(object):
def __init__(self, **kwargs):
self.logger = logging.getLogger(__name__)
# this keeps track of whether we've been told to die or not
self.killer = GracefulSignalKiller()
# make sure we are an event listener
self.supervisor_server_url = os.environ.get("SUPERVISOR_SERVER_URL", None)
if (self.supervisor_server_url is None):
raise RuntimeError("cannot run from outside supervisor eventlistener")
self.logger.info("connecting to supervisor over {}".format(self.supervisor_server_url))
def run(self, *args, **kwargs):
# THIS NEXT LINE WORKS
# this converts everything to utf8 and lets newlines pass as is it also
# makes .readline read until it gets to a "\n" and only a "\n".
#stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", newline="\n")
# TEST #1: THIS NEXT LINE DOES NOT WORK
# this breaks when our data contains things that look like newlines but
# are not "\n" because it tries to read for data that isn't there.
#stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
# TEST #2: THIS NEXT LINE DOES NOT WORK
# this breaks when our data contains poorly encoded data. it won't read
# enough data and so the next readline call gets garbage.
#stdin = sys.stdin.buffer
# THIS NEXT LINE WORKS
# this appears to give a file handle with universal newlines disabled
# and returning characters and just kind of works. the python
# documentation says nothing either way about whether sys.stdin has
# universal newlines enabled by default and it's not clear either.
stdin = sys.stdin
# what stdin module are we using?
self.logger.info("using {}".format(stdin))
# if we need to exit then this will be changed to True
finished = False
while (not finished and not self.killer.killed()):
# transition from ACKNOWLEDGED to READY
print("READY\n", flush=True, end="")
# this flag will be set if we were processing something when we
# decided to exit. if we were in the middle of processing something
# then we need to acknowledge it before exiting.
needs_acknowledgement = False
while (not self.killer.killed()):
try:
# read our wrapped stdin
parts = self._read_message(stdin)
if (parts is not None):
# if the event handling routine returns "true" then we
# should keep processing events. false means that we are
# finished processing events.
finished = not self._handle_event(*parts)
self.logger.info("finished processing event")
needs_acknowledgement = True
break
except EOFError as e:
# if we received an eof then we have lost our pipe to
# supervisor. now we must exit the loop and the program.
finished = True
break
else:
finished = True
# if we decided that we're done then we are going to send a
# message to supervisor that we're done but only if it is waiting
# for it.
if (needs_acknowledgement):
# transition from READY to ACKNOWLEDGED
# this prints to stdout where we communicate with supervisor
print("RESULT 2\nOK", flush=True, end="")
return 1
def _read_message(self, handle):
# wait one second for a message before exiting with nothing
while handle in select.select([handle], [], [], 1)[0]:
# if the next line returns a false-like value then we received an
# eof. if we have received an eof then we're going to get out
# because something went horribly wrong.
line = handle.readline()
self.logger.info("newlines is set to {}".format(handle.newlines))
if (not line):
raise EOFError("received eof from supervisord trying to read message header")
# convert to utf8 string from bytes
# only used when we are reading from sys.stdin.buffer
if (isinstance(line, bytes)):
line = line.decode("utf-8", "backslashreplace")
# let's assume that we got a real message and it is a string
header = dict([x.split(":") for x in line.split()])
self.logger.info("header: {}".format(header))
data_length = int(header.get("len", 0)) # this will raise a value error on bad data
if (data_length == 0):
return (header, None, None)
# read in only as much data as we were told to read in
data = handle.read(data_length)
self.logger.info("newlines is set to {}".format(handle.newlines))
if (not data):
raise EOFError("received eof from supervisord trying to read message payload")
# convert to utf8 string from bytes
# only used when we are reading from sys.stdin.buffer
if (isinstance(data, bytes)):
data = data.decode("utf-8", "backslashreplace")
if ('\n' in data):
# this message has additional data so extract it out
event, data = data.split('\n', 1)
event = dict([x.split(":") for x in event.split()])
self.logger.info("event header: {}".format(event))
return (header, event, data)
else:
event = dict([x.split(":") for x in data.split()])
self.logger.info("event header: {}".format(event))
return (header, event, None)
else:
return
def _handle_event(self, header, event, data):
# returning True means process more events. False means we exit
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="eventlistener")
args = parser.parse_args()
# configure logging
logging.captureWarnings(True)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_handler = logging.StreamHandler(stream=sys.stderr)
log_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)-8s - %(message)s"))
logger.addHandler(log_handler)
runnable = EventListener()
sys.exit(runnable.run())
#!/usr/bin/env perl
use strict;
use warnings;
use IO::Handle;
STDOUT->autoflush(1);
STDERR->autoflush(1);
binmode(STDOUT, "encoding(UTF-8)");
binmode(STDERR, "encoding(UTF-8)");
print "this is a newline\n";
print "this line\r\n contains a carriage return\n";
#!/usr/bin/env perl
use strict;
use warnings;
use IO::Handle;
STDOUT->autoflush(1);
STDERR->autoflush(1);
binmode(STDOUT, "encoding(UTF-8)");
binmode(STDERR, "encoding(UTF-8)");
# run through this file:
# https://www.cl.cam.ac.uk/~mgk25/ucs/examples/UTF-8-test.txt
open(my $fh, "<", "UTF-8-test.txt") or die $!;
while (my $line = <$fh>) {
print $line;
}
close($fh);
[supervisord]
user = root
logfile = /data/logs/supervisord.log
logfile_maxbytes = 10MB
logfile_backups = 4
loglevel = info
pidfile = /var/run/supervisor/supervisord.pid
[unix_http_server]
file = /var/run/supervisor/supervisord.sock
chmod = 0770
chown = toolop:dart-tools
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl = unix:///var/run/supervisor/supervisord.sock
[eventlistener:listener]
command = /usr/local/bin/event-listener
user = nobody
events = PROCESS_LOG,PROCESS_STATE,TICK_60,TICK_3600
autostart = true
autorestart = true
startretries = 10
stopwaitsecs = 60
stopsignal = TERM
# this makes sure we don't miss anything on a restart
buffer_size = 65535
stdout_logfile = /data/logs/supervisor/dart-agent.log
stdout_logfile_maxbytes = 1MB
stdout_logfile_backups = 1
stderr_logfile = /data/logs/supervisor/dart-agent.err
stderr_logfile_maxbytes = 10MB
stderr_logfile_backups = 4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment