1. 3.3 消息收发

3.2 确定了单 Actor 内顺序处理;跨 Actor 通信则依赖 SendMsg(投递)CallActor(RPC),以及 ActorMsgSender 的 SlotAsyncRunWithMsg 与可选 Fast Path。本节概念与 zhenyi/zactorsender.gohandlemsg.go 等)一致。

1.1. 3.3.1 两种消息模式

Actor 之间的通信有两种模式:

模式 说明 类比
单向发送(Send) 投递到目标邮箱即返回 fire-and-forget
RPC(CallActor) 在调用方阻塞直到回复或超时 request/response

1.1.1. 单向发送:SendMsg

// Gate Actor 收到客户端消息后,转发给 IM Actor
func (g *GateActor) HandleMessage(ctx context.Context, msg *zmsg.Message) {
    msg.TarActor = imActorId  // 指定目标 Actor
    g.SendMsg(msg)            // 发送,不等待回复
}

SendMsg 把消息投递到目标 Actor 的 Mailbox,立刻返回。目标 Actor 的 Run 循环会从 Mailbox 中取出处理。

1.1.2. RPC 调用:CallActor

// IM Actor 登录后,调用 Gate Actor 绑定会话
req := &BindSessionReq{UserId: userId, SessionId: sessionId}
reply := &BindSessionResp{}
result := a.CallActor(gateActorId, req, reply, 3*time.Second)

if result.Code != ziface.ErrCode_Success {
    // 处理错误(超时、序列化失败等)
}

CallActor 发送消息后阻塞等待回复,超时后返回错误。

1.2. 3.3.2 CallActor 的实现:Slot 机制

RPC 调用的核心问题是:发出去的消息怎么和回来的响应对应上?

zhenyi 用 Slot 机制解决。每个 Actor 维护一个 Slot 池(ActorMsgSender):

┌─────────────────────────────────────────────┐
│              ActorMsgSender                  │
│                                             │
│  Slot 0: [Free]                              │
│  Slot 1: [Waiting ← rpcId=65537]            │
│  Slot 2: [Free]                              │
│  Slot 3: [Abandoned ← 超时]                  │
│  ...                                        │
│                                             │
│  cursor: 100  (分配游标,CAS 抢占)          │
│  indexMask: 4095  (快速取模,槽位数量是2的幂)│
└─────────────────────────────────────────────┘

1.2.1. 发送 RPC:AddSender

// 1. 分配一个空闲 Slot(CAS 抢占)
rpcId, err := a.AddSender()

// rpcId 编码方式:高16位=version,低16位=slot索引
// rpcId = (version << 16) | slotIndex

// 2. 把 rpcId 写入消息
m.RpcId = rpcId
m.TarActor = targetActorId

// 3. 发送消息
a.SendMsg(m)

// 4. 阻塞等待 Slot 收到回复
data, ok := a.GetReply(rpcId, timeout)

1.2.2. 接收回复:SetReply

目标 Actor 处理完后,回复消息中带着 RpcId。收到回复后通过 SetReply 投递到对应的 Slot:

func (m *ActorMsgSender) SetReply(data *zmsg.Message) {
    idx := data.RpcId & m.indexMask           // 从 rpcId 提取 slot 索引
    reqVer := data.RpcId >> VersionShift       // 提取 version

    slot := &m.slots[idx]

    // Version 校验:防止串包
    if atomic.LoadUint64(&slot.version) != reqVer {
        return  // 版本不匹配,说明这个 Slot 已被回收复用
    }

    // 状态校验:必须是 Waiting
    if atomic.LoadInt32(&slot.state) != SlotWaiting {
        return
    }

    // 投递到 Slot 的 channel
    slot.ch <- data.Retain()
}

1.2.3. 为什么要 Version?

考虑这个场景:

  1. Actor A 发出 RPC,占用 Slot 1(version=2),rpcId = (2<<16)|1 = 131073
  2. 超时了,A 放弃等待
  3. 对方回了一个迟到的回复,rpcId = 131073
  4. 如果没有 version,这个迟到的回复会匹配到 Slot 1
  5. 但 Slot 1 可能已经被新请求占用

Version 就是为了防止这种情况。 每次占用 Slot 时 version+1,回复的 version 必须匹配才能投递成功。

1.2.4. 槽位回收

Slot 有三种状态,流转如下:

Free → (AddSender) → Waiting → (GetReply 成功) → Free
                           → (GetReply 超时) → Abandoned → (Watchdog 回收) → Free
                           → (Watchdog 强制回收) → Free
  • Free:空闲,可以被抢占
  • Waiting:等待回复
  • Abandoned:超时放弃,等待 Watchdog 回收(防止串包)

发送侧 Watchdog 以 sender.go 内默认 tick(如 100ms 量级) 周期扫描,回收 Abandoned 等槽位(精确常量见源码)。

1.3. 3.3.3 CallActor 必须在异步方法里调用

这是一个重要的使用约束

CallActor 是阻塞调用,它会暂停当前 Actor 的消息处理。如果在 HandleMessage 里直接调用 CallActor,当前 Actor 的 Mailbox 就卡住了。

错误用法:

func (a *IMActor) HandleMessage(ctx context.Context, msg *zmsg.Message) {
    // ❌ 直接调用 CallActor,阻塞 Mailbox
    result := a.CallActor(gateActorId, req, reply, 3*time.Second)
    // 这 3 秒内,这个 Actor 的所有消息都排队等着
}

正确用法:

func (a *IMActor) HandleMessage(ctx context.Context, msg *zmsg.Message) {
    // ✅ 用 AsyncRun 异步执行
    a.AsyncRunWithMsg(msg,
        func(m *zmsg.Message) interface{} {
            return a.CallActor(gateActorId, req, reply, 3*time.Second)
        },
        func(result interface{}) {
            // 回调在 Actor 主线程执行,安全访问 Actor 状态
            reply := result.(ziface.RpcReply)
            // 处理结果...
        },
    )
}

1.4. 3.3.4 AsyncRunWithMsg:异步 + 回调

AsyncRunWithMsg 是 Actor 间通信最常用的方法。它把耗时操作放到协程池,完成后通过 Mailbox 回调到 Actor 主线程:

HandleMessage (Actor 主线程)
    ↓
AsyncRunWithMsg
    ├── pre-check validator  ← 主线程执行
    ├── f()                  ← 协程池执行(不阻塞 Actor)
    └── callBackFunc         ← 通过 Mailbox 回到主线程

流程:

  1. pre-check:在主线程执行条件检查(可选)
  2. 异步执行f() 在协程池中执行,不阻塞 Actor 的 Run 循环
  3. 回调:完成后把结果打包成消息,投递到 Actor 的 Mailbox
  4. 回调执行:Actor 从 Mailbox 取出,在主线程执行回调

1.4.1. 双重校验:Validator

AsyncRunWithMsg 支持 validator 参数,会在两个时间点各检查一次:

a.AsyncRunWithMsg(msg,
    func(m *zmsg.Message) interface{} {
        return CallShopService(itemID)  // 异步调用商城
    },
    func(result interface{}) {
        player.Gold -= 100  // 扣金币
        player.AddItem(result) // 加道具
    },
    func() bool {
        return player.Gold >= 100  // 条件:金币够
    },
)
时间点 检查目的
异步开始前 避免无条件发起不必要的远程调用
回调执行前 异步期间状态可能被其他消息改变,再次确认条件

比如异步调商城期间,另一个消息扣了玩家的金币。等商城回调回来时,金币已经不够了。如果没第二次检查,就会扣成负数。

1.5. 3.3.5 快速回复路径

对于 Gate Actor 这种高频回包场景,每条响应消息都走 Mailbox 太慢了。

zhenyi 提供了 Fast Path:Gate Actor 可以声明 IToClientFastPath 接口,响应消息跳过 Mailbox,直接发送:

// 声明线程安全
type GateActor struct { ... }
func (g *GateActor) HandleToClientFastPath(msg *zmsg.Message) bool {
    // 直接查 channel + Send,不走 Mailbox
    ch := g.GetChannel(msg.SessionId)
    if ch != nil {
        ch.Send(msg)
        return true
    }
    return false
}

为什么 Gate 可以这样做?

HandleToClientFastPath 只做两件事:查 channel 和调用 Send。查 channel 是读 map,Send 是往发送队列写。这两个操作都是线程安全的,不会破坏 Actor 的状态一致性。

非 Gate 的 Actor 不要这样做。 如果在 Fast Path 里修改了 Actor 的状态,就会打破单线程不变性,引发 data race。

1.6. 3.3.6 熔断保护

CallActor 内置了熔断器(Circuit Breaker)。熔断状态挂在当前发送方 Actor 实例上,按目标 actorId 分桶(getCircuitBreaker(actorId)),不是全进程共享一只熔断器。

func (a *Actor) CallActor(actorId uint64, ...) ziface.RpcReply {
    cb := a.getCircuitBreaker(actorId)
    if !cb.allow() {
        return RpcReply{Code: ErrorCode_RpcErr, Msg: "circuit breaker open"}
    }
    // ... 正常调用 ...
    if ok {
        cb.recordSuccess()
    } else {
        cb.recordFailure()
    }
}

连续失败多次后熔断器打开,后续请求直接快速失败,不再发起 RPC。一段时间后尝试半开,如果成功则关闭熔断器。

作用:防止雪崩。 当下游服务出问题时,快速失败比排队等待更好——否则大量超时请求会压垮本地的协程池。若多个发送方 Actor 并发调用同一目标,各自熔断计数不合并;需要全局限流时请在业务或网关层扩展。

1.7. 3.3.7 本节要点

  1. SendMsg:入目标邮箱,不等待业务完成。
  2. CallActor:依赖 Slot + rpcId(含 version) 匹配回复,防止复用串包;详见 sender.go
  3. 约束:在 mailbox 线程上直接 CallActor 会阻塞该 Actor 的投递处理,应 AsyncRunWithMsg(或等价)把阻塞点移到 worker。
  4. AsyncRunWithMsg:可选 validator 在异步前与回调前各验一次,避免状态已变仍提交副作用。
  5. Fast Path:实现 IToClientFastPath 且操作保持 无数据竞争 的前提下,可旁路邮箱直写连接发送路径(滥用会破坏单线程不变性)。
  6. 熔断CallActor 内按 (发送方 Actor, 目标 actorId) 分桶,细节见 3.5

3.4 节safeUpdate / SafeFn、热调参与排空关闭。

results matching ""

    No results matching ""