Skip to content

Instantly share code, notes, and snippets.

@ysegorov
Last active March 30, 2017 12:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ysegorov/8947d99a016aa00ace51d9ab4d89c428 to your computer and use it in GitHub Desktop.
Save ysegorov/8947d99a016aa00ace51d9ab4d89c428 to your computer and use it in GitHub Desktop.
Logger server using edge-triggered epoll
# -*- coding: utf-8 -*-
import os
import errno
import itertools
import socket
import select
import logging
import logging.config
import pickle
import signal
import struct
import settings
logging.config.dictConfig(settings.LOGGING)
counter = itertools.count()
def unpickle(data):
return pickle.loads(data)
def server(host, port, backlog):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((host, port))
srv.listen(backlog)
srv.setblocking(0)
srv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
print('Logger server started on {}:{}'.format(host, port))
return srv
def handle(buffer):
while True:
if len(buffer) < 4:
return buffer
slen = struct.unpack('>L', buffer[:4])[0]
if len(buffer) < slen + 4:
return buffer
obj = unpickle(buffer[4:slen + 4])
record = logging.makeLogRecord(obj)
logger = logging.getLogger(record.name)
logger.handle(record)
next(counter)
buffer = buffer[slen + 4:]
def ioloop(srv, bufsize=4096):
epoll = select.epoll()
epoll.register(srv.fileno(), select.EPOLLIN | select.EPOLLET)
clients = {}
buffers = {}
def unregister(fileno):
epoll.unregister(fileno)
buffers.pop(fileno)
conn = clients.pop(fileno)
conn.close()
def shutdown(fileno):
epoll.modify(fileno, select.EPOLLET)
clients[fileno].shutdown(socket.SHUT_RDWR)
try:
while True:
try:
events = epoll.poll(0.5)
except (KeyboardInterrupt, IOError):
break
stop_server = False
for fileno, event in events:
if fileno == srv.fileno() and event & select.EPOLLHUP:
stop_server = True
break
if fileno == srv.fileno():
while True:
try:
conn, addr = srv.accept()
except socket.error as e:
if e.args[0] in (errno.EWOULDBLOCK,
errno.EAGAIN):
# means all new connections were accepted
break
raise
else:
conn.setblocking(0)
epoll.register(conn.fileno(),
select.EPOLLIN | select.EPOLLET)
clients[conn.fileno()] = conn
buffers[conn.fileno()] = b''
elif event & select.EPOLLIN:
conn = clients[fileno]
buf = b''
chunk_len = None
while True:
try:
chunk = conn.recv(bufsize)
except socket.error as e:
if e.args[0] in (errno.EWOULDBLOCK,
errno.EAGAIN):
# means all data from socket were received
break
raise
chunk_len = len(chunk)
if chunk_len > 0:
buf += chunk
else:
break
buffers[fileno] += buf
if chunk_len == 0:
# means client disconnected
shutdown(fileno)
buffers[fileno] = handle(buffers[fileno])
elif event & select.EPOLLHUP or event & select.EPOLLERR:
unregister(fileno)
if stop_server:
break
finally:
epoll.unregister(srv.fileno())
for fileno in list(clients.keys()):
unregister(fileno)
epoll.close()
srv.close()
num_records = next(counter)
print(
'Logger server stopped, {} records processed'.format(num_records))
def main():
host = os.environ.get('HOST', '127.0.0.1')
port = int(os.environ.get('PORT', 5000))
backlog = int(os.environ.get('BACKLOG', 1000))
srv = server(host, port, backlog)
def shutdown(signum, frame):
srv.shutdown(socket.SHUT_RDWR)
# signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
ioloop(srv)
if __name__ == "__main__":
main()
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'log_level') THEN
CREATE TYPE log_level AS ENUM ('debug', 'info', 'warning', 'error', 'critical');
END IF;
END $$;
CREATE TABLE IF NOT EXISTS log(
"id" serial PRIMARY KEY,
"created_at" timestamp with time zone NOT NULL,
"level" log_level NOT NULL,
"message" text NOT NULL,
"logger" varchar(64) NOT NULL,
"funcname" varchar(64) NOT NULL,
"filename" varchar(64) NOT NULL,
"pathname" varchar(255) NOT NULL,
"lineno" int NOT NULL,
"exc_info" text,
"extra" jsonb NOT NULL
);
CREATE INDEX IF NOT EXISTS "log_extra_idx" ON "log" USING GIN("extra" jsonb_path_ops);
CREATE INDEX IF NOT EXISTS "log_created_at_idx" ON "log" USING BTREE("created_at");
CREATE INDEX IF NOT EXISTS "log_logger_idx" ON "log" USING BTREE("logger");
CREATE INDEX IF NOT EXISTS "log_level_idx" ON "log" USING BTREE("level");
# -*- coding: utf-8 -*-
import datetime
import urllib
import urlparse
import logging
import traceback
import sys
import psycopg2
import psycopg2.extensions
import psycopg2.extras
import ujson
class PgJson(psycopg2.extras.Json):
def dumps(self, obj):
return ujson.dumps(obj, ensure_ascii=False)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
psycopg2.extensions.register_adapter(dict, PgJson)
psycopg2.extras.register_default_json(globally=True, loads=ujson.loads)
psycopg2.extras.register_default_jsonb(globally=True, loads=ujson.loads)
psycopg2.extras.register_uuid()
def pg_uri_to_kwargs(uri):
parsed = urlparse.urlparse(uri)
if parsed.scheme != 'postgresql':
raise ValueError('uri must start with "postgresql://"')
mapped = (
('hostname', 'host', lambda x: x),
('username', 'user', lambda x: x),
('password', 'password', lambda x: x and urllib.unquote(x)),
('port', 'port', lambda x: x and int(x) or 5432),
('path', 'dbname', lambda x: x and _dbname(x[1:])),
)
return dict((k, cast(getattr(parsed, pk))) for (pk, k, cast) in mapped)
def _dbname(name):
return name.partition('.')[0]
class PGHandler(logging.Handler):
query = """
PREPARE save_log AS
INSERT INTO log
(created_at, level, message, logger,
funcname, filename, pathname, lineno,
exc_info, extra)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"""
def __init__(self, pg_uri):
self._pg_uri = pg_uri
self._conn = None
super(PGHandler, self).__init__()
@property
def conn(self):
if self._conn is None:
kwargs = pg_uri_to_kwargs(self._pg_uri)
self._conn = psycopg2.connect(**kwargs)
self._conn.set_session(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
autocommit=True)
self._conn.set_client_encoding('UTF8')
cur = self._conn.cursor()
cur.execute('SET TIME ZONE "UTC";')
cur.execute(self.query)
cur.close()
print(' -> postgresql connection ready')
return self._conn
def emit(self, record):
level = record.levelname.lower()
created = datetime.datetime.utcfromtimestamp(record.created)
try:
with self.conn.cursor() as cursor:
# custom record attributes
# to be stored in `extra` field as `jsonb`
# edit as needed
keys = ('custom_key_1', 'custom_key_2')
extra = {
k: getattr(record, k) for k in keys if hasattr(record, k)}
args = (
created.isoformat(), # 'created_at'
level, # 'level'
record.getMessage(), # 'message'
record.name, # 'logger'
record.funcName, # 'funcname'
record.filename, # 'filename'
record.pathname, # 'pathname'
record.lineno, # 'lineno'
record.exc_text, # 'exc_info'
extra, # 'extra'
)
cursor.execute(
'EXECUTE '
'save_log (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', args)
except Exception:
sys.stderr.write(traceback.format_exc())
self.handleError(record)
def close(self):
self.acquire()
try:
if self._conn:
self._conn.close()
self._conn = None
print(' -> postgresql connection closed')
finally:
self.release()
super(PGHandler, self).close()
# -*- coding: utf-8 -*-
import os
import time
import logging
DEBUG = os.environ.get('DEBUG', 'n').lower() in ('y', 'yes', 't', 'true')
BASE_DIR = os.path.dirname(os.path.dirname(__name__))
LOG_DIR = os.path.join(BASE_DIR, 'logs')
if not os.path.isdir(LOG_DIR):
os.mkdir(LOG_DIR)
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'()': UTCFormatter,
'format': (u'[%(levelname)1.1s '
u'%(asctime)s %(name)s:'
u'%(funcName)s:%(lineno)3d]'
u' %(message)s'),
'datefmt': '%Y-%m-%d %H:%M:%S',
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'default',
},
'logfile': {
'level': 'DEBUG',
'formatter': 'default',
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(LOG_DIR, 'service.log'),
'maxBytes': 100 * 1024 * 1024,
'encoding': 'utf8',
'backupCount': 60,
},
},
'loggers': {
'': {
'handlers': ['console'] if DEBUG else ['logfile'],
'level': 'DEBUG',
'propagate': False,
},
}
}
@ysegorov
Copy link
Author

server works under py2 and py3 and is compatible with SocketHandler

epoll-related code is heavily based on this article by Scot Doyle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment