safe_drive: Formally Specified Rust Bindings for ROS2

safe_drive is a Rust bindings for ROS2. This library provides formal specifications and tested the specifications by using a model checker. Therefore, you can clearly understand how the scheduler work and the safeness of it.

Specifications

Some algorithms we adopted are formally specified and tested the safeness by using TLA+. Original ROS2's executor (rclcpp) is suffering from starvation. In contrast, the starvation freedom of our executor has been validated by not only dynamic analysis but also formal verification.

See specifications.

We specified and tested as follows.

  • Single Threaded Callback Execution
    • Deadlock freedom
    • Starvation freedom
  • Scheduling Core Algorithm
    • Validate the insertion algorithm
    • Termination
  • Initialize Once
    • Deadlock freedom
    • Termination
    • Initialization is performed just once

Tutorial

This chapter explain how to use safe_drive and interoperability of existing ROS2's project written in C++ by using colcon.

Source code.

Setting-up

Install Dependencies

$ sudo apt install curl gnupg2 lsb-release python3-pip git

Install Rust

$ sudo curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
$ source $HOME/.cargo/env

Choose Default when installing Rust.

Install ROS2

$ curl -s https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add -
$ sudo sh -c 'echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" > /etc/apt/sources.list.d/ros2-latest.list'
$ sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
$ sudo apt update
$ sudo apt install ros-iron-desktop python3-colcon-common-extensions
$ . /opt/ros/iron/setup.bash

Install Colcon-cargo

$ pip install git+https://github.com/tier4/colcon-cargo.git
$ pip install git+https://github.com/colcon/colcon-ros-cargo.git

Install Cargo-ament

$ git clone https://github.com/tier4/cargo-ament-build.git
$ cd cargo-ament-build
$ cargo install --path .

Publish and Subscribe

Source code.

This chapter introduces a simple example of publish and subscribe communication. This communication is so-called topic by ROS2. There are N senders and M receivers in a topic. This tutorial implements 1 sender (in Rust) and 2 receivers (in Rust and C++).

The following figure is a example topology.

graph TD;
    Publisher1-->TopicA;
    Publisher2-->TopicA;
    TopicA-->Subscriber1;
    TopicA-->Subscriber2;
    TopicA-->Subscriber3;
    Publisher3-->TopicB;
    Publisher4-->TopicB;
    Publisher5-->TopicB;
    TopicB-->Subscriber4;
    TopicB-->Subscriber5;

There are 2 topics, TopicA and TopicB in this figure. If Publisher2 of TopicA send a message, Subscriber1-3 subscribing TopicA receive the message. Subscriber4 and Subscriber5 can receive messages sent from Publisher3-5.

Directories

First of all, create working directories as follows.

$ mkdir pubsub
$ mkdir pubsub/src

Then create projects of Rust by using cargo as follows.

$ cd pubsub/src
$ cargo new my_talker
$ cargo new my_listener

Following directories are important directories we will use throughout this tutorial.

DirectoriesWhat?
pubsub/src/my_talkerpublisher in Rust
pubsub/src/my_listenersubscriber in Rust
pubsub/installcreated by colcon

Workspace of Rust

To handle multiple projects of cargo, we recommend to prepare Cargo.toml for the workspace as follows.

pubsub/src/Cargo.toml

[workspace]
members = ["my_talker", "my_listener"]

The workspace enables rust-analyzer and reduces compilation time.


Talker in Rust

Let's start implementing a publisher. We created and will edit files of the talker as follows. These files will automatically created when you do cargo new as described before.

FilesWhat?
my_talker/Cargo.tomlfor Cargo
my_talker/package.xmlfor ROS2
my_talker/src/main.rssource code

Edit my_talker/Cargo.toml

Cargo.toml is used to describe a project of Rust. Add safe_drive to the dependencies of Cargo.toml to use it. Currently, safe_drive's repository is private, and we need to specify the path as follows.

pubsub/src/my_talker/Cargo.toml

[dependencies]
safe_drive = "0.3"
std_msgs = { path = "/tmp/safe_drive_tutorial/pubsub/std_msgs" }

[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/pubsub"
safe_drive_version = "0.3"

Edit my_talker/src/main.rs

my_talker is very simple. This does as follows.

  1. Create a context.
  2. Create a node from the context.
  3. Create a publisher from the node.
  4. Send a message periodically.

A context is a resource manager of ROS2, and it must be created first of all. A node contains publishers, subscribers, servers, and clients. A node is a functional unit of ROS2. Publishers and subscribers can be created by a node.

The following figure is an example hierarchy.

graph TD;
    Context-->NodeA
    NodeA-->SubscriberA
    NodeA-->SubscriberB
    NodeA-->PublisherA
    NodeA-->PublisherB
    NodeA-->PublisherC

In this figure, NodeA is created by a context, and subscribers and publishers are created by the node.

Now, we have understood the basics of ROS2. The following is a code of my_talker. You should also understand what this code is doing.

pubsub/src/my_talker/src/main.rs

use safe_drive::{
    context::Context, error::DynError, logger::Logger, pr_info
};
use std::time::Duration;

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("my_talker", None, Default::default())?;

    // Create a publisher.
    let publisher = node.create_publisher::<std_msgs::msg::String>("my_topic", None)?;

    // Create a logger.
    let logger = Logger::new("my_talker");

    let mut cnt = 0; // Counter.
    let mut msg = std_msgs::msg::String::new().unwrap();
    loop {
        // Create a message to be sent.
        let data = format!("Hello, World!: cnt = {cnt}");
        msg.data.assign(&data);

        pr_info!(logger, "send: {}", msg.data); // Print log.

        // Send a message.
        publisher.send(&msg)?;

        // Sleep 1s.
        cnt += 1;
        std::thread::sleep(Duration::from_secs(1));
    }
}

my_talker in Detail

create_node creates a node.

#![allow(unused)]
fn main() {
// Create a node.
let node = ctx.create_node("my_talker", None, Default::default())?;
}

The arguments indicate as follows.

  • "my_talker" : the name of the node.
  • Node : namespace.
    • We can pass a namespace like Some("namespace").
    • If None is passed, the node has no namespace.
  • Default::default() : option of node.

create_publisher creates a publisher.

#![allow(unused)]
fn main() {
// Create a publisher.
let publisher = node.create_publisher::<std_msgs::msg::String>("my_topic", None)?;
}
  • <std_msgs::msg::String> : the publisher can send values of std_msgs::msg::String.
  • "my_topic" : the topic name to which the publisher send messages.
  • None : QoS of the publisher. QoS will be described in a later chapter.

A message can be sent by send method as follows.

#![allow(unused)]
fn main() {
// Create a message to be sent.
let data = format!("Hello, World!: cnt = {cnt}");
msg.data.assign(&data);

// Send a message.
publisher.send(&msg)?;
}

Logger is used to print some messages. It can be created as follows.

#![allow(unused)]
fn main() {
// Create a logger.
let logger = Logger::new("my_talker");
}

The argument of "my_talker" is the name of the logger. To print a message, use pr_* macros as follows.

#![allow(unused)]
fn main() {
pr_info!(logger, "send: {}", msg.data); // Print log.
}

There are macros for logging as follows.

  • pr_debug! : debug
  • pr_info! : information
  • pr_warn! : warning
  • pr_err! : error
  • pr_fatal! : fatal

Create my_talker/package.xml

package.xml is used by colcon, which is a build tool used by ROS2. It contains the package name, maintainer, description, etc, as follows.

pubsub/src/my_talker/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_talker</name>
  <version>0.0.0</version>
  <description>My Talker in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <depend>std_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Copy and paste this, then edit it if you want.

Execute the Talker

Before compiling, ensure that you load setting of ROS2 as follows. If you have already done so, you do not need this.

$ . /opt/ros/iron/setup.bash

Then compile by using colcon as follows. Before compiling, change the current directory to pubsub, which is the top directory of our project.

$ cd pubsub
$ colcon build --cargo-args --release

--cargo-args --release is a option to pass the --release argument to cargo. cargo optimizes the code when --release is specified.

To launch my_talker, load the setting and use ros2 command as follows.

$ . ./install/setup.bash
$ ros2 run my_talker my_talker
[INFO] [1656048392.368568500] [my_talker]: send: Hello, World!: cnt = 0
[INFO] [1656048393.368787500] [my_talker]: send: Hello, World!: cnt = 1
[INFO] [1656048394.368994200] [my_talker]: send: Hello, World!: cnt = 2

Listener in Rust

Let's then implement a listener. We created and will edit files of the listener as follows.

FilesWhat?
my_listener/Cargo.tomlfor Cargo
my_listener/package.xmlfor ROS2
my_listener/src/main.rssource code

Edit my_listener/Cargo.toml

my_listener also requires safe_drive. Add safe_drive to the dependencies as follows.

# pubsub/src/my_listener/Cargo.toml
[dependencies]
safe_drive = "0.3"
std_msgs = { path = "/tmp/safe_drive_tutorial/pubsub/std_msgs" }

[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/pubsub"
safe_drive_version = "0.3"

Edit my_listener/src/main.rs

To implement subscriber, we have to prepare a callback function of the subscriber. This is the main difference from my_talker.

pubsub/src/my_listener/src/main.rs

use safe_drive::{
    context::Context, error::DynError, logger::Logger, pr_info,
};

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("my_listener", None, Default::default())?;

    // Create a subscriber.
    let subscriber = node.create_subscriber::<std_msgs::msg::String>("my_topic", None)?;

    // Create a logger.
    let logger = Logger::new("my_listener");

    // Create a selector.
    let mut selector = ctx.create_selector()?;

    // Add a callback function.
    selector.add_subscriber(
        subscriber,
        Box::new(move |msg| {
            pr_info!(logger, "receive: {}", msg.data);
        }),
    );

    // Spin.
    loop {
        selector.wait()?;
    }
}

my_listener in Detail

Similar to the publisher, create_subscriber creates a subscriber.

#![allow(unused)]
fn main() {
// Create a subscriber.
let subscriber = node.create_subscriber::<std_msgs::msg::String>("my_topic", None)?;
}

The arguments are as follows.

  • "my_topic" : the name of the topic to which the subscriber is subscribing.
  • None : QoS of the subscriber. Discussed in a later chapter.

To add and invoke a callback function. We need to create a selector, which can wait some events and invoke callback functions of the events. A selector is similar to select or epoll function, and equivalent to a executer of ROS2.

#![allow(unused)]
fn main() {
// Create a selector.
let mut selector = ctx.create_selector()?;

// Add a callback function.
selector.add_subscriber(
    subscriber,
    Box::new(move |msg| {
        pr_info!(logger, "receive: {}", msg.data);
    }),
);

// Spin.
loop {
    selector.wait()?;
}
}

The arguments of add_subscriber method are as follows.

  • subscriber : the subscriber.
  • Box::new(move |msg| { pr_info!(logger, "receive: {}", msg.data); }) : the callback function.
  • false : the callback function is called only once or not. If false is passed, the callback function is invoked every time and forever.

selector.wait() wait events. To receive events forever, use infinite loop.

Create my_listener/package.xml

package.xml is almost same as above. The only difference is the name of the package, which is my_listener.

pubsub/src/my_listener/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_listener</name>
  <version>0.0.0</version>
  <description>My Listener in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <depend>std_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Execute the Listener

Before compiling and executing, execute my_talker in other terminal, and don't forget to load the setting of ROS2 as mentioned above. The compilation can be done as follows.

$ cd pubsub
$ colcon build --cargo-args --release

To execute my_listener, load the setting of our projects and execute it by using ros2 command as follows.

$ . ./install/setup.bash
$ ros2 run my_talker my_talker
[INFO] [1656050459.231579900] [my_listener]: receive: Hello, World!: cnt = 4
[INFO] [1656050460.231831200] [my_listener]: receive: Hello, World!: cnt = 5
[INFO] [1656050461.232120000] [my_listener]: receive: Hello, World!: cnt = 6

Nicely done! We are receiving messages sent from my_talker.

Timer

Source code.

This tutorial does not use colcon to build. We use only cargo, which is a Rust's standard build system.

Don't forget loading ROS2's environment as follows. If you already done so, you do not need this.

$ . /opt/ros/iron/setup.bash

Wall-timer

A wall-timer is a timer which periodically invoked. This section describes how to use a wall-timer.

Let's create a package by using a cargo as follows.

$ cargo new wall_timer
$ cd wall_timer

Then, add safe_drive to the dependencies of Cargo.toml.

[dependencies]
safe_drive = "0.3"

The following code is an example using a wall-timer. The important method is Selector::add_wall_timer() which takes a name, a duration, and a callback function.

use safe_drive::{
    context::Context, error::DynError, logger::Logger, msg::common_interfaces::std_msgs, pr_info,
};
use std::{rc::Rc, time::Duration};

fn main() -> Result<(), DynError> {
    // Create a context, a node, a subscriber, a publisher, and a selector.
    let ctx = Context::new()?;
    let node = ctx.create_node("my_node", None, Default::default())?;
    let subscriber = node.create_subscriber::<std_msgs::msg::UInt64>("my_topic", None)?;
    let publisher = node.create_publisher::<std_msgs::msg::UInt64>("my_topic", None)?;
    let mut selector = ctx.create_selector()?;

    // Create a logger.
    // To share this by multiple callback functions, use Rc.
    let logger = Rc::new(Logger::new("wall timer example"));

    // Add a wall timer to publish periodically.
    let mut cnt = Box::new(0);
    let mut msg = std_msgs::msg::UInt64::new().unwrap();
    let logger1 = logger.clone();

    selector.add_wall_timer(
        "publisher", // the name of timer
        Duration::from_secs(1),
        Box::new(move || {
            msg.data = *cnt;
            *cnt += 1;
            publisher.send(&msg).unwrap();
            pr_info!(logger1, "send: msg.data = {}", msg.data);
        }),
    );

    // Add a subscriber.
    selector.add_subscriber(
        subscriber,
        Box::new(move |msg| {
            pr_info!(logger, "received: msg.data = {}", msg.data);
        }),
    );

    // Spin.
    loop {
        selector.wait()?;
    }
}

Timers can be set by a method of selector as follows, and the timers will be invoked when calling the Selector::wait() methods.

#![allow(unused)]
fn main() {
selector.add_wall_timer(
    "publisher", // the name of timer
    Duration::from_secs(1),
    Box::new(move || {
        msg.data = *cnt;
        *cnt += 1;
        publisher.send(&msg).unwrap();
        pr_info!(logger1, "send: msg.data = {}", msg.data);
    }),
);
}
  • "publisher" is the name of this timer. The name is used for statistics. You can use any name.
  • Duration::from_secs(1) is the duration for periodic invoking. This argument means the callback function is invoked every 1 second.
  • Box::new(move || ...) is the callback function.

There is a publisher invoked by a timer, and a subscriber in this code. When executing this, transmission and reception will be confirmed as follows.

$ cargo run
[INFO] [1656557242.842509800] [wall timer example]: send: msg.data = 0
[INFO] [1656557242.842953300] [wall timer example]: received: msg.data = 0
[INFO] [1656557243.843103800] [wall timer example]: send: msg.data = 1
[INFO] [1656557243.843272900] [wall timer example]: received: msg.data = 1
[INFO] [1656557244.843574600] [wall timer example]: send: msg.data = 2
[INFO] [1656557244.844021200] [wall timer example]: received: msg.data = 2
[INFO] [1656557245.844349800] [wall timer example]: send: msg.data = 3
[INFO] [1656557245.844702900] [wall timer example]: received: msg.data = 3

One-shot Timer

A wall-timer is invoked periodically, but one-shot timer is invoked only once. A one-shot can be set by the Selector::add_timer() method as follows.

use safe_drive::{context::Context, error::DynError, logger::Logger, pr_info};
use std::{cell::RefCell, collections::VecDeque, rc::Rc, time::Duration};

pub fn main() -> Result<(), DynError> {
    // Create a context, a publisher, and a logger.
    let ctx = Context::new()?;
    let mut selector = ctx.create_selector()?;
    let logger = Rc::new(Logger::new("one-shot timer example"));

    let queue = Rc::new(RefCell::new(VecDeque::new()));

    // Add a one-shot timer.
    let queue1 = queue.clone();
    selector.add_timer(
        Duration::from_secs(2),
        Box::new(move || {
            pr_info!(logger, "fired!");

            // Insert a timer to the queue.
            let mut q = queue1.borrow_mut();
            let logger1 = logger.clone();
            q.push_back((
                Duration::from_secs(2),
                (Box::new(move || pr_info!(logger1, "fired! again!"))),
            ));
        }),
    );

    // Spin.
    loop {
        {
            // Set timers.
            let mut q = queue.borrow_mut();
            while let Some((dur, f)) = q.pop_front() {
                selector.add_timer(dur, f);
            }
        }

        selector.wait()?;
    }
}

Selector::add_timer() does not take the name, but other arguments are same as Selector::add_wall_timer().

#![allow(unused)]
fn main() {
selector.add_timer(
    Duration::from_secs(2),
    Box::new(move || ...),
);
}
  • Duration::from_secs(2) is a duration indicating when the timer will be invoked.
  • Box::new(move || ...) is the callback function.

This code reenables a timer in the callback function. To reenable, the callback takes a queue and timers in the queue is reenabled in the spin as follows.

#![allow(unused)]
fn main() {
// Spin.
loop {
    {
        // Set timers.
        let mut q = queue.borrow_mut();
        while let Some((dur, f)) = q.pop_front() {
            selector.add_timer(dur, f);
        }
    }

    selector.wait()?;
}
}

The important thing is that the borrowed resources must be released. To release definitely, the code fraction borrowing the queue is surrounded by braces.

The following is a execution result of this code.

$ cargo run
[INFO] [1657070943.324438900] [one-shot timer example]: fired!
[INFO] [1657070945.324675600] [one-shot timer example]: fired! again!

Multi-threaded Publish and Subscribe

Source code.

Previous chapters use a selector to wait messages. A selector can be used by only a single thread. This means previous implementation is single-threaded.

To execute tasks concurrently, we can use the async/await feature of Rust. By using this, we can implement multi-threaded applications. In this chapter, we will describe how to use safe_drive with async/await. We use async_std as a asynchronous library, but you can use Tokio instead.

Creating Projects

Before implementing, we need to prepare projects as follows.

$ mkdir -p mt_pubsub/src
$ cd mt_pubsub/src
$ cargo new publishers
$ cargo new subscribers

The mt_pubsub is the root directory of this project, and we created publishers and subscribers of Rust's projects. At the moment, the following directories is present.

DirectoriesWhat?
mt_pubsub/src/publisherspublishers in Rust
mt_pubsub/src/subscriberssubscribers in Rust

Common Configuration

Then, we have to create or edit the following files for basic configurations.

FilesWhat?
mt_pubsub/src/publishers/Cargo.tomlfor Cargo
mt_pubsub/src/publishers/package.xmlfor ROS2
mt_pubsub/src/publishers/build.rsfor rustc
mt_pubsub/src/subscribers/Cargo.tomlfor Cargo
mt_pubsub/src/subscribers/package.xmlfor ROS2
mt_pubsub/src/subscribers/build.rsfor rustc
mt_pubsub/src/Cargo.tomlfor Cargo's workspace

To enable rust-analyzer in the mt_pubsub directory and reduce the compilation time, prepare workspace's Cargo.toml as follows.

mt_pubsub/src/Cargo.toml

[workspace]
members = ["publishers", "subscribers"]

package.xmls are almost same as Publish and Subscribe. If you cannot understand what these files do, please go back to the previous chapter.

publishers/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>publishers</name>
  <version>0.0.0</version>
  <description>MT-Publishers</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <depend>std_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

subscribers/package.xml

  <name>subscribers</name>
  <description>MT-Subscribers</description>

To use async_std, we have to update Cargo.toml as follows.

Cargo.toml

[dependencies]
async-std = { version = "1", features = ["attributes"] }
safe_drive = "0.3"
std_msgs = { path = "/tmp/safe_drive_tutorial/mt_pubsub/std_msgs" }

[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/mt_pubsub"
safe_drive_version = "0.3"

Publishers

The publishers publishes messages to 2 topics periodically. This create 2 tasks. One is send a message every 500ms, and another is send a message every 250ms. The code of the publishers is as follows.

mt_pubsub/src/publishers/src/main.rs

use safe_drive::{context::Context, error::DynError};
use std::time::Duration;

#[async_std::main]
async fn main() -> Result<(), DynError> {
    // Create a context and a node.
    let ctx = Context::new()?;
    let node = ctx.create_node("publishers", None, Default::default())?;

    // Create publishers.
    let publisher1 = node.create_publisher::<std_msgs::msg::String>("topic1", None)?;
    let publisher2 = node.create_publisher::<std_msgs::msg::String>("topic2", None)?;

    // Create a task which sends "Hello, World!".
    let task1 = async_std::task::spawn(async move {
        let mut msg = std_msgs::msg::String::new().unwrap();
        msg.data.assign("Hello, World!");
        for _ in 0..50 {
            publisher1.send(&msg).unwrap(); // Send a message.
            async_std::task::sleep(Duration::from_millis(500)).await; // Sleep 500ms.
        }
    });

    // Create a task which sends "Hello, Universe!".
    let task2 = async_std::task::spawn(async move {
        let mut msg = std_msgs::msg::String::new().unwrap();
        msg.data.assign("Hello, Universe!");
        for _ in 0..100 {
            publisher2.send(&msg).unwrap(); // Send a message.
            async_std::task::sleep(Duration::from_millis(250)).await; // Sleep 250ms.
        }
    });

    task1.await;
    task2.await;

    Ok(())
}

The async_std::task::spawn creates a asynchronous task, which is similar to a thread. The following is a excerpt creating a task which sends "Hello, World!" to "topic1" every 500ms.

#![allow(unused)]
fn main() {
// Create a task which sends "Hello, World!".
let task1 = async_std::task::spawn(async move {
    let mut msg = std_msgs::msg::String::new().unwrap();
    msg.data.assign("Hello, World!");
    for _ in 0..50 {
        publisher1.send(&msg).unwrap(); // Send a message.
        async_std::task::sleep(Duration::from_millis(500)).await; // Sleep 500ms.
    }
});
}

This excerpt sends a message by Publisher::send and sleep by async_std::task::sleep. Note that this uses async_std::task::sleep instead of std::thread::sleep, because the former is non-blocking but the the latter is blocking. Calling blocking functions should be avoided in asynchronous tasks.

Subscribers

Then, let's implement the subscribers. The main function is almost same as previous one.

mt_pubsub/src/subscribers/src/main

use safe_drive::{
    context::Context, error::DynError, logger::Logger, pr_info, topic::subscriber::Subscriber,
};

#[async_std::main]
async fn main() -> Result<(), DynError> {
    // Create a context and a node.
    let ctx = Context::new()?;
    let node = ctx.create_node("subscribers", None, Default::default())?;

    // Create subscribers.
    let subscriber1 = node.create_subscriber::<std_msgs::msg::String>("topic1", None)?;
    let subscriber2 = node.create_subscriber::<std_msgs::msg::String>("topic2", None)?;

    // Receive messages.
    let task1 = async_std::task::spawn(receiver(subscriber1));
    let task2 = async_std::task::spawn(receiver(subscriber2));

    task1.await?;
    task2.await?;

    Ok(())
}

async fn receiver(mut subscriber: Subscriber<std_msgs::msg::String>) -> Result<(), DynError> {
    let logger = Logger::new(subscriber.get_topic_name());

    loop {
        // Receive a message
        let msg = subscriber.recv().await?;
        pr_info!(logger, "{}", msg.data);
    }
}

Subscriber::recv is an asynchronous function to receive a message. To receive asynchronously, use the .await keyword. If there is a message to be received, recv().await returns the message immediately. Otherwise, it yields the execution to another task and sleeps until arriving a message.

Modification for Timeout

By using a timeout mechanism provided by asynchronous libraries, we can implement receiving with timeout. Timeout can be implemented as follows.

#![allow(unused)]
fn main() {
use async_std::future::timeout;
use safe_drive::pr_warn;
use std::time::Duration;
async fn receiver(mut subscriber: Subscriber<std_msgs::msg::String>) -> Result<(), DynError> {
    let logger = Logger::new(subscriber.get_topic_name());

    loop {
        // Receive a message with 3s timeout.
        match timeout(Duration::from_secs(3), subscriber.recv()).await {
            Ok(result) => pr_info!(logger, "received: {}", result?.data),
            Err(_) => {
                // Timed out. Finish receiving.
                pr_warn!(logger, "timeout");
                break;
            }
        }
    }

    Ok(())
}
}

async_std::timeout is a function to represent timeout. timeout(Duration::from_secs(3), subscriber.recv()).await calls subscriber.recv() asynchronously, but it will be timed out after 3s later.

Execution

First, execute publishers in a terminal application window as follows.

$ cd mt_pubsub
$ colcon build --cargo-args --release
$ . ./install/setup.bash
$ ros2 run publishers publishers

Then, execute subscribers in another terminal application window as follows. This uses timeout for receiving.

$ cd mt_pubsub.
$ . ./install/setup.bash
$ ros2 run subscribers subscribers
[INFO] [1657076046.969809600] [topic2]: received: Hello, Universe!
[INFO] [1657076047.220104100] [topic2]: received: Hello, Universe!
[INFO] [1657076047.447426100] [topic1]: received: Hello, World!
[INFO] [1657076047.470348600] [topic2]: received: Hello, Universe!
[INFO] [1657076047.721980900] [topic2]: received: Hello, Universe!
[WARN] [1657076050.448393200] [topic1]: timeout
[WARN] [1657076050.722433800] [topic2]: timeout

Nicely done! We are sending and receiving messages asynchronously. In addition to that, the timeouts work correctly.

User Defined Data Structure

Source code.

Until previous tutorial, we used pre-defined message types. In this tutorial, we will describe how to define user defined types.

Create Project Directory

Then create a project directory as follows.

$ mkdir -p msgtest/src

Throughout this tutorial, we will create 4 packages as follows.

packagesdescription
msgtest/src/my_interfacesdefining types of ROS2
msgtest/src/talkera publisher
msgtest/src/listenera subscriber

Workspace's Cargo.toml

The workspace's Cargo.toml should be created as follows.

msgtest/src/Cargo.toml

[workspace]
members = ["talker", "listener"]

Then, create projects as follows.

$ cd msgtest/src
$ cargo new talker
$ cargo new listener

Define User Defined Type

To define message types, we have to create a ROS2's package, and create a '.msg' files. The package can be created in the ordinary way of ROS2 as follows.

$ cd msgtest/src
$ ros2 pkg create --build-type ament_cmake my_interfaces
$ cd my_interfaces
$ mkdir msg
$ cd msg

Primitive Type: my_interfaces/msg/MyMsg.msg

Then create a file, msgtest/src/my_interfaces/msg, as follows.

int32 integer_value
int32[] unbounded_integer_array
int32[5] five_integers_array
int32[<=5] up_to_five_integers_array

There are 4 values in this type.

  • integer_value : a value of the int32 type
  • unbounded_integer_array : an unbounded array of the int32 type
  • five_integers_array : an array which size is 5 of the int32 type
  • up_to_five_integers_array : an array whose size is up to 5 of the int32 type

Using User Defiend Type: my_interfaces/msg/MyMsgs.msg

We can use the MyMsg previously defined in another message type, msgtest/src/my_interfaces/msg/MyMsgs.msg, as follows.

MyMsg msg1
MyMsg msg2

String Type: my_interfaces/msg/MyMsgStr.msg

A size of an array can be specified as described above, a length of a string can be also specified. For example, string<=10 is a type of string, but its length is up to 10.

We prepare msgtest/src/my_interfaces/msg/MyMsgStr.msg as follows.

string message
string[2] static_array_str
string[] dynamic_array_str
string[<=3] bounded_array_str

string<=10 bounded_str
string<=10[2] static_array_bounded_str
string<=10[] dynamic_array_bounded_str
string<=10[<=3] bounded_array_bounded_str

Edit my_interfaces/CMakeLists.txt

To generate C or C++ files and libraries for used defined types, we have to edit CMakeLists.txt as follows.

msgtest/src/my_interfaces/CMakeLists.txt

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/MyMsg.msg"
  "msg/MyMsgs.msg"
  "msg/MyMsgStr.msg"
)

We have to specify messages files in CMakeLists.txt.

Edit my_interfaces/package.xml

We also have to edit package.xml as follows.

msgtest/src/my_interfaces/package.xml

<build_depend>rosidl_default_generators</build_depend>

<exec_depend>rosidl_default_runtime</exec_depend>

<member_of_group>rosidl_interface_packages</member_of_group>

User Defined Type in Rust

Primitive types are translated into Rust's types as follows.

ROS2Rust
boolbool
byteu8
chari8
int8i8
uint8u8
int16i16
uint16u16
int32i32
uint32u32
int64i64
uint64u64
float32f32
float64f64
stringsafe_drive::msg::RosString

Generated Types

Array types are generated as follows.

ROS2Rust
int32[5][i32; 5]
int32[]safe_drive::msg::I32Seq<0>
int32[<=5]safe_drive::msg::I32Seq<5>

0 of I32Seq<0> indicates unbounded, and 5 of I32Seq<5> indicates less than or equal to 5. So, MyMsg and MyMsgs are generated as follows.

Talker

Let's implement a talker which publishes MyMsgs periodically.

Edit talker/Cargo.toml

To use the generated types in Rust, we have to edit Cargo.toml as follows. The most important thing is to add my_interfaces, which defines message types we use.

# msgtest/src/talker/Cargo.toml
[dependencies]
safe_drive = "0.3"
my_interfaces = { path = "/tmp/safe_drive_tutorial/msgtest/my_interfaces" }

[package.metadata.ros]
msg = ["my_interfaces"]
msg_dir = "/tmp/safe_drive_tutorial/msgtest"
safe_drive_version = "0.3"

Create talker/package.xml

Then create package.xml as follows.

msgtest/src/talker/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>talker</name>
  <version>0.0.0</version>
  <description>Talker in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <depend>my_interfaces</depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Don't forget <depend>my_interfaces</depend>.

Generated Files

When you run colcon, it generate my_interfaces in Rust.

$ cd msgtest
$ colcon build --cargo-args --release

Then, you can find Rust's files as follows.

/tmp/safe_drive_tutorial/msgtest/my_interfaces/src/msg/my_msg.rs

#![allow(unused)]
fn main() {
#[repr(C)]
#[derive(Debug)]
pub struct MyMsg {
    pub integer_value: i32,
    pub unbounded_integer_array: safe_drive::msg::I32Seq<0>,
    pub five_integers_array: [i32; 5],
    pub up_to_five_integers_array: safe_drive::msg::I32Seq<5>,
}
}

/tmp/safe_drive_tutorial/msgtest/my_interfaces/src/msg/my_msgs.rs

#![allow(unused)]
fn main() {
#[repr(C)]
#[derive(Debug)]
pub struct MyMsgs {
    pub msg1: crate::msg::my_msg::MyMsg,
    pub msg2: crate::msg::my_msg::MyMsg,
}
}

Edit talker/src/main.rs

If you want to know how to implement a subscriber or a publisher, please see a tutorial of Pub/Sub. This section describes how to handle a messages generated by ros2msg_to_rs.

msgtest/src/talker/src/main.rs

use safe_drive::{
    context::Context,
    error::DynError,
    logger::Logger,
    msg::{I32Seq, RosStringSeq},
    pr_info,
};
use std::time::Duration;

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("talker", None, Default::default())?;

    // Create a publisher.
    let publisher = node.create_publisher::<my_interfaces::msg::MyMsgs>("my_topic", None)?;

    // Create a logger.
    let logger = Logger::new("talker");

    // Create a message
    let my_msg1 = create_message()?;
    let my_msg2 = create_message()?;

    let mut my_msgs = my_interfaces::msg::MyMsgs::new().ok_or("failed to create MyMsgs")?;
    my_msgs.msg1 = my_msg1;
    my_msgs.msg2 = my_msg2;

    loop {
        pr_info!(logger, "send: {:?}", my_msgs); // Print log.

        // Send a message.
        publisher.send(&my_msgs)?;

        std::thread::sleep(Duration::from_secs(1));
    }
}

fn create_message() -> Result<my_interfaces::msg::MyMsg, DynError> {
    let mut my_msg = my_interfaces::msg::MyMsg::new().unwrap();

    my_msg.integer_value = 10;

    // int32[5] five_integers_array
    my_msg.five_integers_array[0] = 11;
    my_msg.five_integers_array[1] = 13;
    my_msg.five_integers_array[2] = 49;
    my_msg.five_integers_array[3] = 55;
    my_msg.five_integers_array[4] = 19;

    // int32[] unbounded_integer_array
    let mut msgs = I32Seq::new(3).unwrap();
    let ref_msgs = msgs.as_slice_mut();
    ref_msgs[0] = 6;
    ref_msgs[1] = 7;
    ref_msgs[2] = 8;
    my_msg.unbounded_integer_array = msgs;

    // int32[<=5] up_to_five_integers_array
    let mut msgs = I32Seq::new(2).unwrap();
    let ref_msgs = msgs.as_slice_mut();
    ref_msgs[0] = 2;
    ref_msgs[1] = 3;
    my_msg.up_to_five_integers_array = msgs;

    Ok(my_msg)
}

Primitive types and arrays can be handles in the ordinary way of Rust as follows.

#![allow(unused)]
fn main() {
my_msg.integer_value = 10;

// int32[5] five_integers_array
my_msg.five_integers_array[0] = 11;
my_msg.five_integers_array[1] = 13;
my_msg.five_integers_array[2] = 49;
my_msg.five_integers_array[3] = 55;
my_msg.five_integers_array[4] = 19;
}

To access elements of unbounded or bounded arrays, we can use as_slice_mut() or as_slice() methods as follows.

#![allow(unused)]
fn main() {
// unbounded or unbounded array
let mut msgs = I32Seq::new(3).unwrap();
let ref_msgs = msgs.as_slice_mut();
}

as_slice_mut() returns a mutable slice, you can thus update the elements of the array via the slice.

Listener

Let's then implement a listener which receive messages published by the talker.

Edit listener/Cargo.toml

The listener also requires my_interfaces, and edit Cargo.toml as follows.

# msgtest/src/listener/Cargo.toml
[dependencies]
safe_drive = "0.3"
my_interfaces = { path = "/tmp/safe_drive_tutorial/msgtest/my_interfaces" }

[package.metadata.ros]
msg = ["my_interfaces"]
msg_dir = "/tmp/safe_drive_tutorial/msgtest"
safe_drive_version = "0.3"

Create listener/package.xml

Then create package.xml as follows.

msgtest/src/listener/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>listener</name>
  <version>0.0.0</version>
  <description>Listener in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <depend>my_interfaces</depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Don't forget <depend>my_interfaces</depend>.

Edit listener/src/main.rs

The listener can also be implemented straightforwardly as follows.

msgtest/src/listener/src/main.rs

use my_interfaces_rs::my_interfaces;
use safe_drive::{context::Context, error::DynError, logger::Logger, pr_info};

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("listener", None, Default::default())?;

    // Create a subscriber.
    let subscriber = node.create_subscriber::<my_interfaces::msg::MyMsgs>("my_topic", None)?;

    // Create a logger.
    let logger = Logger::new("listener");

    // Create a selector.
    let mut selector = ctx.create_selector()?;

    pr_info!(logger, "listening");

    // Add a callback function.
    selector.add_subscriber(
        subscriber,
        Box::new(move |msg| {
            let msg = &msg.msg1;

            pr_info!(logger, "message: {}", msg.integer_value);

            for msg in msg.five_integers_array.iter() {
                pr_info!(logger, "five_integers_array: {}", msg);
            }

            for msg in msg.unbounded_integer_array.iter() {
                pr_info!(logger, "unbounded_integer_array: {}", msg);
            }

            for msg in msg.up_to_five_integers_array.iter() {
                pr_info!(logger, "up_to_five_integers_array: {}", msg);
            }
        }),
        false,
    );

    // Spin.
    loop {
        selector.wait()?;
    }
}

Compilation and Execution

Now, we can compile and execute the talker and listener. Let's do it!

Compile

The compilation can be performed by using colcon as follows.

$ cd msgtest
$ colcon build --cargo-args --release
$ . ./install/setup.bash

Execute Listener

The listener can be executed by using ros2 as follows. After executing the talker, it receives messages as follows.

$ ros2 run listener listener
[INFO] [1658305910.013449534] [listener]: listening
[INFO] [1658305914.359791460] [listener]: message: 10
[INFO] [1658305914.359839382] [listener]: five_integers_array: 11
[INFO] [1658305914.359867532] [listener]: five_integers_array: 13
[INFO] [1658305914.359880763] [listener]: five_integers_array: 49
[INFO] [1658305914.359889731] [listener]: five_integers_array: 55
[INFO] [1658305914.359900913] [listener]: five_integers_array: 19
[INFO] [1658305914.359912534] [listener]: unbounded_integer_array: 6
[INFO] [1658305914.359924084] [listener]: unbounded_integer_array: 7
[INFO] [1658305914.359936971] [listener]: unbounded_integer_array: 8
[INFO] [1658305914.359946479] [listener]: up_to_five_integers_array: 2
[INFO] [1658305914.359959422] [listener]: up_to_five_integers_array: 3

Execute Talker

To execute the talker, open a new terminal window and execute it as follows.

$ cd msgtest
$ . ./install/setup.bash
$ ros2 run talker talker
[INFO] [1658305913.359250753] [talker]: send: MyMsgs { msg1: MyMsg { integer_value: 10, unbounded_integer_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653f7aa0, size: 3, capacity: 3 }), five_integers_array: [11, 13, 49, 55, 19], up_to_five_integers_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653efaa0, size: 2, capacity: 2 }) }, msg2: MyMsg { integer_value: 10, unbounded_integer_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653f7e30, size: 3, capacity: 3 }), five_integers_array: [11, 13, 49, 55, 19], up_to_five_integers_array: I32Seq(rosidl_runtime_c__int32__Sequence { data: 0x55a0653f7e50, size: 2, capacity: 2 }) }

Nicely done! Now, we can define new types and handle the types in Rust.

String Type

Arrays of string are bit different from arrays of primitive types. string is unbounded string, and string<=5 is bounded string whose length is up to 5. So, there are arrays for unbounded and bounded strings as follows; msg is safe_drive::msg.

ROSRust
stringmsg::RosString<0>
string[]msg::StringSeq<0, 0>
string[<=5]msg::StringSeq<0, 5>
string[10][msg::RosString<0>; 10]
string<=5msg::RosString<5>
string<=5[<=10]msg::StringSeq<5, 10>
string<=5[10][msg::RosString<5>; 10]

RosString<0> is a type of unbounded string, and RosString<5> is a type of bounded string whose length is less than or equal to 5. 5 of StringSeq<5, 10> indicates the length of a string is less than or equal to 5, and 10 of it indicates the length of the array is less than or equal to 10.

For example, the following ROS2 message type can be translated into the MyMsgStr as follows.

string message
string[2] static_array_str
string[] dynamic_array_str
string[<=3] bounded_array_str

string<=10 bounded_str
string<=10[2] static_array_bounded_str
string<=10[] dynamic_array_bounded_str
string<=10[<=3] bounded_array_bounded_str

/tmp/safe_drive_tutorial/msgtest/my_interfaces/src/msg/my_msg_str.rs

#![allow(unused)]
fn main() {
#[repr(C)]
#[derive(Debug)]
pub struct MyMsgStr {
    pub message: safe_drive::msg::RosString<0>,
    pub static_array_str: [safe_drive::msg::RosString<0>; 2],
    pub dynamic_array_str: safe_drive::msg::RosStringSeq<0, 0>,
    pub bounded_array_str: safe_drive::msg::RosStringSeq<0, 3>,
    pub bounded_str: safe_drive::msg::RosString<10>,
    pub static_array_bounded_str: [safe_drive::msg::RosString<10>; 2],
    pub dynamic_array_bounded_str: safe_drive::msg::RosStringSeq<10, 0>,
    pub bounded_array_bounded_str: safe_drive::msg::RosStringSeq<10, 3>,
}
}

To access to elements of string arrays, we can use as_slice_mut() or as_slice_mut() as follows.

#![allow(unused)]
fn main() {
fn _create_message_str() -> Result<my_interfaces::msg::MyMsgStr, DynError> {
    let mut my_msg = my_interfaces::msg::MyMsgStr::new().unwrap();

    // string message
    my_msg.message.assign("Hello, World!");

    // string[2] static_array_str
    my_msg.static_array_str[0].assign("static array 0");
    my_msg.static_array_str[1].assign("static array 1");

    // string[] dynamic_array_str
    let mut msgs = RosStringSeq::new(3).ok_or("failed to create string")?;
    let ref_msgs = msgs.as_slice_mut();
    ref_msgs[0].assign("dynamic array 0");
    ref_msgs[1].assign("dynamic array 1");
    ref_msgs[2].assign("dynamic array 2");
    my_msg.dynamic_array_str = msgs;

    // string[<=3] bounded_array_str
    let mut msgs = RosStringSeq::new(2).ok_or("failed to create string")?;
    let ref_msgs = msgs.as_slice_mut();
    ref_msgs[0].assign("bounded array 0");
    ref_msgs[1].assign("bounded array 1");
    my_msg.bounded_array_str = msgs;

    // string<=10 bounded_str
    my_msg.bounded_str.assign("Hello!");

    // string<=10[2] static_array_bounded_str
    my_msg.static_array_bounded_str[0].assign("msg1");
    my_msg.static_array_bounded_str[1].assign("msg2");

    // string<=10[] dynamic_array_bounded_str
    let mut msgs = RosStringSeq::new(3).ok_or("failed to create string")?;
    let ref_msgs = msgs.as_slice_mut();
    ref_msgs[0].assign("msg3");
    ref_msgs[1].assign("msg4");
    ref_msgs[2].assign("msg5");
    my_msg.dynamic_array_bounded_str = msgs;

    // string<=10[<=3] bounded_array_bounded_str
    let mut msgs = RosStringSeq::new(2).ok_or("failed to create string")?;
    let ref_msgs = msgs.as_slice_mut();
    ref_msgs[0].assign("msg3");
    ref_msgs[1].assign("msg5");
    my_msg.bounded_array_bounded_str = msgs;

    Ok(my_msg)
}
}

Service

Source code.

A service is a communication consisting requests and responses. There are a server and clients for a service as follows.

graph TD;
    Server--response-->Client1;
    Server--response-->Client2;
    Server--response-->Client3;
    Client1--request-->Server;
    Client2--request-->Server;
    Client3--request-->Server;

A client sends a request and the server replies the request. In this tutorial, we will describe how to implement a client and a server in Rust by using safe_drive.

Create Base Files

In this tutorial, we prepare 3 Rust's projects and 1 C's project. The following table shows the directories we use.

DirectoriesDescription
srvtest/src/serverserver in Rust
srvtest/src/clientclient in Rust
srvtest/src/srvmsgmessage type

srvmsg is a ROS2's project to define a user defined type for a service we implement.

$ mkdir -p srvtest/src
$ cd srvtest/src
$ cargo new server
$ cargo new client
$ ros2 pkg create --build-type ament_cmake srvmsg

In addition to that, create the workspace's Cargo.toml as follows.

srvtest/src/Cargo.toml

[workspace]
members = [ "client", "server"]

Define Protocol

First of all, let's define a protocol for a service. ROS2 provides a special format to define a protocol, and it should be described in a .srv file.

Usually, .srv files are placed in a srv directory. So, we create srv directory as follows.

$ cd srvtest/src/srvmsg
$ mkdir srv

Create srvmsg/srv/AddTwoInts.srv

Then, create AddTwoInts.srv file in which a protocol is specified as follows.

uint32 x
uint32 y
---
uint32 result

uint32 x and uint32 y above -- are types which must be included in a request, and uint32 result is a type which must be included in a response. We will implement a server which takes 2 integer values and returns the summation of the values.

Edit srvmsg/CMakeLists.txt

To generate shared libraries from AddTwoInts.srv, CMakeLists.txt must be updated as follows.

# srvtest/src/srvmsg/CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "srv/AddTwoInts.srv"
)

Edit srvmsg/package.xml

In addition to that, the following lines must be added to package.xml.

srvtest/src/srvmsg/package.xml

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

Server

A server can be implemented in a straightforward way.

Edit server/Cargo.toml

To generate Rust types from .srv files, we have to edit Cargo.toml as follows.

[dependencies]
safe_drive = "0.3"
srvmsg = { path = "/tmp/safe_drive_tutorial/srvtest/srvmsg" }
tokio = { version = "1", features = ["full"] }

[package.metadata.ros]
msg = ["srvmsg"]
msg_dir = "/tmp/safe_drive_tutorial/srvtest"
safe_drive_version = "0.3"

Create server/package.xml

package.xml is also required as follows.

srvtest/src/server/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>server</name>
  <version>0.0.0</version>
  <description>Server in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <depend>srvmsg</depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Don't forget <depend>srvmsg</depend>.

Generate Types

AddTwoInts.srv will be translated to struct AddTwoInts as follows.

#![allow(unused)]
fn main() {
#[derive(Debug)]
pub struct AddTwoInts;

impl ServiceMsg for AddTwoInts {
    type Request = AddTwoInts_Request;
    type Response = AddTwoInts_Response;
    fn type_support() -> *const rcl::rosidl_service_type_support_t {
        unsafe {
            rosidl_typesupport_c__get_service_type_support_handle__srvmsg__srv__AddTwoInts()
        }
    }
}
}

struct AddTwoInts implements ServiceMsg trait. ServiceMsg::Request and ServiceMsg::Response are types of request and response, respectively. AddTwoInts_Request and AddTowInts_Response are as follows.

#![allow(unused)]
fn main() {
#[repr(C)]
#[derive(Debug)]
pub struct AddTwoInts_Request {
    pub x: u32,
    pub y: u32,
}

#[repr(C)]
#[derive(Debug)]
pub struct AddTwoInts_Response {
    pub result: u32,
}
}

Edit server/src/main.rs

You have to create a server by create_server() method and register a callback function to a selector as follows.

use safe_drive::{context::Context, error::DynError, logger::Logger, pr_error, qos::Profile};
use srvmsg_rs::srvmsg::srv::{AddTwoInts, AddTwoInts_Response};

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("server_node", None, Default::default())?;

    // Create a server.
    let server = node.create_server::<AddTwoInts>("my_service", Some(Profile::default()))?;

    // Create a selector.
    let mut selector = ctx.create_selector()?;

    // Create a logger.
    let logger = Logger::new("server");

    selector.add_server(
        server,
        Box::new(move |msg, _header| {
            let mut response = AddTwoInts_Response::new().unwrap();
            pr_error!(logger, "recv: {:?}", msg);
            response.result = msg.x + msg.y;
            response
        }),
    );

    loop {
        selector.wait()?; // Spin.
    }
}

Let's dig into detail.

#![allow(unused)]
fn main() {
// Create a server.
let server = node.create_server::<AddTwoInts>("my_service", Some(Profile::default()))?;
}

AddTwoInts of create_server::<AddTwoInts> indicates the protocol of the server. The arguments of "my_service" and Some(Profile::default()) are the service name and QoS. If you pass None instead of Some(Profile::default()), Profile::default() is used.

The callback was passed as follows.

#![allow(unused)]
fn main() {
selector.add_server(
    server,
    Box::new(move |msg, _header| {
        let mut response = AddTwoInts_Response::new().unwrap();
        pr_error!(logger, "recv: {:?}", msg);
        response.result = msg.x + msg.y;
        response
    }),
);
}

The callback function must take a message sent by a client and a header including sequence number, time, etc. msg's types is AddTwoInts_Request and a value of AddTwoInts_Response, which is a response, must be returned.

Client

Edit client/Cargo.toml

safe_drive, and tokio must be added to Cargo.toml as follows.

srvtest/src/client/Cargo.toml

[dependencies]
safe_drive = "0.3"
srvmsg = { path = "/tmp/safe_drive_tutorial/srvtest/srvmsg" }
tokio = { version = "1", features = ["full"] }

[package.metadata.ros]
msg = ["srvmsg"]
msg_dir = "/tmp/safe_drive_tutorial/srvtest"
safe_drive_version = "0.3"

Create client/package.xml

package.xml is also required as follows.

srvtest/src/client/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>client</name>
  <version>0.0.0</version>
  <description>Client in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <depend>srvmsg</depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Don't forget <depend>srvmsg</depend>.

Edit client/src/main.rs

We recommend to use async/await to implement a client, because a client wait a response, but when the response is replied is unpredictable. In this tutorial, we use Tokio, which is the most popular asynchronous library of Rust.

use safe_drive::{
    context::Context, error::DynError, logger::Logger, pr_error, pr_info, pr_warn, qos::Profile,
};
use srvmsg_rs::srvmsg::srv::{AddTwoInts, AddTwoInts_Request};
use std::time::Duration;
use tokio::time::timeout;

#[tokio::main]
async fn main() -> Result<(), DynError> {
    // Create a context and a node.
    let ctx = Context::new()?;
    let node = ctx.create_node("client", None, Default::default())?;

    let logger = Logger::new("client");

    // Create a client.
    let mut client = node.create_client::<AddTwoInts>("my_service", Some(Profile::default()))?;

    let mut n = 0;
    loop {
        let mut request = AddTwoInts_Request::new().unwrap();
        request.x = n;
        request.y = n + 1;
        n += 1;

        // Send a request.
        let client_rcv = client.send(&request)?;

        // Receive a request.
        let mut receiver = client_rcv.recv();

        match timeout(Duration::from_secs(1), &mut receiver).await {
            Ok(Ok((c, response, _header))) => {
                pr_info!(logger, "received {:?}", response);
                client = c;
            }
            Ok(Err(e)) => {
                pr_error!(logger, "error: {e}");
                return Err(e);
            }
            Err(elapsed) => {
                pr_warn!(logger, "timeout: {elapsed}");
                client = receiver.give_up();
            }
        }

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

Because a response will be never returned, we use tokio::time::timeout to receive a response. timeout(Duration::from_secs(1), &mut receiver).await takes a duration and a awaitable future. The awaitable future can be taken by client_rcv.recv().

Note that it uses tokio::time::sleep instead of std::thread::sleep because to avoid calling blocking functions.

client.send() consumes client and it returns client_recv to receive a response. client_rcv.recv() consumes client_recv and client_rcv.recv().await returns new client, a response, and a header. The new client must be used to send next request.

This means that you cannot ignore receiving a response. If you forget to receive a response, you cannot send any request again, because you need to get a new client by receiving a response.. Otherwise, you encountered compilation errors.

Execution

First of all, you have to compile it and execute a server as follows.

$ cd srvtest
$ colcon build --cargo-args --release
$ . ./install/setup.bash
$ ros2 run server server

Then, in another terminal, execute a client as follows.

$ cd srvtest
$ . ./install/setup.bash
$ ros2 run client client
[WARN] [1659604527.720730018] [client]: timeout: deadline has elapsed
[INFO] [1659604528.722220697] [client]: received AddTwoInts_Response { result: 3 }
[INFO] [1659604529.723525686] [client]: received AddTwoInts_Response { result: 5 }
[INFO] [1659604530.724820326] [client]: received AddTwoInts_Response { result: 7 }

Nicely done! We used async/await for the client, and it can be used for the server as follows.

#![allow(unused)]
fn main() {
async { server.recv().await };
}

Parameter

Source code.

This chapter introduces how to use parameters. A node can have parameters as follows.

graph TD;
    NodeA --> ParamA1:i64;
    NodeA --> ParamA2:bool;
    NodeA --> ParamA3:String;
    NodeB --> ParamB1:f64;
    NodeB --> ParamB2:f64;

In this figure, NodeA has 3 parameters whose types are i64, bool, and String, respectively, and NodeB has 2 parameters whose types are f64. Parameters can be read/write from external nodes.

To receive a notification of updating a parameter, we can use a callback function or async/await. In this chapter, we will explain how to handle it.

Packages

We will prepare 2 packages as follows.

$ mkdir params
$ cd params
$ cargo new param_server
$ cargo new async_param_server

param_server explains a callback based parameter handling, and async_param_server explains an async/await based.

To manage 2 packages, let's prepare Cargo.toml file for a workspace as follows.

params/Cargo.toml

[workspace]
members = ["param_server", "async_param_server"]

The following table shows files we use in this chapter.

FileWhat?
params/param_server/callback based parameter server
params/async_param_server/async/await based parameter server
params/Cargo.tomlworkspace configuration

Type of Parameter

Before explaining handlers, let's see types of parameters.

The type of parameter value is defined by safe_drive::parameter::Value as follows.

#![allow(unused)]
fn main() {
pub enum Value {
    NotSet,
    Bool(bool),
    I64(i64),
    F64(f64),
    String(String),
    VecBool(Vec<bool>),
    VecI64(Vec<i64>),
    VecU8(Vec<u8>),
    VecF64(Vec<f64>),
    VecString(Vec<String>),
}
}

This means that i64 is valid, but i8, i32, and other user defined types are invalid.

A parameter is associated with a descriptor of safe_drive::parameter::Descriptor as follows.

#![allow(unused)]
fn main() {
pub struct Descriptor {
    pub description: String,
    pub additional_constraints: String,
    pub read_only: bool,
    pub dynamic_typing: bool,
    pub floating_point_range: Option<FloatingPointRange>,
    pub integer_range: Option<IntegerRange>,
}
}

So, a parameter and parameters can be represented by safe_drive::parameter::{Parameter, Parameters} as follows.

#![allow(unused)]
fn main() {
pub struct Parameter {
    pub descriptor: Descriptor,
    pub value: Value,
}

pub struct Parameters { /* omitted private fields */ }
}

and a parameter server can be represented by safe_drive::parameter::ParameterServer as follows.

#![allow(unused)]
fn main() {
pub struct ParameterServer {
    pub params: Arc<RwLock<Parameters>>
    // omitted private fields
}
}

In summary, a parameter server can be represented as follows.

graph TD;
    ParameterServer --> Parameters;
    Parameters --> Parameter;
    Parameter --> Descriptor;
    Parameter --> Value;

Parameter Setting and Waiting Update by Callback

We will explain how to set parameters, and how to wait updating by using a callback function.

Edit param_server/Cargo.toml

Add safe_drive to the dependencies section of Cargo.toml as follows.

[dependencies]
safe_drive = "0.3"

Edit param_server/src/main.rs

param_server can be implemented as follows. This sets 2 parameters up and waits updating.

use safe_drive::{context::Context, error::DynError, logger::Logger, parameter::Value, pr_info};

fn main() -> Result<(), DynError> {
    // Create a context and a node.
    let ctx = Context::new()?;
    let node = ctx.create_node("param_server", None, Default::default())?;

    // Create a parameter server.
    let param_server = node.create_parameter_server()?;
    {
        // Set parameters.
        let mut params = param_server.params.write(); // Write lock

        // Statically typed parameter.
        params.set_parameter(
            "my_flag".to_string(),                     // parameter name
            Value::Bool(false),                        // value
            false,                                     // read only?
            Some("my flag's description".to_string()), // description
        )?;

        // Dynamically typed parameter.
        params.set_dynamically_typed_parameter(
            "my_dynamic_type_flag".to_string(), // parameter name
            Value::Bool(false),                 // value
            false,                              // read only?
            Some("my dynamic type flag's description".to_string()), // description
        )?;

        // Add Directly from Parameter struct
        let parameter_to_set = Parameter {
            Descriptor {
                Some("my parameter description".to_string()),                 // parameter description
                Some("my parameter addutional_constraints".to_string()),      // parameter additional constraints
                false,                                                                  // read only ?
                false,                                                                  // static or Dynamic
                None,                                                                   // floating point range 
                None,                                                                   // integer point range 
            },
            Value::Bool(false),                                                         // value
        }
        params.add_parameter(
            ("my parameter").to_string(),                                     // name
            parameter_to_set,                                                           // parameter
        )?;
    }

    // Create a logger and a selector.
    let logger = Logger::new("param_server");
    let mut selector = ctx.create_selector()?;

    // Add a callback function to the parameter server.
    selector.add_parameter_server(
        param_server,
        Box::new(move |params, updated| {
            // Print updated parameters.
            let mut keys = String::new();
            for key in updated.iter() {
                let value = &params.get_parameter(key).unwrap().value;
                keys = format!("{keys}{key} = {}, ", value);
            }
            pr_info!(logger, "updated parameters: {keys}");
        }),
    );

    // Spin.
    loop {
        selector.wait()?;
    }
}

param_server in Detail

First of all, we have to create a parameter server by create_parameter_server() method as follows.

#![allow(unused)]
fn main() {
// Create a parameter server.
let param_server = node.create_parameter_server()?;
}

Then set parameters as follows. To update parameters, we have to acquire a write lock for mutual exclusion by write() method.

#![allow(unused)]
fn main() {
{
    // Set parameters.
    let mut params = param_server.params.write(); // Write lock

    // Statically typed parameter.
    params.set_parameter(
        "my_flag".to_string(),                     // parameter name
        Value::Bool(false),                        // value
        false,                                     // read only?
        Some("my flag's description".to_string()), // description
    )?;

    // Dynamically typed parameter.
    params.set_dynamically_typed_parameter(
        "my_dynamic_type_flag".to_string(), // parameter name
        Value::Bool(false),                 // value
        false,                              // read only?
        Some("my dynamic type flag's description".to_string()), // description
    )?;

    // Add Directly from Parameter struct
        let parameter_to_set = Parameter {
            Descriptor {
                Some("my parameter description".to_string()),                 // parameter description
                Some("my parameter addutional_constraints".to_string()),      // parameter additional constraints
                false,                                                                  // read only ?
                false,                                                                  // static or Dynamic
                None,                                                                   // floating point range 
                None,                                                                   // integer point range 
            },
            Value::Bool(false),                                                         // value
        }
        params.add_parameter(
            ("my parameter").to_string(),                                     // name
            parameter_to_set,                                                           // parameter
        )?;
}
}

To set a statically typed parameters, use set_parameter(). A statically typed parameter cannot be updated by a value whose type is different from original type.

To set a dynamically typed parameters, use set_dynamically_typed_parameter(). A dynamically typed parameter can be updated by an arbitrary type.

To set directly from the Parameter struct, use add_parameter(). A Parameter struct can contain additional_constraints.

Finally, register a callback function to wait updating parameters as follows.

#![allow(unused)]
fn main() {
// Add a callback function to the parameter server.
selector.add_parameter_server(
    param_server,
    Box::new(move |params, updated| {
        // Print updated parameters.
        let mut keys = String::new();
        for key in updated.iter() {
            let value = &params.get_parameter(key).unwrap().value;
            keys = format!("{keys}{key} = {}, ", value);
        }
        pr_info!(logger, "updated parameters: {keys}");
    }),
);

// Spin.
loop {
    selector.wait()?;
}
}

1st argument of the closure, params, is a value of parameters, and 2nd argument, updated is a value containing updated parameters.

Execute param_server

Then, let's execute param_server and get/set a parameter. First, execute param_server in a terminal application window as follows.

$ cargo run --bin param_server
    Finished dev [unoptimized + debuginfo] target(s) in 0.01s
     Running `target/debug/param_server`
[INFO] [1669873997.622330908] [param_server]: updated parameters: my_flag = true,

Then, get and set the parameter by using ros2 command in another terminal application window as follows.

$ ros2 param get param_server my_flag
Boolean value is: False
$ ros2 param set param_server my_flag True
Set parameter successful
$ ros2 param get param_server my_flag
Boolean value is: True
$ ros2 param set param_server my_flag 10
Setting parameter failed: failed type checking: dst = Bool, src = I64

We can get and set boolean values, but integer value cannot be set because my_flag is boolean type and statically typed.


Asynchronous Wait

Then, we explain async/await based parameter server.

Edit async_param_server/Cargo.toml

Add safe_drive and tokio to the dependencies section of Cargo.toml as follows.

[dependencies]
safe_drive = "0.3"
tokio = { version = "1", features = ["full"] }

Edit async_param_server/src/main.rs

async_param_server can be implemented as follows. This sets 2 parameters up and waits updating.

use safe_drive::{context::Context, error::DynError, logger::Logger, parameter::Value, pr_info};

#[tokio::main]
async fn main() -> Result<(), DynError> {
    // Create a context and a node.
    let ctx = Context::new()?;
    let node = ctx.create_node("async_param_server", None, Default::default())?;

    // Create a parameter server.
    let mut param_server = node.create_parameter_server()?;
    {
        // Set parameters.
        let mut params = param_server.params.write(); // Write lock

        // Statically typed parameter.
        params.set_parameter(
            "my_flag".to_string(),                     // parameter name
            Value::Bool(false),                        // value
            false,                                     // read only?
            Some("my flag's description".to_string()), // description
        )?;

        // Dynamically typed parameter.
        params.set_dynamically_typed_parameter(
            "my_dynamic_type_flag".to_string(), // parameter name
            Value::Bool(false),                 // value
            false,                              // read only?
            Some("my dynamic type flag's description".to_string()), // description
        )?;

        // Add Directly from Parameter struct
        let parameter_to_set = Parameter {
            Descriptor {
                Some("my parameter description".to_string()),                 // parameter description
                Some("my parameter addutional_constraints".to_string()),      // parameter additional constraints
                false,                                                                  // read only ?
                false,                                                                  // static or Dynamic
                None,                                                                   // floating point range 
                None,                                                                   // integer point range 
            },
            Value::Bool(false),                                                         // value
        }
        params.add_parameter(
            ("my parameter").to_string(),                                     // name
            parameter_to_set,                                                           // parameter
        )?;
    }

    // Create a logger.
    let logger = Logger::new("async_param_server");

    loop {
        // Wait update asynchronously.
        let updated = param_server.wait().await?;

        let params = param_server.params.read(); // Read lock

        // Print updated parameters.
        let mut keys = String::new();
        for key in updated.iter() {
            let value = &params.get_parameter(key).unwrap().value;
            keys = format!("{keys}{key} = {}, ", value);
        }
        pr_info!(logger, "updated parameters: {keys}");
    }
}

Important things are only following 2 lines.

#![allow(unused)]
fn main() {
// Wait update asynchronously.
let updated = param_server.wait().await?;

let params = param_server.params.read(); // Read lock
}

To wait updating, use wait().await and acquire a read lock by read() method to read parameters. wait().await returns updated parameters.

Zero Copy Publish and Subscribe

Source code.

In this chapter, we will introduce how to use zero copy communications by CycloneDDS. To use CycloneDDS, please install it as Eclipse Cyclone DDS.

Create Project

First of all, create a project as follows.

$ cargo new zerocpy

The files we use are as follows. cyclonedds.xml will be created and used later.

FilesWhat?
zerocopy/Cargo.tomlCargo.toml
zerocopy/src/main.rsmain.rs
zerocopy/cyclonedds.xmlconfiguration of CycloneDDS

Add safe_drive to dependencies section of Cargo.toml as follows.

Cargo.toml

[dependencies]
safe_drive = "0.3"

main.rs

main.rs can be implemented as follows, but almost every lines are same as shown before.

main.rs

use safe_drive::{context::Context, msg::common_interfaces::std_msgs};
use std::{error::Error, time::Duration};

const TOPIC_NAME: &str = "pubsub_loaned";

fn main() -> Result<(), Box<dyn Error + Sync + Send + 'static>> {
    // create a context
    let ctx = Context::new()?;

    // create a subscribe node
    let node_sub = ctx.create_node("loaned_sub_node", None, Default::default())?;

    // create a publish node
    let node_pub = ctx.create_node("loaned_pub_node", None, Default::default())?;

    std::thread::sleep(Duration::from_millis(500));

    // create a publisher and a subscriber
    let subscriber = node_sub.create_subscriber::<std_msgs::msg::Bool>(TOPIC_NAME, None)?;
    let publisher = node_pub.create_publisher::<std_msgs::msg::Bool>(TOPIC_NAME, None)?;

    let mut loaned = publisher.borrow_loaned_message()?;
    *loaned = std_msgs::msg::Bool::new().ok_or("failed to new Bool")?;
    loaned.data = false;
    publisher.send_loaned(loaned)?; // send message

    std::thread::sleep(Duration::from_millis(500));

    // wait messages
    let mut selector = ctx.create_selector()?;
    selector.add_subscriber(
        subscriber,
        Box::new(move |msg| {
            assert!(!msg.data);
        }),
    );
    selector.wait()?;

    Ok(())
}

The important thing is using borrow_loaned_message() and send_loaned() methods as follows.

#![allow(unused)]
fn main() {
let mut loaned = publisher.borrow_loaned_message()?;
*loaned = std_msgs::msg::Bool::new().ok_or("failed to new Bool")?;
loaned.data = false;
publisher.send_loaned(loaned)?; // send message
}

borrow_loaned_message() borrows a memory region from CycloneDDS and send_loaned() sends a message in the borrowed region. safe_drive automatically check whether zero copy is available or not, and it uses conventional copied APIs if zero copy is not available.

Setting-up Zero Copy

To enable zero copy, please prepare cyclonedds.xml, which is a configuration file of CycloneDDS, as follows. You can use arbitrary name for it.

cyclonedds.xml

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/iceoryx/etc/cyclonedds.xsd">
    <Domain id="any">
        <SharedMemory>
            <Enable>true</Enable>
            <LogLevel>info</LogLevel>
        </SharedMemory>
    </Domain>
</CycloneDDS>

<SharedMemory> tag indicates the configuration of zero copy.

To use zero copy enabled CycloneDDS, please set environment arguments as follows.

$ export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
$ export CYCLONEDDS_URI=file://path/to/cyclonedds.xml

In addition to that, CycloneDDS requires iox-roudi daemon to use zero copy. It can be launched as follows.

$ iox-roudi

Execute

After that, zero copy communications can be performed!

$ cargo run

Custom Memory Allocator

Under construction.

$ mkdir custom_alloc
$ cargo new forwarder
$ cargo new requester
$ cat Cargo.toml
[workspace]
members = ["requester", "forwarder"]

Request to Server in Callback

Source code.

In this chapter, we will introduce how to access a server in a callback function. The code introduced here can be implemented by using async/await more beautifully.

Subscriber and Client

The concept here is very simple. We use 2 selectors for a callback function and accessing a server, and recv_timeout() to receive a response from the server.

We only show Rust code of a publisher as follows. Other files can be found in Source code.

use safe_drive::{
    context::Context,
    error::DynError,
    logger::Logger,
    msg::common_interfaces::{std_msgs, std_srvs},
    pr_fatal, pr_info,
    selector::Selector,
    service::client::Client,
    topic::subscriber::Subscriber,
    RecvResult,
};
use std::time::Duration;

fn main() -> Result<(), DynError> {
    // Create a context and a node.
    let ctx = Context::new()?;
    let node = ctx.create_node("listen_client", None, Default::default())?;

    // Create 2 selectors.
    let selector = ctx.create_selector()?;
    let selector_client = ctx.create_selector()?;

    // Create a subscriber, a client, and a logger.
    let subscriber = node.create_subscriber::<std_msgs::msg::Empty>("pubsubsrv_topic", None)?;
    let client = node.create_client::<std_srvs::srv::Empty>("pubsubsrv_service", None)?;

    worker(selector, selector_client, subscriber, client)?;

    Ok(())
}

fn worker(
    mut selector: Selector,
    mut selector_client: Selector,
    subscriber: Subscriber<std_msgs::msg::Empty>,
    client: Client<std_srvs::srv::Empty>,
) -> Result<(), DynError> {
    let mut client = Some(client);
    let logger = Logger::new("listen_client");

    selector.add_subscriber(
        subscriber,
        Box::new(move |_msg| {
            pr_info!(logger, "receive a message");

            // Take the client.
            let c = client.take().unwrap();

            let request = std_srvs::srv::EmptyRequest::new().unwrap();

            // Send a request.
            let receiver = c.send(&request).unwrap();

            // Receive a response.
            match receiver.recv_timeout(Duration::from_millis(20), &mut selector_client) {
                RecvResult::Ok((c, _response, _header)) => {
                    pr_info!(logger, "receive a response");
                    client = Some(c)
                }
                RecvResult::RetryLater(r) => client = Some(r.give_up()),
                RecvResult::Err(e) => {
                    pr_fatal!(logger, "{e}");
                    panic!()
                }
            }
        }),
    );

    loop {
        selector.wait()?;
    }
}

in Detail

The important pieces are as follows. This code create 2 selectors first. selector_client is used for a service.

#![allow(unused)]
fn main() {
// Create 2 selectors.
let selector = ctx.create_selector()?;
let selector_client = ctx.create_selector()?;
}

In the callback function, recv_timeout() method, which takes selector_client, is used as follows.

#![allow(unused)]
fn main() {
// Send a request.
let receiver = c.send(&request).unwrap();

// Receive a response.
match receiver.recv_timeout(Duration::from_millis(20), &mut selector_client) {
    RecvResult::Ok((c, _response, _header)) => {
        pr_info!(logger, "receive a response");
        client = Some(c)
    }
    RecvResult::RetryLater(r) => client = Some(r.give_up()),
    RecvResult::Err(e) => {
        pr_fatal!(logger, "{e}");
        panic!()
    }
}
}

In this callback function, it sends a request to the server and receives a response by recv_timeout() method.

State Change

A service's state can be represented as follows.

graph LR;
    Start-->Send
    Send-->Receive
    Receive-->Send

To represent the state, safe_drive uses the type system of Rust.

First, a client is created as follows. At that time, it must be Send state.

#![allow(unused)]
fn main() {
let mut client = Some(client); // Send state.
}

send() method consumes the client and returns receiver. It means the state is changed from Send to Receive.

#![allow(unused)]
fn main() {
let c = client.take().unwrap();
let receiver = c.send(&request).unwrap(); // Change state from Send to Receive.
}

recv_timeout() also consumes receiver and returns a new client to send a new request if it successfully receives a response.

#![allow(unused)]
fn main() {
// Receive a response.
match receiver.recv_timeout(Duration::from_millis(20), &mut selector_client) {
    RecvResult::Ok((c, _response, _header)) => {
        pr_info!(logger, "receive a response");
        client = Some(c)
    }
    RecvResult::RetryLater(r) => client = Some(r.give_up()),
    RecvResult::Err(e) => {
        pr_fatal!(logger, "{e}");
        panic!()
    }
}
}

When it has timed out, you can choose retrying or giving up. This code chooses giving up by give_up() method. give_up() methods consumes a receiver and return a new client to send a new request.

This code will work, but we think using async/await is better than this because it is designed for asynchronous programming. I recommend to rewrite this code by using async/await.

Contribution Guide

This chapter explains safe_drive's internal briefly to contribute to it.

Setting-up for Contribution

Basic Setting-up

See Setting-up for basic setting-up.

Setting-up Development Environment

To build safe_drive, you need install bindgen-cli and ros2msg_to_rs as follows.

$ cargo install bindgen-cli
$ cargo install --git https://github.com/tier4/ros2msg_to_rs.git

bindgen_cli is a transpiler from C to Rust, and ros2msg_to_rs is also a transpiler from .msg and .srv to Rust.

If you want to contribute to documents you are reading now, please install mdbook and mdbook-mermaind as follows.

$ cargo install mdbook
$ cargo install mdbook-mermaid

Finally, download safe_drive as follows.

$ git clone https://github.com/tier4/safe_drive
$ cd safe_drive

Following chapters introduce how to hack safe_drive in this directory.

Use Docker

We provide Docker files in docker. You can use this to hack safe_drive as follows, alternatively.

$ git clone https://github.com/tier4/safe_drive.git
$ cd safe_drive/docker
$ docker compose build
$ docker compose up -d
$ docker exec -it docker-safe_drive-1 /bin/zsh

Build and Test

Build and test can be done by using make as follows.

Build:

$ make

Test:

$ make test

make test executes test codes in tests, and codes in documentation comment.

cargo test may not work because it requires some environment variables and some code generation. Please see Makefile.

Cleaning-up can be done as follows.

$ make clean

Editor Setting-up

safe_drive has features like iron, humble or galactic, which indicates ROS2 version. We recommend pass one of a feature to enable editor's support.

If you use ROS2 humble and Visual Studio Code, please add iron in rust-analyzer -> cargo -> features as follows.

{
    "rust-analyzer.cargo.features": [
        "iron"
    ]
}

Client applications not necessary to specify a feature of ROS2 version, because build.rs detects ROS2 version from ROS_DISTRO environment variable.

Generating Messages Included by safe_drive

For client applications, safe_drive provides a way to transpile from ROS2's .idl files to Rust crates by using safe_drive_msg. safe_drive itself includes ROS2's some basic messages to handle parameter servers.

The basic messages can be generated by ros2msg_to_rs, and located in src/msg.

safe_drive contains common_interfaces, unique_identifier_msgs, and rcl_interfaces. These messages can be transpiled as follows. {safe_drive} of the following means a directory path to safe_drive.

$ git clone https://github.com/ros2/common_interfaces.git -b humble
$ ros2msg_to_rs --disable-common-interfaces -s crate -i common_interfaces -o {safe_drive}/src/msg/humble/common_interfaces
$ mkdir ros2msg && cd ros2msg
$ git clone https://github.com/ros2/unique_identifier_msgs.git -b humble
$ ros2msg_to_rs --disable-common-interfaces -s crate -i . -o {safe_drive}/src/msg/humble/ros2msg
$ git clone https://github.com/ros2/rcl_interfaces.git -b humble
$ rm -rf rcl_interfaces/test_msgs rcl_interfaces/builtin_interfaces
$ ros2msg_to_rs --disable-common-interfaces -s crate -i rcl_interfaces -o {safe_drive}/src/msg/humble/interfaces

-s crate means crate is the name of top module. -i and -o means input and output directories. Because safe_drive defines builtin_interfaces internally, it must be removed before transpilation.

To support newer version of ROS2, these files should be updated.

C APIs

C APIs of ROS2 are defined in src/msg/humble/runtime_c.rs and src/rcl. These files are generated by Makefile in supplements/bindgen.

MT Unsafe APIs

MT unsafe APIs requires mutex to prevent data races. So, these MT unsafe APIs must be redefined in MTUnsafeFn in src/rcl.rs. The redefined functions can be called via rcl::MT_UNSAFE_FN, which is a global variable, after locking. For example, rcl::rcl_guard_condition_init() are defined as follows.

#![allow(unused)]
fn main() {
pub(crate) static MT_UNSAFE_FN: Lazy<Mutex<MTUnsafeFn>> =
    Lazy::new(|| Mutex::new(MTUnsafeFn::new()));

pub(crate) struct MTUnsafeFn;

impl MTUnsafeFn {
    pub fn rcl_guard_condition_init(
        &self,
        guard_condition: *mut rcl_guard_condition_t,
        context: *mut rcl_context_t,
        options: rcl_guard_condition_options_t,
    ) -> RCLResult<()> {
        ret_val_to_err(unsafe { self::rcl_guard_condition_init(guard_condition, context, options) })
    }
}
}

It can be called as follows.

#![allow(unused)]
fn main() {
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_guard_condition_init(
    &mut guard_condition,
    unsafe { context.as_ptr_mut() },
    rcl::rcl_guard_condition_options_t { allocator },
)?;
}

MT Safe APIs

MT safe APIs are redefined in MTSafeFn in src/rcl.rs. For example, rcl::rcl_shutdown() is defined as follows.

#![allow(unused)]
fn main() {
pub(crate) struct MTSafeFn;

impl MTSafeFn {
    pub fn rcl_shutdown(context: *mut rcl_context_t) -> RCLResult<()> {
        ret_val_to_err(unsafe { self::rcl_shutdown(context) })
    }
}
}

It can be called without lock as follows.

#![allow(unused)]
fn main() {
rcl::MTSafeFn::rcl_shutdown(&mut self.context).unwrap();
}

Documentation

You can contribute to documents you are reading now. Documents are written in Markdown and located in mdbook. These Markdown files are compiled to HTML files and located in docs, and published at https://tier4.github.io/safe_drive/.

You can use (Mermaid)[https://mermaid.js.org/#/] notation in Markdown as follows.

graph TD;
    A-->B;
    A-->C;
    B-->D;
    C-->D;

The compilation can be done by make as follows.

$ make doc

We are not native English speaker. So, we need your help to improve documents!