Skip to content

Instantly share code, notes, and snippets.

@vsajip
Last active April 30, 2024 03:40
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save vsajip/4b227eeec43817465ca835ca66f75e2b to your computer and use it in GitHub Desktop.
Save vsajip/4b227eeec43817465ca835ca66f75e2b to your computer and use it in GitHub Desktop.
Run a logging socket receiver in a production setting with logging from an example webapp
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import urllib.request
with open('webapp.json', encoding='utf-8') as f:
config = json.loads(f.read())
URLS = [
'http://localhost:%d/?ident=%d' % (config['port'], ident)
for ident in range(1, 1001)
]
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 1): url for url in URLS}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print('Fetched %s' % url)
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
#!/usr/bin/env bash
PROJECT_DIR=$(dirname $0)
SUCTL=venv/bin/supervisorctl
SUD=venv/bin/supervisord
SUCONF=supervisor.conf
cd "$PROJECT_DIR"
"$SUCTL" -c $SUCONF status > /dev/null
status="$?"
case "$status" in
3)
"$SUCTL" -c "$SUCONF" restart all ;;
4)
"$SUD" -c "$SUCONF" ;;
esac
{
"port": 9020,
"logging": {
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)-8s %(process)s %(name)s %(message)s"
}
},
"handlers": {
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "run/app.log",
"maxBytes": 1024,
"backupCount": 5,
"formatter": "default"
}
},
"root": {
"level": "DEBUG",
"handlers": ["file"]
}
}
}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Red Dove Consultants Limited. BSD-3-Clause licensed.
#
import argparse
import json
import logging
import logging.config
import os
import pickle
import socketserver
import struct
import sys
PRINT_EXC_TYPE = False
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""
Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self,
host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()], [], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
adhf = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(formatter_class=adhf)
aa = parser.add_argument
aa('config', metavar='CONFIG', help='Configuration file to use (JSON)')
options = parser.parse_args()
fn = options.config
if not os.path.exists(fn):
print(f'Configuration file not found: {fn}')
return 1
with open(fn, encoding='utf-8') as f:
config = json.loads(f.read())
port = config['port']
logging.config.dictConfig(config['logging'])
logging.getLogger('log_listener').info('Log listener started.')
tcpserver = LogRecordSocketReceiver(port=port)
print(f'About to start TCP server on port {port} ...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
try:
rc = main()
except KeyboardInterrupt:
rc = 2
except Exception as e:
if PRINT_EXC_TYPE:
s = ' %s:' % type(e).__name__
else:
s = ''
sys.stderr.write('Failed:%s %s\n' % (s, e))
if 'PY_DEBUG' in os.environ:
import traceback
traceback.print_exc()
rc = 1
sys.exit(rc)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 Red Dove Consultants Limited
#
import argparse
import datetime
import json
import logging
import logging.config
import os
import sys
from bottle import route, run, request
DEBUGGING = 'PY_DEBUG' in os.environ
logger = logging.getLogger(__name__)
@route('/')
def index():
s = datetime.datetime.now().isoformat(' ', 'seconds')
ident = request.query.get('ident')
logger.debug('Request with ident %s handled', ident)
style = '<style>body { font-family: sans-serif; border: 1px solid silver; padding: 1em; }</style>'
return f"{style}<h4>Hello, world! It's {s}</h4>"
def process(options):
host = options.config['host']
port = options.config['port']
run(host=host, port=port, server='gunicorn', workers=4)
def main():
adhf = argparse.ArgumentDefaultsHelpFormatter
ap = argparse.ArgumentParser(formatter_class=adhf, prog='main')
aa = ap.add_argument
aa('-c',
'--config',
default='webapp.json',
help='Logging configuration (JSON)')
options = ap.parse_args()
with open(options.config, encoding='utf-8') as f:
config = json.loads(f.read())
options.config = config
logging.config.dictConfig(config['logging'])
process(options)
if __name__ == '__main__':
try:
rc = main()
except KeyboardInterrupt:
rc = 2
except Exception as e:
if DEBUGGING:
s = ' %s:' % type(e).__name__
else:
s = ''
sys.stderr.write('Failed:%s %s\n' % (s, e))
if DEBUGGING:
import traceback
traceback.print_exc()
rc = 1
sys.exit(rc)
#!/usr/bin/env bash
mkdir -p run
python3 -m venv venv
venv/bin/pip install bottle gunicorn supervisor
;
; Note: shell expansion ("~" or "$HOME") is not supported. Environment
; variables can be expanded using this syntax: "%(ENV_HOME)s".
[unix_http_server]
file=%(here)s/run/supervisor.sock
[supervisord]
logfile=%(here)s/run/supervisord.log
pidfile=%(here)s/run/supervisord.pid
nodaemon=false
nocleanup=true
childlogdir=%(here)s/run
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix://%(here)s/run/supervisor.sock
[program:listener]
command = venv/bin/python log_listener.py listener.json
priority = 500
autostart = true
autorestart = true
directory = %(here)s
stdout_logfile=%(here)s/run/%(program_name)s_stdout.log
stderr_logfile=%(here)s/run/%(program_name)s_stderr.log
[program:app]
command = venv/bin/python main.py
autostart = true
autorestart = true
directory = %(here)s
stdout_logfile=%(here)s/run/%(program_name)s_stdout.log
stderr_logfile=%(here)s/run/%(program_name)s_stderr.log
{
"host": "0.0.0.0",
"port": 32323,
"logging": {
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)-8s %(name)s %(message)s"
}
},
"handlers": {
"socket": {
"class": "logging.handlers.SocketHandler",
"host": "localhost",
"port": 9020
}
},
"root": {
"level": "DEBUG",
"handlers": ["socket"]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment