cartographer中传感器数据的分发处理
传感器数据走向分析
Node类的HandleLaserScanMessage函数
void Node::HandleLaserScanMessage(const int trajectory_id,
const std::string& sensor_id,
const sensor_msgs::LaserScan::ConstPtr& msg) {
map_builder_bridge_.sensor_bridge(trajectory_id)
->HandleLaserScanMessage(sensor_id, msg);
}
这个函数是订阅LaserScan的topic绑定的回调函数,这个函数执行的是map_builder_bridge里的sensor_bridge里的HandleLaserScanMessage这个函数。
sensor_bridge里的HandleLaserScanMessage函数
// 处理LaserScan数据, 先转成点云,再传入trajectory_builder_
void SensorBridge::HandleLaserScanMessage(
const std::string& sensor_id, const sensor_msgs::LaserScan::ConstPtr& msg) {
carto::sensor::PointCloudWithIntensities point_cloud;
carto::common::Time time;
std::tie(point_cloud, time) = ToPointCloudWithIntensities(*msg);
HandleLaserScan(sensor_id, time, msg->header.frame_id, point_cloud);
}
这个函数是将激光雷达的点云数据加上时间和强度信息,并将处理后的点云数据以及sensor_id,frame_id,time一起传入HandleLaserScan这个函数。
HandleLaserScan这个函数最后是调用了HandleRangefinder进行处理。
HandleRangefinder函数
void SensorBridge::HandleRangefinder(
const std::string& sensor_id, const carto::common::Time time,
const std::string& frame_id, const carto::sensor::TimedPointCloud& ranges) {
...
if (sensor_to_tracking != nullptr) {
trajectory_builder_->AddSensorData(
sensor_id, carto::sensor::TimedPointCloudData{
time,
sensor_to_tracking->translation().cast<float>(),
// 将点云从雷达坐标系下转到tracking_frame坐标系系下
carto::sensor::TransformTimedPointCloud(
ranges, sensor_to_tracking->cast<float>())} ); // 强度始终为空
}
}
这个函数也没有对数据进行实际的处理,而是调用trajectory_builder_->AddSensorData这个函数,对点云的坐标进行坐标变换。
SensorBridge的*trajectory_builder_实际也是指向CollatedTrajectoryBuilder的指针
这个类不区分2d与3d,整理传感器数据,将传感器数据按照时间顺序进行排列,然后发送到GlobalTrajectoryBuilder。
CollatedTrajectoryBuilder在MapBuilder::AddTrajectoryBuilder中初始化
int MapBuilder::AddTrajectoryBuilder(
const std::set<SensorId>& expected_sensor_ids,
const proto::TrajectoryBuilderOptions& trajectory_options,
LocalSlamResultCallback local_slam_result_callback) {
...
trajectory_builders_.push_back(absl::make_unique<CollatedTrajectoryBuilder>(
trajectory_options, sensor_collator_.get(), trajectory_id,
expected_sensor_ids,
// 将2D前端与2D位姿图打包在一起, 传入CollatedTrajectoryBuilder
CreateGlobalTrajectoryBuilder2D(
std::move(local_trajectory_builder), trajectory_id,
static_cast<PoseGraph2D*>(pose_graph_.get()),
local_slam_result_callback, pose_graph_odometry_motion_filter)));
}
CollatedTrajectoryBuilder类的AddSensorData函数
void AddSensorData(
const std::string& sensor_id,
const sensor::TimedPointCloudData& timed_point_cloud_data) override {
AddData(sensor::MakeDispatchable(sensor_id, timed_point_cloud_data));
}
这个函数调用AddData函数添加数据
CollatedTrajectoryBuilder类的AddData函数
void CollatedTrajectoryBuilder::AddData(std::unique_ptr<sensor::Data> data) {
sensor_collator_->AddSensorData(trajectory_id_, std::move(data));
}
将数据传入sensor_collator_的AddSensorData进行排序
sensor_collator_的初始化
在MapBuilder的构造函数中对sensor_collator_进行初始化
MapBuilder::MapBuilder(const proto::MapBuilderOptions& options)
: options_(options), thread_pool_(options.num_background_threads()) {
...
// 在 cartographer/configuration_files/map_builder.lua 中设置
// param: MAP_BUILDER.collate_by_trajectory 默认为false
if (options.collate_by_trajectory()) {
sensor_collator_ = absl::make_unique<sensor::TrajectoryCollator>();
} else {
// sensor_collator_初始化, 实际使用这个
sensor_collator_ = absl::make_unique<sensor::Collator>();
}
}
然后再MapBuilder
类的AddTrajectoryBuilder函数中将sensor_collator_的指针传给了CollatedTrajectoryBuilder这个类。
CollatedTrajectoryBuilder的sensor_collator_是指向Collator的指针,collator类继承的是CollatorInterface这个类。
Collator类的AddSensorData函数
void Collator::AddSensorData(const int trajectory_id,
std::unique_ptr<Data> data) {
QueueKey queue_key{trajectory_id, data->GetSensorId()};
queue_.Add(std::move(queue_key), std::move(data));
}
这个函数完成的是向数据队列中添加传感器数据,将原始的data以及封装好的queue_key传入到queue.Add
上述的Add函数在 OrderedMultiQueue类中
OrderedMultiQueue类的Add函数
向数据队列中添加数据
void OrderedMultiQueue::Add(const QueueKey& queue_key,
std::unique_ptr<Data> data) {
auto it = queues_.find(queue_key);
// 如果queue_key不在queues_中, 就忽略data
if (it == queues_.end()) {
LOG_EVERY_N(WARNING, 1000)
<< "Ignored data for queue: '" << queue_key << "'";
return;
}
// 向数据队列中添加数据
it->second.queue.Push(std::move(data));
// 传感器数据的分发处理
Dispatch();
}
传感器数据进入Dispatch函数进行分发处理
OrderedMultiQueue类的Dispatch函数
文档最后对Dispatch函数做详解,这里只说明数据的传输流程。
void OrderedMultiQueue::Dispatch() {
while (true) {
const Data* next_data = nullptr;
Queue* next_queue = nullptr;
QueueKey next_queue_key;
...
if (next_data->GetTime() >= common_start_time) {
// Happy case, we are beyond the 'common_start_time' already.
// 更新分发数据的时间
last_dispatched_time_ = next_data->GetTime();
// 将数据传入 callback() 函数进行处理,并将这个数据从数据队列中删除
next_queue->callback(next_queue->queue.Pop());
callback函数是在Collator::AddTrajectory()中使用labmda表达式传参
void Collator::AddTrajectory(
const int trajectory_id,
const absl::flat_hash_set<std::string>& expected_sensor_ids,
const Callback& callback) {
for (const auto& sensor_id : expected_sensor_ids) {
const auto queue_key = QueueKey{trajectory_id, sensor_id};
queue_.AddQueue(queue_key,
// void(std::unique_ptr<Data> data) 带了个默认参数sensor_id
[callback, sensor_id](std::unique_ptr<Data> data) {
callback(sensor_id, std::move(data));
});
queue_keys_[trajectory_id].push_back(queue_key);
}
}
callback是CollatedTrajectoryBuilder::HandleCollatedSensorData这个函数
void CollatedTrajectoryBuilder::HandleCollatedSensorData(
const std::string& sensor_id, std::unique_ptr<sensor::Data> data) {
...
// 将排序好的数据送入 GlobalTrajectoryBuilder中的AddSensorData()函数中进行使用
data->AddToTrajectoryBuilder(wrapped_trajectory_builder_.get());
// 调用传入的trajectory_builder的AddSensorData()
void AddToTrajectoryBuilder(
mapping::TrajectoryBuilderInterface *const trajectory_builder) override {
trajectory_builder->AddSensorData(sensor_id_, data_);
}
trajectory_builder实际上就是GlobalTrajectoryBuilder的指针
从GlobalTrajectoryBuilder开始,数据就真正走到了slam的前端与后端部分。
注:数据首先都是先传入缓冲区,然后从缓冲区里面读取,缓冲区处理代码位于:cartographer/cartographer/common/internal/blocking_queue.h
CollatedTrajectoryBuilder类
代码路径:cartographer/mapping/internal/collated_trajectory_builder.cc
这个类不区分2d与3d,整理传感器数据,将传感器数据按照时间顺序进行排列,然后发送到GlobalTrajectoryBuilder。
collator类
collator::AddTrajectory
在CollatedTrajectoryBuilder的构造函数中调用
sensor_collator_->AddTrajectory(
trajectory_id, expected_sensor_id_strings,
[this](const std::string& sensor_id, std::unique_ptr<sensor::Data> data) {
HandleCollatedSensorData(sensor_id, std::move(data));
});
进入AddTrajectory函数
@brief 添加轨迹以生成排序的传感器输出, 每个topic设置一个回调函数
*
* @param[in] trajectory_id 新生成的轨迹的id
* @param[in] expected_sensor_ids 需要排序的topic名字的集合
* @param[in] callback 2个参数的回调函数, 实际是CollatedTrajectoryBuilder::HandleCollatedSensorData()函数
*/
void Collator::AddTrajectory(
const int trajectory_id,
const absl::flat_hash_set<std::string>& expected_sensor_ids,
const Callback& callback) {
for (const auto& sensor_id : expected_sensor_ids) {
const auto queue_key = QueueKey{trajectory_id, sensor_id};
queue_.AddQueue(queue_key,
// void(std::unique_ptr<Data> data) 带了个默认参数sensor_id
[callback, sensor_id](std::unique_ptr<Data> data) {
callback(sensor_id, std::move(data));
});
queue_keys_[trajectory_id].push_back(queue_key);
}
}
其中QueueKey在ordered_multi_queue.h中定义
struct QueueKey {
int trajectory_id; // 轨迹id
std::string sensor_id; // topic名字
// 重载小于运算符, map根据这个规则对QueueKey进行排序
// 以tuple规则比较2者, tuple定义了<运算符, 逐个元素进行比较
bool operator<(const QueueKey& other) const {
return std::forward_as_tuple(trajectory_id, sensor_id) <
std::forward_as_tuple(other.trajectory_id, other.sensor_id);
}
};
重载小于运算符, map根据这个规则对QueueKey进行排序,以tuple规则比较2者, tuple定义了<运算符, 逐个元素进行比较
Collator::FinishTrajectory
void Collator::FinishTrajectory(const int trajectory_id) {
for (const auto& queue_key : queue_keys_[trajectory_id]) {
queue_.MarkQueueAsFinished(queue_key);
}
}
将 trajectory_id 标记为完成
获取到对应trajectory_id的queue_key传入到queue.MarkQueueAsFinished函数中,进行finish的标记操作。
Collator::AddSensorData
这个函数是向数据队列中添加传感器数据
void Collator::AddSensorData(const int trajectory_id,
std::unique_ptr<Data> data) {
QueueKey queue_key{trajectory_id, data->GetSensorId()};
queue_.Add(std::move(queue_key), std::move(data));
}
Collator::Flush()
// 将所有数据队列标记为已完成,分派所有剩下的传感器数据
// 只能调用一次, 在 Flush 之后不能再调用 AddSensorData()
void Collator::Flush() { queue_.Flush(); }
Collator::GetBlockingTrajectoryId()
// 返回在 CollatorInterface 解锁之前需要更多数据的轨迹的 ID
// 对于不等待特定轨迹的实现, 返回 'nullopt'
absl::optional<int> Collator::GetBlockingTrajectoryId() const {
return absl::optional<int>(queue_.GetBlocker().trajectory_id);
}
数据分发
OrderedMultiQueue类的Dispatch函数
void OrderedMultiQueue::Dispatch() {
while (true) {
/*
queues_:
(0, scan): { 4, }
(0, imu): {1, 3, 5, }
(0, odom): { 2, 6,}
*/
const Data* next_data = nullptr;
Queue* next_queue = nullptr;
QueueKey next_queue_key;
// Step: 1 遍历所有的数据队列, 找到所有数据队列的第一个数据中时间最老的一个数据
for (auto it = queues_.begin(); it != queues_.end();) {
// c++11: auto*(指针类型说明符), auto&(引用类型说明符), auto &&(右值引用)
// 获取当前队列中时间最老的一个的一个数据
const auto* data = it->second.queue.Peek<Data>();
if (data == nullptr) {
// 如果队列已经处于finished状态了, 就删掉这个队列
if (it->second.finished) {
queues_.erase(it++);
continue;
}
// 退出条件1: 某个话题的数据队列为空同时又不是完成状态, 就先退出, 发布log并标记为阻塞者
CannotMakeProgress(it->first);
return;
}
// 第一次进行到这里或者data的时间比next_data的时间小(老数据)
// 就更新next_data, 并保存当前话题的数据队列以及queue_key
if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
next_data = data;
next_queue = &it->second;
next_queue_key = it->first;
}
// 数据的时间戳不是按顺序的, 就报错
CHECK_LE(last_dispatched_time_, next_data->GetTime())
<< "Non-sorted data added to queue: '" << it->first << "'";
++it;
} // end for
// 退出条件2: 只有多队列queues_为空, 才可能next_data==nullptr
if (next_data == nullptr) {
CHECK(queues_.empty());
return;
}
// If we haven't dispatched any data for this trajectory yet, fast forward
// all queues of this trajectory until a common start time has been reached.
// 如果我们还没有为这个轨迹分配任何数据, 快进这个轨迹的所有队列, 直到达到一个共同的开始时间
// Step: 2 获取对应轨迹id的所有数据队列中的最小共同时间戳, 作为轨迹开始的时间
const common::Time common_start_time =
GetCommonStartTime(next_queue_key.trajectory_id);
// Step: 3 将 next_queue 的时间最老的一个数据传入回调函数进行处理
// 大多数情况, 数据时间都会超过common_start_time的
if (next_data->GetTime() >= common_start_time) {
// Happy case, we are beyond the 'common_start_time' already.
// 更新分发数据的时间
last_dispatched_time_ = next_data->GetTime();
// 将数据传入 callback() 函数进行处理,并将这个数据从数据队列中删除
next_queue->callback(next_queue->queue.Pop());
}
// 数据时间小于common_start_time,同时数据队列数据的个数小于2,只有1个数据的情况 罕见
else if (next_queue->queue.Size() < 2) {
// 退出条件3: 数据队列数据的个数少,又不是完成状态, 不能确定现在到底是啥情况, 就先退出稍后再处理
if (!next_queue->finished) {
// We cannot decide whether to drop or dispatch this yet.
CannotMakeProgress(next_queue_key);
return;
}
// 处于完成状态了, 将数据传入 callback() 函数进行最后几个数据的处理
// 更新分发数据的时间,将数据传入 callback() 进行处理,并将这个数据从数据队列中删除
last_dispatched_time_ = next_data->GetTime();
next_queue->callback(next_queue->queue.Pop());
}
// 数据时间小于common_start_time,同时数据队列数据的个数大于等于2个
else {
// We take a peek at the time after next data. If it also is not beyond
// 'common_start_time' we drop 'next_data', otherwise we just found the
// first packet to dispatch from this queue.
// 只处理数据在common_start_time的前一个数据, 其他更早的数据会被丢弃掉
std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
// 更新分发数据的时间,将数据传入 callback() 进行处理
last_dispatched_time_ = next_data->GetTime();
next_queue->callback(std::move(next_data_owner));
}
}
}
}
这个函数将处于数据队列中的数据,按照时间顺序依次传入回调函数(数据分发)
执行步骤:
step1. 遍历所有的数据队列, 找到所有数据队列的第一个数据中时间最老的一个数据
step2. 获取对应轨迹id的所有数据队列中的最小共同时间戳, 作为轨迹开始的时间
step3. 将 next_queue 的时间最老的一个数据传入回调函数进行处理
有三种退出情况:
-
某个话题的数据队列为空同时又不是完成状态, 就退出
有两种可能,一种是某一个话题的消息始终没有收到,另一种是对这个话题消息使用过快,是的这个话题中没有消息了
-
只有多队列queues_为空, 就退出,
queue为空的情况,是在还未执行AddQueue函数的时候,对其进行构造,而执行dispatch是,这个函数早就执行过了,所以,queues_正常不为空。
-
数据队列中数据的个数只有1个,又不是完成状态,不能确定状态, 就先退出
/*
queues_:
(0, scan): { 4, }
(0, imu): {1, 3, 5, }
(0, odom): { 2, 6,}
*/
这个注释是对queue_key 的简单的举例
前面()里的0代表的是trajectory_id是0,第0条轨迹,后面是各个传感器,{}里面是对应的时间戳,可以看出imu是最快的,数据对应的时间戳是1,3,5;scan是最慢的,时间戳是4。
const common::Time common_start_time =
GetCommonStartTime(next_queue_key.trajectory_id);
找到数据队列中所有第一帧的最大时间(共同时间),对于某个id轨迹的common_start_time只会计算一次。文章来源:https://uudwc.com/A/Eva1Y
进入GetCommonStartTime函数文章来源地址https://uudwc.com/A/Eva1Y
common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
// c++11: map::emplace() 返回的 pair 对象
// pair 的成员变量 first 是一个指向插入元素或阻止插入的元素的迭代器
// 成员变量 second 是个布尔值, 表示是否插入成功, 如果这个元素的索引已经存在插入会失败,返回false
auto emplace_result = common_start_time_per_trajectory_.emplace(
trajectory_id, common::Time::min());
common::Time& common_start_time = emplace_result.first->second;
// 如果插入成功了就找到时间戳最大的对common_start_time进行更新, 失败了就不更新
// 只会在轨迹开始时插入成功一次
if (emplace_result.second) {
// 找到这个轨迹下,所有数据队列中数据的时间戳最大 的时间戳
// 执行到这里时, 所有的数据队列都有值了, 因为没值的情况在Dispatch()中提前返回了
for (auto& entry : queues_) {
if (entry.first.trajectory_id == trajectory_id) {
common_start_time = std::max(
common_start_time, entry.second.queue.Peek<Data>()->GetTime());
}
}
LOG(INFO) << "All sensor data for trajectory " << trajectory_id
<< " is available starting at '" << common_start_time << "'.";
}
return common_start_time;
}