
eprosima Fast DDS 的底层 RTPS 层实现了 RTPS 标准中定义的协议。与 DDS 层相比这一层提供了对通信协议内部机制更多的控制因此高级用户可以更精细地控制库的功能。4.1 与 DDS 层的关系该层的元素与 DDS 层的元素一一对应并有一些补充。这种对应关系如下表所示DDS LayerRTPS LayerDomainRTPSDomainDomainParticipantRTPSParticipantDataWriterRTPSWriterDataReaderRTPSReader4.2 如何使用 RTPS 层现在我们将像之前介绍 DDS 层一样介绍 RTPS 层的使用方法解释它提供的新特性。建议您在阅读本节时查看发行版中附带的一个示例该示例描述了如何使用 RTPS 层。它位于 examples/cpp/rtps 目录中。4.2.1 管理Participant创建 RTPSParticipant 是通过 RTPSDomain::createParticipant() 完成的。RTPSParticipantAttributes 结构体用于在创建时配置 RTPSParticipant。RTPSParticipantAttributes participant_attr; participant_attr.setName(participant); RTPSParticipant* participant RTPSDomain::createParticipant(0, participant_attr);4.2.2 管理Writers和Readers根据 RTPS 标准的规定RTPSWriter 和 RTPSReader 总是与一个历史记录History元素关联。在 DDS 层中它们的创建和管理是被隐藏的但在 RTPS 层中您可以完全控制其创建和配置。写入器通过 RTPSDomain::createRTPSWriter() 创建并使用 WriterAttributes 结构体进行配置。它们还需要一个 WriterHistory该历史记录使用 HistoryAttributes 结构体进行配置。HistoryAttributes history_attr; WriterHistory* history new WriterHistory(history_attr); WriterAttributes writer_attr; RTPSWriter* writer RTPSDomain::createRTPSWriter(participant, writer_attr, history);与创建写入器类似读取器通过 RTPSDomain::createRTPSReader() 创建并使用 ReaderAttributes 结构体进行配置。HistoryAttributes 结构体用于配置所需的 ReaderHistory。请注意在这种情况下您可以提供一个实现了您所需回调的 ReaderListener 类的特化版本class MyReaderListener : public ReaderListener { // Callbacks override }; MyReaderListener listener; HistoryAttributes history_attr; ReaderHistory* history new ReaderHistory(history_attr); ReaderAttributes reader_attr; RTPSReader* reader RTPSDomain::createRTPSReader(participant, reader_attr, history, listener);4.2.3 使用历史记录发送和接收数据在 RTPS 协议中读取器和写入器将关于某个主题的数据保存在它们关联的历史记录中。每一份数据都通过一个变更Change来表示eprosima Fast DDS 将其实现为 CacheChange_t。变更始终由历史记录进行管理。要发送数据您可以向写入器的历史记录中添加一个新的 CacheChange_t。步骤如下1. 使用 WriterHistory::create_change() 从写入器历史记录请求一个 CacheChange_t。为了分配足够的内存您需要提供有效负载中的最大字节数作为参数。2. 用数据填充 CacheChange_t。3. 使用 WriterHistory::add_change() 将其添加到历史记录中。写入器将负责处理所有事务以将数据传输给读取器。// Request a change from the history CacheChange_t* change history-create_change(255, ALIVE); // Write serialized data into the change change-serializedPayload.length sprintf((char*) change-serializedPayload.data, My example string %d, 2) 1; // Insert change into the history. The Writer takes care of the rest. history-add_change(change);如果您的主题数据类型有多个字段您将需要提供函数来将数据序列化到 CacheChange_t 中或从中反序列化出来。Fast DDS-Gen 可以为您完成这项工作。您可以在 ReaderListener::onNewCacheChangeAdded 回调中接收数据就像我们在 DDS 层中所做的那样1. 回调接收一个包含接收数据的 CacheChange_t 参数。2. 处理接收到的 CacheChange_t 中的数据。3. 通知读取器的历史记录不再需要该变更。class MyReaderListener : public ReaderListener { public: MyReaderListener() { } ~MyReaderListener() { } void onNewCacheChangeAdded( RTPSReader* reader, const CacheChange_t* const change) { // The incoming message is enclosed within the change in the function parameters printf(%s\n, change-serializedPayload.data); // Once done, remove the change reader-get_history()-remove_change((CacheChange_t*)change); } };4.2.4 管理内置传输DDS 使用传输层来允许 DDS 实体之间进行通信。eProsima Fast DDS 附带了五种已实现的传输方式。然而这些传输方式之间并不总是互斥的在某些情况下它们可以同时使用。您可以选择要使用的传输方式要么禁用内置传输并手动添加它们参见 TransportConfigQos要么使用默认的内置传输行为并选择下面列出的配置选项之一。每个选项都会修改将要实例化的传输类型。内置传输选项描述NONE不会实例化任何传输。因此用户必须手动添加所需的传输否则参与者创建将失败。DEFAULT将实例化 UDPv4 和 SHM 传输。SHM 传输的优先级高于 UDPv4 传输这意味着在可能的情况下将始终使用 SHM。DEFAULTv6将实例化 UDPv6 和 SHM 传输。SHM 传输的优先级高于 UDPv6 传输这意味着在可能的情况下将始终使用 SHM。SHM仅实例化 SHM 传输。UDPv4仅实例化 UDPv4 传输。UDPv6仅实例化 UDPv6 传输。LARGE_DATA将实例化 UDPv4、TCPv4 和 SHM 传输。但是UDP 将仅用于参与者发现阶段的组播通告参见《发现阶段》而参与者活性维持和应用程序数据传输则通过 TCP 或 SHM 进行。此配置在处理大数据时非常有用参见《大数据模式》和《Fast DDS over TCP》。RTPSParticipantAttributes participant_attr; participant_attr.setup_transports(eprosima::fastdds::rtps::BuiltinTransports::LARGE_DATA); RTPSParticipant* participant RTPSDomain::createParticipant(0, participant_attr);也可以通过使用 DomainParticipantQos 的 setup_transports() 包装函数、XML 配置文件参见 RTPS 元素类型或 FASTDDS_BUILTIN_TRANSPORTS 环境变量参见 FASTDDS_BUILTIN_TRANSPORTS来获得相同的结果。TCPv4 传输使用以下配置进行初始化calculate_crc、check_crc 和 apply_security 被设置为 false。enable_tcp_nodelay 被设置为 true。keep_alive_thread 和 accept_thread 使用默认配置。为了在使用大数据消息时获得更好的性能强烈建议使用内置传输配置选项根据应用程序的具体需求调整传输设置。有关如何配置的更多信息请参阅《使用配置选项处理大数据》。4.3 配置Readers和Writers使用 RTPS 层的一个好处是它在保留 DDS 层选项的同时提供了新的配置可能性。例如您可以像之前一样将写入器或读取器设置为可靠或尽力而为的端点writer_attr.endpoint.reliabilityKind BEST_EFFORT;4.3.1 设置数据持久性类型持久性参数定义了当新的读取器匹配时写入器关于已发送样本的行为。eProsima Fast DDS 提供三种持久性选项VOLATILE默认消息一经发送即被丢弃。如果新的读取器在第 n 条消息之后匹配它将从第 n1 条消息开始接收。TRANSIENT_LOCAL写入器保存其已发送的最后 k 条消息的记录。如果新的读取器在第 n 条消息之后匹配它将从第 n-k 条消息开始接收。TRANSIENT与 TRANSIENT_LOCAL 类似但消息记录将保存到持久化存储中因此即使写入器被销毁并重新创建或者应用程序崩溃该记录仍然可用。要选择您偏好的选项writer_attr.endpoint.durabilityKind TRANSIENT_LOCAL;由于在 RTPS 层您可以控制历史记录History因此在 TRANSIENT_LOCAL 和 TRANSIENT 模式下写入器会发送您尚未从历史记录中显式释放的所有变更。4.4 配置历史记录历史记录有其自身的配置结构即 HistoryAttributes。4.4.1 更改有效负载的最大大小您可以选择可以放入 CacheChange_t 中的有效负载的最大大小。请确保选择的大小能够容纳可能的最大数据块history_attr.payloadMaxSize 250; // Defaults to 500 bytes4.4.2 更改历史记录的大小您可以为历史记录指定要容纳的最大变更数量以及初始分配的变更数量history_attr.initialReservedCaches 250; // Defaults to 500 history_attr.maximumReservedCaches 500; // Defaults to 0 Unlimited Changes当预留的初始变更数量低于最大值时历史记录将根据需要分配更多变更直到达到最大尺寸。4.5 使用自定义Payload Pool有效负载Payload定义为用户希望在写入器和读取器之间传输的数据。RTPS 需要向此有效负载添加一些元数据以管理端点之间的通信。因此此有效负载被封装在 CacheChange_t 的 SerializedPayload_t 字段中而 CacheChange_t 的其他字段则提供所需的元数据。WriterHistory 和 ReaderHistory 为用户提供了与这些变更交互的接口写入器要传输的变更被添加到其 WriterHistory 中而读取器上已处理的变更可以从 ReaderHistory 中移除。从这个意义上说History 充当了尚未完全处理的变更的缓冲区。在正常执行期间新的变更会被添加到 History 中旧的变更会从其中移除。为了管理这些变更中包含的 Payload 的生命周期Reader 和 Writer 使用一个池对象该对象是 IPayloadPool 接口的实现。不同的池实现可以实现不同的优化。例如可以从不同的预分配内存块中获取不同大小的 Payload。Writer 和 Reader 可以自动选择最符合 HistoryAttributes 中给定配置的默认 Payload 池实现。然而可以将自定义的 Payload 池传递给 RTPSDomain::createRTPSWriter() 和 RTPSDomain::createRTPSReader() 函数。当请求或释放新的 CacheChange_t 时Writer 和 Reader 将使用提供的池。4.5.1 IPayloadPool 接口IPayloadPool::get_payload 重载带 size 参数将请求大小的空 Payload 绑定到一个 CacheChange_t 实例。然后可以用所需数据填充该 Payload。IPayloadPool::get_payload 重载带 SerializedPayload_t 参数将给定的 Payload 数据复制到池中的一个新 Payload。每个 SerializedPayload_t 包含一个指向分配其数据的池的指针。这允许进行某些优化例如如果原始 Payload 来自同一个池则共享该 Payload从而避免复制操作。IPayloadPool::release_payload将与 CacheChange_t 绑定的 Payload 归还到池中并解除绑定。在实现自定义的 Payload 池时请确保分配的 Payload 满足标准 RTPS 序列化的要求。具体来说Payload 必须足够大以容纳序列化的用户数据加上 RTPS 标准第 10.2 节中规定的 4 个八位字节的 SerializedPayloadHeader。例如如果我们知道序列化用户数据的上限我们可以考虑实现一个始终分配固定大小的 Payload 的池该大小足够容纳这些数据。如果序列化的用户数据最多有 N 个八位字节则分配的 Payload 必须至少有 N4 个八位字节。请注意向 IPayloadPool::get_payload 请求的大小已经考虑了这个 4 字节的头。4.5.2 默认Payload pool 实现如果未向 Writer 或 Reader 提供自定义的 Payload 池Fast DDS 将自动使用与 History 的 memoryPolicy 配置最匹配的默认实现。PREALLOCATED_MEMORY_MODE所有 Payload 将具有固定大小的数据缓冲区该大小等于 payloadMaxSize 的值无论向 IPayloadPool::get_payload 请求的大小如何。已释放的 Payload 可以被重新用于另一个 CacheChange_t。这以更高的内存使用为代价减少了内存分配操作。在 History 初始化期间会为初始分配的 CacheChange_t 预分配 initialReservedCaches 个 Payload。PREALLOCATED_WITH_REALLOC_MEMORY_MODE保证 Payload 的数据缓冲区至少为请求大小和 payloadMaxSize 两者中的最大值。已释放的 Payload 可以被重新用于另一个 CacheChange_t。如果至少有一个空闲的 Payload其缓冲区大小等于或大于请求的大小则不会进行内存分配。在 History 初始化期间会为初始分配的 CacheChange_t 预分配 initialReservedCaches 个 Payload。DYNAMIC_RESERVE_MEMORY_MODE每次请求 Payload 时都会在内存中分配一个具有适当大小的新 Payload。payloadMaxSize 被忽略。已释放的 Payload 的内存总是被释放因此池中永远不会有空闲的 Payload。这以减少内存使用为代价但会频繁进行内存分配。在 History 初始化期间不会预分配任何 Payload。DYNAMIC_REUSABLE_MEMORY_MODE保证 Payload 的数据缓冲区至少为请求的大小。payloadMaxSize 被忽略。已释放的 Payload 可以被重新用于另一个 CacheChange_t。如果至少有一个空闲的 Payload其缓冲区大小等于或大于请求的大小则不会进行内存分配。4.5.3 使用自定义Payload pool的示例// A simple payload pool that reserves and frees memory each time class CustomPayloadPool : public IPayloadPool { bool get_payload( uint32_t size, SerializedPayload_t payload) override { // Reserve new memory for the payload buffer octet* payload_buff new octet[size]; // Assign the payload buffer to the CacheChange and update sizes payload.data payload_buff; payload.length size; payload.max_size size; // Tell the CacheChange who needs to release its payload payload.payload_owner this; return true; } bool get_payload( const SerializedPayload_t data, SerializedPayload_t payload) { // Reserve new memory for the payload buffer octet* payload_buff new octet[data.length]; // Copy the data memcpy(payload_buff, data.data, data.length); // Tell the CacheChange who needs to release its payload payload.payload_owner this; // Assign the payload buffer to the CacheChange and update sizes payload.data payload_buff; payload.length data.length; payload.max_size data.length; return true; } bool release_payload( SerializedPayload_t payload) override { // Ensure precondition if (this ! payload.payload_owner) { std::cerr Trying to release a payload buffer allocated by a different PayloadPool. std::endl; return false; } // Dealloc the buffer of the payload delete[] payload.data; // Reset sizes and pointers payload.data nullptr; payload.length 0; payload.max_size 0; // Reset the owner of the payload payload.payload_owner nullptr; return true; } }; std::shared_ptrCustomPayloadPool payload_pool std::make_sharedCustomPayloadPool(); // A writer using the custom payload pool HistoryAttributes writer_history_attr; WriterHistory* writer_history new WriterHistory(writer_history_attr, payload_pool); WriterAttributes writer_attr; RTPSWriter* writer RTPSDomain::createRTPSWriter(participant, writer_attr, writer_history); // A reader using the same instance of the custom payload pool HistoryAttributes reader_history_attr; ReaderHistory* reader_history new ReaderHistory(reader_history_attr); ReaderAttributes reader_attr; RTPSReader* reader RTPSDomain::createRTPSReader(participant, reader_attr, payload_pool, reader_history); // Write and Read operations work as usual, but take the Payloads from the pool. // Requesting a change to the Writer will provide one with an empty Payload taken from the pool CacheChange_t* change writer_history-create_change(255, ALIVE); // Write serialized data into the change and add it to the history change-serializedPayload.length sprintf((char*) change-serializedPayload.data, My example string %d, 2) 1; writer_history-add_change(change);