Skip to content

Instantly share code, notes, and snippets.

@Alessandro-Barbieri
Last active April 1, 2022 17:23
Show Gist options
  • Save Alessandro-Barbieri/b310800db6e7e8dffcb8e319cc0c5092 to your computer and use it in GitHub Desktop.
Save Alessandro-Barbieri/b310800db6e7e8dffcb8e319cc0c5092 to your computer and use it in GitHub Desktop.
import os.path
from typing import (
List,
Optional,
)
from .. import errors
from ..interfaces import (
ExecutorInterface,
ServiceManagerInterface,
)
class OpenRCGentooDriver(ServiceManagerInterface):
def __init__(
self, executor: ExecutorInterface, rc-service_bin: str, rc-config_bin: str
):
"""
executor -- external commands used by this class are executed using
this object
rc-service_bin -- path to an executable used for starting and stopping
services and to check if a service is running
rc-config_bin -- path to an executable used for enabling, disabling and
listing available service and to check if service is enabled
"""
self._executor = executor
self._rc-config_bin = rc-config_bin
self._rc-service_bin = rc-service_bin
self._available_services: List[str] = []
def start(self, service: str, instance: Optional[str] = None) -> None:
result = self._executor.run([self._rc-service_bin, service, "start"])
if result.retval != 0:
raise errors.StartServiceError(service, result.joined_output)
def stop(self, service: str, instance: Optional[str] = None) -> None:
result = self._executor.run([self._rc-service_bin, service, "stop"])
if result.retval != 0:
raise errors.StopServiceError(service, result.joined_output)
def enable(self, service: str, instance: Optional[str] = None) -> None:
result = self._executor.run([self._rc-config_bin, "add", service, "default"])
if result.retval != 0:
raise errors.EnableServiceError(service, result.joined_output)
def disable(self, service: str, instance: Optional[str] = None) -> None:
if not self.is_installed(service):
return
result = self._executor.run([self._rc-config_bin, "delete", service, "default"])
if result.retval != 0:
raise errors.DisableServiceError(service, result.joined_output)
def is_enabled(self, service: str, instance: Optional[str] = None) -> bool:
if not self._available_services:
self._available_services = self._get_available_services()
return ( service in self._available_services )
def is_running(self, service: str, instance: Optional[str] = None) -> bool:
result = self._executor.run([self._rc-service_bin, service, "status"]).stdout
return( result == " * status: started" )
def is_installed(self, service: str) -> bool:
return service in self.get_available_services()
def get_available_services(self) -> List[str]:
if not self._available_services:
self._available_services = self._get_available_services()
return self._available_services
def _get_available_services(self) -> List[str]:
result = self._executor.run([self._rc-config_bin])
if result.retval != 0:
return []
service_list = []
# skip first string that say 'Init scripts to be started by runlevel default'
for service in result.stdout.splitlines()[1:]:
service = service.strip()
if service:
service_list.append(service)
return service_list
def is_current_system_supported(self) -> bool:
return all(
os.path.isfile(binary)
for binary in (self._service_bin, self._rc-config_bin)
)
from unittest import mock, TestCase
from pcs.common.services import errors
from pcs.common.services.drivers import OpenRCGentooDriver
from pcs.common.services.interfaces import ExecutorInterface
from pcs.common.services.types import ExecutorResult
class Base(TestCase):
def setUp(self):
self.mock_executor = mock.MagicMock(spec_set=ExecutorInterface)
self.service = "service_name"
self.instance = "instance_name"
self.service_bin = "service_bin"
self.chkconfig_bin = "chkconfig_bin"
self.driver = OpenRCGentooDriver(
self.mock_executor, self.service_bin, self.chkconfig_bin
)
class BaseTestMixin:
subcmd = None
exception = None
executable = None
driver_callback = staticmethod(lambda: None)
def test_success(self):
self.mock_executor.run.return_value = ExecutorResult(0, "", "")
self.driver_callback(self.service)
self.mock_executor.run.assert_called_once_with(
[self.executable, self.service, self.subcmd]
)
def test_instance_success(self):
self.mock_executor.run.return_value = ExecutorResult(0, "", "")
self.driver_callback(self.service, self.instance)
self.mock_executor.run.assert_called_once_with(
[self.executable, self.service, self.subcmd]
)
def test_failure(self):
result = ExecutorResult(1, "stdout", "stderr")
self.mock_executor.run.return_value = result
with self.assertRaises(self.exception) as cm:
self.driver_callback(self.service)
self.assertEqual(cm.exception.service, self.service)
self.assertEqual(cm.exception.message, result.joined_output)
self.assertIsNone(cm.exception.instance)
self.mock_executor.run.assert_called_once_with(
[self.executable, self.service, self.subcmd]
)
def test_instace_failure(self):
result = ExecutorResult(1, "stdout", "stderr")
self.mock_executor.run.return_value = result
with self.assertRaises(self.exception) as cm:
self.driver_callback(self.service, self.instance)
self.assertEqual(cm.exception.service, self.service)
self.assertEqual(cm.exception.message, result.joined_output)
self.assertIsNone(cm.exception.instance)
self.mock_executor.run.assert_called_once_with(
[self.executable, self.service, self.subcmd]
)
class StartTest(Base, BaseTestMixin):
subcmd = "start"
exception = errors.StartServiceError
def setUp(self):
super().setUp()
self.driver_callback = self.driver.start
self.executable = self.service_bin
class StopTest(Base, BaseTestMixin):
subcmd = "stop"
exception = errors.StopServiceError
def setUp(self):
super().setUp()
self.driver_callback = self.driver.stop
self.executable = self.service_bin
class EnableTest(Base, BaseTestMixin):
subcmd = "on"
exception = errors.EnableServiceError
def setUp(self):
super().setUp()
self.driver_callback = self.driver.enable
self.executable = self.chkconfig_bin
class DisableTest(Base, BaseTestMixin):
subcmd = "off"
exception = errors.DisableServiceError
def setUp(self):
super().setUp()
# pylint: disable=protected-access
self.driver._available_services = [self.service]
self.driver_callback = self.driver.disable
self.executable = self.chkconfig_bin
def test_not_intalled(self):
# pylint: disable=protected-access
self.driver._available_services = [f"not_{self.service}"]
self.driver_callback(self.service)
self.mock_executor.run.assert_not_called()
class IsEnabledTest(Base):
def test_enabled(self):
self.mock_executor.run.return_value = ExecutorResult(0, "", "")
self.assertTrue(self.driver.is_enabled(self.service))
self.mock_executor.run.assert_called_once_with(
[self.chkconfig_bin, self.service]
)
def test_instance_enabled(self):
self.mock_executor.run.return_value = ExecutorResult(0, "", "")
self.assertTrue(self.driver.is_enabled(self.service, self.instance))
self.mock_executor.run.assert_called_once_with(
[self.chkconfig_bin, self.service]
)
def test_disabled(self):
self.mock_executor.run.return_value = ExecutorResult(3, "", "")
self.assertFalse(self.driver.is_enabled(self.service))
self.mock_executor.run.assert_called_once_with(
[self.chkconfig_bin, self.service]
)
def test_failure(self):
self.mock_executor.run.return_value = ExecutorResult(1, "", "")
self.assertFalse(self.driver.is_enabled(self.service))
self.mock_executor.run.assert_called_once_with(
[self.chkconfig_bin, self.service]
)
class IsRunningTest(Base):
def test_running(self):
self.mock_executor.run.return_value = ExecutorResult(
0, " * status: started", ""
)
self.assertTrue(self.driver.is_running(self.service))
self.mock_executor.run.assert_called_once_with(
[self.service_bin, self.service, "status"]
)
def test_instance_running(self):
self.mock_executor.run.return_value = ExecutorResult(
0, " * status: started", ""
)
self.assertTrue(self.driver.is_running(self.service, self.instance))
self.mock_executor.run.assert_called_once_with(
[self.service_bin, self.service, "status"]
)
def test_not_running(self):
self.mock_executor.run.return_value = ExecutorResult(
0, " * status: stopped", ""
)
self.assertFalse(self.driver.is_running(self.service))
self.mock_executor.run.assert_called_once_with(
[self.service_bin, self.service, "status"]
)
def test_failure(self):
self.mock_executor.run.return_value = ExecutorResult(1, "", "error")
self.assertFalse(self.driver.is_running(self.service))
self.mock_executor.run.assert_called_once_with(
[self.service_bin, self.service, "status"]
)
class IsInstalledTest(Base):
def test_installed(self):
output = (
"Init scripts to be started by runlevel default\n"
" service1\n"
" abc\n"
" xyz\n"
f" {self.service}\n"
)
self.mock_executor.run.return_value = ExecutorResult(0, output, "")
self.assertTrue(self.driver.is_installed(self.service))
# Intentionally called twice to make sure that unit files listing is
# done only once
self.assertTrue(self.driver.is_installed(self.service))
self.mock_executor.run.assert_called_once_with([self.chkconfig_bin])
def test_not_installed(self):
output = (
"Init scripts to be started by runlevel default\n"
" service1\n"
" abc\n"
" xyz\n"
)
self.mock_executor.run.return_value = ExecutorResult(0, output, "")
self.assertFalse(self.driver.is_installed(self.service))
# Intentionally called twice to make sure that unit files listing is
# done only once
self.assertFalse(self.driver.is_installed(self.service))
self.mock_executor.run.assert_called_once_with([self.chkconfig_bin])
class GetAvailableServicesTest(Base):
def test_success(self):
output = (
"Init scripts to be started by runlevel default\n"
" service1\n"
" abc\n"
" xyz\n"
)
self.mock_executor.run.return_value = ExecutorResult(0, output, "")
self.assertEqual(
self.driver.get_available_services(),
["service1", "abc", "xyz"],
)
self.mock_executor.run.assert_called_once_with([self.chkconfig_bin])
def test_failure(self):
self.mock_executor.run.return_value = ExecutorResult(1, "", "error")
self.assertEqual(self.driver.get_available_services(), [])
self.mock_executor.run.assert_called_once_with([self.chkconfig_bin])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment