File single_consumer_processor.hpp
File List > common > single_consumer_processor.hpp
Go to the documentation of this file
// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <rcpputils/thread_safety_annotations.hpp>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
namespace nebula::ros
{
template <typename T>
class SingleConsumerProcessor
{
public:
using callback_t = std::function<void(T &&)>;
explicit SingleConsumerProcessor(callback_t callback, size_t max_queue_size)
: callback_(std::move(callback)), max_queue_size_(max_queue_size)
{
if (max_queue_size == 0) {
throw std::invalid_argument("Max queue size must be greater than 0");
}
if (!callback_) {
throw std::invalid_argument("Callback function must not be null");
}
consumer_thread_ = std::thread(&SingleConsumerProcessor::consumer_loop, this);
}
~SingleConsumerProcessor() { stop(); }
// Delete copy and move constructors/operators to prevent accidental copying
SingleConsumerProcessor(const SingleConsumerProcessor &) = delete;
SingleConsumerProcessor & operator=(const SingleConsumerProcessor &) = delete;
SingleConsumerProcessor(SingleConsumerProcessor &&) = default;
SingleConsumerProcessor & operator=(SingleConsumerProcessor &&) = default;
void push(T && item)
{
std::unique_lock<std::mutex> lock(queue_mutex_);
cv_can_push_.wait(lock, [this]() { return should_stop_ || (queue_.size() < max_queue_size_); });
if (should_stop_) {
return;
}
queue_.push(std::move(item));
lock.unlock();
cv_can_pop_.notify_one();
}
bool try_push(T && item)
{
std::unique_lock lock(queue_mutex_);
if (should_stop_) {
return false;
}
if (queue_.size() >= max_queue_size_) {
return false;
}
queue_.push(std::move(item));
lock.unlock();
cv_can_pop_.notify_one();
return true;
}
void stop()
{
{
std::lock_guard lock(queue_mutex_);
should_stop_ = true;
}
cv_can_pop_.notify_all();
cv_can_push_.notify_all();
if (consumer_thread_.joinable()) {
consumer_thread_.join();
}
}
bool is_stopping() const
{
std::lock_guard lock(queue_mutex_);
return should_stop_;
}
private:
void consumer_loop()
{
std::unique_lock lock(queue_mutex_, std::defer_lock);
while (true) {
lock.lock();
// Wait for an item to be available or for stop signal
cv_can_pop_.wait(lock, [this] { return !queue_.empty() || should_stop_; });
if (should_stop_) {
break;
}
T item = std::move(queue_.front());
queue_.pop();
lock.unlock();
cv_can_push_.notify_one();
callback_(std::move(item));
}
assert(lock.owns_lock());
// Process the remaining items when stopping.
// No need to unlock as the consumer
while (!queue_.empty()) {
T item = std::move(queue_.front());
queue_.pop();
callback_(std::move(item));
}
}
callback_t callback_;
std::thread consumer_thread_;
mutable std::mutex queue_mutex_;
std::condition_variable cv_can_pop_;
std::condition_variable cv_can_push_;
size_t max_queue_size_;
std::queue<T> queue_ {}
RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);
bool should_stop_{false} RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);
};
} // namespace nebula::ros