Publish and Subscribe

Source code.

This chapter introduces a simple example of publish and subscribe communication. This communication is so-called topic by ROS2. There are N senders and M receivers in a topic. This tutorial implements 1 sender (in Rust) and 2 receivers (in Rust and C++).

The following figure is a example topology.

graph TD;
    Publisher1-->TopicA;
    Publisher2-->TopicA;
    TopicA-->Subscriber1;
    TopicA-->Subscriber2;
    TopicA-->Subscriber3;
    Publisher3-->TopicB;
    Publisher4-->TopicB;
    Publisher5-->TopicB;
    TopicB-->Subscriber4;
    TopicB-->Subscriber5;

There are 2 topics, TopicA and TopicB in this figure. If Publisher2 of TopicA send a message, Subscriber1-3 subscribing TopicA receive the message. Subscriber4 and Subscriber5 can receive messages sent from Publisher3-5.

Directories

First of all, create working directories as follows.

$ mkdir pubsub
$ mkdir pubsub/src

Then create projects of Rust by using cargo as follows.

$ cd pubsub/src
$ cargo new my_talker
$ cargo new my_listener

Following directories are important directories we will use throughout this tutorial.

DirectoriesWhat?
pubsub/src/my_talkerpublisher in Rust
pubsub/src/my_listenersubscriber in Rust
pubsub/installcreated by colcon

Workspace of Rust

To handle multiple projects of cargo, we recommend to prepare Cargo.toml for the workspace as follows.

pubsub/src/Cargo.toml

[workspace]
members = ["my_talker", "my_listener"]

The workspace enables rust-analyzer and reduces compilation time.


Talker in Rust

Let's start implementing a publisher. We created and will edit files of the talker as follows. These files will automatically created when you do cargo new as described before.

FilesWhat?
my_talker/Cargo.tomlfor Cargo
my_talker/package.xmlfor ROS2
my_talker/src/main.rssource code

Edit my_talker/Cargo.toml

Cargo.toml is used to describe a project of Rust. Add safe_drive to the dependencies of Cargo.toml to use it. Currently, safe_drive's repository is private, and we need to specify the path as follows.

pubsub/src/my_talker/Cargo.toml

[dependencies]
safe_drive = "0.4"
std_msgs = { path = "/tmp/safe_drive_tutorial/pubsub/std_msgs" }

[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/pubsub"
safe_drive_version = "0.3"

Edit my_talker/src/main.rs

my_talker is very simple. This does as follows.

  1. Create a context.
  2. Create a node from the context.
  3. Create a publisher from the node.
  4. Send a message periodically.

A context is a resource manager of ROS2, and it must be created first of all. A node contains publishers, subscribers, servers, and clients. A node is a functional unit of ROS2. Publishers and subscribers can be created by a node.

The following figure is an example hierarchy.

graph TD;
    Context-->NodeA
    NodeA-->SubscriberA
    NodeA-->SubscriberB
    NodeA-->PublisherA
    NodeA-->PublisherB
    NodeA-->PublisherC

In this figure, NodeA is created by a context, and subscribers and publishers are created by the node.

Now, we have understood the basics of ROS2. The following is a code of my_talker. You should also understand what this code is doing.

pubsub/src/my_talker/src/main.rs

use safe_drive::{
    context::Context, error::DynError, logger::Logger, pr_info
};
use std::time::Duration;

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("my_talker", None, Default::default())?;

    // Create a publisher.
    let publisher = node.create_publisher::<std_msgs::msg::String>("my_topic", None)?;

    // Create a logger.
    let logger = Logger::new("my_talker");

    let mut cnt = 0; // Counter.
    let mut msg = std_msgs::msg::String::new().unwrap();
    loop {
        // Create a message to be sent.
        let data = format!("Hello, World!: cnt = {cnt}");
        msg.data.assign(&data);

        pr_info!(logger, "send: {}", msg.data); // Print log.

        // Send a message.
        publisher.send(&msg)?;

        // Sleep 1s.
        cnt += 1;
        std::thread::sleep(Duration::from_secs(1));
    }
}

my_talker in Detail

create_node creates a node.

#![allow(unused)]
fn main() {
// Create a node.
let node = ctx.create_node("my_talker", None, Default::default())?;
}

The arguments indicate as follows.

  • "my_talker" : the name of the node.
  • Node : namespace.
    • We can pass a namespace like Some("namespace").
    • If None is passed, the node has no namespace.
  • Default::default() : option of node.

create_publisher creates a publisher.

#![allow(unused)]
fn main() {
// Create a publisher.
let publisher = node.create_publisher::<std_msgs::msg::String>("my_topic", None)?;
}
  • <std_msgs::msg::String> : the publisher can send values of std_msgs::msg::String.
  • "my_topic" : the topic name to which the publisher send messages.
  • None : QoS of the publisher. QoS will be described in a later chapter.

A message can be sent by send method as follows.

#![allow(unused)]
fn main() {
// Create a message to be sent.
let data = format!("Hello, World!: cnt = {cnt}");
msg.data.assign(&data);

// Send a message.
publisher.send(&msg)?;
}

Logger is used to print some messages. It can be created as follows.

#![allow(unused)]
fn main() {
// Create a logger.
let logger = Logger::new("my_talker");
}

The argument of "my_talker" is the name of the logger. To print a message, use pr_* macros as follows.

#![allow(unused)]
fn main() {
pr_info!(logger, "send: {}", msg.data); // Print log.
}

There are macros for logging as follows.

  • pr_debug! : debug
  • pr_info! : information
  • pr_warn! : warning
  • pr_err! : error
  • pr_fatal! : fatal

Create my_talker/package.xml

package.xml is used by colcon, which is a build tool used by ROS2. It contains the package name, maintainer, description, etc, as follows.

pubsub/src/my_talker/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_talker</name>
  <version>0.0.0</version>
  <description>My Talker in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <depend>std_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Copy and paste this, then edit it if you want.

Execute the Talker

Before compiling, ensure that you load setting of ROS2 as follows. If you have already done so, you do not need this.

$ . /opt/ros/iron/setup.bash

Then compile by using colcon as follows. Before compiling, change the current directory to pubsub, which is the top directory of our project.

$ cd pubsub
$ colcon build --cargo-args --release

--cargo-args --release is a option to pass the --release argument to cargo. cargo optimizes the code when --release is specified.

To launch my_talker, load the setting and use ros2 command as follows.

$ . ./install/setup.bash
$ ros2 run my_talker my_talker
[INFO] [1656048392.368568500] [my_talker]: send: Hello, World!: cnt = 0
[INFO] [1656048393.368787500] [my_talker]: send: Hello, World!: cnt = 1
[INFO] [1656048394.368994200] [my_talker]: send: Hello, World!: cnt = 2

Listener in Rust

Let's then implement a listener. We created and will edit files of the listener as follows.

FilesWhat?
my_listener/Cargo.tomlfor Cargo
my_listener/package.xmlfor ROS2
my_listener/src/main.rssource code

Edit my_listener/Cargo.toml

my_listener also requires safe_drive. Add safe_drive to the dependencies as follows.

# pubsub/src/my_listener/Cargo.toml
[dependencies]
safe_drive = "0.4"
std_msgs = { path = "/tmp/safe_drive_tutorial/pubsub/std_msgs" }

[package.metadata.ros]
msg = ["std_msgs"]
msg_dir = "/tmp/safe_drive_tutorial/pubsub"
safe_drive_version = "0.3"

Edit my_listener/src/main.rs

To implement subscriber, we have to prepare a callback function of the subscriber. This is the main difference from my_talker.

pubsub/src/my_listener/src/main.rs

use safe_drive::{
    context::Context, error::DynError, logger::Logger, pr_info,
};

fn main() -> Result<(), DynError> {
    // Create a context.
    let ctx = Context::new()?;

    // Create a node.
    let node = ctx.create_node("my_listener", None, Default::default())?;

    // Create a subscriber.
    let subscriber = node.create_subscriber::<std_msgs::msg::String>("my_topic", None)?;

    // Create a logger.
    let logger = Logger::new("my_listener");

    // Create a selector.
    let mut selector = ctx.create_selector()?;

    // Add a callback function.
    selector.add_subscriber(
        subscriber,
        Box::new(move |msg| {
            pr_info!(logger, "receive: {}", msg.data);
        }),
    );

    // Spin.
    loop {
        selector.wait()?;
    }
}

my_listener in Detail

Similar to the publisher, create_subscriber creates a subscriber.

#![allow(unused)]
fn main() {
// Create a subscriber.
let subscriber = node.create_subscriber::<std_msgs::msg::String>("my_topic", None)?;
}

The arguments are as follows.

  • "my_topic" : the name of the topic to which the subscriber is subscribing.
  • None : QoS of the subscriber. Discussed in a later chapter.

To add and invoke a callback function. We need to create a selector, which can wait some events and invoke callback functions of the events. A selector is similar to select or epoll function, and equivalent to a executer of ROS2.

#![allow(unused)]
fn main() {
// Create a selector.
let mut selector = ctx.create_selector()?;

// Add a callback function.
selector.add_subscriber(
    subscriber,
    Box::new(move |msg| {
        pr_info!(logger, "receive: {}", msg.data);
    }),
);

// Spin.
loop {
    selector.wait()?;
}
}

The arguments of add_subscriber method are as follows.

  • subscriber : the subscriber.
  • Box::new(move |msg| { pr_info!(logger, "receive: {}", msg.data); }) : the callback function.
  • false : the callback function is called only once or not. If false is passed, the callback function is invoked every time and forever.

selector.wait() wait events. To receive events forever, use infinite loop.

Create my_listener/package.xml

package.xml is almost same as above. The only difference is the name of the package, which is my_listener.

pubsub/src/my_listener/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_listener</name>
  <version>0.0.0</version>
  <description>My Listener in Rust</description>
  <maintainer email="yuuki.takano@tier4.jp">Yuuki Takano</maintainer>
  <license>Apache License 2.0</license>

  <depend>std_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cargo</build_type>
  </export>
</package>

Execute the Listener

Before compiling and executing, execute my_talker in other terminal, and don't forget to load the setting of ROS2 as mentioned above. The compilation can be done as follows.

$ cd pubsub
$ colcon build --cargo-args --release

To execute my_listener, load the setting of our projects and execute it by using ros2 command as follows.

$ . ./install/setup.bash
$ ros2 run my_listener my_listener
[INFO] [1656050459.231579900] [my_listener]: receive: Hello, World!: cnt = 4
[INFO] [1656050460.231831200] [my_listener]: receive: Hello, World!: cnt = 5
[INFO] [1656050461.232120000] [my_listener]: receive: Hello, World!: cnt = 6

Nicely done! We are receiving messages sent from my_talker.