1. 3.3 消息收发
3.2 确定了单 Actor 内顺序处理;跨 Actor 通信则依赖 SendMsg(投递) 与 CallActor(RPC),以及 ActorMsgSender 的 Slot、AsyncRunWithMsg 与可选 Fast Path。本节概念与 zhenyi/zactor(sender.go、handlemsg.go 等)一致。
1.1. 3.3.1 两种消息模式
Actor 之间的通信有两种模式:
| 模式 | 说明 | 类比 |
|---|---|---|
| 单向发送(Send) | 投递到目标邮箱即返回 | fire-and-forget |
| RPC(CallActor) | 在调用方阻塞直到回复或超时 | request/response |
1.1.1. 单向发送:SendMsg
// Gate Actor 收到客户端消息后,转发给 IM Actor
func (g *GateActor) HandleMessage(ctx context.Context, msg *zmsg.Message) {
msg.TarActor = imActorId // 指定目标 Actor
g.SendMsg(msg) // 发送,不等待回复
}
SendMsg 把消息投递到目标 Actor 的 Mailbox,立刻返回。目标 Actor 的 Run 循环会从 Mailbox 中取出处理。
1.1.2. RPC 调用:CallActor
// IM Actor 登录后,调用 Gate Actor 绑定会话
req := &BindSessionReq{UserId: userId, SessionId: sessionId}
reply := &BindSessionResp{}
result := a.CallActor(gateActorId, req, reply, 3*time.Second)
if result.Code != ziface.ErrCode_Success {
// 处理错误(超时、序列化失败等)
}
CallActor 发送消息后阻塞等待回复,超时后返回错误。
1.2. 3.3.2 CallActor 的实现:Slot 机制
RPC 调用的核心问题是:发出去的消息怎么和回来的响应对应上?
zhenyi 用 Slot 机制解决。每个 Actor 维护一个 Slot 池(ActorMsgSender):
┌─────────────────────────────────────────────┐
│ ActorMsgSender │
│ │
│ Slot 0: [Free] │
│ Slot 1: [Waiting ← rpcId=65537] │
│ Slot 2: [Free] │
│ Slot 3: [Abandoned ← 超时] │
│ ... │
│ │
│ cursor: 100 (分配游标,CAS 抢占) │
│ indexMask: 4095 (快速取模,槽位数量是2的幂)│
└─────────────────────────────────────────────┘
1.2.1. 发送 RPC:AddSender
// 1. 分配一个空闲 Slot(CAS 抢占)
rpcId, err := a.AddSender()
// rpcId 编码方式:高16位=version,低16位=slot索引
// rpcId = (version << 16) | slotIndex
// 2. 把 rpcId 写入消息
m.RpcId = rpcId
m.TarActor = targetActorId
// 3. 发送消息
a.SendMsg(m)
// 4. 阻塞等待 Slot 收到回复
data, ok := a.GetReply(rpcId, timeout)
1.2.2. 接收回复:SetReply
目标 Actor 处理完后,回复消息中带着 RpcId。收到回复后通过 SetReply 投递到对应的 Slot:
func (m *ActorMsgSender) SetReply(data *zmsg.Message) {
idx := data.RpcId & m.indexMask // 从 rpcId 提取 slot 索引
reqVer := data.RpcId >> VersionShift // 提取 version
slot := &m.slots[idx]
// Version 校验:防止串包
if atomic.LoadUint64(&slot.version) != reqVer {
return // 版本不匹配,说明这个 Slot 已被回收复用
}
// 状态校验:必须是 Waiting
if atomic.LoadInt32(&slot.state) != SlotWaiting {
return
}
// 投递到 Slot 的 channel
slot.ch <- data.Retain()
}
1.2.3. 为什么要 Version?
考虑这个场景:
- Actor A 发出 RPC,占用 Slot 1(version=2),rpcId = (2<<16)|1 = 131073
- 超时了,A 放弃等待
- 对方回了一个迟到的回复,rpcId = 131073
- 如果没有 version,这个迟到的回复会匹配到 Slot 1
- 但 Slot 1 可能已经被新请求占用
Version 就是为了防止这种情况。 每次占用 Slot 时 version+1,回复的 version 必须匹配才能投递成功。
1.2.4. 槽位回收
Slot 有三种状态,流转如下:
Free → (AddSender) → Waiting → (GetReply 成功) → Free
→ (GetReply 超时) → Abandoned → (Watchdog 回收) → Free
→ (Watchdog 强制回收) → Free
- Free:空闲,可以被抢占
- Waiting:等待回复
- Abandoned:超时放弃,等待 Watchdog 回收(防止串包)
发送侧 Watchdog 以 sender.go 内默认 tick(如 100ms 量级) 周期扫描,回收 Abandoned 等槽位(精确常量见源码)。
1.3. 3.3.3 CallActor 必须在异步方法里调用
这是一个重要的使用约束:
CallActor是阻塞调用,它会暂停当前 Actor 的消息处理。如果在HandleMessage里直接调用CallActor,当前 Actor 的 Mailbox 就卡住了。
错误用法:
func (a *IMActor) HandleMessage(ctx context.Context, msg *zmsg.Message) {
// ❌ 直接调用 CallActor,阻塞 Mailbox
result := a.CallActor(gateActorId, req, reply, 3*time.Second)
// 这 3 秒内,这个 Actor 的所有消息都排队等着
}
正确用法:
func (a *IMActor) HandleMessage(ctx context.Context, msg *zmsg.Message) {
// ✅ 用 AsyncRun 异步执行
a.AsyncRunWithMsg(msg,
func(m *zmsg.Message) interface{} {
return a.CallActor(gateActorId, req, reply, 3*time.Second)
},
func(result interface{}) {
// 回调在 Actor 主线程执行,安全访问 Actor 状态
reply := result.(ziface.RpcReply)
// 处理结果...
},
)
}
1.4. 3.3.4 AsyncRunWithMsg:异步 + 回调
AsyncRunWithMsg 是 Actor 间通信最常用的方法。它把耗时操作放到协程池,完成后通过 Mailbox 回调到 Actor 主线程:
HandleMessage (Actor 主线程)
↓
AsyncRunWithMsg
├── pre-check validator ← 主线程执行
├── f() ← 协程池执行(不阻塞 Actor)
└── callBackFunc ← 通过 Mailbox 回到主线程
流程:
- pre-check:在主线程执行条件检查(可选)
- 异步执行:
f()在协程池中执行,不阻塞 Actor 的 Run 循环 - 回调:完成后把结果打包成消息,投递到 Actor 的 Mailbox
- 回调执行:Actor 从 Mailbox 取出,在主线程执行回调
1.4.1. 双重校验:Validator
AsyncRunWithMsg 支持 validator 参数,会在两个时间点各检查一次:
a.AsyncRunWithMsg(msg,
func(m *zmsg.Message) interface{} {
return CallShopService(itemID) // 异步调用商城
},
func(result interface{}) {
player.Gold -= 100 // 扣金币
player.AddItem(result) // 加道具
},
func() bool {
return player.Gold >= 100 // 条件:金币够
},
)
| 时间点 | 检查目的 |
|---|---|
| 异步开始前 | 避免无条件发起不必要的远程调用 |
| 回调执行前 | 异步期间状态可能被其他消息改变,再次确认条件 |
比如异步调商城期间,另一个消息扣了玩家的金币。等商城回调回来时,金币已经不够了。如果没第二次检查,就会扣成负数。
1.5. 3.3.5 快速回复路径
对于 Gate Actor 这种高频回包场景,每条响应消息都走 Mailbox 太慢了。
zhenyi 提供了 Fast Path:Gate Actor 可以声明 IToClientFastPath 接口,响应消息跳过 Mailbox,直接发送:
// 声明线程安全
type GateActor struct { ... }
func (g *GateActor) HandleToClientFastPath(msg *zmsg.Message) bool {
// 直接查 channel + Send,不走 Mailbox
ch := g.GetChannel(msg.SessionId)
if ch != nil {
ch.Send(msg)
return true
}
return false
}
为什么 Gate 可以这样做?
HandleToClientFastPath 只做两件事:查 channel 和调用 Send。查 channel 是读 map,Send 是往发送队列写。这两个操作都是线程安全的,不会破坏 Actor 的状态一致性。
非 Gate 的 Actor 不要这样做。 如果在 Fast Path 里修改了 Actor 的状态,就会打破单线程不变性,引发 data race。
1.6. 3.3.6 熔断保护
CallActor 内置了熔断器(Circuit Breaker)。熔断状态挂在当前发送方 Actor 实例上,按目标 actorId 分桶(getCircuitBreaker(actorId)),不是全进程共享一只熔断器。
func (a *Actor) CallActor(actorId uint64, ...) ziface.RpcReply {
cb := a.getCircuitBreaker(actorId)
if !cb.allow() {
return RpcReply{Code: ErrorCode_RpcErr, Msg: "circuit breaker open"}
}
// ... 正常调用 ...
if ok {
cb.recordSuccess()
} else {
cb.recordFailure()
}
}
连续失败多次后熔断器打开,后续请求直接快速失败,不再发起 RPC。一段时间后尝试半开,如果成功则关闭熔断器。
作用:防止雪崩。 当下游服务出问题时,快速失败比排队等待更好——否则大量超时请求会压垮本地的协程池。若多个发送方 Actor 并发调用同一目标,各自熔断计数不合并;需要全局限流时请在业务或网关层扩展。
1.7. 3.3.7 本节要点
- SendMsg:入目标邮箱,不等待业务完成。
- CallActor:依赖 Slot + rpcId(含 version) 匹配回复,防止复用串包;详见
sender.go。 - 约束:在 mailbox 线程上直接
CallActor会阻塞该 Actor 的投递处理,应AsyncRunWithMsg(或等价)把阻塞点移到 worker。 - AsyncRunWithMsg:可选 validator 在异步前与回调前各验一次,避免状态已变仍提交副作用。
- Fast Path:实现
IToClientFastPath且操作保持 无数据竞争 的前提下,可旁路邮箱直写连接发送路径(滥用会破坏单线程不变性)。 - 熔断:
CallActor内按 (发送方 Actor, 目标 actorId) 分桶,细节见 3.5。
3.4 节:safeUpdate / SafeFn、热调参与排空关闭。