Skip to content
团子云技术 Lite 1.048596
Go back

Mooncake TE 阅读手记-17-元数据管理

团团虾导读:元数据管理是 Mooncake 集群中节点发现和内存发现的基石。TE 支持两种模式:集中式(通过 etcd/Redis/HTTP 做 shared discovery)和去中心化(P2P Handshake 模式,适合简单部署)。这篇以 Segment 注册为例,完整追踪了一个节点从 init → registerLocalMemory → publish to etcd → 远端 openSegment 的全过程。

阅读版本: Mooncake v0.3.10.post2-104-geaf724ab (commit eaf724ab, 2026-05-20)

Mooncake Transfer Engine 元数据管理:以 Segment 注册为例

一、架构概览

Mooncake Transfer Engine 的元数据管理由 TransferMetadata 类统一负责,支持两种运行模式:

模式触发方式存储后端适用场景
集中式(默认)连接字符串以 etcd:///redis:///http:// 开头etcd / Redis / HTTP K-V 服务多节点集群,需要中心化发现
去中心化连接字符串为字面量 P2PHANDSHAKE无集中存储,TCP 直连交换元数据简单部署、测试、两节点直连

整个元数据系统位于 mooncake-transfer-engine/ 中,核心文件包括:

文件职责
include/transfer_metadata.hTransferMetadata + 所有描述符结构体定义
src/transfer_metadata.cppSegment 增删查、缓存、JSON 编解码
include/transfer_metadata_plugin.hMetadataStoragePluginHandShakePlugin 抽象接口
src/transfer_metadata_plugin.cpp三个存储后端(Etcd/Redis/HTTP)+ TCP 握手的实现
mooncake-common/etcd/etcd_wrapper.goGo etcd v3 客户端,编译为 C 共享库供 C++ 调用

二、集中式模式:让 etcd 告诉我们每个节点有什么

集中式模式下,所有节点的 Segment 描述符和 RPC 地址写入同一个 K-V 存储。这是一个典型的 share-nothing metadata, shared discovery 设计 —— 每个节点只管理自己的描述符,但所有人都可以从共享存储中发现其他人。

2.1 引擎初始化时注册 RPC 地址入口

TransferEngineImpl::init() 在做传输层初始化之前,会先用 TransferMetadata 在存储集群里登记自己的位置:

引擎启动 (transfer_engine_impl.cpp:77-368):

// 1. 解析 local_server_name(格式 ip:port)
// 2. 构造 RpcMetaDesc(ip + port)
desc.ip_or_host_name = host_name;
desc.rpc_port = port;

// 3. 创建 TransferMetadata,传入 etcd 连接字符串
metadata_ = std::make_shared<TransferMetadata>(metadata_conn_string);

// 4. 发布本节点 RPC 地址到中心存储
metadata_->addRpcMetaEntry(local_server_name_, desc);

// 5. 发现本地拓扑、安装传输插件(RDMA/TCP/NVLink等)
multi_transports_ = std::make_shared<MultiTransport>(metadata_, local_server_name_);

2.2 注册本地内存,将 Segment 描述符写入 etcd

当调用方执行 registerLocalMemory(addr, length, ...) 后,各 Transport 在注册硬件 MR 的同时,会把缓冲区信息追加到本地 Segment 描述符中,并推送到存储后端。

Transport 注册内存 (transfer_engine_impl.cpp:511-533):

for (auto transport : multi_transports_->listTransports()) {
    int ret = transport->registerLocalMemory(
        addr, length, location, remote_accessible, update_metadata);
    if (ret < 0) return ret;
}

元数据层追加 BufferDesc 并更新 (transfer_metadata.cpp:1059-1071):

int TransferMetadata::addLocalMemoryBuffer(const BufferDesc &buffer_desc,
                                           bool update_metadata) {
    RWSpinlock::WriteGuard guard(segment_lock_);
    auto new_segment_desc = std::make_shared<SegmentDesc>();
    auto &segment_desc = segment_id_to_desc_map_[LOCAL_SEGMENT_ID];
    *new_segment_desc = *segment_desc;   // CoW 拷贝
    segment_desc = new_segment_desc;
    segment_desc->buffers.push_back(buffer_desc);  // 追加新 buffer
    // ...
    if (update_metadata) return updateLocalSegmentDesc();  // 序列化并写入存储
}

updateLocalSegmentDesc() 调用 updateSegmentDesc(),将整个 SegmentDesc 序列化为 JSON 后调用 storage_plugin_->set(key, json) 写入中心存储。

2.3 Segment 描述符的 Key 格式

Key 的生成逻辑在 getFullMetadataKey() (transfer_metadata.cpp:169-184):

// 默认 key 前缀为 "mooncake/<cluster_id>/"
// <cluster_id> 来自环境变量 MC_METADATA_CLUSTER_ID

// 普通 segment name -> mooncake/<cluster_id>/ram/<name>
// 包含 '/' 的 name  -> mooncake/<cluster_id>/<name>
return common_key_prefix_ + "ram/" + segment_name;

RPC 元数据的 key 格式类似:

mooncake/<cluster_id>/rpc_meta/<server_name>

2.4 发现远端 Segment:openSegment 的完整链路

openSegment("10.0.0.2:12345") 被调用时,元数据层的流程如下:

TransferEngineImpl (transfer_engine_impl.cpp:453-476):

SegmentHandle TransferEngineImpl::openSegment(const std::string& segment_name) {
    // 1. 调用元数据层获取 SegmentID
    SegmentID sid = metadata_->getSegmentID(trimmed_segment_name);
    return sid;
}

TransferMetadata::getSegmentID (transfer_metadata.cpp:1004-1024):

SegmentID TransferMetadata::getSegmentID(const std::string &segment_name) {
    // 1. 先查本地缓存 (name -> id)
    {
        RWSpinlock::ReadGuard guard(segment_lock_);
        if (segment_name_to_id_map_.count(segment_name))
            return segment_name_to_id_map_[segment_name];
    }

    // 2. 缓存未命中,从远程存储获取描述符(可能涉及网络 I/O)
    auto segment_desc = this->getSegmentDesc(segment_name);
    if (!segment_desc) return -1;

    // 3. WriteLock + double-check,分配新 ID,更新缓存
    RWSpinlock::WriteGuard guard(segment_lock_);
    if (segment_name_to_id_map_.count(segment_name))
        return segment_name_to_id_map_[segment_name];
    SegmentID id = next_segment_id_.fetch_add(1);
    segment_id_to_desc_map_[id] = segment_desc;
    segment_name_to_id_map_[segment_name] = id;
    return id;
}

getSegmentDesc 中心化路径 (transfer_metadata.cpp:865-901):

// 中心化模式:从存储插件 GET
if (!storage_plugin_->get(getFullMetadataKey(segment_name), peer_json)) {
    LOG(WARNING) << "Failed to retrieve segment descriptor";
    return nullptr;
}
return decodeSegmentDesc(peer_json, segment_name);

2.5 元数据缓存

TransferMetadata 维护了两级缓存:

  1. name-to-id 映射 segment_name_to_id_map_ (string -> uint64_t)
  2. id-to-descriptor 映射 segment_id_to_desc_map_ (uint64_t -> shared_ptr<SegmentDesc>)

两个 map 通过读写锁 segment_lock_ 保护。获取描述符时,默认启用 metacache(可通过 MC_DISABLE_METACACHE 关闭):

std::shared_ptr<SegmentDesc> getSegmentDescByName(
    const std::string &segment_name, bool force_update) {
    // metacache 开启时直接返回缓存,跳过网络 IO
    if (globalConfig().metacache && !force_update) {
        RWSpinlock::ReadGuard guard(segment_lock_);
        auto iter = segment_name_to_id_map_.find(segment_name);
        if (iter != segment_name_to_id_map_.end())
            return segment_id_to_desc_map_[iter->second];
    }
    // ...
}

syncSegmentCache() 可在运行时强制从存储后端刷新缓存,支持全量或指定 Segment 刷新。


三、Segment 描述符的完整结构

Segment 描述符是元数据管理的核心对象,包含了远端节点进行 RDMA/GPU Direct 等操作所需的全部信息。结构体定义在 transfer_metadata.h:

struct SegmentDesc {
    std::string name;                            // 节点名 (ip:port)
    std::string protocol;                        // "rdma"|"tcp"|"nvlink"|"ascend"|"cxl"|"ub"|...
    std::vector<DeviceDesc> devices;             // RDMA/UB 设备信息
    Topology topology;                           // NIC 优先级矩阵
    std::vector<BufferDesc> buffers;             // 注册的内存缓冲区列表
    std::vector<NVMeoFBufferDesc> nvmeof_buffers; // NVMe-oF 描述符
    std::string cxl_name;                        // CXL 设备名
    uint64_t cxl_base_addr;                      // CXL 基地址
    std::string timestamp;                        // 最后修改时间
    RankInfoDesc rank_info;                      // Ascend NPU 专用
    int tcp_data_port;                            // TCP 数据端口
};

每个 BufferDesc 包含 key、addr、length 和传输协议特定字段(rkey[]、lkey[]、shm_name、tseg[] 等)。描述符按协议类型使用不同的 JSON 编解码逻辑(见 encodeSegmentDesc/decodeSegmentDesc):


四、存储后端实现

4.1 EtcdStoragePlugin(默认)

Mooncake 使用 Go etcd v3 client 编译为 C 共享库 的方式集成 etcd,避免了 C++ 直接调用 etcd gRPC API 的复杂性:

Go 侧示例 —— EtcdPutWrapper:

func EtcdPutWrapper(key *C.char, value *C.char, errMsg **C.char) int {
    k := C.GoString(key)
    v := C.GoString(value)
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    _, err := globalClient.Put(ctx, k, v)
    // ...
}

连接字符串格式:etcd://<host>:<port>,支持多 endpoints(用逗号或分号分隔)。

4.2 RedisStoragePlugin

基于 hiredis 库的直接封装:

构造: redisConnect(hostname, port)
GET:  redisCommand(client_, "GET %s", key.c_str())
SET:  redisCommand(client_, "SET %s %s", key.c_str(), json_file.c_str())
DEL:  redisCommand(client_, "DEL %s", key.c_str())

支持认证:环境变量 MC_REDIS_USERNAME / MC_REDIS_PASSWORD / MC_REDIS_DB_INDEX

连接字符串格式:redis://<host>:<port>

4.3 HTTPStoragePlugin

通过 libcurl 以 RESTful 方式访问 K-V 存储:

使用 thread_local curl handle 避免多线程竞争。

连接字符串格式:http://<url>https://<url>

4.4 插件选择逻辑

MetadataStoragePlugin::Create() 通过两步确定后端:

  1. parseConnectionString (transfer_metadata_plugin.cpp:523-540): 解析 :// 前缀提取协议名。:// 的字符串默认视为 etcd(如 "10.0.0.1:2379" 等价于 etcd://10.0.0.1:2379)。
  2. Create 工厂 (transfer_metadata_plugin.cpp:542-593): 按编译宏匹配已知协议名,匹配成功则创建对应后端。所有已知协议(etcd/redis/http/https)均未匹配时,调用 LOG(FATAL) 终止程序,不会回退到 etcd。

具体分发逻辑:

"etcd"  -> EtcdStoragePlugin(需 USE_ETCD 编译宏, line 546-548)
"redis" -> RedisStoragePlugin(需 USE_REDIS 编译宏, line 552-578)
"http"  -> HTTPStoragePlugin(需 USE_HTTP 编译宏, line 583-586)
"https" -> HTTPStoragePlugin(需 USE_HTTP 编译宏, line 583-586)
以上均未命中 -> LOG(FATAL) "Unable to find metadata storage plugin" (line 590)

五、去中心化 P2P 模式

当连接字符串为字面量 "P2PHANDSHAKE" 时,TransferMetadata 跳过所有中心化存储操作:

if (conn_string == P2PHANDSHAKE) {
    p2p_handshake_mode_ = true;
    return;  // 不创建 storage_plugin_
}

5.1 SocketHandShakePlugin 的设计

这是一个基于 TCP 的异步握手服务:

  1. 启动: 在随机可用端口上启动 TCP listener 线程

  2. 消息协议: 长度前缀 + 1 字节消息类型 + JSON payload

  3. 三种在线消息类型 (common.h:59-66 定义的枚举值 0-3):

    • Connection = 0 —— 握手连接
    • Metadata = 1 —— 交换 Segment 描述符
    • Notify = 2 —— 异步通知
    • Probe = 3 —— 存活探测

    外加一个解码回退路径 OldProtocol = 0xff: 这不是真正的线格式类型。当 readString 收到消息的首字节 > 3 (Probe) 时,识别为旧版本协议(无类型字节),将其标记为 OldProtocol (common.h:443-449)。在服务端处理时,Connection 和 OldProtocol 两个类型走同一个回调 (on_connection_callback_),但线路上编码不同:新协议多一个类型字节,旧协议直接从 payload 开始。

5.2 P2P 模式下的 Segment 发现

openSegment("10.0.0.2:12345")
  -> getSegmentID(name)
    -> getSegmentDesc(name)  [P2P 分支]
      -> 解析 name 为 ip + port
      -> 编码本地 LOCAL_SEGMENT_ID 描述符为 JSON
      -> handshake_plugin_->exchangeMetadata(ip, port, local_json, peer_json)
      -> 解码对端返回的 JSON 为 SegmentDesc
    -> 分配 ID,缓存,返回

P2P 模式中没有服务发现 —— 调用方必须事先知道对端地址。


六、数据流全景图

应用层:
  TransferEngine::init("etcd://10.0.0.1:2379", "10.0.0.1:12345")
  TransferEngine::registerLocalMemory(addr, len, ...)
  TransferEngine::openSegment("10.0.0.2:12345")

TransferEngineImpl:
  ├── init() → metadata_->addRpcMetaEntry()
  ├── registerLocalMemory() → transport->registerLocalMemory()
  │     └── 内部调用 metadata_->addLocalMemoryBuffer()
  │           └── updateLocalSegmentDesc() → storage_plugin_->set(...)
  └── openSegment() → metadata_->getSegmentID()
        └── getSegmentDesc() → storage_plugin_->get(...)
              └── decodeSegmentDesc() → SegmentDesc → 缓存

TransferMetadata (元数据中枢):
  ├── 本地缓存: segment_name_to_id_map_ / segment_id_to_desc_map_
  ├── 集中式: MetadataStoragePlugin (etcd/redis/http)
  └── P2P: HandShakePlugin (SocketHandShakePlugin, TCP 直连)

MultiTransport (传输层):
  └── selectTransport(segment_desc.protocol) → RdmaTransport/TcpTransport/...

七、关键设计要点

  1. 多后端可插拔: MetadataStoragePlugin 纯虚接口让 etcd/Redis/HTTP 可以任意切换,新增后端只需实现 get/set/remove 三个方法。

  2. CoW 模式更新 Segment 描述符: addLocalMemoryBuffer 使用 Copy-on-Write 方式追加 buffer —— 创建新的 shared_ptr<SegmentDesc> 拷贝,追加 buffer 后替换旧指针。这样读操作不会被写阻塞。

  3. RWSpinlock 保护缓存: 两种 map 的读写使用读写自旋锁,读多写少的场景下性能良好。网络 I/O 操作(storage_plugin_->get)在锁外执行,避免长时间持锁。

  4. 原子 ID 分配: next_segment_id_std::atomic<uint64_t>,通过 fetch_add(1) 无锁分配。本地 Segment 固定为 LOCAL_SEGMENT_ID = 0

  5. Go-C++ CGo 桥接: etcd 集成通过 Go 编译为 C 共享库实现,CGo 层的内存管理需要注意 —— EtcdGetWrapper 返回的 *C.char 需要 C++ 侧 free()

  6. Metacache 默认开启: 获取 Segment 描述符默认走本地缓存,避免每次 getSegmentDescByName 都产生网络 I/O。需要实时性时可以传 force_update=true 或调用 syncSegmentCache()

  7. 集群隔离: 通过 MC_METADATA_CLUSTER_ID 环境变量为 key 添加自定义前缀,多个 Mooncake 集群可以共享同一个 etcd 实例而不冲突。


八、代码验证

以下是验证上述分析的源码位置汇总:

引擎初始化:

元数据核心:

存储插件:

Go etcd 桥接:

Segment 注册调用链:

P2P 握手:


九、修订说明

本文档基于审查记录 ./10-review-notes.md 完成修订,共修正 1 处事实错误3 处描述不精确

  1. 事实错误修正 — 4.4 节插件选择逻辑:原文称 “其他 -> etcd (默认回退)“,与 MetadataStoragePlugin::Create() (transfer_metadata_plugin.cpp:590) 的实际行为不符。修正为两步描述:parseConnectionString 将无 :// 前缀默认视为 etcd;Create 工厂对未匹配的已知协议调用 LOG(FATAL) 终止程序。

  2. 描述精确化 — 三、SegmentDesc 协议编解码分组:补充了 ubshmem 编码分支 (transfer_metadata.cpp:423-427) 和 nvmeof 解码分支 (transfer_metadata.cpp:767-777),并标注 nvmeof 仅在解码阶段有对应路径。

  3. 描述精确化 — 5.1 节 P2P 消息类型:原文将 OldProtocolConnection 混列为一种消息类型。修正为三类在线消息类型 (Connection/Metadata/Notify/Probe, 枚举值 0-3) + 一个解码回退路径 (OldProtocol=0xff),并解释 readString (common.h:443-449) 中的触发条件和服务端处理差异。

  4. 行号修正 — 二/八节 init() 行号范围:从 77-202 更正为 77-368(函数在 transfer_engine_impl.cpp 中延伸至 368 行)。此外 RedisStoragePlugin 结束行从 205 订正为 204、EtcdStoragePlugin 结束行从 521 订正为 520、etcd_wrapper.go TE 客户端引用范围从 85-198 订正为 85-127。


Share this post on:

Previous Post
AI是威胁还是机遇?——软件股财报季分析
Next Post
Mooncake TE 阅读手记-16-路径选择与 Peer NIC Path