File rate_bound_status.hpp
File List > common > diagnostics > rate_bound_status.hpp
Go to the documentation of this file
// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copied from https://github.com/tier4/ros2_v4l2_camera/pull/29
// Patched with https://github.com/tier4/ros2_v4l2_camera/pull/30
#ifndef RATE_BOUND_STATUS_HPP_
#define RATE_BOUND_STATUS_HPP_
#include <diagnostic_updater/diagnostic_updater.hpp>
#include <diagnostic_msgs/msg/diagnostic_status.hpp>
#include <rcl/time.h>
#include <iomanip>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <stdexcept>
#include <string>
#include <variant>
namespace custom_diagnostic_tasks
{
struct RateBoundStatusParam
{
RateBoundStatusParam(const double min_freq, const double max_freq)
: min_frequency(min_freq), max_frequency(max_freq)
{
}
double min_frequency;
double max_frequency;
};
class RateBoundStatus : public diagnostic_updater::DiagnosticTask
{
private:
// Helper struct to express state machine nodes
struct StateBase
{
StateBase(const unsigned char lv, const std::string m) : level(lv), num_observations(1), msg(m)
{
}
unsigned char level;
size_t num_observations;
std::string msg;
};
struct Stale : public StateBase
{
Stale()
: StateBase(diagnostic_msgs::msg::DiagnosticStatus::STALE, "Topic has not been received yet")
{
}
};
struct Ok : public StateBase
{
Ok() : StateBase(diagnostic_msgs::msg::DiagnosticStatus::OK, "Rate is reasonable") {}
};
struct Warn : public StateBase
{
Warn() : StateBase(diagnostic_msgs::msg::DiagnosticStatus::WARN, "Rate is within warning range")
{
}
};
struct Error : public StateBase
{
Error() : StateBase(diagnostic_msgs::msg::DiagnosticStatus::ERROR, "Rate is out of valid range")
{
}
};
using StateHolder = std::variant<Stale, Ok, Warn, Error>;
public:
RateBoundStatus(
const rclcpp::Node * parent_node, const RateBoundStatusParam & ok_params,
const RateBoundStatusParam & warn_params, const size_t num_frame_transition = 1,
const bool immediate_error_report = true, const std::string & name = "rate bound check")
: DiagnosticTask(name),
ok_params_(ok_params),
warn_params_(warn_params),
num_frame_transition_(num_frame_transition),
immediate_error_report_(immediate_error_report),
zero_seen_(false),
candidate_state_(Stale{}),
current_state_(Stale{})
{
if (num_frame_transition < 1) {
num_frame_transition_ = 1;
}
// Confirm `warn_params` surely has wider range than `ok_params`
if (
warn_params_.min_frequency >= ok_params_.min_frequency ||
ok_params_.max_frequency >= warn_params_.max_frequency) {
throw std::runtime_error(
"Invalid range parameters were detected. warn_params should specify a range "
"that includes a range of ok_params.");
}
// select clock according to the use_sim_time paramter set to the parent
bool use_sim_time = false;
if (parent_node->has_parameter("use_sim_time")) {
use_sim_time = parent_node->get_parameter("use_sim_time").as_bool();
}
if (use_sim_time) {
clock_ = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
} else {
clock_ = std::make_shared<rclcpp::Clock>(RCL_STEADY_TIME);
}
}
void tick()
{
std::unique_lock<std::mutex> lock(lock_);
double stamp = get_now();
if (!previous_frame_timestamp_) {
zero_seen_ = true;
} else {
zero_seen_ = false;
double delta = stamp - previous_frame_timestamp_.value();
frequency_ = (delta < 10 * std::numeric_limits<double>::epsilon())
? std::numeric_limits<double>::infinity()
: 1. / delta;
}
previous_frame_timestamp_ = stamp;
}
void run(diagnostic_updater::DiagnosticStatusWrapper & stat) override
{
std::unique_lock<std::mutex> lock(lock_);
// classify the current observation
StateHolder frame_result;
if (!frequency_ || zero_seen_) {
frame_result.emplace<Stale>();
} else {
if (ok_params_.min_frequency < frequency_ && frequency_ < ok_params_.max_frequency) {
frame_result.emplace<Ok>();
} else if (
(warn_params_.min_frequency <= frequency_ && frequency_ <= ok_params_.min_frequency) ||
(ok_params_.max_frequency <= frequency_ && frequency_ <= warn_params_.max_frequency)) {
frame_result.emplace<Warn>();
} else {
frame_result.emplace<Error>();
}
}
// check the latest update is valid one
size_t num_frame_skipped = 0;
bool is_valid_observation = true;
if (previous_frame_timestamp_) {
double stamp = get_now();
double delta = stamp - previous_frame_timestamp_.value();
double freq_from_prev_tick = 1. / delta;
// If the latest update too older than warn_params_ criteria, frame_result fires error
if (freq_from_prev_tick < warn_params_.min_frequency) {
frame_result.emplace<Error>();
is_valid_observation = false;
frequency_ = freq_from_prev_tick;
auto max_frame_period_s = 1. / warn_params_.min_frequency;
// Minimum frames to assume skipped if 'tick' calls occur at 'warn_params_.min_frequency'.
num_frame_skipped = static_cast<size_t>(delta / max_frame_period_s);
}
}
// If the classify result is same as previous one, count the number of observation
// Otherwise, update candidate
if (candidate_state_.index() == frame_result.index()) { // if result has the same status as
// candidate
std::visit([](auto & s) { s.num_observations += 1; }, candidate_state_);
} else {
candidate_state_ = frame_result;
}
// Update the current state if
// - immediate error report is required and the observed state is error
// - Or the same state is observed multiple times
if (
(immediate_error_report_ && std::holds_alternative<Error>(candidate_state_)) ||
(is_valid_observation && get_num_observations(candidate_state_) >= num_frame_transition_) ||
(!is_valid_observation && num_frame_skipped >= num_frame_transition_)) {
current_state_ = candidate_state_;
std::visit([](auto & s) { s.num_observations = 1; }, candidate_state_);
}
stat.summary(get_level(current_state_), get_msg(current_state_));
std::stringstream ss;
ss << std::fixed << std::setprecision(2) << frequency_.value_or(0.0);
stat.add("Publish rate", ss.str());
ss.str(""); // reset contents
ss << get_level_string(get_level(frame_result));
stat.add("Rate status", ss.str());
ss.str(""); // reset contents
ss << std::fixed << std::setprecision(2) << ok_params_.min_frequency;
stat.add("Minimum OK rate threshold", ss.str());
ss.str(""); // reset contents
ss << std::fixed << std::setprecision(2) << ok_params_.max_frequency;
stat.add("Maximum OK rate threshold", ss.str());
ss.str(""); // reset contents
ss << std::fixed << std::setprecision(2) << warn_params_.min_frequency;
stat.add("Minimum WARN rate threshold", ss.str());
ss.str(""); // reset contents
ss << std::fixed << std::setprecision(2) << warn_params_.max_frequency;
stat.add("Maximum WARN rate threshold", ss.str());
ss.str(""); // reset contents
ss << get_num_observations(candidate_state_);
stat.add("Observed frames", ss.str());
ss.str(""); // reset contents
ss << num_frame_skipped;
stat.add("Assumed skipped frames", ss.str());
ss.str(""); // reset contents
ss << num_frame_transition_;
stat.add("Observed frames transition threshold", ss.str());
}
protected:
RateBoundStatusParam ok_params_;
RateBoundStatusParam warn_params_;
size_t num_frame_transition_;
bool immediate_error_report_;
bool zero_seen_;
std::optional<double> frequency_;
std::optional<double> previous_frame_timestamp_;
std::mutex lock_;
StateHolder candidate_state_;
StateHolder current_state_;
std::shared_ptr<rclcpp::Clock> clock_;
double get_now() { return clock_->now().seconds(); }
static unsigned char get_level(const StateHolder & state)
{
return std::visit([](const auto & s) { return s.level; }, state);
}
static size_t get_num_observations(const StateHolder & state)
{
return std::visit([](const auto & s) { return s.num_observations; }, state);
}
static std::string get_msg(const StateHolder & state)
{
return std::visit([](const auto & s) { return s.msg; }, state);
}
static std::string get_level_string(unsigned char level)
{
switch (level) {
case diagnostic_msgs::msg::DiagnosticStatus::OK:
return "OK";
case diagnostic_msgs::msg::DiagnosticStatus::WARN:
return "WARN";
case diagnostic_msgs::msg::DiagnosticStatus::ERROR:
return "ERROR";
case diagnostic_msgs::msg::DiagnosticStatus::STALE:
return "STALE";
default:
return "UNDEFINED";
}
}
}; // class RateBoundStatus
} // namespace custom_diagnostic_tasks
#endif // RATE_BOUND_STATUS_HPP_