1. 3.2 Actor 模型基础
在 3.1 的动机之后,本节把 邮箱(Mailbox)、主循环 Run、路由与监督落到 zhenyi 的 zhenyi/zactor 类型与调用路径上;字段与分支以 base.go、group.go、handlemsg.go 为准,文中代码块多为阅读指引。
1.1. 3.2.1 Actor 的核心组成
一个 Actor 由三部分组成:
┌─────────────────────────────────────┐
│ Mailbox │ ← 消息队列
│ [msg3] [msg2] [msg1] │
├─────────────────────────────────────┤
│ 处理逻辑 │ ← 串行处理消息
│ for msg := range mailbox { │
│ handleMessage(msg) │
│ } │
├─────────────────────────────────────┤
│ 状态 │ ← 私有状态,不需要锁
│ userCount, roomList, ... │
└─────────────────────────────────────┘
- Mailbox:消息队列,其他 Actor 通过 Mailbox 投递消息
- 处理逻辑:从 Mailbox 中取消息,串行处理
- 状态:只被这一个 Actor 访问,天然线程安全
1.2. 3.2.2 zhenyi 的 Actor 实现
zhenyi 的 Actor 结构体包含这些关键组件:
// 主干字段节选;另含 circuitBreakers、iActor、tickFns、logger、ctx/cancel 等
type Actor struct {
mailBoxQueue *zqueue.UnboundedMPSC[zmodel.ActorCmd] // 邮箱(无界 MPSC)
handle *HandleRegistry // 客户端 msgId → 处理函数
dispatcher *Dispatcher // 进程内 Dispatch 管线
workerPool *ants.PoolWithFunc // AsyncRun / AsyncRunWithMsg
batcher *zbatch.FastAdaptiveBatcher // Run 内批量 dequeue 尺寸
// ...
}
(完整定义见 zhenyi/zactor/base.go。)
1.2.1. 邮箱:UnboundedMPSC 队列
邮箱是一个无界多生产者单消费者队列:
- 多生产者:多个 Actor 都能往这个 Actor 的邮箱投递消息
- 单消费者:只有这个 Actor 自己从邮箱取消息处理
- 无界:不会因为邮箱满而丢消息(但也意味着要控制生产速度)
// 其他 Actor 投递消息
actor.Push(zmodel.ActorCmd{
Type: zmodel.CmdTypeMsg,
Msg: msgData,
})
// Actor 自己从邮箱取消息处理(Run 方法中)
n := a.mailBoxQueue.DequeueBatch(msgs[:batchSize])
1.2.2. 协程池:异步任务
Actor 的消息是串行处理的,但有些操作可以并行。比如发 RPC 调用、访问数据库,这些不需要串行。
zhenyi 给每个 Actor 配了一个协程池(基于 ants):
// 创建 Actor 时初始化协程池
poolSize := int(actorConfig.WorkSize) // 默认值由 FrameworkTuning 配置
workerPool, _ := ants.NewPoolWithFunc(poolSize, func(arg interface{}) {
// 异步处理任务
}, ants.WithPreAlloc(true))
这样 Actor 可以在处理消息时,把耗时操作丢给协程池异步执行,不阻塞主循环。
1.3. 3.2.3 消息处理:Run 方法
Run 是 Actor 的主循环。下面为结构示意:真实实现还会在首批前监听 closeCh、用 batcher.GetBatchSize(int64(lastBatchSize)) 取批量、批内按间隔刷新时间戳、记录慢批日志等,见 zhenyi/zactor/base.go 中 Run。
func (a *Actor) Run(ctx context.Context) {
for {
// 1. 检查关闭信号
select {
case <-ctx.Done():
shouldExit = true // 收到关闭信号,排空邮箱后退出
default:
}
// 2. 批量取消息
n := a.mailBoxQueue.DequeueBatch(msgs[:batchSize])
// 3. 排空检测:收到关闭信号且邮箱为空,退出
if shouldExit && n == 0 {
return // 邮箱排空,优雅退出
}
// 4. 空闲时自旋退避
if n == 0 {
idleCount++
backoff.Backoff(idleCount, 10, 30, time.Microsecond)
continue
}
// 5. 批量处理消息
for i := 0; i < n; i++ {
a.SafeHandleMessage(ctx, msgs[i], batchStartMs)
msgs[i].Release() // ActorCmd.Release
}
}
}
这里有三个值得注意的点:
1.3.1. 排空退出
收到关闭信号后,不是立即退出,而是继续处理邮箱中的消息,直到邮箱为空。这保证了消息不丢失。
正常关闭流程:
1. ctx.Done() 触发 → shouldExit = true
2. 继续处理剩余消息
3. n == 0 → 邮箱空了 → return
1.3.2. 自适应批处理
batcher.GetBatchSize() 会根据上次的处理耗时动态调整批量大小:
- 处理快 → 增大批量(减少循环次数)
- 处理慢 → 减小批量(减少单次耗时)
这样在低负载和高负载下都能有较好的表现。
1.3.3. 空闲退避
邮箱为空时不会死循环空转,而是用指数退避等待:
idleCount++
backoff.Backoff(idleCount, 10, 30, time.Microsecond)
// idle=1: 10μs, idle=2: 20μs, ..., idle=30: 300μs(封顶)
有消息来了立刻唤醒,不会影响响应速度。
1.4. 3.2.4 消息分发:HandleRegistry 与 Dispatcher
客户端上行消息(HandleClientMessage 路径)由 HandleRegistry 按 msgId 注册:
// 注册客户端消息处理函数(在 Actor 初始化阶段)
actor.GetHandleMgr().RegisterHandle(MsgLogin, onLogin)
actor.GetHandleMgr().RegisterHandle(MsgChat, onChat)
进程内 Dispatcher 负责另一类分发(Dispatch 路径,handler 返回 ziface.IMessage 等),注册接口为:
actor.GetDispatcher().Register(MsgInternal, onInternal)
// 或 Init 阶段批量:GetDispatcher().RegisterBatch(map[int32]ziface.MsgHandlerFunc{...})
两者并存:前者对应客户端协议号路由,后者对应 Actor 内部消息处理管线;不要与 RegisterHandle 混名(源码中不存在 actor.RegisterHandler)。
1.5. 3.2.5 监督:Group 管理 Actor 的生命周期
Actor 不是孤立运行的,它属于一个 Group。Group 负责管理所有 Actor 的生命周期,包括启动、监控和异常重启。
// Group 启动时,为每个 Actor 启动一个监督 goroutine(示意)
func (g *Group) StartWorkers(ctx context.Context) {
g.actors.Range(func(actorId uint64, item *ActorItem) bool {
g.wg.Add(1)
go g.superviseActor(ctx, item.IActor)
return true
})
}
1.5.1. 监督策略:简化版
zhenyi 的监督不是 Erlang 那样的完整监督树,而是简化版:
- Group 监督每个 Actor
- Actor 异常退出后自动重启
- 指数退避:1→2→4→8→16→32 秒(上限 30s)
- 重启窗口:若距离上一次异常退出已超过 30s,则重启计数清零;在窗口内累计超过
maxRestarts(默认 3)则永久停止 - 超限后永久停止,打印错误日志
监督循环的要点:Run 返回后若 ctx 未取消,则在 30s 窗口 内累计重启次数,超限则永久停止;否则按 1s 起指数退避(封顶 30s) 休眠再启。完整顺序、日志与 MaxRestarts 配置见 zhenyi/zactor/group.go 的 superviseActor。
为什么是简化版而不是完整监督树?
完整监督树(Erlang 风格)的复杂度很高,每种 Actor 可以有不同的重启策略,Actor 之间可以有父子关系。zhenyi 只有几十个 Actor,Group 统一管理就够了,没必要引入额外的复杂度。
1.6. 3.2.6 定时任务:TickFn
有些业务需要定期执行,比如:
- 每 5 秒清理超时连接
- 每 30 秒同步一次数据
zhenyi 用 TickFn 实现:
// 注册定时任务
actor.RegisterTickFn("clean_timeout", 5*time.Second, func(ctx context.Context, nowTs int64) {
// 清理超时连接
})
Group.tick 使用约 每秒 30 次 的 Ticker(time.Second/30),再按各 Actor 注册的间隔决定是否投递 Tick;未注册 TickFn 的 Actor 不参与 tick 分发逻辑。
1.7. 3.2.7 本节要点
- 三要素:Mailbox +
Run上单线程顺序处理 + 仅在该路径上无锁演进的私有状态。 - 邮箱:
UnboundedMPSC;无界意为队列本身不因「满」而拒收,仍须在业务上控制生产速率,避免内存膨胀。 Run:ctx.Done()/closeCh→ 排空后退出;batcher+ 空闲退避;批内SafeHandleMessage+ActorCmd.Release。- 路由:
HandleRegistry(客户端消息)与Dispatcher(Dispatch管线)职责分离。 - Group:
superviseActor重启策略与 3.2.5 描述一致,以group.go为准。 - TickFn:由 Group tick 驱动,按间隔经邮箱下达。
3.3 节:SendMsg、CallActor、Slot 与 AsyncRunWithMsg。