1. 5.3 跨进程通信:NATS 深入实践
4.5 从 Gate Broadcast 视角带过 NATS;本节从 zbus.TopicBus → znats 分层说明职责边界、连接池与观测。默认实现在 zhenyi/znats,接口在 zhenyi/zbus。
1.1. 5.3.1 Actor 间通信的两个层次
zhenyi 的 Actor 间通信分两层:
同一进程内:Mailbox 直接投递(零网络开销)
不同进程间:TopicBus → NATS → 目标进程
本地通信在第三章已经讲完。本章讲跨进程通信。
1.2. 5.3.2 为什么需要 NATS?
一个直接的方案是让 Actor 之间直接建立 TCP 连接——进程 1 的 Gate 直接连进程 2 的 IM Actor。
但想想这有多复杂:
| 问题 | 说明 |
|---|---|
| 连接管理 | N 个进程,每两个之间都要维护连接,O(N²) |
| 服务发现 | 进程 2 的地址变了,进程 1 怎么知道? |
| 负载均衡 | 3 个 IM Actor,怎么均匀分发? |
| 可靠性 | 连接断开时怎么重试? |
| 广播 | 给所有 IM Actor 发广播消息,要连 N 次 |
如果引入一个消息中间件,这些问题都变成它的事:
| 问题 | 用 NATS 解决 |
|---|---|
| 连接管理 | 每个进程连一次 NATS,O(N) |
| 服务发现 | 通过 Topic 名路由,不关心对端地址 |
| 负载均衡 | NATS 自动均衡,或业务层控制 |
| 可靠性 | NATS 自动重连 |
| 广播 | Publish 一次,所有订阅者都收到 |
NATS 不是用来做业务消息队列的,而是做进程间的通信管道。 两者定位不同。
1.3. 5.3.3 NATS vs 其他消息中间件
实时应用对跨进程通信的要求:
| 要求 | 优先级 | 原因 |
|---|---|---|
| 低延迟 | 最高 | 每次跨进程调用增加的延迟直接影响用户体验 |
| 高吞吐 | 高 | 聊天、位置同步等场景消息量大 |
| 简单运维 | 高 | 个人/小团队不需要复杂的消息基础设施 |
| 持久化 | 低 | Actor 状态在内存里,消息丢了重发就行 |
| 事务 | 低 | 不需要事务保证 |
| 中间件 | 概括 |
|---|---|
| NATS | 同机房通常可做到很低延迟;单二进制部署与本仓库默认适配器一致 |
| 其它 | 可实现同一 TopicBus 接口替换;对比需按你的集群与持久化需求单独测 |
避免写死「全网最优」:跨进程延迟 = 总线 + 序列化 + 对端入队,应靠压测与 P99 监控。
1.4. 5.3.4 NATS 的核心模型
NATS 只有两个概念:Subject(主题) 和 Message(消息)。
发布者:
NATS.Publish("topic_1_0_5", data)
订阅者:
NATS.Subscribe("topic_1_0_5", func(msg) { ... })
发布者不关心谁在订阅,订阅者不关心谁在发布。完全解耦。
1.4.1. Subject 不是 Queue
NATS 的 Subject 是 Pub/Sub 模式——一条消息,所有订阅该 Subject 的消费者都会收到。
如果需要"一条消息只被一个消费者处理",用 Queue Group:
NATS.QueueSubscribe("topic_1_0_5", "im_group", handler)
同一个 queue group 里的消费者,一条消息只有一个会收到。
zhenyi 目前用的是普通 Subscribe(不是 Queue Subscribe),因为每个 Actor 有唯一的 topic,只有一个订阅者。
1.5. 5.3.5 zhenyi 对 NATS 的封装
1.5.1. 三层抽象
zbus.TopicBus(接口)
↓
znats.NatsBus(适配器)
↓
NatsPool → []*Nats(连接池)
TopicBus 接口(zbus 包):
type TopicBus interface {
Broadcast(topic string, data []byte) error
Subscribe(topic string, handler Handler) (Subscription, error)
}
只有两个方法。Actor 层只依赖这个接口。
NatsBus(znats/nats_bus.go,handler 类型为 zbus.Handler):
type NatsBus struct {
pool *NatsPool
}
func (b *NatsBus) Broadcast(topic string, data []byte) error {
return b.pool.Broadcast(topic, data)
}
func (b *NatsBus) Subscribe(topic string, handler zbus.Handler) (zbus.Subscription, error) {
sub := b.pool.SubscribeCall(topic, func(msg *nats.Msg) {
handler(msg.Subject, msg.Data)
})
return &natsSubscription{sub: sub}, nil
}
纯适配,把 NATS 的 API 翻译成 TopicBus 的接口。
NatsPool(连接池):
type NatsPool struct {
clients []*Nats
counter uint64
}
func (c *NatsPool) Broadcast(topic string, data []byte) error {
idx := atomic.AddUint64(&c.counter, 1) % uint64(len(c.clients))
return c.clients[idx].Broadcast(topic, data)
}
Broadcast 轮询选连接,Subscribe 固定用第一个连接。
1.5.2. 为什么 Broadcast 用轮询而不是固定连接?
Publish 是 CPU-bound 操作(序列化 + 网络发送)。多个连接可以分散负载,避免单连接成为瓶颈。
Subscribe 不需要轮询,因为 NATS 服务端会自动均衡消息分发到各个连接。
1.6. 5.3.6 消息流转的完整链路
进程 1 的 Gate:
1. msg.MarshalPooled() → 从池获取 buffer,序列化
2. zbus.DefaultBus.Broadcast(topic, data) → NatsPool 轮询选连接
3. nats.Publish(topic, data) → 发送到 NATS 服务端
4. buf.Release() → 立即归还 buffer(NATS 已拷贝)
NATS 服务端:
5. 根据 topic 找到订阅者(进程 2)
6. 转发 data 到进程 2 的连接
进程 2 的 Actor:
7. NATS callback 收到 data
8. msg := zmsg.GetMessage() → 从对象池获取
9. msg.Unmarshal(data) → 反序列化
10. actor.Push(zmodel.ActorCmd{Type: zmodel.CmdTypeMsg, Msg: msg}) → 入邮箱(以实际订阅侧为准)
整条链路中,内存分配只有:
- 发送端:从 buffer 池获取(MarshalPooled),用完归还
- 接收端:从消息对象池获取(GetMessage),处理完 Release
热路径上尽量 池化序列化缓冲与 zmsg,但 Unmarshal、总线库内部拷贝、观测埋点等仍可能产生分配与 GC 压力——以 go test -bench / 生产 profiles 为准,不宜绝对化「零 GC」。
1.7. 5.3.7 连接可靠性
1.7.1. 自动重连
NATS 客户端内置自动重连机制:
连接正常 → 网络闪断 → 客户端自动重连 → 重连成功 → 继续收发
↘ 重连失败 → 重试(指数退避)
zhenyi 不需要自己实现重连逻辑,依赖 NATS 客户端的内置能力。
1.7.2. 首次连接重试
进程启动时 NATS 可能还没就绪,zhenyi 有自己的重试:
const (
DefaultMaxRetries = 30
DefaultRetryDelay = 200 * time.Millisecond
)
30 次重试,每次 200ms,总计约 6 秒。超过 6 秒 NATS 还没起来,进程启动失败。
为什么不用 NATS 客户端内置的重连?因为首次连接和断线重连是不同的场景。首次连接失败意味着整个分布式功能不可用,应该快速失败而不是无限等待。
1.7.3. 租约与 NATS 的配合
Etcd 共享租约(10s TTL)与 NATS 连接相互独立:
正常情况:
Etcd KeepAlive 续约 → Actor 在线
NATS 连接正常 → 消息可达
进程崩溃:
Etcd KeepAlive 停止 → 10 秒后租约过期 → 自动注销
NATS 连接断开 → 客户端自动重连(但进程已经挂了)
网络分区(进程活着但连不上 NATS):
Etcd KeepAlive 正常 → Actor 仍"在线"
NATS 连接断开 → 消息不可达
→ 其他进程通过 Etcd 认为它在线,发消息过去但 NATS 转发失败
→ Gate 的 fallback 机制尝试下一个候选
NATS 不可达不会触发 Etcd 注销。这是合理的设计——NATS 可能短暂抖动,不应该因此注销整个进程。
1.8. 5.3.8 监控指标
zhenyi 对 NATS 通信做了全面的指标埋点:
zmetrics.NatsPublishTotal.Inc() // 发布计数
zmetrics.NatsPublishErrors.Inc() // 发布失败
zmetrics.NatsRequestTotal.Inc() // 请求计数
zmetrics.NatsRequestErrors.Inc() // 请求失败
zmetrics.NatsRequestLatency.Observe() // 请求延迟(Histogram)
这些指标通过 Prometheus 暴露,运维可以:
- NatsPublishErrors 突增 → NATS 可能有问题
- NatsRequestLatency P99 上升 → 网络延迟或 NATS 负载过高
- NatsPublishTotal = 0 → 跨进程通信停止,可能是配置错误
1.9. 5.3.9 NATS 部署建议
1.9.1. 开发环境
# 单节点,零配置
nats-server
1.9.2. 生产环境
# 集群模式,3 节点
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node1:6222 --routes nats://node2:6222
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node2:6222 --routes nats://node3:6222
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node3:6222 --routes nats://node1:6222
NATS 集群模式下节点间有路由同步机制;具体拓扑与版本以 NATS 文档 为准,本书不绑定实现细节。
一个 NATS 节点挂了,客户端自动重连到其他节点。不影响消息转发。
1.9.3. 资源需求
NATS 非常轻量:
| 场景 | CPU | 内存 |
|---|---|---|
| 开发 | < 1 核 | < 50MB |
| 生产(10 万 QPS) | 1~2 核 | 200~500MB |
| 生产(百万 QPS) | 2~4 核 | 1~2GB |
资源占用与 JetStream / 持久化 等模式强相关;上表仅为数量级印象,生产容量规划以官方工具与自测为准。
1.10. 5.3.10 本节要点
- 分层:业务只依赖
zbus.TopicBus;znats为默认适配器。 - 池:
NatsPool.Broadcast轮询选连接;SubscribeCall固定池中首连接(见nats.go)。 - 载荷:发送端宜
MarshalPooled+ 及时Release;接收端GetMessage/Unmarshal/Release。 - 启动:
NewDefaultNats后须Connect(ctx);失败重试常量DefaultMaxRetries/DefaultRetryDelay见znats。 - 分区:Etcd 与 NATS 故障域不同;可能出现 注册仍在线但总线不可达,需 路由 fallback 与运维告警配合。
- 指标:
zmetrics下 NatsPublish* / NatsRequest* 等系列。
5.4 节:Group.otherRouteSnapshot 按需重建与 IGroupRemoteRouteTableView。