Skip to content

Instantly share code, notes, and snippets.

@maxbane
Created October 15, 2022 00:45
Show Gist options
  • Save maxbane/595bf38e894c49f58e20fb905d24bf30 to your computer and use it in GitHub Desktop.
Save maxbane/595bf38e894c49f58e20fb905d24bf30 to your computer and use it in GitHub Desktop.
Proof of concept of forwarding the user's logging calls in submitted tasks to client session in Dask
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-10-15 00:31:55 ERROR [worker tcp://127.0.0.1:33781] user.module Hello error\n",
"2022-10-15 00:31:55 INFO [worker tcp://127.0.0.1:38977] user.module Hello info the second time\n"
]
}
],
"source": [
"import distributed\n",
"import logging\n",
"\n",
"TOPIC = \"forwarded-log-record\"\n",
"\n",
"# Handler class that gets installed inside workers. Each worker adds an instance\n",
"# of this handler to one or more loggers (possibly the root logger). Tasks\n",
"# running on that worker may then use the affected logger as normal, with the\n",
"# side effect that any LogRecords handled by the logger (or by a logger below it\n",
"# in the hierarhcy) will be published to the dask client as a\n",
"# \"forwarded-log-record\" event. \n",
"class ForwardingLogHandler(logging.Handler):\n",
" def prepare_record_attributes(self, record):\n",
" # Adapted from the CPython standard library's logging.handlers.SocketHandler.makePickle; see\n",
" # its source at: https://github.com/python/cpython/blob/main/Lib/logging/handlers.py\n",
" ei = record.exc_info\n",
" if ei:\n",
" # just to get traceback text into record.exc_text ...\n",
" dummy = self.format(record)\n",
" # If msg or args are objects, they may not be available on the receiving\n",
" # end. So we convert the msg % args to a string, save it as msg and zap\n",
" # the args.\n",
" d = dict(record.__dict__)\n",
" d['msg'] = record.getMessage()\n",
" d['args'] = None\n",
" d['exc_info'] = None\n",
" # delete 'message' if present: redundant with 'msg'\n",
" d.pop('message', None)\n",
" return d\n",
"\n",
" def emit(self, record):\n",
" try:\n",
" worker = distributed.get_worker()\n",
" except ValueError:\n",
" return\n",
" attributes = self.prepare_record_attributes(record)\n",
" worker.log_event(TOPIC, attributes)\n",
"\n",
"# The client-side handler function for \"forwarded-log-record\" events. Sends the\n",
"# forwarded LogRecord to the client-side logger with the same name as that which\n",
"# originally handled the record on the worker-side.\n",
"def client_handle_forwarded_log_record(event):\n",
" stamp, record_attrs = event\n",
" # print(record_attrs, flush=True)\n",
" record = logging.makeLogRecord(record_attrs)\n",
" dest_logger = logging.getLogger(record.name)\n",
" dest_logger.handle(record)\n",
"\n",
"# User-facing interface. After creating a dask Client, call this on it to begin\n",
"# forwarding the given logger (by default the root) and all loggers under it\n",
"# from workers to the client process.\n",
"def forward_logging(client, logger_name=None, level=logging.NOTSET):\n",
" client.subscribe_topic(TOPIC, client_handle_forwarded_log_record)\n",
"\n",
" def worker_start_log_forwarding():\n",
" root = logging.getLogger(logger_name)\n",
" root.addHandler(ForwardingLogHandler(level=level))\n",
"\n",
" client.register_worker_callbacks(worker_start_log_forwarding)\n",
"\n",
"################\n",
"## Example usage\n",
"\n",
"client = distributed.Client()\n",
"forward_logging(client) # forward the root logger at any handled level\n",
"\n",
"# Now let's configure client side logging as a user might\n",
"TYPICAL_LOGGING_CONFIG = \"\"\"\n",
"version: 1\n",
"handlers:\n",
" console:\n",
" class : logging.StreamHandler\n",
" formatter: default\n",
" level : INFO\n",
"formatters:\n",
" default:\n",
" format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s'\n",
" datefmt: '%Y-%m-%d %H:%M:%S'\n",
"root:\n",
" handlers:\n",
" - console\n",
"\"\"\"\n",
"import io, yaml\n",
"config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG))\n",
"logging.config.dictConfig(config)\n",
"\n",
"# Submit a task that does some error logging. We should see output from the\n",
"# client-side StreamHandler in the output cell below.\n",
"def do_error():\n",
" logging.getLogger(\"user.module\").error(\"Hello error\")\n",
"\n",
"client.submit(do_error).result()\n",
"\n",
"# Here's a nuance worth highlighting... even though our client-side root logger\n",
"# is configured with a level of INFO, the worker-side root loggers still have\n",
"# their default/initial level of ERROR (we haven't done any explicit logging\n",
"# configuration on the workers other than installing the ForwardingLogHandler).\n",
"# Therefore worker-side INFO logs will NOT be forwarded because they never even\n",
"# get handled on the worker-side, never giving the ForwardingLogHandler a chance\n",
"# to forward them.\n",
"def do_info_1():\n",
" # no output on the client side\n",
" logging.getLogger(\"user.module\").info(\"Hello info the first time\")\n",
"\n",
"client.submit(do_info_1).result()\n",
"\n",
"# It's necessary to set the client-side logger's level to INFO before the info\n",
"# message will be handled and forwarded to the client. In other words, the\n",
"# effective level of the client-side forwarded logging is the maximum of each\n",
"# logger's client-side and worker-side levels. (...and the optional ``level``\n",
"# kwarg of forward_logging, which sets a level on the ForwardingLogHandler\n",
"# itself!)\n",
"def do_info_2():\n",
" logger = logging.getLogger(\"user.module\")\n",
" logger.setLevel(logging.INFO)\n",
" # now produces output on the client side\n",
" logger.info(\"Hello info the second time\")\n",
"\n",
"client.submit(do_info_2).result()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"client.shutdown()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.10.6 ('dask-distributed')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "62301c75e0f964bff35fbc7e0787627824dcb5724374fafbf69e9c41cf4c20af"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment