1. 4.5 会话粘性与跨节点转发

4.3–4.4 覆盖单进程内路由与回包优化;本节说明 跨进程 时如何通过 zbus.TopicBus(默认实现 zhenyi/znats)发布到 ActorConfig.GetTopic(),以及 HRW 粘性连接池。部署上需 发现服务NATS 已 Connect,见 znats.NewDefaultNatsNatsPool.Connect

1.1. 4.5.1 为什么需要跨节点转发?

单进程部署时,所有 Actor 都在同一进程里,消息通过 Mailbox 直接投递,不需要跨进程通信。

但生产环境通常多进程部署:

进程 1: Gate + IM Actor(分片1) + Match Actor
进程 2: Gate + IM Actor(分片2) + Chat Actor
进程 3: Gate + IM Actor(分片3)

当进程 1 的 Gate 收到一条聊天消息,但对应的 IM Actor 在进程 2 时,就需要跨进程转发。

1.2. 4.5.2 消息总线的抽象

zhenyi 用 TopicBus 接口抽象跨进程通信:

type TopicBus interface {
    Broadcast(topic string, data []byte) error
    Subscribe(topic string, handler Handler) (Subscription, error)
}

只定义了两个操作:发布和订阅。Actor 层只依赖这个接口,不关心底层用的是 NATS、Redis 还是其他 MQ。

默认实现是 NatsBus,基于 NATS。

1.2.1. 为什么选 NATS?

方面 NATS(概括) 备注
延迟 同机房通常可达亚毫秒~毫秒级 依赖网络与集群配置
运维 单二进制、轻量 与本框架默认实现匹配
模型 Subject Pub/Sub Broadcast/Subscribe 抽象一致

其它后端(Redis、Kafka、gRPC 等)亦可 实现 TopicBus 替换 zbus.DefaultBus;默认选 NATS 是工程上的折中,并非唯一正确项。

1.3. 4.5.3 Topic 命名规则

每个 Actor 启动时,Group 会为其注册到 Etcd,同时计算一个唯一的 topic:

func (a ActorConfig) GetTopic() string {
    return fmt.Sprintf("topic_%d_%d_%d", a.ActorType, a.Index, a.Id)
}

例如 ActorType=1(IM)、Index=0、Id=5 的 topic 是 topic_1_0_5

还有按类型分组的 topic:

func (a ActorConfig) GetNameTopic() string {
    return fmt.Sprintf("topic_name_%d", a.ActorType)
}

所有 IM Actor 的 GetNameTopic() 都是 topic_name_1,用于广播。

1.4. 4.5.4 跨节点转发的完整流程

以一条聊天消息为例:

进程 1 的 Gate 收到消息 "发消息给用户 B"
    ↓
1. 本地路由:查找本进程 IM Actor → 没有(用户 B 在进程 2)
    ↓
2. 远程路由:从 Etcd 获取候选列表
   → 进程 2 的 IM Actor(topic_1_0_5) 支持 ChatMsg
   → 进程 3 的 IM Actor(topic_1_1_6) 支持 ChatMsg
    ↓
3. 路由策略选择
   → RendezvousHash(用户B的SessionId) → 进程 2 的 IM Actor 分数最高
    ↓
4. 序列化消息
   → msg.MarshalPooled() → []byte
    ↓
5. 广播到目标 topic
   → zbus.DefaultBus.Broadcast("topic_1_0_5", data)
    ↓
6. 进程 2 的 IM Actor 收到
   → NATS callback → 反序列化 → Push 到 Mailbox
    ↓
7. IM Actor 处理,生成响应
    ↓
8. 响应回到 Gate → 发回客户端

1.4.1. 序列化与反序列化

跨进程传输必须序列化。zhenyi 用的是 3.6 节讲过的手写二进制序列化:

// 发送端
buf, err := msg.MarshalPooled()   // 从池获取 buffer,零分配序列化
zbus.DefaultBus.Broadcast(topic, buf.B)
buf.Release()                      // 立即归还,NATS 内部已拷贝

// 接收端
func handler(topic string, data []byte) {
    msg := zmsg.GetMessage()
    msg.Unmarshal(data)            // 反序列化到池中的对象
    actor.Push(zmodel.ActorCmd{Type: zmodel.CmdTypeClient, Msg: msg})
}

MarshalPooled() 从 buffer 池获取内存,序列化完立即归还。GetMessage() 从消息对象池获取。整条链路保持零分配。

1.5. 4.5.5 会话粘性:同一用户总到同一进程

4.3 节提到 RendezvousHash 使用 DefaultRemoteRouteKey(优先 SessionId)。在 SessionId 稳定对应同一终端用户的前提下,同一用户的请求会路由到同一候选 Actor(通常即同一进程上的分片)。

为什么需要粘性?

假设用户 B 的状态存在进程 2 的 IM Actor 中:

无粘性:
请求 1 → 进程 2(查到用户状态)✅
请求 2 → 进程 3(查不到用户状态)❌ 需要跨进程查询

有粘性:
请求 1 → 进程 2 ✅
请求 2 → 进程 2 ✅

没有粘性,每次都要跨进程查状态,延迟翻倍。

1.5.1. 扩缩容时的迁移

RendezvousHash 的一个重要特性:扩缩容时只影响少量 key。

原来 3 个 IM Actor:1、2、3
加了一个 Actor 4:
  - 大约 1/4 的用户会被迁移到 Actor 4
  - 大约 3/4 的用户不受影响

相比普通随机分配(扩缩容后全部打乱),这个特性对实时服务很友好——扩容时不会引发大规模重连或状态迁移。

1.6. 4.5.6 NATS 连接管理

1.6.1. 连接池

func NewDefaultNats(url string, poolSize int) {
    DefaultNatsClient = &NatsPool{clients: make([]*Nats, poolSize)}
    for i := 0; i < poolSize; i++ {
        DefaultNatsClient.clients[i] = NewNats(url)
    }
    zbus.DefaultBus = NewNatsBus(DefaultNatsClient)
}

NatsPool 内部维护多个连接。Broadcast 时按轮询选择连接:

func (c *NatsPool) Broadcast(topic string, data []byte) error {
    idx := atomic.AddUint64(&c.counter, 1) % uint64(len(c.clients))
    return c.clients[idx].Broadcast(topic, data)
}

为什么用连接池?单个 NATS 连接在高吞吐时可能成为瓶颈。多连接可以分散负载。

1.6.2. 自动重连

const (
    DefaultMaxRetries = 30
    DefaultRetryDelay = 200 * time.Millisecond
)

连接 NATS 失败后,最多重试 30 次,每次间隔 200ms,总等待约 6 秒。如果 NATS 还没恢复,进程启动失败。

NATS 客户端本身也支持自动重连(断线后自动重连),zhenyi 的重试逻辑只在首次启动时生效。

1.6.3. 订阅管理

订阅信息也用 Copy-On-Write 存储:

type subsMap map[string]*subItem  // topic → {sub, timeout}

func (q *Nats) upsertSub(topic string, sub *nats.Subscription, timeout time.Duration) {
    q.subsMu.Lock()
    cur := q.loadSubs()
    next := make(subsMap, len(cur)+1)
    for k, v := range cur {
        next[k] = v
    }
    next[topic] = &subItem{topic: topic, sub: sub, timeout: timeout}
    q.subs.Store(next)  // 原子替换
}

和 Group 路由表一样的模式:读路径无锁,写路径 Copy-On-Write。

1.6.4. 指标监控

zmetrics.NatsPublishTotal.Inc()       // 发布总数
zmetrics.NatsPublishErrors.Inc()      // 发布失败
zmetrics.NatsRequestTotal.Inc()       // 请求总数
zmetrics.NatsRequestErrors.Inc()      // 请求失败
zmetrics.NatsRequestLatency.Observe()  // 请求延迟

通过 Prometheus 暴露,运维可以实时监控跨进程通信的健康状况。

1.7. 4.5.7 单机模式 vs 分布式模式

TopicBus 的抽象让单机和分布式用同一套代码。目前框架只提供了 NATS 实现(NatsBus):

znats.NewDefaultNats("nats://127.0.0.1:4222", 2)
// 随后需对池 Connect,例如:znats.DefaultNatsClient.Connect(ctx)
// 成功后 zbus.DefaultBus 指向 NatsBus

单机模式下消息只走本地路由(路径 2),不会触发 TopicBus。只有多进程部署、本地路由找不到目标时,才会走 NATS(路径 3)。

如果你想扩展,比如用 Redis Pub/Sub 替代 NATS,只需实现 TopicBus 接口的 BroadcastSubscribe 两个方法,然后注入到 zbus.DefaultBus。Actor 层的代码不需要任何修改。

1.8. 4.5.8 本节要点

  1. TopicBusBroadcast/Subscribe;Actor 层依赖 zbus.DefaultBus 抽象。
  2. 默认 znatsNewDefaultNats 建池并注入;生产路径须 Connect 成功。
  3. TopicGetTopic()topic_{ActorType}_{Index}_{Id}GetNameTopic() 按类型共享前缀。
  4. 载荷MarshalPooled + buf.Release() 等模式见 3.6;跨进程总线是否拷贝取决于 NATS 客户端行为。
  5. 粘性RendezvousHash + DefaultRemoteRouteKey(通常 SessionId)。
  6. 扩缩容:HRW 相对随机分片,迁移比例约为 O(1/N) 量级(非零)。
  7. 池化 PublishNatsPool.Broadcast 轮询选连接,分散单连接带宽。

第五章可接服务发现与全链路路由(本书大纲);与本章 Group.watchActorFindAllByPrefix 衔接。

results matching ""

    No results matching ""