ROS2 Test Environment

This module provides the environment by which tests can interact with nodes.

class ros2_easy_test.env.ROS2TestEnvironment(*, watch_topics: ~typing.Mapping[str, ~typing.Type] | None = None, qos_profiles: ~typing.Mapping[str, <Mock name='mock.QoSProfile' id='140257370085024'>] | None = None, **kwargs)

Bases: Node

This class provides a way to interact with nodes and assert that events happen or do not.

Publishing messages works by creating a publishers on the fly. However, one must ensure that the type of the message that is published stays the same across each test.

For subscribers, the topics and their types must be declared at the instantiation via the watch_topics argument. This cannot be avoided due to how rclpy works. Then, a mailbox is created for each topic and incoming messages are collected each (without a limit). This is performed in the background, meaning that for example assert_message_published() may return a message that was collected before the method was called. To prevent this from happening, one might use clear_messages() (although the concurrency is often the desired behaviour). Also, mind the timeout / time_span arguments as they determine how long to wait while checking the assertion (default: 1 second). Asserting that a message is sent or failing to assert that no message is sent will return early.

Services are handled similarly to publishers, being created on the fly when used for the first time.

Note

The ROS2TestEnvironment is a rclpy.node.Node too, which allows to implement custom functionality too.

Note

Timings and timeouts in this class are only approximate.

Parameters:
  • watch_topics – The topics (and their type) to watch for incoming messages. You may pass None to not watch any topic.

  • qos_profile – The topics and their QoS profile to subscribe to or publish on. You may pass None to use the default profile. All topics without an explicit profile use QoSProfile(history=QoSHistoryPolicy.KEEP_ALL).

assert_message_published(topic: str, timeout: float | None = 2) Any

Asserts a message is published on the given topic within after at most the given time.

This method might return early if a message is received before the timeout as occurred.

Parameters:
  • topic – The topic to listen on

  • timeout – The approximate amount of time to listen before failing. None means waiting forever.

Returns:

The message that was received

assert_messages_published(topic: str, number: int, individual_timeout: float | None = 2, max_total_timeout: float | None = 30.0) List[Any]

Asserts that some messages are published on the given topic within after at most the given time.

This method might return early if ALL messages are received before the timeout as occurred.

Parameters:
  • topic – The topic to listen on

  • number – The expected number of messages to arrive

  • individual_timeout – The approximate amount of time per message to listen before failing. None means waiting forever (if max_total_timeout is also set to None). The timeout that is used is calculated by number * timeout but runs for all messages together, and NOT for each one individually. In other words, it is accepted that when waiting for number = 10 messages for timeout = 2.0 seconds (that means the total timeout is 10 * 2 = 20 seconds) and then all messages arrive after 19.9 seconds. If they arrive after 20.5 seconds, this method will have already raised an AssertionError.

  • max_total_timeout – This caps the total timeout that is computed from number * timeout if not set to None. This prevents excessively long tests times until a test fails when dealing with many messages. Most – if not all – tests should in any way not take more time than the defualt of this.

Returns:

All messages that were received. It is guaranteed that len(result) == number.

assert_no_message_published(topic: str, time_span: float = 0.5) None

Asserts that no message is published on the given topic within the given time.

Parameters:
  • topic – The topic to listen on

  • time_span – The approximate amount of time to listen before returning. This value is set to be rather low (when compared to the other timeouts) since in a successful test, it may never return early and causes a lot of waiting time.

await_future(future: <Mock name='mock.Future' id='140257370085360'>, timeout: float | None = 10) Any

Waits for the given future to complete.

Parameters:
  • future – The ROS Future to wait for

  • timeout – The timeout to wait for the future

Raises:
  • TimeoutError – If the future did not complete within the timeout

  • Exception – If the future completed with an exception

Returns:

The result of the future

call_service(name: str, request: <Mock name='mock.SrvTypeRequest' id='140257370083680'>, timeout_availability: float | None = 1, timeout_call: float | None = 10) <Mock name='mock.SrvTypeResponse' id='140257370084016'>

Calls the given service with the given request once available and returns the response.

The service type if inferred automatically from the request type.

Parameters:
  • name – The name of the service

  • request – The request to send to the service

  • timeout_availability – The timeout to wait for the service to be available

  • timeout_call – The timeout to wait for the service to respond

Returns:

The response of the service

Raises:

TimeoutError – If the service did not respond within the timeout

clear_messages(topic: str | None = None) None

Clear all messages in the mailbox of the given topic or in all mailboxes.

Parameters:

topic – The topic to clear the mailbox of, or all topics if None

destroy_node()
listen_for_messages(topic: str, time_span: float | None = 2) List[Any]

Collects all messages which arrive in the given time on the topic.

Parameters:
  • topic – The topic to listen on

  • time_span – The approximate amount of time to listen before returning. If None, this method will immediately return all messages that are currently in the queue.

Returns:

A (possibly empty) list of messages that were received

publish(topic: str, message: Any) None

Publish the message on the topic.

This method creates the publisher internally if required and caches it. Nothing needs to be done manually.

Parameters:
  • topic – The topic to publish on

  • message – The message to be sent. It’s type must match the one of all other messages sent on this topic before and after.

send_action_goal(name: str, goal: Any, collect_feedback: bool = True, timeout_availability: float | None = 1, timeout_receive_goal: float | None = 1) Any]]

Sends the goal to the given action but does not handle the result.

This does not check if the goal was accepted and does not attempt to retrieve the result.

Parameters:
  • name – The name of the action

  • goal – Goal message to send to the action server.

  • collect_feedback – Whether to collect feedbacks asynchronously. Defaults to True. This is provided for performance reasons.

  • timeout_availability – The timeout to wait for the action to be available. Defaults to 1.

  • timeout_receive_goal – The timeout to wait for the action server to accept the goal. Defaults to 1.

Raises:

TimeoutError – If the action server does not respond within the specified timeouts.

Returns:

Return the goal handle and feedback list (that might still grow asynchronously).

send_action_goal_and_wait_for_result(name: str, goal: Any, collect_feedback: bool = True, timeout_availability: float | None = 1, timeout_accept_goal: float | None = 1, timeout_get_result: float | None = 10) Tuple[List[Any], Any]

Sends the goal to the given action and returns the response handle, feedbacks and result.

Parameters:
  • name – The name of the action

  • goal – Goal message to send to the action server.

  • collect_feedback – Whether to collect feedbacks asynchronously. Defaults to True. This is provided for performance reasons.

  • timeout_availability – The timeout to wait for the action to be available. Defaults to 1.

  • timeout_accept_goal – The timeout to wait for the action server to accept the goal. Defaults to 1.

  • timeout_get_result – The timeout to wait for the action server to return the result. Defaults to 10.

Raises:
  • TimeoutError – If the action server does not respond within the specified timeouts.

  • AssertionError – If the goal was not accepted or the goal did not succeed.

Returns:

Return the goal handle, all the feedback responses and the final result.