Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of how to ingest sensord logs (from journalctl) to EventModel
from event_model import compose_run
import datetime
import dateutil.parser
import itertools
from io import StringIO
import subprocess
import numpy as np
import functools
def extract_event(ts, batch):
"""
Extract and format the data and timestamp dictionaries
Parameters
----------
ts : float
unix timestamp as a float
batch : List[str]
Assumed to be of the form ::
Adapter: group
a: 12 V
b: 43 C (min = 127.0 C, max = 127.0 C)
Adapter: group2
c: 15 RPM
Returns
-------
data, timestamps : Dict[str, float]
The data and timestamp dictionaries as you would find an an Event
"""
out = {}
current_section = ""
for entry in batch:
head, _, tail = entry.partition(": ")
if not head.startswith(" "):
current_section = tail.strip()
continue
key = (current_section, head.strip())
out["_".join(key)] = float(tail.strip().split(" ")[0])
return out, {k: ts for k in out}
def extract_datakeys(machine, system, batch):
"""
Extract and format the data and timestamp dictionaries
Parameters
----------
ts : float
unix timestamp as a float
batch : List[str]
Assumed to be of the form ::
Adapter: group
a: 12 V
b: 43 C (min = 127.0 C, max = 127.0 C)
Adapter: group2
c: 15 RPM
Returns
-------
data_keys : Dict[str, Dict[str, Any]]
The data keys as you would find an a Descriptor
"""
out = {}
for entry in batch:
head, _, tail = entry.partition(": ")
if not head.startswith(" "):
current_section = tail.strip()
continue
key = (current_section, head.strip())
out["_".join(key)] = {
"source": f"{machine}_{system}",
"dtype": "number",
"shape": [],
"units": tail.strip().split(" ")[1],
}
return out
def _process_batch(machine, system, dt, batch, *, descs, run_bundle):
"""
Process the list of strings to an event (and maybe a descriptor)
Parameters
----------
machine : str
The name of the machine
system : str
The same of the logging system
batch : List[str]
Assumed to be of the form ::
Adapter: group
a: 12 V
b: 43 C (min = 127.0 C, max = 127.0 C)
Adapter: group2
c: 15 RPM
descs : Dict[ComposeDescriptorBundle]
Cache of know descriptor bundles
.. warning ::
This input will be mutated
run_bundle : ComposeRunBundle
The run we are making events for
.. warning ::
This input will be mutated (implicitly by it's use)
"""
# first, get the event data
data, ts = extract_event(dt.timestamp(), batch)
# check if we need to make a new descriptor
key = frozenset(data)
if key not in descs:
# sometimes the logs are missing an entry, handle that ...
for dk in descs:
if dk > key:
missing = dk - key
for missing_key in missing:
data[missing_key] = np.nan
ts[missing_key] = dt.timestamp()
key = dk
break
else:
# other wise get the data keys
data_keys = extract_datakeys(machine, system, batch)
# make the descriptor
descriptor_bundle = run_bundle.compose_descriptor(
name=system, data_keys=data_keys
)
# emit it
yield "descriptor", descriptor_bundle.descriptor_doc
# and stash it
descs[key] = descriptor_bundle
# get the descriptor from the cache
descriptor_bundle = descs[key]
# mint the event
ev = descriptor_bundle.compose_event(data=data, timestamps=ts)
# and emit it
yield "event", ev
def parse_sensord_log(fin, *, md=None):
"""
Parse sensord log data to event model
Assumes the input logs look like::
...
May 10 19:35:19 jupiter sensord[610]: Chip: coretemp-isa-0000
May 10 19:35:19 jupiter sensord[610]: Adapter: ISA adapter
May 10 19:35:19 jupiter sensord[610]: Package id 0: 57.0 C
May 10 19:35:19 jupiter sensord[610]: Core 0: 57.0 C
May 10 19:35:19 jupiter sensord[610]: Core 1: 48.0 C
May 10 19:35:19 jupiter sensord[610]: Adapter: ACPI interface
May 10 19:35:19 jupiter sensord[610]: temp1: 27.8 C
May 10 19:35:19 jupiter sensord[610]: Adapter: ISA adapter
May 10 19:35:19 jupiter sensord[610]: in0: +1.19 V (min = +0.00 V, max = +3.06 V)
May 10 19:35:19 jupiter sensord[610]: fan1: 866 RPM (min = 0 RPM) (beep)
May 10 19:35:19 jupiter sensord[610]: temp1: 39.0 C (min = 127.0 C, max = 127.0 C) (beep)
May 10 19:35:19 jupiter sensord[610]: Adapter: PCI adapter
May 10 19:35:19 jupiter sensord[610]: Composite: 38.9 C (min = -0.1 C, max = 76.8 C)
...
Parameters
----------
fin : file-like of the log
Expects `fin.readline` and `iter(fin)` to give one line at at time.
md : Dict[str, Any]
Any extra metadata to inject into the start document
Yields
------
name : str
doc : Dict[str, Any]
The name / doc pairs of the data
"""
md = md or {}
# get the first line to get a start time
ln = None
while not ln:
ln = fin.readline().strip()
if ln.startswith("--"):
# skip the comment
ln = None
month, day, time, rest = ln.split(" ", 3)
machine, _, rest = rest.partition(" ")
md.setdefault("machine", machine)
dt = dateutil.parser.parse(" ".join([month, day, time]))
# local running state
run_bundle = compose_run(time=dt.timestamp(), metadata=md)
descs = {}
current_stamp = dt
batch_data = []
# this closes over descs cache and the run bundle
process_batch = functools.partial(
_process_batch, descs=descs, run_bundle=run_bundle
)
yield "start", run_bundle.start_doc
# stick the first line back in, and process remaining text
for ln in itertools.chain([ln], fin):
# skip blank lines
if not ln.strip():
continue
# get the timestamp on this line
month, day, time, rest = ln.split(" ", 3)
# TODO will have issues on year :/
dt = dateutil.parser.parse(" ".join([month, day, time]))
# strip off the machine / system
machine, _, rest = rest.partition(" ")
system, _, rest = rest.partition(": ")
# drop the pid
system, *_ = system.partition("[")
# check if we are still in "current batch"
# see if we are within a minute of the previous line. These time stamps
# are when the line was emitted to the log (not when sensord ran) so can have
# a second spread in the timestamps of one batch, use 5 just to be safe
if (dt - current_stamp).total_seconds() > 5:
# if not and we have accumulated lines, generate the event
# (and maybe descriptor)
if batch_data:
yield from process_batch(machine, system, current_stamp, batch_data)
# reset local state
batch_data = []
current_stamp = dt
# add the current line to the running set
batch_data.append(rest)
# make sure we get the last batch
if batch_data:
yield from process_batch(machine, system, current_stamp, batch_data)
# emit the stop document
yield "stop", run_bundle.compose_stop(time=current_stamp.timestamp())
def fill_history(boot_index, callback):
"""
Extract per-boot "runs" of sensord from journalctl.
Runs ::
journalctl -b -{boot_index} -u sensord
and then pipes the results through parse_sensord_log to the callback.
Parameters
----------
boot_index : int
How many boots back to extract the data from
callback : Callable[str, Dict]
Function to receive the documents
Examples
--------
>> import databroker
>> db = databroker.Broker.named('temp')
# get the data from the last boot and store it
>> fill_history(1, db.insert)
"""
ret = subprocess.check_output(
f"journalctl -b -{boot_index} -u sensord".split(" ")
).decode()
for name, doc in parse_sensord_log(StringIO(ret)):
serializer(name, doc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment