Skip to content

Instantly share code, notes, and snippets.

@cottsay
Last active January 27, 2020 22:36
Show Gist options
  • Save cottsay/afb34d329e93f8f18e645b46634614cc to your computer and use it in GitHub Desktop.
Save cottsay/afb34d329e93f8f18e645b46634614cc to your computer and use it in GitHub Desktop.
A ROS 2 launch test demonstrating passing a the PID of one process to another via an environment var
import unittest
from launch import LaunchDescription
from launch.actions import (
ExecuteProcess, OpaqueFunction,
RegisterEventHandler, SetEnvironmentVariable)
from launch.event_handlers import OnProcessStart
from launch.substitutions import EnvironmentVariable, FindExecutable
import launch_testing
def on_process_start(process_started, launch_context, observation_process):
SetEnvironmentVariable(
'PROCESS_UNDER_TEST_ID', str(process_started.pid)).execute(launch_context)
observation_process.execute(launch_context)
def generate_test_description(ready_fn):
process_under_test = ExecuteProcess(
cmd=[
FindExecutable(name='bash'),
'-c',
[
FindExecutable(name='echo'),
' @@@@@ I, the PUT, have PID $$ && ',
FindExecutable(name='sleep'),
' 5',
]
],
output='screen',
)
observation_process = ExecuteProcess(
cmd=[
FindExecutable(name='echo'),
[
'@@@@@ I, the observer, will observe PID ',
EnvironmentVariable('PROCESS_UNDER_TEST_ID'),
],
],
output='screen',
)
return LaunchDescription([
RegisterEventHandler(OnProcessStart(
target_action=process_under_test,
on_start=lambda p, c: on_process_start(p, c, observation_process)
)),
process_under_test,
launch_testing.util.KeepAliveProc(),
OpaqueFunction(function=lambda context: ready_fn()),
]), locals()
class PerformanceTestTermination(unittest.TestCase):
def test_termination_put(self, process_under_test, proc_info):
proc_info.assertWaitForShutdown(process=process_under_test, timeout=15)
def test_termination_op(self, observation_process, proc_info):
proc_info.assertWaitForShutdown(process=observation_process, timeout=15)
@launch_testing.post_shutdown_test()
class PerformanceTestResults(unittest.TestCase):
def test_results(self, observation_process, process_under_test, proc_info):
launch_testing.asserts.assertExitCodes(
proc_info,
[launch_testing.asserts.EXIT_OK],
observation_process,
)
launch_testing.asserts.assertExitCodes(
proc_info,
[launch_testing.asserts.EXIT_OK],
process_under_test,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment