Skip to content

Instantly share code, notes, and snippets.

@antdking
Created April 28, 2023 12:49
Show Gist options
  • Save antdking/0eacd8d9df8bda120dcaceea6eaab1f7 to your computer and use it in GitHub Desktop.
Save antdking/0eacd8d9df8bda120dcaceea6eaab1f7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from contextlib import contextmanager
import inspect
from tempfile import NamedTemporaryFile
import typing as t
import typing_extensions as te
import os
import pystemd
import pystemd.run
import systemd.journal
import itertools
# They tried to make executable modules work,
# but it actually breaks standard usage :laughing:
if t.TYPE_CHECKING:
systemd_run = pystemd.run.run
else:
systemd_run = pystemd.run
import logging
logging.basicConfig()
log = logging.getLogger("systemd-launcher")
log.setLevel(logging.DEBUG)
Unit: te.TypeAlias = pystemd.systemd1.Unit
USER_MODE = os.geteuid() != 0
def start(cmd: t.List[str]) -> Unit:
# start in exec mode, so that we return after the actual execution starts
return systemd_run(cmd, user_mode=USER_MODE, service_type="exec", env={"PYTHONUNBUFFERED": "1"})
@contextmanager
def log_stream(unit: Unit) -> t.Generator[t.Iterable[str], None, None]:
# TODO: work out how to get this from the Unit object
unit_filter_key = "_SYSTEMD_USER_UNIT" if USER_MODE else "_SYSTEMD_UNIT"
log.debug("Starting log stream: %s", unit.external_id.decode())
journal_reader = systemd.journal.Reader()
# see: https://man7.org/linux/man-pages/man7/systemd.journal-fields.7.html
journal_reader.add_match(**{unit_filter_key: unit.external_id.decode()})
journal_reader.seek_realtime(unit.Service.ExecMainStartTimestamp)
log.debug("Set clock + unit filter")
# Journal.Reader is a repeatable read iterator: It'll raise StopIteration whenever
# the current batch is consumed, and then iterate it again for the next batch.
#
#
def inner():
# Keep reading while journal is reporting anything other than No-ops
# Note: just because we receive an Append, doesn't mean there's data to read
while (status := journal_reader.wait(0.1)) != systemd.journal.NOP:
log.debug("Got status: %i", status)
for entry in journal_reader:
yield entry["MESSAGE"]
log.debug("Finished batch: %i", status)
def outer() -> t.Generator[str, None, None]:
log.debug("Starting main loop, current PID: %i", unit.Service.MainPID)
# MainPID gets set to 0 when the service finishes
while unit.Service.MainPID != 0:
log.debug("Sending batch")
yield from inner()
log.debug("Service finished, sending final batch")
yield from inner()
yield outer()
def main():
f = None
try:
# quick script to test a long running process with stdout output
with NamedTemporaryFile("w", delete=False) as f:
f.write(inspect.cleandoc("""
#!/usr/bin/env python
import time, sys
try:
count_to = int(sys.argv[1])
except IndexError:
count_to = 100
for i in range(count_to):
print(f" Hello: {i}")
time.sleep(1)
"""))
f.flush()
os.chmod(f.name, 0o755)
unit = start([f.name, "20"])
with log_stream(unit) as stream:
for line in stream:
log.info(line)
finally:
if f is not None:
os.unlink(f.name)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment