Skip to content

Instantly share code, notes, and snippets.

@rominf
Last active March 27, 2020 14:03
Show Gist options
  • Save rominf/d513e22993e98ca23b8646761417d097 to your computer and use it in GitHub Desktop.
Save rominf/d513e22993e98ca23b8646761417d097 to your computer and use it in GitHub Desktop.
Start sshd service and wait for signal using DBus, pydbus, asyncio
# On Fedora run:
# $ sudo dnf install python3-pydbus
# Then run this script as root
from gi.repository import GLib
from asyncio import Event
from pydbus import SystemBus
import asyncio
import pydbus
bus = SystemBus()
# For accessing the DBus SystemBus inside the container:
# bus = pydbus.connect("unix:path=/tmp/container_dbus/system_bus_socket")
systemd = bus.get(".systemd1")
def service_start(name: str) -> Event:
unit_name = next(next(
value for value in unit if value.startswith("/org/freedesktop/systemd1/unit/"))
for unit in systemd.ListUnits() if name in unit)
unit = bus.get(".systemd1", unit_name)
properties = unit["org.freedesktop.DBus.Properties"]
result = Event()
connection = None
def callback(interface_name, changed_properties, invalidated_properties):
if unit.ActiveState == "active" and unit.SubState == "running":
connection.disconnect()
loop.call_soon_threadsafe(result.set)
print("Received DBus signal")
connection = properties.PropertiesChanged.connect(callback)
return result
async def run_sshd():
unit_name = "sshd.service"
service_start_event = service_start(unit_name)
systemd.StartUnit(unit_name, "fail") # Calls systemd via DBus and continues without waiting
print("Before sshd start")
await service_start_event.wait()
print("After sshd start")
glib_loop = GLib.MainLoop()
loop = asyncio.get_event_loop()
loop.run_in_executor(None, glib_loop.run)
loop.run_until_complete(run_sshd())
glib_loop.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment