ROS2 Test Environment
This module provides the environment by which tests can interact with nodes.
- class ros2_easy_test.env.ROS2TestEnvironment(*, watch_topics: ~typing.Mapping[str, ~typing.Type] | None = None, qos_profiles: ~typing.Mapping[str, <Mock name='mock.QoSProfile' id='140257370085024'>] | None = None, **kwargs)
Bases:
Node
This class provides a way to interact with nodes and assert that events happen or do not.
Publishing messages works by creating a publishers on the fly. However, one must ensure that the type of the message that is published stays the same across each test.
For subscribers, the topics and their types must be declared at the instantiation via the
watch_topics
argument. This cannot be avoided due to how rclpy works. Then, a mailbox is created for each topic and incoming messages are collected each (without a limit). This is performed in the background, meaning that for exampleassert_message_published()
may return a message that was collected before the method was called. To prevent this from happening, one might useclear_messages()
(although the concurrency is often the desired behaviour). Also, mind thetimeout
/time_span
arguments as they determine how long to wait while checking the assertion (default:1
second). Asserting that a message is sent or failing to assert that no message is sent will return early.Services are handled similarly to publishers, being created on the fly when used for the first time.
Note
The
ROS2TestEnvironment
is arclpy.node.Node
too, which allows to implement custom functionality too.Note
Timings and timeouts in this class are only approximate.
- Parameters:
watch_topics – The topics (and their type) to watch for incoming messages. You may pass
None
to not watch any topic.qos_profile – The topics and their QoS profile to subscribe to or publish on. You may pass
None
to use the default profile. All topics without an explicit profile useQoSProfile(history=QoSHistoryPolicy.KEEP_ALL)
.
- assert_message_published(topic: str, timeout: float | None = 2) Any
Asserts a message is published on the given topic within after at most the given time.
This method might return early if a message is received before the timeout as occurred.
- Parameters:
topic – The topic to listen on
timeout – The approximate amount of time to listen before failing.
None
means waiting forever.
- Returns:
The message that was received
- assert_messages_published(topic: str, number: int, individual_timeout: float | None = 2, max_total_timeout: float | None = 30.0) List[Any]
Asserts that some messages are published on the given topic within after at most the given time.
This method might return early if ALL messages are received before the timeout as occurred.
- Parameters:
topic – The topic to listen on
number – The expected number of messages to arrive
individual_timeout – The approximate amount of time per message to listen before failing.
None
means waiting forever (ifmax_total_timeout
is also set toNone
). The timeout that is used is calculated bynumber * timeout
but runs for all messages together, and NOT for each one individually. In other words, it is accepted that when waiting fornumber = 10
messages fortimeout = 2.0
seconds (that means the total timeout is10 * 2 = 20
seconds) and then all messages arrive after19.9
seconds. If they arrive after20.5
seconds, this method will have already raised anAssertionError
.max_total_timeout – This caps the total timeout that is computed from
number * timeout
if not set toNone
. This prevents excessively long tests times until a test fails when dealing with many messages. Most – if not all – tests should in any way not take more time than the defualt of this.
- Returns:
All messages that were received. It is guaranteed that
len(result) == number
.
- assert_no_message_published(topic: str, time_span: float = 0.5) None
Asserts that no message is published on the given topic within the given time.
- Parameters:
topic – The topic to listen on
time_span – The approximate amount of time to listen before returning. This value is set to be rather low (when compared to the other timeouts) since in a successful test, it may never return early and causes a lot of waiting time.
- await_future(future: <Mock name='mock.Future' id='140257370085360'>, timeout: float | None = 10) Any
Waits for the given future to complete.
- Parameters:
future – The ROS Future to wait for
timeout – The timeout to wait for the future
- Raises:
TimeoutError – If the future did not complete within the timeout
Exception – If the future completed with an exception
- Returns:
The result of the future
- call_service(name: str, request: <Mock name='mock.SrvTypeRequest' id='140257370083680'>, timeout_availability: float | None = 1, timeout_call: float | None = 10) <Mock name='mock.SrvTypeResponse' id='140257370084016'>
Calls the given service with the given request once available and returns the response.
The service type if inferred automatically from the request type.
- Parameters:
name – The name of the service
request – The request to send to the service
timeout_availability – The timeout to wait for the service to be available
timeout_call – The timeout to wait for the service to respond
- Returns:
The response of the service
- Raises:
TimeoutError – If the service did not respond within the timeout
- clear_messages(topic: str | None = None) None
Clear all messages in the mailbox of the given
topic
or in all mailboxes.- Parameters:
topic – The topic to clear the mailbox of, or all topics if
None
- destroy_node()
- listen_for_messages(topic: str, time_span: float | None = 2) List[Any]
Collects all messages which arrive in the given time on the topic.
- Parameters:
topic – The topic to listen on
time_span – The approximate amount of time to listen before returning. If
None
, this method will immediately return all messages that are currently in the queue.
- Returns:
A (possibly empty) list of messages that were received
- publish(topic: str, message: Any) None
Publish the message on the topic.
This method creates the publisher internally if required and caches it. Nothing needs to be done manually.
- Parameters:
topic – The topic to publish on
message – The message to be sent. It’s type must match the one of all other messages sent on this
topic
before and after.
- send_action_goal(name: str, goal: Any, collect_feedback: bool = True, timeout_availability: float | None = 1, timeout_receive_goal: float | None = 1) Any]]
Sends the goal to the given action but does not handle the result.
This does not check if the goal was accepted and does not attempt to retrieve the result.
- Parameters:
name – The name of the action
goal – Goal message to send to the action server.
collect_feedback – Whether to collect feedbacks asynchronously. Defaults to True. This is provided for performance reasons.
timeout_availability – The timeout to wait for the action to be available. Defaults to 1.
timeout_receive_goal – The timeout to wait for the action server to accept the goal. Defaults to 1.
- Raises:
TimeoutError – If the action server does not respond within the specified timeouts.
- Returns:
Return the goal handle and feedback list (that might still grow asynchronously).
- send_action_goal_and_wait_for_result(name: str, goal: Any, collect_feedback: bool = True, timeout_availability: float | None = 1, timeout_accept_goal: float | None = 1, timeout_get_result: float | None = 10) Tuple[List[Any], Any]
Sends the goal to the given action and returns the response handle, feedbacks and result.
- Parameters:
name – The name of the action
goal – Goal message to send to the action server.
collect_feedback – Whether to collect feedbacks asynchronously. Defaults to True. This is provided for performance reasons.
timeout_availability – The timeout to wait for the action to be available. Defaults to 1.
timeout_accept_goal – The timeout to wait for the action server to accept the goal. Defaults to 1.
timeout_get_result – The timeout to wait for the action server to return the result. Defaults to 10.
- Raises:
TimeoutError – If the action server does not respond within the specified timeouts.
AssertionError – If the goal was not accepted or the goal did not succeed.
- Returns:
Return the goal handle, all the feedback responses and the final result.
See also