1. 5.3 跨进程通信:NATS 深入实践

4.5 从 Gate Broadcast 视角带过 NATS;本节从 zbus.TopicBusznats 分层说明职责边界、连接池与观测。默认实现zhenyi/znats,接口在 zhenyi/zbus

1.1. 5.3.1 Actor 间通信的两个层次

zhenyi 的 Actor 间通信分两层:

同一进程内:Mailbox 直接投递(零网络开销)
不同进程间:TopicBus → NATS → 目标进程

本地通信在第三章已经讲完。本章讲跨进程通信。

1.2. 5.3.2 为什么需要 NATS?

一个直接的方案是让 Actor 之间直接建立 TCP 连接——进程 1 的 Gate 直接连进程 2 的 IM Actor。

但想想这有多复杂:

问题 说明
连接管理 N 个进程,每两个之间都要维护连接,O(N²)
服务发现 进程 2 的地址变了,进程 1 怎么知道?
负载均衡 3 个 IM Actor,怎么均匀分发?
可靠性 连接断开时怎么重试?
广播 给所有 IM Actor 发广播消息,要连 N 次

如果引入一个消息中间件,这些问题都变成它的事:

问题 用 NATS 解决
连接管理 每个进程连一次 NATS,O(N)
服务发现 通过 Topic 名路由,不关心对端地址
负载均衡 NATS 自动均衡,或业务层控制
可靠性 NATS 自动重连
广播 Publish 一次,所有订阅者都收到

NATS 不是用来做业务消息队列的,而是做进程间的通信管道。 两者定位不同。

1.3. 5.3.3 NATS vs 其他消息中间件

实时应用对跨进程通信的要求:

要求 优先级 原因
低延迟 最高 每次跨进程调用增加的延迟直接影响用户体验
高吞吐 聊天、位置同步等场景消息量大
简单运维 个人/小团队不需要复杂的消息基础设施
持久化 Actor 状态在内存里,消息丢了重发就行
事务 不需要事务保证
中间件 概括
NATS 同机房通常可做到很低延迟;单二进制部署与本仓库默认适配器一致
其它 可实现同一 TopicBus 接口替换;对比需按你的集群与持久化需求单独测

避免写死「全网最优」:跨进程延迟 = 总线 + 序列化 + 对端入队,应靠压测与 P99 监控。

1.4. 5.3.4 NATS 的核心模型

NATS 只有两个概念:Subject(主题)Message(消息)

发布者:
  NATS.Publish("topic_1_0_5", data)

订阅者:
  NATS.Subscribe("topic_1_0_5", func(msg) { ... })

发布者不关心谁在订阅,订阅者不关心谁在发布。完全解耦。

1.4.1. Subject 不是 Queue

NATS 的 Subject 是 Pub/Sub 模式——一条消息,所有订阅该 Subject 的消费者都会收到。

如果需要"一条消息只被一个消费者处理",用 Queue Group:

NATS.QueueSubscribe("topic_1_0_5", "im_group", handler)

同一个 queue group 里的消费者,一条消息只有一个会收到。

zhenyi 目前用的是普通 Subscribe(不是 Queue Subscribe),因为每个 Actor 有唯一的 topic,只有一个订阅者。

1.5. 5.3.5 zhenyi 对 NATS 的封装

1.5.1. 三层抽象

zbus.TopicBus(接口)
    ↓
znats.NatsBus(适配器)
    ↓
NatsPool → []*Nats(连接池)

TopicBus 接口(zbus 包):

type TopicBus interface {
    Broadcast(topic string, data []byte) error
    Subscribe(topic string, handler Handler) (Subscription, error)
}

只有两个方法。Actor 层只依赖这个接口。

NatsBusznats/nats_bus.gohandler 类型为 zbus.Handler):

type NatsBus struct {
    pool *NatsPool
}

func (b *NatsBus) Broadcast(topic string, data []byte) error {
    return b.pool.Broadcast(topic, data)
}

func (b *NatsBus) Subscribe(topic string, handler zbus.Handler) (zbus.Subscription, error) {
    sub := b.pool.SubscribeCall(topic, func(msg *nats.Msg) {
        handler(msg.Subject, msg.Data)
    })
    return &natsSubscription{sub: sub}, nil
}

纯适配,把 NATS 的 API 翻译成 TopicBus 的接口。

NatsPool(连接池):

type NatsPool struct {
    clients []*Nats
    counter uint64
}

func (c *NatsPool) Broadcast(topic string, data []byte) error {
    idx := atomic.AddUint64(&c.counter, 1) % uint64(len(c.clients))
    return c.clients[idx].Broadcast(topic, data)
}

Broadcast 轮询选连接,Subscribe 固定用第一个连接。

1.5.2. 为什么 Broadcast 用轮询而不是固定连接?

Publish 是 CPU-bound 操作(序列化 + 网络发送)。多个连接可以分散负载,避免单连接成为瓶颈。

Subscribe 不需要轮询,因为 NATS 服务端会自动均衡消息分发到各个连接。

1.6. 5.3.6 消息流转的完整链路

进程 1 的 Gate:
  1. msg.MarshalPooled()        → 从池获取 buffer,序列化
  2. zbus.DefaultBus.Broadcast(topic, data)  → NatsPool 轮询选连接
  3. nats.Publish(topic, data)   → 发送到 NATS 服务端
  4. buf.Release()               → 立即归还 buffer(NATS 已拷贝)

NATS 服务端:
  5. 根据 topic 找到订阅者(进程 2)
  6. 转发 data 到进程 2 的连接

进程 2 的 Actor:
  7. NATS callback 收到 data
  8. msg := zmsg.GetMessage()   → 从对象池获取
  9. msg.Unmarshal(data)        → 反序列化
  10. actor.Push(zmodel.ActorCmd{Type: zmodel.CmdTypeMsg, Msg: msg}) → 入邮箱(以实际订阅侧为准)

整条链路中,内存分配只有:

  • 发送端:从 buffer 池获取(MarshalPooled),用完归还
  • 接收端:从消息对象池获取(GetMessage),处理完 Release

热路径上尽量 池化序列化缓冲与 zmsg,但 Unmarshal、总线库内部拷贝、观测埋点等仍可能产生分配与 GC 压力——以 go test -bench / 生产 profiles 为准,不宜绝对化「零 GC」。

1.7. 5.3.7 连接可靠性

1.7.1. 自动重连

NATS 客户端内置自动重连机制:

连接正常 → 网络闪断 → 客户端自动重连 → 重连成功 → 继续收发
                                                ↘ 重连失败 → 重试(指数退避)

zhenyi 不需要自己实现重连逻辑,依赖 NATS 客户端的内置能力。

1.7.2. 首次连接重试

进程启动时 NATS 可能还没就绪,zhenyi 有自己的重试:

const (
    DefaultMaxRetries = 30
    DefaultRetryDelay = 200 * time.Millisecond
)

30 次重试,每次 200ms,总计约 6 秒。超过 6 秒 NATS 还没起来,进程启动失败。

为什么不用 NATS 客户端内置的重连?因为首次连接和断线重连是不同的场景。首次连接失败意味着整个分布式功能不可用,应该快速失败而不是无限等待。

1.7.3. 租约与 NATS 的配合

Etcd 共享租约(10s TTL)与 NATS 连接相互独立:

正常情况:
  Etcd KeepAlive 续约 → Actor 在线
  NATS 连接正常       → 消息可达

进程崩溃:
  Etcd KeepAlive 停止 → 10 秒后租约过期 → 自动注销
  NATS 连接断开       → 客户端自动重连(但进程已经挂了)

网络分区(进程活着但连不上 NATS):
  Etcd KeepAlive 正常 → Actor 仍"在线"
  NATS 连接断开       → 消息不可达
  → 其他进程通过 Etcd 认为它在线,发消息过去但 NATS 转发失败
  → Gate 的 fallback 机制尝试下一个候选

NATS 不可达不会触发 Etcd 注销。这是合理的设计——NATS 可能短暂抖动,不应该因此注销整个进程。

1.8. 5.3.8 监控指标

zhenyi 对 NATS 通信做了全面的指标埋点:

zmetrics.NatsPublishTotal.Inc()        // 发布计数
zmetrics.NatsPublishErrors.Inc()       // 发布失败
zmetrics.NatsRequestTotal.Inc()        // 请求计数
zmetrics.NatsRequestErrors.Inc()       // 请求失败
zmetrics.NatsRequestLatency.Observe()  // 请求延迟(Histogram)

这些指标通过 Prometheus 暴露,运维可以:

  • NatsPublishErrors 突增 → NATS 可能有问题
  • NatsRequestLatency P99 上升 → 网络延迟或 NATS 负载过高
  • NatsPublishTotal = 0 → 跨进程通信停止,可能是配置错误

1.9. 5.3.9 NATS 部署建议

1.9.1. 开发环境

# 单节点,零配置
nats-server

1.9.2. 生产环境

# 集群模式,3 节点
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node1:6222 --routes nats://node2:6222
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node2:6222 --routes nats://node3:6222
nats-server --cluster nats://0.0.0.0:6222 --routes nats://node3:6222 --routes nats://node1:6222

NATS 集群模式下节点间有路由同步机制;具体拓扑与版本NATS 文档 为准,本书不绑定实现细节。

一个 NATS 节点挂了,客户端自动重连到其他节点。不影响消息转发。

1.9.3. 资源需求

NATS 非常轻量:

场景 CPU 内存
开发 < 1 核 < 50MB
生产(10 万 QPS) 1~2 核 200~500MB
生产(百万 QPS) 2~4 核 1~2GB

资源占用与 JetStream / 持久化 等模式强相关;上表仅为数量级印象,生产容量规划以官方工具与自测为准

1.10. 5.3.10 本节要点

  1. 分层:业务只依赖 zbus.TopicBusznats 为默认适配器。
  2. NatsPool.Broadcast 轮询选连接;SubscribeCall 固定池中首连接(见 nats.go)。
  3. 载荷:发送端宜 MarshalPooled + 及时 Release;接收端 GetMessage/Unmarshal/Release
  4. 启动NewDefaultNats 后须 Connect(ctx);失败重试常量 DefaultMaxRetries/DefaultRetryDelayznats
  5. 分区:Etcd 与 NATS 故障域不同;可能出现 注册仍在线但总线不可达,需 路由 fallback 与运维告警配合。
  6. 指标zmetricsNatsPublish* / NatsRequest* 等系列。

5.4 节Group.otherRouteSnapshot 按需重建与 IGroupRemoteRouteTableView

results matching ""

    No results matching ""