FKIE Message Filters
Improved filters for processing ROS messages
|
Store and forward data. More...
#include <fkie_message_filters/buffer.hpp>
Public Types | |
using | Input = IO< Inputs... > |
Grouped input types. | |
using | Output = IO< Outputs... > |
Grouped output types. | |
Public Member Functions | |
Buffer (BufferPolicy policy=BufferPolicy::Discard, std::size_t max_queue_size=1) noexcept | |
Constructor. | |
Buffer (const rclcpp::Node::SharedPtr &node, std::size_t max_queue_size) noexcept | |
Constructor. | |
Connection | connect_to_sink (Sink< Outputs... > &dst) noexcept |
Connect this source to a sink. | |
Connection | connect_to_source (Source< Inputs... > &src) noexcept |
Connect this sink to a source. | |
virtual void | disconnect () noexcept override |
Disconnect from all connected sources and sinks. | |
void | disconnect_from_all_sinks () noexcept |
Disconnect from all connected sinks. | |
void | disconnect_from_all_sources () noexcept |
Disconnect from all connected sources. | |
bool | has_some () const noexcept |
Check if the buffer has pending data. | |
bool | process_one () |
Wait for and process one data item. | |
template<class Rep , class Period > | |
bool | process_one (const std::chrono::duration< Rep, Period > &timeout) |
Wait for and process one data item. | |
void | reset () noexcept override |
Reset filter. | |
void | set_node (const rclcpp::Node::SharedPtr &node) noexcept |
Process data in a dedicated ROS callback group. | |
void | set_policy (BufferPolicy policy, std::size_t max_queue_size=0) |
Modify the buffer policy. | |
void | spin () |
Process all data indefinitely. | |
void | spin_once () |
Process pending data. | |
bool | wait () noexcept |
Wait for pending data. | |
template<class Rep , class Period > | |
bool | wait_for (const std::chrono::duration< Rep, Period > &timeout) noexcept |
Wait for pending data until timeout expires. | |
Protected Member Functions | |
virtual void | receive (helpers::argument_t< Inputs >... in) override |
Process incoming data. | |
void | send (helpers::argument_t< Outputs >... out) |
Pass data to all connected sinks. | |
Store and forward data.
The buffer acts as a decoupler in the filter pipeline. Data can be stored and processed at a later time. The pipeline is effectively split into independent upstream and downstream parts, and it becomes possible to run the downstream data processing asynchronously. For instance, you can run computationally expensive algorithms on ROS messages in a different thread without blocking the ROS subscriber callback queue.
The buffer can be used for at least three different use cases:
Run your own thread(s) for data processing
This is the most flexible option for advanced users. You can set up your worker threads as you desire and then call spin(), spin_once() or process_one() as you see fit.
|
inherited |
Grouped input types.
This type can be used to define sources with matching types.
|
inherited |
Grouped output types.
This type can be used to define sinks with matching types.
|
noexcept |
Constructor.
Constructs a buffer with BufferPolicy::Queue policy and data processing via ROS callbacks.
node
ROS node instance which will host a dedicated callback group for data processing max_queue_size
the maximum number of queued data items
|
noexcept |
Constructor.
policy
the buffer policy max_queue_size
for the BufferPolicy::Queue policy, the maximum number of queued data items.
|
noexceptinherited |
Connect this source to a sink.
Can be called multiple times to connect multiple sinks; in that case, the sinks receive data in the same order as they have been connected. This function does basically the same thing as Sink::connect_to_source(), only from the opposite point of view.
dst
the sink that is to be connected
|
noexceptinherited |
Connect this sink to a source.
Can be called multiple times to connect multiple sources; in that case, the sink receives data from all connected sources. This function does basically the same thing as Source::connect_to_sink(), only from the opposite point of view.
src
the source that is to be connected
|
overridevirtualnoexceptinherited |
Disconnect from all connected sources and sinks.
Convenience function that calls disconnect_from_all_sources() and disonnect_from_all_sinks().
Reimplemented from fkie_message_filters::Sink< Inputs >.
|
noexceptinherited |
Disconnect from all connected sinks.
Severs the connection to all sinks, turning the send() method into a no-op.
|
noexceptinherited |
Disconnect from all connected sources.
Severs the connection to all sources. The receive() method will not be called any more.
|
noexcept |
Check if the buffer has pending data.
true | if the current policy is BufferPolicy::Queue and a subsequent call to process_one() or spin_once() will process data. |
false | otherwise |
bool fkie_message_filters::Buffer< Inputs >::process_one | ( | ) |
Wait for and process one data item.
true | data has been processed successfully |
false | either the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or the ROS node has been shut down. |
bool fkie_message_filters::Buffer< Inputs >::process_one | ( | const std::chrono::duration< Rep, Period > & | timeout | ) |
Wait for and process one data item.
timeout
maximum duration to wait for new data if none is availabletrue | data has been processed successfully |
false | either the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or time timeout expired, or the ROS node has been shut down. |
|
overrideprotectedvirtual |
Process incoming data.
Derived classes need to override this method to handle all data that is to be consumed by the sink.
Implements fkie_message_filters::Sink< Inputs >.
|
overridevirtualnoexcept |
Reset filter.
If the buffer policy is BufferPolicy::Queue, this will clear the internal queue and discard all pending data. Otherwise, this function has no effect.
Reimplemented from fkie_message_filters::FilterBase.
|
protectedinherited |
Pass data to all connected sinks.
out
data
|
noexcept |
Process data in a dedicated ROS callback group.
Instead of running your own processing threads, you can use the ROS callback system to schedule data processing whenever new data arrives.
node
the ROS node or nullptr
to disable ROS callbacks.void fkie_message_filters::Buffer< Inputs >::set_policy | ( | BufferPolicy | policy, |
std::size_t | max_queue_size = 0 |
||
) |
Modify the buffer policy.
If the new buffer policy is not BufferPolicy::Queue, any pending call to wait(), process_one(), or spin() will return. If the buffer policy is changed to BufferPolicy::Passthru, all pending data is processed immediately before the function returns. If the buffer policy is changed to BufferPolicy::Discard, all pending data is discarded immediately.
policy
the buffer policy max_queue_size
for the BufferPolicy::Queue policy, the maximum number of queued data items. If zero, the previously set queue size remains unchanged.void fkie_message_filters::Buffer< Inputs >::spin | ( | ) |
Process all data indefinitely.
Blocks and processes all incoming data until the buffer policy is changed to something other than BufferPolicy::Queue or the ROS node is shut down.
You can call the function from multiple threads at once, and the workload will be shared among all participating threads.
void fkie_message_filters::Buffer< Inputs >::spin_once | ( | ) |
Process pending data.
Does nothing if the buffer policy is not BufferPolicy::Queue. The method is guaranteed to return as it will only process data which is pending at invocation time. This also means that there may be new data pending already when this method returns.
|
noexcept |
Wait for pending data.
true | there is data available for consumption by spin_once() |
false | either the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or the ROS node has been shut down. |
|
noexcept |
Wait for pending data until timeout expires.
timeout
maximum duration to wait for new data if none is availabletrue | there is data available for consumption by spin_once() |
false | either the current buffer policy is not BufferPolicy::Queue, or the policy has been changed to something other than BufferPolicy::Queue while the function was waiting, or the timeout expired, or the ROS node has been shut down. |