Last active
January 27, 2020 22:36
-
-
Save cottsay/afb34d329e93f8f18e645b46634614cc to your computer and use it in GitHub Desktop.
A ROS 2 launch test demonstrating passing a the PID of one process to another via an environment var
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from launch import LaunchDescription | |
from launch.actions import ( | |
ExecuteProcess, OpaqueFunction, | |
RegisterEventHandler, SetEnvironmentVariable) | |
from launch.event_handlers import OnProcessStart | |
from launch.substitutions import EnvironmentVariable, FindExecutable | |
import launch_testing | |
def on_process_start(process_started, launch_context, observation_process): | |
SetEnvironmentVariable( | |
'PROCESS_UNDER_TEST_ID', str(process_started.pid)).execute(launch_context) | |
observation_process.execute(launch_context) | |
def generate_test_description(ready_fn): | |
process_under_test = ExecuteProcess( | |
cmd=[ | |
FindExecutable(name='bash'), | |
'-c', | |
[ | |
FindExecutable(name='echo'), | |
' @@@@@ I, the PUT, have PID $$ && ', | |
FindExecutable(name='sleep'), | |
' 5', | |
] | |
], | |
output='screen', | |
) | |
observation_process = ExecuteProcess( | |
cmd=[ | |
FindExecutable(name='echo'), | |
[ | |
'@@@@@ I, the observer, will observe PID ', | |
EnvironmentVariable('PROCESS_UNDER_TEST_ID'), | |
], | |
], | |
output='screen', | |
) | |
return LaunchDescription([ | |
RegisterEventHandler(OnProcessStart( | |
target_action=process_under_test, | |
on_start=lambda p, c: on_process_start(p, c, observation_process) | |
)), | |
process_under_test, | |
launch_testing.util.KeepAliveProc(), | |
OpaqueFunction(function=lambda context: ready_fn()), | |
]), locals() | |
class PerformanceTestTermination(unittest.TestCase): | |
def test_termination_put(self, process_under_test, proc_info): | |
proc_info.assertWaitForShutdown(process=process_under_test, timeout=15) | |
def test_termination_op(self, observation_process, proc_info): | |
proc_info.assertWaitForShutdown(process=observation_process, timeout=15) | |
@launch_testing.post_shutdown_test() | |
class PerformanceTestResults(unittest.TestCase): | |
def test_results(self, observation_process, process_under_test, proc_info): | |
launch_testing.asserts.assertExitCodes( | |
proc_info, | |
[launch_testing.asserts.EXIT_OK], | |
observation_process, | |
) | |
launch_testing.asserts.assertExitCodes( | |
proc_info, | |
[launch_testing.asserts.EXIT_OK], | |
process_under_test, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment