1. 4.5 会话粘性与跨节点转发
4.3–4.4 覆盖单进程内路由与回包优化;本节说明 跨进程 时如何通过 zbus.TopicBus(默认实现 zhenyi/znats)发布到 ActorConfig.GetTopic(),以及 HRW 粘性与 连接池。部署上需 发现服务 与 NATS 已 Connect,见 znats.NewDefaultNats 与 NatsPool.Connect。
1.1. 4.5.1 为什么需要跨节点转发?
单进程部署时,所有 Actor 都在同一进程里,消息通过 Mailbox 直接投递,不需要跨进程通信。
但生产环境通常多进程部署:
进程 1: Gate + IM Actor(分片1) + Match Actor
进程 2: Gate + IM Actor(分片2) + Chat Actor
进程 3: Gate + IM Actor(分片3)
当进程 1 的 Gate 收到一条聊天消息,但对应的 IM Actor 在进程 2 时,就需要跨进程转发。
1.2. 4.5.2 消息总线的抽象
zhenyi 用 TopicBus 接口抽象跨进程通信:
type TopicBus interface {
Broadcast(topic string, data []byte) error
Subscribe(topic string, handler Handler) (Subscription, error)
}
只定义了两个操作:发布和订阅。Actor 层只依赖这个接口,不关心底层用的是 NATS、Redis 还是其他 MQ。
默认实现是 NatsBus,基于 NATS。
1.2.1. 为什么选 NATS?
| 方面 | NATS(概括) | 备注 |
|---|---|---|
| 延迟 | 同机房通常可达亚毫秒~毫秒级 | 依赖网络与集群配置 |
| 运维 | 单二进制、轻量 | 与本框架默认实现匹配 |
| 模型 | Subject Pub/Sub | 与 Broadcast/Subscribe 抽象一致 |
其它后端(Redis、Kafka、gRPC 等)亦可 实现 TopicBus 替换 zbus.DefaultBus;默认选 NATS 是工程上的折中,并非唯一正确项。
1.3. 4.5.3 Topic 命名规则
每个 Actor 启动时,Group 会为其注册到 Etcd,同时计算一个唯一的 topic:
func (a ActorConfig) GetTopic() string {
return fmt.Sprintf("topic_%d_%d_%d", a.ActorType, a.Index, a.Id)
}
例如 ActorType=1(IM)、Index=0、Id=5 的 topic 是 topic_1_0_5。
还有按类型分组的 topic:
func (a ActorConfig) GetNameTopic() string {
return fmt.Sprintf("topic_name_%d", a.ActorType)
}
所有 IM Actor 的 GetNameTopic() 都是 topic_name_1,用于广播。
1.4. 4.5.4 跨节点转发的完整流程
以一条聊天消息为例:
进程 1 的 Gate 收到消息 "发消息给用户 B"
↓
1. 本地路由:查找本进程 IM Actor → 没有(用户 B 在进程 2)
↓
2. 远程路由:从 Etcd 获取候选列表
→ 进程 2 的 IM Actor(topic_1_0_5) 支持 ChatMsg
→ 进程 3 的 IM Actor(topic_1_1_6) 支持 ChatMsg
↓
3. 路由策略选择
→ RendezvousHash(用户B的SessionId) → 进程 2 的 IM Actor 分数最高
↓
4. 序列化消息
→ msg.MarshalPooled() → []byte
↓
5. 广播到目标 topic
→ zbus.DefaultBus.Broadcast("topic_1_0_5", data)
↓
6. 进程 2 的 IM Actor 收到
→ NATS callback → 反序列化 → Push 到 Mailbox
↓
7. IM Actor 处理,生成响应
↓
8. 响应回到 Gate → 发回客户端
1.4.1. 序列化与反序列化
跨进程传输必须序列化。zhenyi 用的是 3.6 节讲过的手写二进制序列化:
// 发送端
buf, err := msg.MarshalPooled() // 从池获取 buffer,零分配序列化
zbus.DefaultBus.Broadcast(topic, buf.B)
buf.Release() // 立即归还,NATS 内部已拷贝
// 接收端
func handler(topic string, data []byte) {
msg := zmsg.GetMessage()
msg.Unmarshal(data) // 反序列化到池中的对象
actor.Push(zmodel.ActorCmd{Type: zmodel.CmdTypeClient, Msg: msg})
}
MarshalPooled() 从 buffer 池获取内存,序列化完立即归还。GetMessage() 从消息对象池获取。整条链路保持零分配。
1.5. 4.5.5 会话粘性:同一用户总到同一进程
4.3 节提到 RendezvousHash 使用 DefaultRemoteRouteKey(优先 SessionId)。在 SessionId 稳定对应同一终端用户的前提下,同一用户的请求会路由到同一候选 Actor(通常即同一进程上的分片)。
为什么需要粘性?
假设用户 B 的状态存在进程 2 的 IM Actor 中:
无粘性:
请求 1 → 进程 2(查到用户状态)✅
请求 2 → 进程 3(查不到用户状态)❌ 需要跨进程查询
有粘性:
请求 1 → 进程 2 ✅
请求 2 → 进程 2 ✅
没有粘性,每次都要跨进程查状态,延迟翻倍。
1.5.1. 扩缩容时的迁移
RendezvousHash 的一个重要特性:扩缩容时只影响少量 key。
原来 3 个 IM Actor:1、2、3
加了一个 Actor 4:
- 大约 1/4 的用户会被迁移到 Actor 4
- 大约 3/4 的用户不受影响
相比普通随机分配(扩缩容后全部打乱),这个特性对实时服务很友好——扩容时不会引发大规模重连或状态迁移。
1.6. 4.5.6 NATS 连接管理
1.6.1. 连接池
func NewDefaultNats(url string, poolSize int) {
DefaultNatsClient = &NatsPool{clients: make([]*Nats, poolSize)}
for i := 0; i < poolSize; i++ {
DefaultNatsClient.clients[i] = NewNats(url)
}
zbus.DefaultBus = NewNatsBus(DefaultNatsClient)
}
NatsPool 内部维护多个连接。Broadcast 时按轮询选择连接:
func (c *NatsPool) Broadcast(topic string, data []byte) error {
idx := atomic.AddUint64(&c.counter, 1) % uint64(len(c.clients))
return c.clients[idx].Broadcast(topic, data)
}
为什么用连接池?单个 NATS 连接在高吞吐时可能成为瓶颈。多连接可以分散负载。
1.6.2. 自动重连
const (
DefaultMaxRetries = 30
DefaultRetryDelay = 200 * time.Millisecond
)
连接 NATS 失败后,最多重试 30 次,每次间隔 200ms,总等待约 6 秒。如果 NATS 还没恢复,进程启动失败。
NATS 客户端本身也支持自动重连(断线后自动重连),zhenyi 的重试逻辑只在首次启动时生效。
1.6.3. 订阅管理
订阅信息也用 Copy-On-Write 存储:
type subsMap map[string]*subItem // topic → {sub, timeout}
func (q *Nats) upsertSub(topic string, sub *nats.Subscription, timeout time.Duration) {
q.subsMu.Lock()
cur := q.loadSubs()
next := make(subsMap, len(cur)+1)
for k, v := range cur {
next[k] = v
}
next[topic] = &subItem{topic: topic, sub: sub, timeout: timeout}
q.subs.Store(next) // 原子替换
}
和 Group 路由表一样的模式:读路径无锁,写路径 Copy-On-Write。
1.6.4. 指标监控
zmetrics.NatsPublishTotal.Inc() // 发布总数
zmetrics.NatsPublishErrors.Inc() // 发布失败
zmetrics.NatsRequestTotal.Inc() // 请求总数
zmetrics.NatsRequestErrors.Inc() // 请求失败
zmetrics.NatsRequestLatency.Observe() // 请求延迟
通过 Prometheus 暴露,运维可以实时监控跨进程通信的健康状况。
1.7. 4.5.7 单机模式 vs 分布式模式
TopicBus 的抽象让单机和分布式用同一套代码。目前框架只提供了 NATS 实现(NatsBus):
znats.NewDefaultNats("nats://127.0.0.1:4222", 2)
// 随后需对池 Connect,例如:znats.DefaultNatsClient.Connect(ctx)
// 成功后 zbus.DefaultBus 指向 NatsBus
单机模式下消息只走本地路由(路径 2),不会触发 TopicBus。只有多进程部署、本地路由找不到目标时,才会走 NATS(路径 3)。
如果你想扩展,比如用 Redis Pub/Sub 替代 NATS,只需实现 TopicBus 接口的 Broadcast 和 Subscribe 两个方法,然后注入到 zbus.DefaultBus。Actor 层的代码不需要任何修改。
1.8. 4.5.8 本节要点
TopicBus:Broadcast/Subscribe;Actor 层依赖zbus.DefaultBus抽象。- 默认
znats:NewDefaultNats建池并注入;生产路径须Connect成功。 - Topic:
GetTopic()→topic_{ActorType}_{Index}_{Id};GetNameTopic()按类型共享前缀。 - 载荷:
MarshalPooled+buf.Release()等模式见 3.6;跨进程总线是否拷贝取决于 NATS 客户端行为。 - 粘性:
RendezvousHash+DefaultRemoteRouteKey(通常SessionId)。 - 扩缩容:HRW 相对随机分片,迁移比例约为 O(1/N) 量级(非零)。
- 池化 Publish:
NatsPool.Broadcast轮询选连接,分散单连接带宽。
第五章可接服务发现与全链路路由(本书大纲);与本章 Group.watchActor、FindAllByPrefix 衔接。