Skip to content

File rate_bound_status.hpp

File List > common > diagnostics > rate_bound_status.hpp

Go to the documentation of this file

// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copied from https://github.com/tier4/ros2_v4l2_camera/pull/29
// Patched with https://github.com/tier4/ros2_v4l2_camera/pull/30

#ifndef RATE_BOUND_STATUS_HPP_
#define RATE_BOUND_STATUS_HPP_

#include <diagnostic_updater/diagnostic_updater.hpp>

#include <diagnostic_msgs/msg/diagnostic_status.hpp>

#include <rcl/time.h>

#include <iomanip>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <stdexcept>
#include <string>
#include <variant>

namespace custom_diagnostic_tasks
{
struct RateBoundStatusParam
{
  RateBoundStatusParam(const double min_freq, const double max_freq)
  : min_frequency(min_freq), max_frequency(max_freq)
  {
  }

  double min_frequency;
  double max_frequency;
};

class RateBoundStatus : public diagnostic_updater::DiagnosticTask
{
private:
  // Helper struct to express state machine nodes
  struct StateBase
  {
    StateBase(const unsigned char lv, const std::string m) : level(lv), num_observations(1), msg(m)
    {
    }

    unsigned char level;
    size_t num_observations;
    std::string msg;
  };

  struct Stale : public StateBase
  {
    Stale()
    : StateBase(diagnostic_msgs::msg::DiagnosticStatus::STALE, "Topic has not been received yet")
    {
    }
  };

  struct Ok : public StateBase
  {
    Ok() : StateBase(diagnostic_msgs::msg::DiagnosticStatus::OK, "Rate is reasonable") {}
  };

  struct Warn : public StateBase
  {
    Warn() : StateBase(diagnostic_msgs::msg::DiagnosticStatus::WARN, "Rate is within warning range")
    {
    }
  };

  struct Error : public StateBase
  {
    Error() : StateBase(diagnostic_msgs::msg::DiagnosticStatus::ERROR, "Rate is out of valid range")
    {
    }
  };

  using StateHolder = std::variant<Stale, Ok, Warn, Error>;

public:
  RateBoundStatus(
    const rclcpp::Node * parent_node, const RateBoundStatusParam & ok_params,
    const RateBoundStatusParam & warn_params, const size_t num_frame_transition = 1,
    const bool immediate_error_report = true, const std::string & name = "rate bound check")
  : DiagnosticTask(name),
    ok_params_(ok_params),
    warn_params_(warn_params),
    num_frame_transition_(num_frame_transition),
    immediate_error_report_(immediate_error_report),
    zero_seen_(false),
    candidate_state_(Stale{}),
    current_state_(Stale{})
  {
    if (num_frame_transition < 1) {
      num_frame_transition_ = 1;
    }

    // Confirm `warn_params` surely has wider range than `ok_params`
    if (
      warn_params_.min_frequency >= ok_params_.min_frequency ||
      ok_params_.max_frequency >= warn_params_.max_frequency) {
      throw std::runtime_error(
        "Invalid range parameters were detected. warn_params should specify a range "
        "that includes a range of ok_params.");
    }

    // select clock according to the use_sim_time paramter set to the parent
    bool use_sim_time = false;
    if (parent_node->has_parameter("use_sim_time")) {
      use_sim_time = parent_node->get_parameter("use_sim_time").as_bool();
    }
    if (use_sim_time) {
      clock_ = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
    } else {
      clock_ = std::make_shared<rclcpp::Clock>(RCL_STEADY_TIME);
    }
  }

  void tick()
  {
    std::unique_lock<std::mutex> lock(lock_);
    double stamp = get_now();

    if (!previous_frame_timestamp_) {
      zero_seen_ = true;
    } else {
      zero_seen_ = false;
      double delta = stamp - previous_frame_timestamp_.value();
      frequency_ = (delta < 10 * std::numeric_limits<double>::epsilon())
                     ? std::numeric_limits<double>::infinity()
                     : 1. / delta;
    }
    previous_frame_timestamp_ = stamp;
  }

  void run(diagnostic_updater::DiagnosticStatusWrapper & stat) override
  {
    std::unique_lock<std::mutex> lock(lock_);

    // classify the current observation
    StateHolder frame_result;
    if (!frequency_ || zero_seen_) {
      frame_result.emplace<Stale>();
    } else {
      if (ok_params_.min_frequency < frequency_ && frequency_ < ok_params_.max_frequency) {
        frame_result.emplace<Ok>();
      } else if (
        (warn_params_.min_frequency <= frequency_ && frequency_ <= ok_params_.min_frequency) ||
        (ok_params_.max_frequency <= frequency_ && frequency_ <= warn_params_.max_frequency)) {
        frame_result.emplace<Warn>();
      } else {
        frame_result.emplace<Error>();
      }
    }

    // check the latest update is valid one
    size_t num_frame_skipped = 0;
    bool is_valid_observation = true;
    if (previous_frame_timestamp_) {
      double stamp = get_now();
      double delta = stamp - previous_frame_timestamp_.value();
      double freq_from_prev_tick = 1. / delta;
      // If the latest update too older than warn_params_ criteria, frame_result fires error
      if (freq_from_prev_tick < warn_params_.min_frequency) {
        frame_result.emplace<Error>();
        is_valid_observation = false;
        frequency_ = freq_from_prev_tick;
        auto max_frame_period_s = 1. / warn_params_.min_frequency;
        // Minimum frames to assume skipped if 'tick' calls occur at 'warn_params_.min_frequency'.
        num_frame_skipped = static_cast<size_t>(delta / max_frame_period_s);
      }
    }

    // If the classify result is same as previous one, count the number of observation
    // Otherwise, update candidate
    if (candidate_state_.index() == frame_result.index()) {  // if result has the same status as
                                                             // candidate
      std::visit([](auto & s) { s.num_observations += 1; }, candidate_state_);
    } else {
      candidate_state_ = frame_result;
    }

    // Update the current state if
    // - immediate error report is required and the observed state is error
    // - Or the same state is observed multiple times
    if (
      (immediate_error_report_ && std::holds_alternative<Error>(candidate_state_)) ||
      (is_valid_observation && get_num_observations(candidate_state_) >= num_frame_transition_) ||
      (!is_valid_observation && num_frame_skipped >= num_frame_transition_)) {
      current_state_ = candidate_state_;
      std::visit([](auto & s) { s.num_observations = 1; }, candidate_state_);
    }

    stat.summary(get_level(current_state_), get_msg(current_state_));

    std::stringstream ss;
    ss << std::fixed << std::setprecision(2) << frequency_.value_or(0.0);
    stat.add("Publish rate", ss.str());

    ss.str("");  // reset contents
    ss << get_level_string(get_level(frame_result));
    stat.add("Rate status", ss.str());

    ss.str("");  // reset contents
    ss << std::fixed << std::setprecision(2) << ok_params_.min_frequency;
    stat.add("Minimum OK rate threshold", ss.str());

    ss.str("");  // reset contents
    ss << std::fixed << std::setprecision(2) << ok_params_.max_frequency;
    stat.add("Maximum OK rate threshold", ss.str());

    ss.str("");  // reset contents
    ss << std::fixed << std::setprecision(2) << warn_params_.min_frequency;
    stat.add("Minimum WARN rate threshold", ss.str());

    ss.str("");  // reset contents
    ss << std::fixed << std::setprecision(2) << warn_params_.max_frequency;
    stat.add("Maximum WARN rate threshold", ss.str());

    ss.str("");  // reset contents
    ss << get_num_observations(candidate_state_);
    stat.add("Observed frames", ss.str());

    ss.str("");  // reset contents
    ss << num_frame_skipped;
    stat.add("Assumed skipped frames", ss.str());

    ss.str("");  // reset contents
    ss << num_frame_transition_;
    stat.add("Observed frames transition threshold", ss.str());
  }

protected:
  RateBoundStatusParam ok_params_;
  RateBoundStatusParam warn_params_;
  size_t num_frame_transition_;
  bool immediate_error_report_;
  bool zero_seen_;
  std::optional<double> frequency_;
  std::optional<double> previous_frame_timestamp_;
  std::mutex lock_;

  StateHolder candidate_state_;
  StateHolder current_state_;

  std::shared_ptr<rclcpp::Clock> clock_;

  double get_now() { return clock_->now().seconds(); }

  static unsigned char get_level(const StateHolder & state)
  {
    return std::visit([](const auto & s) { return s.level; }, state);
  }

  static size_t get_num_observations(const StateHolder & state)
  {
    return std::visit([](const auto & s) { return s.num_observations; }, state);
  }

  static std::string get_msg(const StateHolder & state)
  {
    return std::visit([](const auto & s) { return s.msg; }, state);
  }

  static std::string get_level_string(unsigned char level)
  {
    switch (level) {
      case diagnostic_msgs::msg::DiagnosticStatus::OK:
        return "OK";
      case diagnostic_msgs::msg::DiagnosticStatus::WARN:
        return "WARN";
      case diagnostic_msgs::msg::DiagnosticStatus::ERROR:
        return "ERROR";
      case diagnostic_msgs::msg::DiagnosticStatus::STALE:
        return "STALE";
      default:
        return "UNDEFINED";
    }
  }
};  // class RateBoundStatus

}  // namespace custom_diagnostic_tasks

#endif  // RATE_BOUND_STATUS_HPP_