6.1. 使用“zero-copy”

功能背景

通信是机器人开发引擎的基础功能,原生ROS进行大数据量通信时存在时延较大、系统负载较高等问题。TogetheROS.Bot基于地平线系统软件库hbmem实现了“zero-copy”功能,数据跨进程传输零拷贝,可大大减少大块数据传输延时和系统资源占用。本节介绍如何使用tros.b hbmem接口创建publisher和subscriber node进行大块数据传输,并计算传输延时。

前置条件

已按照安装成功安装tros.b,并已掌握ROS2 node,topic,qos等基础知识,以及如何创建package和使用自定义消息,具体教程可见ROS2官方文档

已安装ROS2软件包构建系统ament_cmake。安装命令:apt update; apt-get install python3-catkin-pkg; pip3 install empy

已安装ROS2编译工具colcon。安装命令:pip3 install -U colcon-common-extensions

任务内容

1 创建package

打开一个新的终端,source tros.b setup脚本,确保ros2命令可以运行。

source /opt/tros/local_setup.bash

使用以下命令创建一个workspace,详细介绍可见ROS2 官方教程Creating a workspace

mkdir -p ~/dev_ws/src
cd ~/dev_ws/src

运行以下命令创建一个package

ros2 pkg create --build-type ament_cmake hbmem_pubsub

2 创建自定义消息

2.1 新建消息文件

运行以下命令,创建msg目录用来存放自定义消息文件

cd ~/dev_ws/src/hbmem_pubsub
mkdir msg

msg目录下新建SampleMessage.msg文件,具体内容如下:

int32 index
uint64 time_stamp
uint8[4194304] data

uint32 MAX_SIZE=4194304

2.2 编译依赖

返回到~/dev_ws/src/hbmem_pubsub目录,修改package.xml,在<buildtool_depend>ament_cmake</buildtool_depend>下面添加以下内容:

  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

2.3 编译脚本

修改CMakeLists.txt,在# find_package(<dependency> REQUIRED)下面添加以下内容,进行msg编译:

find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SampleMessage.msg"
)

3 创建消息发布节点

3.1 新建消息发布节点文件

~/dev_ws/src/hbmem_pubsub/src目录下新建 publisher_hbmem.cpp文件,用来创建publisher node,具体代码和解释如下:

#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"

using namespace std::chrono_literals;


class MinimalHbmemPublisher  : public rclcpp::Node {
 public:
  MinimalHbmemPublisher () : Node("minimal_hbmem_publisher"), count_(0) {
    // 创建publisher_hbmem,topic为"topic",QOS为KEEPLAST(10),以及默认的可靠传输
    publisher_ = this->create_publisher_hbmem<hbmem_pubsub::msg::SampleMessage>(
        "topic", 10);

    // 定时器,每隔40毫秒调用一次timer_callback进行消息发送
    timer_ = this->create_wall_timer(
        40ms, std::bind(&MinimalHbmemPublisher ::timer_callback, this));
  }

 private:
  // 定时器回调函数
  void timer_callback() {
    // 获取要发送的消息
    auto loanedMsg = publisher_->borrow_loaned_message();
    // 判断消息是否可用,可能出现获取消息失败导致消息不可用的情况
    if (loanedMsg.is_valid()) {
      // 引用方式获取实际的消息
      auto& msg = loanedMsg.get();
      
      // 获取当前时间,单位为us
      auto time_now =
          std::chrono::duration_cast<std::chrono::microseconds>(
              std::chrono::steady_clock::now().time_since_epoch()).count();
      
      // 对消息的index和time_stamp进行赋值
      msg.index = count_;
      msg.time_stamp = time_now;
      
      // 打印发送消息
      RCLCPP_INFO(this->get_logger(), "message: %d", msg.index);
      publisher_->publish(std::move(loanedMsg));
      // 注意,发送后,loanedMsg已不可用
      // 计数器加一
      count_++;
    } else {
      // 获取消息失败,丢弃该消息
      RCLCPP_INFO(this->get_logger(), "Failed to get LoanMessage!");
    }
  }
  
  // 定时器
  rclcpp::TimerBase::SharedPtr timer_;

  // hbmem publisher
  rclcpp::PublisherHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr publisher_;
  
  // 计数器
  size_t count_;
};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<MinimalHbmemPublisher>());
  rclcpp::shutdown();
  return 0;
}

3.2 编译依赖

返回到~/dev_ws/src/hbmem_pubsub目录,修改package.xml,在<member_of_group>rosidl_interface_packages</member_of_group> 下面增加rclcpp依赖:

  <depend>rclcpp</depend>

3.3 编译脚本

修改CMakeLists.txt,在rosidl_generate_interfaces语句下面添加以下内容,完成publisher编译:

find_package(rclcpp REQUIRED)

add_executable(talker src/publisher_hbmem.cpp)
ament_target_dependencies(talker rclcpp)
rosidl_target_interfaces(talker
  ${PROJECT_NAME} "rosidl_typesupport_cpp")

install(TARGETS
  talker
  DESTINATION lib/${PROJECT_NAME})

4 创建消息接收节点

4.1 新建消息接收节点文件

~/dev_ws/src/hbmem_pubsub/src目录下新建  subscriber_hbmem.cpp文件,用来创建subscriber node,具体代码和解释如下:

#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"


class MinimalHbmemSubscriber  : public rclcpp::Node {
 public:
  MinimalHbmemSubscriber () : Node("minimal_hbmem_subscriber") {
    // 创建subscription_hbmem,topic为"sample",QOS为KEEPLAST(10),以及默认的可靠传输
    // 消息回调函数为topic_callback
    subscription_ =
        this->create_subscription_hbmem<hbmem_pubsub::msg::SampleMessage>(
            "topic", 10,
            std::bind(&MinimalHbmemSubscriber ::topic_callback, this,
                      std::placeholders::_1));
  }

 private:
  // 消息回调函数
  void topic_callback(
      const hbmem_pubsub::msg::SampleMessage::SharedPtr msg) const {
    // 注意,msg只能在回调函数中使用,回调函数返回后,该消息就会被释放
    // 获取当前时间
    auto time_now =
        std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::steady_clock::now().time_since_epoch())
            .count();
    // 计算延时并打印出来
    RCLCPP_INFO(this->get_logger(), "msg %d, time cost %dus", msg->index,
                time_now - msg->time_stamp);
  }
  
  // hbmem subscription
  rclcpp::SubscriptionHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr
      subscription_;
};


int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<MinimalHbmemSubscriber>());
  rclcpp::shutdown();
  return 0;
}

4.2 编译脚本

返回到~/dev_ws/src/hbmem_pubsub目录,之前已经在package.xml中增加rclcpp依赖,故不需要需改package.xml

修改CMakeLists.txt,在install语句下面添加以下内容,完成subscriber编译:

add_executable(listener src/subscriber_hbmem.cpp)
ament_target_dependencies(listener rclcpp)
rosidl_target_interfaces(listener
  ${PROJECT_NAME} "rosidl_typesupport_cpp")

install(TARGETS
  listener
  DESTINATION lib/${PROJECT_NAME})

5 编译和运行

整个workspace目录结构如下:

dev_ws/
└── src
    └── hbmem_pubsub
        ├── CMakeLists.txt
        ├── include
        │   └── hbmem_pubsub
        ├── msg
        │   └── SampleMessage.msg
        ├── package.xml
        └── src
            ├── publisher_hbmem.cpp
            └── subscriber_hbmem.cpp

完整的package.xml内容如下:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>hbmem_pubsub</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="root@todo.todo">root</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <depend>rclcpp</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

完整的CMakeLists.txt内容如下:

cmake_minimum_required(VERSION 3.5)
project(hbmem_pubsub)

# Default to C99
if(NOT CMAKE_C_STANDARD)
  set(CMAKE_C_STANDARD 99)
endif()

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
  set(CMAKE_CXX_STANDARD 14)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SampleMessage.msg"
)

find_package(rclcpp REQUIRED)

add_executable(talker src/publisher_hbmem.cpp)
ament_target_dependencies(talker rclcpp)
rosidl_target_interfaces(talker
  ${PROJECT_NAME} "rosidl_typesupport_cpp")

install(TARGETS
  talker
  DESTINATION lib/${PROJECT_NAME})

add_executable(listener src/subscriber_hbmem.cpp)
ament_target_dependencies(listener rclcpp)
rosidl_target_interfaces(listener
  ${PROJECT_NAME} "rosidl_typesupport_cpp")

install(TARGETS
  listener
  DESTINATION lib/${PROJECT_NAME})

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # uncomment the line when a copyright and license is not present in all source files
  #set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # uncomment the line when this package is not in a git repo
  #set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

ament_package()

在workspace根目录~/dev_ws,编译package:

colcon build --packages-select hbmem_pubsub

若提示colcon命令未安装,使用以下命令安装即可:

pip3 install -U colcon-common-extensions

打开一个新的终端,cddev_ws目录,source tros.b和当前workspace setup文件:

source /opt/tros/local_setup.bash
cd ~/dev_ws
. install/setup.bash

运行talker node:

ros2 run hbmem_pubsub talker

终端上会出现如下打印:

[INFO] [1649227473.431381673] [minimal_hbmem_publisher]: message: 0
[INFO] [1649227473.470746697] [minimal_hbmem_publisher]: message: 1
[INFO] [1649227473.510923361] [minimal_hbmem_publisher]: message: 2
[INFO] [1649227473.550886783] [minimal_hbmem_publisher]: message: 3
[INFO] [1649227473.590664377] [minimal_hbmem_publisher]: message: 4
[INFO] [1649227473.630857041] [minimal_hbmem_publisher]: message: 5

再打开一个新的终端,同样cddev_ws目录,然后souce setup文件,之后运行listener node:

source /opt/tros/local_setup.bash
cd ~/dev_ws
. install/setup.bash

ros2 run hbmem_pubsub listener

终端上会有如下打印,表明subscriber已成功接收到publisher发送的消息:

[INFO] [1649227450.387089523] [minimal_hbmem_subscriber]: msg 10, time cost 1663us
[INFO] [1649227450.427071280] [minimal_hbmem_subscriber]: msg 11, time cost 1713us
[INFO] [1649227450.466993413] [minimal_hbmem_subscriber]: msg 12, time cost 1622us
[INFO] [1649227450.507029960] [minimal_hbmem_subscriber]: msg 13, time cost 1666us
[INFO] [1649227450.546146910] [minimal_hbmem_subscriber]: msg 14, time cost 998us
[INFO] [1649227450.587002681] [minimal_hbmem_subscriber]: msg 15, time cost 1768us

使用Ctrl+C可结束每个Node的运行。

本节总结

如果你已经掌握ROS2的publisher和subscriber使用方式,那么很容易切换到hbmem的publisher和subscriber,使用时只需要做以下改动:

  • rclcpp::Publisher 改为 rclcpp::PublisherHbmem

  • create_publisher 改为 create_publisher_hbmem

  • rclcpp::Subscription 改为 rclcpp::SubscriptionHbmem

  • create_subscription 改为 create_subscription_hbmem

  • publisher发送消息前要先调用borrow_loaned_message获取消息,然后确认消息是否可用,若可用,再进行赋值,发送

  • subscription在回调函数中处理接收到的消息,且接收到的消息只能在回调函数中使用,回调函数执行完,该消息就会释放

注意:

  • 使用基于hbmem的零拷贝会占用ion内存,若创建多个较大消息的publisher,可能出现ion内存不够用,导致创建失败问题。

  • 创建publisher时会一次性申请KEEPLAST的三倍个消息大小的ion内存(最大为256MB),用于消息的传输,之后不会再动态申请。若subscriber端消息处理出错或者未及时处理,则会出现消息buffer都被占用,publisher一直获取不到可用消息的情况。

使用限制

和ROS2的publisher/subscriber数据传输方式相比,使用基于hbmem的零拷贝传输存在以下限制:

  • QOS History只支持KEEPLAST,不支持KEEPALL,且KEEPLAST不能设置太大,有内存限制,目前设置为最大占用256M内存

  • 传输的消息大小是固定的,即消息的sizeof值是不变的,不能包含可变长度类型数据,例如:string,动态数组

  • 只能用于同一设备进程间通信,不可跨设备传输

  • publisher消息要先获取再赋值发送,且要判断是否获取成功

  • subscriber收到的消息有效期仅限回调函数中,不能在回调函数之外使用