1. 3.2 Actor 模型基础

3.1 的动机之后,本节把 邮箱(Mailbox)、主循环 Run、路由与监督落到 zhenyizhenyi/zactor 类型与调用路径上;字段与分支以 base.gogroup.gohandlemsg.go 为准,文中代码块多为阅读指引

1.1. 3.2.1 Actor 的核心组成

一个 Actor 由三部分组成:

┌─────────────────────────────────────┐
│              Mailbox                │  ← 消息队列
│  [msg3] [msg2] [msg1]              │
├─────────────────────────────────────┤
│           处理逻辑                   │  ← 串行处理消息
│  for msg := range mailbox {         │
│      handleMessage(msg)             │
│  }                                  │
├─────────────────────────────────────┤
│              状态                   │  ← 私有状态,不需要锁
│  userCount, roomList, ...           │
└─────────────────────────────────────┘
  • Mailbox:消息队列,其他 Actor 通过 Mailbox 投递消息
  • 处理逻辑:从 Mailbox 中取消息,串行处理
  • 状态:只被这一个 Actor 访问,天然线程安全

1.2. 3.2.2 zhenyi 的 Actor 实现

zhenyi 的 Actor 结构体包含这些关键组件:

// 主干字段节选;另含 circuitBreakers、iActor、tickFns、logger、ctx/cancel 等
type Actor struct {
    mailBoxQueue  *zqueue.UnboundedMPSC[zmodel.ActorCmd]  // 邮箱(无界 MPSC)
    handle        *HandleRegistry                          // 客户端 msgId → 处理函数
    dispatcher    *Dispatcher                              // 进程内 Dispatch 管线
    workerPool    *ants.PoolWithFunc                       // AsyncRun / AsyncRunWithMsg
    batcher       *zbatch.FastAdaptiveBatcher              // Run 内批量 dequeue 尺寸
    // ...
}

(完整定义见 zhenyi/zactor/base.go。)

1.2.1. 邮箱:UnboundedMPSC 队列

邮箱是一个无界多生产者单消费者队列

  • 多生产者:多个 Actor 都能往这个 Actor 的邮箱投递消息
  • 单消费者:只有这个 Actor 自己从邮箱取消息处理
  • 无界:不会因为邮箱满而丢消息(但也意味着要控制生产速度)
// 其他 Actor 投递消息
actor.Push(zmodel.ActorCmd{
    Type: zmodel.CmdTypeMsg,
    Msg:  msgData,
})

// Actor 自己从邮箱取消息处理(Run 方法中)
n := a.mailBoxQueue.DequeueBatch(msgs[:batchSize])

1.2.2. 协程池:异步任务

Actor 的消息是串行处理的,但有些操作可以并行。比如发 RPC 调用、访问数据库,这些不需要串行。

zhenyi 给每个 Actor 配了一个协程池(基于 ants):

// 创建 Actor 时初始化协程池
poolSize := int(actorConfig.WorkSize)  // 默认值由 FrameworkTuning 配置
workerPool, _ := ants.NewPoolWithFunc(poolSize, func(arg interface{}) {
    // 异步处理任务
}, ants.WithPreAlloc(true))

这样 Actor 可以在处理消息时,把耗时操作丢给协程池异步执行,不阻塞主循环。

1.3. 3.2.3 消息处理:Run 方法

Run 是 Actor 的主循环。下面为结构示意:真实实现还会在首批前监听 closeCh、用 batcher.GetBatchSize(int64(lastBatchSize)) 取批量、批内按间隔刷新时间戳、记录慢批日志等,见 zhenyi/zactor/base.goRun

func (a *Actor) Run(ctx context.Context) {
    for {
        // 1. 检查关闭信号
        select {
        case <-ctx.Done():
            shouldExit = true  // 收到关闭信号,排空邮箱后退出
        default:
        }

        // 2. 批量取消息
        n := a.mailBoxQueue.DequeueBatch(msgs[:batchSize])

        // 3. 排空检测:收到关闭信号且邮箱为空,退出
        if shouldExit && n == 0 {
            return  // 邮箱排空,优雅退出
        }

        // 4. 空闲时自旋退避
        if n == 0 {
            idleCount++
            backoff.Backoff(idleCount, 10, 30, time.Microsecond)
            continue
        }

        // 5. 批量处理消息
        for i := 0; i < n; i++ {
            a.SafeHandleMessage(ctx, msgs[i], batchStartMs)
            msgs[i].Release() // ActorCmd.Release
        }
    }
}

这里有三个值得注意的点:

1.3.1. 排空退出

收到关闭信号后,不是立即退出,而是继续处理邮箱中的消息,直到邮箱为空。这保证了消息不丢失。

正常关闭流程:

1. ctx.Done() 触发 → shouldExit = true
2. 继续处理剩余消息
3. n == 0 → 邮箱空了 → return

1.3.2. 自适应批处理

batcher.GetBatchSize() 会根据上次的处理耗时动态调整批量大小:

  • 处理快 → 增大批量(减少循环次数)
  • 处理慢 → 减小批量(减少单次耗时)

这样在低负载和高负载下都能有较好的表现。

1.3.3. 空闲退避

邮箱为空时不会死循环空转,而是用指数退避等待:

idleCount++
backoff.Backoff(idleCount, 10, 30, time.Microsecond)
// idle=1: 10μs, idle=2: 20μs, ..., idle=30: 300μs(封顶)

有消息来了立刻唤醒,不会影响响应速度。

1.4. 3.2.4 消息分发:HandleRegistry 与 Dispatcher

客户端上行消息(HandleClientMessage 路径)由 HandleRegistrymsgId 注册:

// 注册客户端消息处理函数(在 Actor 初始化阶段)
actor.GetHandleMgr().RegisterHandle(MsgLogin, onLogin)
actor.GetHandleMgr().RegisterHandle(MsgChat, onChat)

进程内 Dispatcher 负责另一类分发(Dispatch 路径,handler 返回 ziface.IMessage 等),注册接口为:

actor.GetDispatcher().Register(MsgInternal, onInternal)
// 或 Init 阶段批量:GetDispatcher().RegisterBatch(map[int32]ziface.MsgHandlerFunc{...})

两者并存:前者对应客户端协议号路由,后者对应 Actor 内部消息处理管线;不要与 RegisterHandle 混名(源码中不存在 actor.RegisterHandler)。

1.5. 3.2.5 监督:Group 管理 Actor 的生命周期

Actor 不是孤立运行的,它属于一个 Group。Group 负责管理所有 Actor 的生命周期,包括启动、监控和异常重启。

// Group 启动时,为每个 Actor 启动一个监督 goroutine(示意)
func (g *Group) StartWorkers(ctx context.Context) {
    g.actors.Range(func(actorId uint64, item *ActorItem) bool {
        g.wg.Add(1)
        go g.superviseActor(ctx, item.IActor)
        return true
    })
}

1.5.1. 监督策略:简化版

zhenyi 的监督不是 Erlang 那样的完整监督树,而是简化版:

  1. Group 监督每个 Actor
  2. Actor 异常退出后自动重启
  3. 指数退避:1→2→4→8→16→32 秒(上限 30s)
  4. 重启窗口:若距离上一次异常退出已超过 30s,则重启计数清零;在窗口内累计超过 maxRestarts(默认 3)则永久停止
  5. 超限后永久停止,打印错误日志

监督循环的要点:Run 返回后ctx 未取消,则在 30s 窗口 内累计重启次数,超限则永久停止;否则按 1s 起指数退避(封顶 30s) 休眠再启。完整顺序、日志与 MaxRestarts 配置见 zhenyi/zactor/group.gosuperviseActor

为什么是简化版而不是完整监督树?

完整监督树(Erlang 风格)的复杂度很高,每种 Actor 可以有不同的重启策略,Actor 之间可以有父子关系。zhenyi 只有几十个 Actor,Group 统一管理就够了,没必要引入额外的复杂度。

1.6. 3.2.6 定时任务:TickFn

有些业务需要定期执行,比如:

  • 每 5 秒清理超时连接
  • 每 30 秒同步一次数据

zhenyi 用 TickFn 实现:

// 注册定时任务
actor.RegisterTickFn("clean_timeout", 5*time.Second, func(ctx context.Context, nowTs int64) {
    // 清理超时连接
})

Group.tick 使用约 每秒 30 次Tickertime.Second/30),再按各 Actor 注册的间隔决定是否投递 Tick;未注册 TickFn 的 Actor 不参与 tick 分发逻辑。

1.7. 3.2.7 本节要点

  1. 三要素:Mailbox + Run 上单线程顺序处理 + 仅在该路径上无锁演进的私有状态。
  2. 邮箱UnboundedMPSC无界意为队列本身不因「满」而拒收,仍须在业务上控制生产速率,避免内存膨胀。
  3. Runctx.Done() / closeCh → 排空后退出;batcher + 空闲退避;批内 SafeHandleMessage + ActorCmd.Release
  4. 路由HandleRegistry(客户端消息)与 DispatcherDispatch 管线)职责分离。
  5. GroupsuperviseActor 重启策略与 3.2.5 描述一致,以 group.go 为准。
  6. TickFn:由 Group tick 驱动,按间隔经邮箱下达。

3.3 节SendMsgCallActor、Slot 与 AsyncRunWithMsg

results matching ""

    No results matching ""