Skip to content

File single_consumer_processor.hpp

File List > common > single_consumer_processor.hpp

Go to the documentation of this file

// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <rcpputils/thread_safety_annotations.hpp>

#include <cassert>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>

namespace nebula::ros
{

template <typename T>
class SingleConsumerProcessor
{
public:
  using callback_t = std::function<void(T &&)>;

  explicit SingleConsumerProcessor(callback_t callback, size_t max_queue_size)
  : callback_(std::move(callback)), max_queue_size_(max_queue_size)
  {
    if (max_queue_size == 0) {
      throw std::invalid_argument("Max queue size must be greater than 0");
    }

    if (!callback_) {
      throw std::invalid_argument("Callback function must not be null");
    }

    consumer_thread_ = std::thread(&SingleConsumerProcessor::consumer_loop, this);
  }

  ~SingleConsumerProcessor() { stop(); }

  // Delete copy and move constructors/operators to prevent accidental copying
  SingleConsumerProcessor(const SingleConsumerProcessor &) = delete;
  SingleConsumerProcessor & operator=(const SingleConsumerProcessor &) = delete;
  SingleConsumerProcessor(SingleConsumerProcessor &&) = default;
  SingleConsumerProcessor & operator=(SingleConsumerProcessor &&) = default;

  void push(T && item)
  {
    std::unique_lock<std::mutex> lock(queue_mutex_);

    cv_can_push_.wait(lock, [this]() { return should_stop_ || (queue_.size() < max_queue_size_); });

    if (should_stop_) {
      return;
    }

    queue_.push(std::move(item));
    lock.unlock();
    cv_can_pop_.notify_one();
  }

  bool try_push(T && item)
  {
    std::unique_lock lock(queue_mutex_);

    if (should_stop_) {
      return false;
    }

    if (queue_.size() >= max_queue_size_) {
      return false;
    }

    queue_.push(std::move(item));
    lock.unlock();
    cv_can_pop_.notify_one();
    return true;
  }

  void stop()
  {
    {
      std::lock_guard lock(queue_mutex_);
      should_stop_ = true;
    }

    cv_can_pop_.notify_all();
    cv_can_push_.notify_all();

    if (consumer_thread_.joinable()) {
      consumer_thread_.join();
    }
  }

  bool is_stopping() const
  {
    std::lock_guard lock(queue_mutex_);
    return should_stop_;
  }

private:
  void consumer_loop()
  {
    std::unique_lock lock(queue_mutex_, std::defer_lock);

    while (true) {
      lock.lock();
      // Wait for an item to be available or for stop signal
      cv_can_pop_.wait(lock, [this] { return !queue_.empty() || should_stop_; });

      if (should_stop_) {
        break;
      }

      T item = std::move(queue_.front());
      queue_.pop();
      lock.unlock();
      cv_can_push_.notify_one();
      callback_(std::move(item));
    }

    assert(lock.owns_lock());

    // Process the remaining items when stopping.
    // No need to unlock as the consumer
    while (!queue_.empty()) {
      T item = std::move(queue_.front());
      queue_.pop();
      callback_(std::move(item));
    }
  }

  callback_t callback_;
  std::thread consumer_thread_;

  mutable std::mutex queue_mutex_;
  std::condition_variable cv_can_pop_;
  std::condition_variable cv_can_push_;

  size_t max_queue_size_;
  std::queue<T> queue_ {}
  RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);
  bool should_stop_{false} RCPPUTILS_TSA_GUARDED_BY(queue_mutex_);
};

}  // namespace nebula::ros