团团虾导读:从代码出发的实战笔记。以一个完整的 minimal_example.cpp 为起点,逐行走读 target 节点(注册内存→等待 RDMA 操作)和 initiator 节点(打开远端 Segment→构造 TransferRequest→submit→轮询完成)的完整流程。关键洞察:initiator 也需要 registerLocalMemory,因为 RDMA 操作同时需要两端的 lkey 和 rkey。
第一篇:最小 C++ 示例全解
这是一个端到端、可直接编译运行的 C++ 示例,展示了一个 target 节点和一个 initiator 节点之间的数据传输。source 文件位于:
mooncake-transfer-engine/example/minimal_example.cpp
完整代码
// Minimal Mooncake Transfer Engine example
//
// Build: add to mooncake-transfer-engine/example/CMakeLists.txt:
// add_executable(minimal_example ${WORKSPACE}/minimal_example.cpp)
// target_link_libraries(minimal_example PUBLIC transfer_engine)
//
// Usage:
// Target node: ./minimal_example --mode=target
// Initiator node: ./minimal_example --mode=initiator --segment_id=<target_hostname>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <cstdlib>
#include <memory>
#include <thread>
#include "common.h"
#include "transfer_engine.h"
#include "transport/transport.h"
DEFINE_string(mode, "target", "Mode: target | initiator");
DEFINE_string(metadata_server, "192.168.3.77:2379", "Metadata server address");
DEFINE_string(local_server_name, mooncake::getHostname(), "Local server name");
DEFINE_string(protocol, "tcp", "Transport protocol: rdma | tcp");
DEFINE_string(device_name, "", "Device name (for RDMA)");
DEFINE_string(segment_id, "", "Segment ID of the target node (initiator only)");
DEFINE_uint64(buffer_size, 1ULL << 30, "Buffer size (1GB default)");
using namespace mooncake;
static const size_t kBlockSize = 65536;
int runTarget() {
auto engine = std::make_unique<TransferEngine>(false);
auto [host, port] = parseHostNameWithPort(FLAGS_local_server_name);
engine->init(FLAGS_metadata_server, FLAGS_local_server_name,
host.c_str(), port);
// Install transport
if (FLAGS_protocol == "tcp") {
engine->installTransport("tcp", nullptr);
} else if (FLAGS_protocol == "rdma") {
std::string matrix = "{\"cpu:0\": [[\"" + FLAGS_device_name +
"\"], []], \"cpu:1\": [[\"" + FLAGS_device_name +
"\"], []]}";
void** args = (void**)malloc(2 * sizeof(void*));
args[0] = (void*)matrix.c_str();
args[1] = nullptr;
engine->installTransport("rdma", args);
free(args);
}
// Allocate and register memory
void* buffer = numa_alloc_onnode(FLAGS_buffer_size, 0);
int rc = engine->registerLocalMemory(buffer, FLAGS_buffer_size, "cpu:0");
LOG_ASSERT(!rc);
LOG(INFO) << "Target ready. Segment: " << FLAGS_local_server_name
<< ", buffer: " << buffer << ", size: " << FLAGS_buffer_size;
while (true) std::this_thread::sleep_for(std::chrono::seconds(1));
engine->unregisterLocalMemory(buffer);
numa_free(buffer, FLAGS_buffer_size);
return 0;
}
int runInitiator() {
LOG_ASSERT(!FLAGS_segment_id.empty())
<< "--segment_id is required for initiator mode";
auto engine = std::make_unique<TransferEngine>(false);
auto [host, port] = parseHostNameWithPort(FLAGS_local_server_name);
engine->init(FLAGS_metadata_server, FLAGS_local_server_name,
host.c_str(), port);
// Install transport (same as target)
if (FLAGS_protocol == "tcp") {
engine->installTransport("tcp", nullptr);
} else if (FLAGS_protocol == "rdma") {
std::string matrix = "{\"cpu:0\": [[\"" + FLAGS_device_name +
"\"], []], \"cpu:1\": [[\"" + FLAGS_device_name +
"\"], []]}";
void** args = (void**)malloc(2 * sizeof(void*));
args[0] = (void*)matrix.c_str();
args[1] = nullptr;
engine->installTransport("rdma", args);
free(args);
}
// Allocate and register local memory
void* buffer = numa_alloc_onnode(FLAGS_buffer_size, 0);
int rc = engine->registerLocalMemory(buffer, FLAGS_buffer_size, "cpu:0");
LOG_ASSERT(!rc);
// Open remote segment
SegmentID segment_id = engine->openSegment(FLAGS_segment_id);
LOG(INFO) << "Opened segment: " << FLAGS_segment_id
<< ", segment_id: " << segment_id;
// Get remote buffer address from segment metadata
auto seg_desc = engine->getMetadata()->getSegmentDescByID(segment_id);
LOG_ASSERT(seg_desc && !seg_desc->buffers.empty())
<< "No buffers found in remote segment";
uint64_t remote_addr = (uint64_t)seg_desc->buffers[0].addr;
// Allocate a batch and submit a single WRITE transfer
BatchID batch_id = engine->allocateBatchID(1);
TransferRequest req;
req.opcode = TransferRequest::WRITE;
req.source = buffer;
req.target_id = segment_id;
req.target_offset = remote_addr;
req.length = kBlockSize;
Status s = engine->submitTransfer(batch_id, {req});
LOG_ASSERT(s.ok()) << "submitTransfer failed: " << s.ToString();
// Wait for completion
TransferStatus status;
while (true) {
s = engine->getTransferStatus(batch_id, 0, status);
LOG_ASSERT(s.ok());
if (status.s == TransferStatusEnum::COMPLETED) break;
if (status.s == TransferStatusEnum::FAILED)
LOG(FATAL) << "Transfer failed";
}
LOG(INFO) << "Transfer completed: " << status.transferred_bytes << " bytes";
engine->freeBatchID(batch_id);
engine->unregisterLocalMemory(buffer);
numa_free(buffer, FLAGS_buffer_size);
return 0;
}
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, false);
if (FLAGS_mode == "target")
return runTarget();
else if (FLAGS_mode == "initiator")
return runInitiator();
LOG(FATAL) << "Unknown mode: " << FLAGS_mode;
}
代码流程分析
整个代码的核心流程分两个角色:Target 和 Initiator。
Target 节点的步骤:
TransferEngine(false)— 创建引擎实例(false表示不自动发现拓扑)engine->init(...)— 连接 etcd 元数据服务器,初始化本地 RPC 端点,注册到元数据中心engine->installTransport("rdma", ...)— 安装 RDMA 传输层,指定 HCA 设备与 NUMA 拓扑的绑定矩阵engine->registerLocalMemory(buffer, FLAGS_buffer_size, "cpu:0")— 分配 NUMA 节点 0 上的内存,注册到 RNIC,并将包含 addr、rkey、lkey 的 BufferDesc 写入 etcd- 进入等待循环,接受远端的 RDMA 操作
Initiator 节点的步骤:
1-4 与 target 相同(初始化、安装传输、注册本地内存)
5. engine->openSegment(FLAGS_segment_id) — 从 etcd 获取远端节点的 SegmentDesc 并分配本地 segment ID
6. engine->getMetadata()->getSegmentDescByID(segment_id) — 获取远端节点的完整元数据(含 buffers[].addr, rkey 等)
7. 构造 TransferRequest,执行 submitTransfer,轮询等待完成
关键点:initiator 也需要注册本地内存。原因是 RDMA 操作需要 source buffer 的 lkey(本地内存密钥),target 的 registerLocalMemory 只提供 rkey(远端内存密钥)——两者缺一不可。