1. 5.1 服务发现集成(Etcd)
第四章已描述 Gate 如何用 otherActors 做远程候选;这些条目来自 ziface.Discoverer。默认实现为 zhenyi/zdiscovery.EtcdDiscovery:键 /servers/{actorType}/{actorId},值 zserialize.MarshalJson(ActorServerRegister),本地 atomic.Pointer + Copy-On-Write 缓存,并与 Group.watchActor 对接。
1.1. 5.1.1 为什么需要服务发现?
单机部署时,所有 Actor 都在同一进程,路由表直接存在内存里:
msgId → []Actor(本进程)
多进程部署时,进程 1 的 Gate 收到消息,但目标 Actor 在进程 2。进程 1 怎么知道进程 2 有哪些 Actor?
需要有一个中心化的注册表,所有进程把自己的信息注册上去,其他进程可以查询。
这就是服务发现。
1.2. 5.1.2 为什么选择 Etcd?
| 方案 | 一致性 | 运维复杂度 | 适用场景 |
|---|---|---|---|
| Etcd | 强一致(Raft) | 中 | 服务发现、配置中心 |
| ZooKeeper | 强一致(ZAB) | 高 | 老系统兼容 |
| Consul | 最终一致 | 中 | 服务网格 |
| Redis | 最终一致 | 低 | 缓存、简单配置 |
zhenyi 选择 Etcd 的原因:
- 强一致:服务发现需要准确知道哪些节点在线,不能读到过期数据
- Watch 机制:可以监听变化,实时感知节点上下线
- TTL 租约:节点崩溃时自动注销,避免僵尸节点
- Go 生态:etcd/clientv3 是 Go 写的,集成方便
1.3. 5.1.3 注册与发现的数据结构
1.3.1. 注册的数据
每个 Actor 启动时,向 Etcd 注册:
type ActorServerRegister struct {
Key string // /servers/{actorType}/{actorId}
ActorConfig ActorConfig // Actor 的完整配置
}
type ActorConfig struct {
Id uint64 // Actor 唯一 ID
ActorType uint32 // Actor 类型(如 IM、Match)
Index int32 // 同类型的索引(用于分片)
Process int32 // 进程 ID
SupportedMsgIDs []int32 // 该 Actor 能处理的 msgId 列表
// ... 其他配置
}
Key 格式:/servers/{actorType}/{actorId}
例如:/servers/1/5 表示 ActorType=1(IM)、Id=5 的 Actor。
1.3.2. 本地缓存
EtcdDiscovery 在内存中维护一份缓存:
type EtcdDiscovery struct {
cache atomic.Pointer[map[uint32][]ActorConfig] // actorType → []ActorConfig
// ...
}
缓存用 atomic.Pointer + Copy-On-Write,读路径无锁。
1.4. 5.1.4 注册流程
注册主路径:MarshalJson → 确保 sharedLeaseID(无则 createSharedLeaseLocked)→ Put with lease → registeredActors/cache 的 CoW 更新(见 Register 全函数)。
// 与源码同构的阅读锚点(省略错误处理与 lease 竞争细节)
key := fmt.Sprintf("/servers/%d/%d", c.ActorType, c.Id)
val, _ := zserialize.MarshalJson(&zmodel.ActorServerRegister{Key: key, ActorConfig: c})
_, _ = d.cli.Put(d.ctx, key, string(val), clientv3.WithLease(leaseID))
租约机制:
- 注册时绑定 10 秒租约
- 后台 goroutine 每几秒发送 KeepAlive 续约
- 如果进程崩溃,租约到期,Etcd 自动删除 key
这解决了"僵尸节点"问题——进程挂了,其他进程不会继续给它发消息。
1.5. 5.1.5 发现流程
1.5.1. 启动时加载
func (d *EtcdDiscovery) loadAllToCache() error {
resp, _ := d.cli.Get(ctx, "/servers/", clientv3.WithPrefix())
newCache := make(map[uint32][]zmodel.ActorConfig)
for _, kv := range resp.Kvs {
var item zmodel.ActorServerRegister
zserialize.UnmarshalJson(kv.Value, &item)
c := item.ActorConfig
newCache[c.ActorType] = append(newCache[c.ActorType], c)
}
d.cache.Store(&newCache)
}
启动时拉取所有 /servers/ 前缀的 key,构建本地缓存。
1.5.2. Watch 实时更新
Watch 循环要点(见 etcd.go):Watch 遇错或 channel 结束后 loadAllToCache + sleep 再建 Watch;退出时 close(d.ch),避免上层阻塞。handlePut/handleDelete 会维护 按 actorType 分桶的 cache 并向 Watch() 返回的 chan zmodel.ActorConfig 非阻塞投递(channel 满则丢通知,依赖后续全量一致)。
1.5.3. 查询接口
// 按类型查询(内部辅助;包外多用 FindAllByPrefix / FindMod 等)
func (d *EtcdDiscovery) findActorsByType(actorType uint32) []zmodel.ActorConfig {
p := d.cache.Load()
if p == nil {
return nil
}
return (*p)[actorType]
}
// 查询所有
func (d *EtcdDiscovery) findAllByPrefix(prefix string) []ActorServerRegister {
// ...
}
查询直接读本地缓存,零网络开销。
1.6. 5.1.6 租约保活与故障恢复
1.6.1. 保活机制
func (d *EtcdDiscovery) runLeaseKeepalive() {
for {
ch, _ := d.cli.KeepAlive(ctx, leaseID)
for ka := range ch {
if ka == nil {
// 保活 channel 关闭,需要重建租约
break
}
}
// 重建租约并重新注册所有 Actor
d.recreateLeaseAndReregister()
}
}
KeepAlive 是 Etcd 客户端自动发送的心跳。如果网络闪断导致 channel 关闭,自动重建租约并重新注册。
1.6.2. 重新注册
流程要点:持 leaseMu 建新租约 → 仅遍历当前 registeredActors 快照做 Put → Revoke 旧租约;序列化仍用 zserialize.MarshalJson。完整顺序与错误处理见 recreateLeaseAndReregister。
1.7. 5.1.7 进程退出清理
CloseAll:从 registeredActors 快照收集配置并逐个 Unregister,再 Revoke 共享租约,最后 cancel 根 context(保活/Watch 协程退出顺序以源码为准)。
1.8. 5.1.8 与 Group 的集成
Group 在启动时会注入 Discoverer:
group.SetDiscoverer(etcdDiscovery)
然后 Group 的 watchActor goroutine 监听变化:
func (g *Group) watchActor(ctx context.Context) {
// 单机模式或 discoverer 为空时直接返回(见 zhenyi/zactor/group.go)
// 启动时加载存量
items := g.discoverer.FindAllByPrefix("/servers")
for _, item := range items {
if uint64(item.ActorConfig.Process) == uint64(g.process) {
continue // 本进程实例不入 otherActors
}
g.otherActors.Store(item.ActorConfig.Id, item.ActorConfig)
}
g.otherActorsVersion.Add(1)
ch := g.discoverer.Watch()
for {
select {
case <-ctx.Done():
return
case cfg, ok := <-ch:
if !ok {
return
}
if cfg.Id <= 0 {
continue
}
if cfg.ActorType <= 0 {
g.otherActors.Delete(cfg.Id)
} else {
g.otherActors.Store(cfg.Id, cfg)
}
g.otherActorsVersion.Add(1)
}
}
}
otherActors 存储其他进程的 Actor 信息,用于远程路由。
1.9. 5.1.9 本节要点
- Discoverer 契约:
Register/Unregister/FindAllByPrefix/Watch;Etcd 实现位于zdiscovery/etcd.go。 - 键值:
/servers/%d/%d;负载为ActorServerRegister的 JSON(zserialize)。 - 共享租约:Grant(10) +
KeepAlive;异常时recreateLeaseAndReregister,且只重放仍存在于registeredActors的条目。 - 缓存:
cache与registeredActors均为atomic.Pointer+ 整表克隆写入。 - Group:
watchActor用FindAllByPrefix("/servers")预热otherActors,再消费Watch();ActorType<=0的变更表示删除语义(与handleDelete发往 channel 的占位 cfg 一致)。
5.2 节:RemoteRouteStrategy / HRW 打分与默认路由键。