1. 8.1 自定义协议与扩展开发

第八章聚焦「换实现、不动热路径分叉」:8.1 协议、路由、发现、总线、限流等扩展点;8.2 参与方式与协作习惯。第 7 章 IM 示例可当作扩展点的消费端对照;观测与线上排障回看第 6 章(指标、追踪、部署)。

前七章讲了 zhenyi 的核心用法。本节说明如何在遵守接口契约的前提下替换默认实现,而不是在框架内部复制粘贴改逻辑。

1.1. 8.1.1 zhenyi 的扩展点体系

zhenyi 不是封闭的框架,它把核心接口暴露出来,你可以替换任意一个零件:

网络层(TCP/WS/KCP)
    ↓ 替换:实现 IServer 接口
↓ IWireMessage 接口(zhenyi-base/ziface)
消息解析(固定头协议)
    ↓ 替换:自定义线协议需落入 `IWireMessage` 契约(或与 `BaseSocket` 解析链路对接)
↓ Actor 侧 ISendMsg / Push / SendMsg 等发送路径
业务发送与路由
    ↓ 一般通过实现 `ziface` 契约、替换 Gate 的 `LocalRouter` / 远程策略等扩展(而非单一 `IActorMsgSender` 接口名)
↓ zbus.TopicBus 接口
跨进程通信(NATS)
    ↓ 替换:实现 TopicBus 接口
↓ Discoverer 接口(`github.com/aiyang-zh/zhenyi/ziface`,非 base 包)
服务发现(Etcd)
    ↓ 替换:实现 Discoverer 接口
↓ RemoteRouteStrategy 接口(`zhenyi/zroute`)
远程路由策略(内置 RendezvousHash 等)
    ↓ 替换:实现 RemoteRouteStrategy,经 `zgate.Server.SetRemoteRouteStrategy` 注入
↓ ILimit 接口(`zhenyi-base/ziface`,实现多在 `zhenyi-base/zlimiter`)
连接级限流(如令牌桶)
    ↓ 替换:实现 ILimit,经 `IChannel.SetLimit` 安装

1.1.1. 依赖边界:zhenyi-base 与 zhenyi

仓库分成两层,扩展时先认接口落在哪一层,免得新代码违反模块依赖或许可边界(商业合订仓若另有划分,以该仓 AGENTS.md 为准)。

zhenyi-base(基础层) 不依赖 zhenyi 或其他业务仓库。与自定义协议、连接、热路径相关的扩展点多在这里:ziface(如 IWireMessageIChannelILimit)、znetBaseServer/ServerHandlers.OnReadNetMessage)、ztcpzserializezlimiter 等。如果你只想换「怎么收字节、怎么限流、池化 buffer」,通常只需动 base 侧或实现 base 已声明的接口。

zhenyi(框架层) 只依赖 zhenyi-base。Actor、网关、跨进程总线、发现、与业务 Actor 协作的契约多在这里:zmsg.Message 线协议信封、zgatezactor/zstreamzroute.RemoteRouteStrategyzbus.TopicBusziface.Discoverer(注意:此处 github.com/aiyang-zh/zhenyi/zifacezhenyi-base/ziface 是两个包,同名 ziface 导入时按路径区分)、zdiscoveryzstartup.App 等。换「发现后端、总线实现、Gate 远端选址」一般在 zhenyi 包路径下做实现,再在启动代码里注入。

实操核对:纯 base 扩展可以让你的模块 require 只指向 zhenyi-base;一旦引用 zgatezactorzmsg 等,就必须依赖整条 zhenyi 模块。书稿示例里 IM 与监控多在 zhenyi 层拼起来,第 7 章即典型消费方式。

每个接口在仓库里都有默认实现。你可以替换其中之一或若干,尽量避免 fork 后改热路径分叉。

1.2. 8.1.2 消息格式的扩展

1.2.1. 理解现有的消息格式

线协议信封定义在 zhenyi/zmsgFixedHeaderSize = 65,与 Message.MarshalTo 写入顺序一致(小端),无额外 padding。头部之后是 4 字节 Data 长度 + payload

固定头 65 字节:
flags(1) + msgId(4) + srcActor(8) + tarActor(8) + sessionId(8) + rpcId(8)
       + seqId(4) + traceIdHi(8) + traceIdLo(8) + spanId(8)

flags 打包 ToClient / FromClient / IsResponse 三个 bool,其余 bit 预留。

信封字段与业务负载分离:业务 JSON、protobuf 等放在 Data,不必改头 65 字节布局即可演进负载格式。

1.2.2. 扩展线协议与 IWireMessage

网络层解析产物对应 github.com/aiyang-zh/zhenyi-base/ziface.IWireMessage(如 GetMsgId/SetMsgIdGetSeqId/SetSeqIdGetMessageData/SetMessageDataReset)。常用具体类型包括 zhenyi-base/znet.NetMessage 等,与 BaseServer/BaseChannel 的读路径衔接。

若业务要换包头/编解码,通常需要改 zhenyi-base/znet 的打包/解包路径(与 BaseSocket、各协议 Server 配套),而不是在书里再定义一套与仓库不一致的「IWMessage」接口名。

示意(风格对齐 ServerHandlers):

ztcp.NewServer(addr, znet.ServerHandlers{
    OnAccept: func(ziface.IChannel) bool { return true },
    OnRead: func(ch ziface.IChannel, raw ziface.IWireMessage) {
        // raw:已解析出的线消息;再组装为 zmsg.Message 投递 Actor
    },
})

1.2.3. 在 Message 中携带业务数据

大多数场景不必更换 65 字节头:保持现有拆包链路,仅在 zmsg.Message.Data(或等价的线消息 data 区)里放 JSON、protobuf、自定义二进制即可;IM 示例里即在对 msg.Datazserialize.UnmarshalJson 再业务处理。

若在 znet 读路径自定义解析,令 OnRead 收到你实现的 IWireMessage,后续仍须进入框架已约定的 *zmsg.Message 投递(参见 zgate/zstream 对上游消息的封装),而不是在业务里另写一套与 Actor 邮箱解耦的 Push 伪代码。

推荐用 JSON 还是二进制?

格式 开发效率 体积 序列化速度
JSON
protobuf
手写二进制 最小 最快

zhenyi 的 Message 用手写二进制(65 字节头)。业务数据可以用 JSON 或 protobuf,框架不限制。

1.3. 8.1.3 路由策略的扩展

除内置策略外,可实现 github.com/aiyang-zh/zhenyi/zroute.RemoteRouteStrategy(定义见 remote_strategy.go):

type RemoteRouteStrategy interface {
    PickOne(msg *zmsg.Message, candidates []zmodel.ActorConfig) int
}

另外,Gate 远程路由默认会优先使用 ziface.IGroupRemoteRouteTableView 的只读候选视图(LookupOtherActorConfigsByMsgIDView)以避免热路径分配;如果你的 Group 有自定义实现,建议同时实现该可选接口。

1.3.1. 实现一个基于权重的策略

假设不同进程的 Actor 性能不同,需要按权重分配流量:

type WeightedStrategy struct {
    weights map[uint64]int  // actorId -> weight
}

func (s *WeightedStrategy) PickOne(msg *zmsg.Message, candidates []zmodel.ActorConfig) int {
    if len(candidates) == 0 {
        return -1
    }

    // 按权重计算分布概率
    total := 0
    for _, c := range candidates {
        total += s.weights[c.Id]
    }

    // 简单加权随机(生产实现须保证每个 candidate 权重 > 0,且 total > 0)
    r := rand.Intn(total)
    cumulative := 0
    for i, c := range candidates {
        cumulative += s.weights[c.Id]
        if r < cumulative {
            return i
        }
    }

    return 0
}

1.3.2. 实现一个基于地理位置的策略

如果你的服务按地域部署,可以用地理位置做路由:

type GeoStrategy struct{}

func (s *GeoStrategy) PickOne(msg *zmsg.Message, candidates []zmodel.ActorConfig) int {
    if len(candidates) == 0 {
        return -1
    }
    // 从消息中提取客户端 IP 或经纬度
    clientRegion := extractRegion(msg)

    // 线性扫描找最近节点,返回其下标(避免排序与分配)
    bestIdx := 0
    best := geoDistance(clientRegion, candidates[0])
    for i := 1; i < len(candidates); i++ {
        d := geoDistance(clientRegion, candidates[i])
        if d < best {
            best = d
            bestIdx = i
        }
    }
    return bestIdx
}

注入到 Gate:

gate.SetRemoteRouteStrategy(&WeightedStrategy{weights: weights})

1.4. 8.1.4 服务发现的扩展

如果你不用 Etcd,可实现 github.com/aiyang-zh/zhenyi/ziface.Discovererzhenyi/ziface/idiscovery.go):

type Discoverer interface {
    FindRandom(key string) zmodel.ActorConfig
    FindPoll(key string) zmodel.ActorConfig
    FindMod(actorType uint32, userId uint64) zmodel.ActorConfig
    Unregister(c zmodel.ActorConfig) error
    Register(c zmodel.ActorConfig) error
    Watch() chan zmodel.ActorConfig
    FindAllByPrefix(key string) []zmodel.ActorServerRegister
}

zdiscovery.EtcdDiscovery 另有 CloseAll() error 用于租约与后台任务清理;该方法不在 Discoverer 接口上,类型断言或直接使用具体实现时再调用。

1.4.1. 用 Consul 替换 Etcd

// 以下为结构示意;真实实现须补全 Discoverer 全部方法,并与 zmodel.ActorConfig 字段语义对齐。

type ConsulDiscovery struct {
    client *consul.Client
    ch     chan zmodel.ActorConfig
}

func (c *ConsulDiscovery) Register(config zmodel.ActorConfig) error {
    key := fmt.Sprintf("/services/%d/%d", config.ActorType, config.Id)
    reg := &api.AgentServiceRegistration{
        ID:   key,
        Name: "zhenyi",
        Tags: []string{strconv.Itoa(int(config.ActorType))},
        // Address、Port:从 config.Addr 解析或与 Consul 侧约定一致后填入
    }
    return c.client.Agent().ServiceRegister(reg)
}

func (c *ConsulDiscovery) Watch() chan zmodel.ActorConfig {
    return c.ch
}

// 注入(与 zstartup.App 一致时多为:app.Group.SetDiscoverer(consulDiscovery))

Actor 注册、路由查询的代码不需要改,只需要换一个 Discoverer 实现。

1.5. 8.1.5 跨进程总线的扩展

zbus.TopicBus 接口可以替换 NATS:

type TopicBus interface {
    Broadcast(topic string, data []byte) error
    Subscribe(topic string, handler zbus.Handler) (Subscription, error)
}

1.5.1. 用 Redis Pub/Sub 替换 NATS

type RedisBus struct {
    client *redis.Client
}

func (b *RedisBus) Broadcast(topic string, data []byte) error {
    return b.client.Publish(topic, data).Err()
}

func (b *RedisBus) Subscribe(topic string, handler zbus.Handler) (zbus.Subscription, error) {
    sub := b.client.Subscribe(topic)
    go func() {
        ch := sub.Channel()
        for msg := range ch {
            handler(msg.Channel, []byte(msg.Payload)) // zbus.Handler: func(topic string, data []byte)
        }
    }()
    return &redisSubscription{sub: sub}, nil
}

// 注入
zbus.DefaultBus = &RedisBus{client: redisClient}

注意:Redis Pub/Sub 没有 NATS 那样的消息持久化和可靠性,消息发送失败时需要业务层处理重试。

1.6. 8.1.6 限流器的扩展

连接级限流注入使用 ziface.ILimitAllow() bool),由 IChannel.SetLimit 安装;包路径为 zhenyi-base/zlimiter,默认实现为令牌桶(具体类型以 zlimiter 包为准)。

// 示意:自定义限流器实现 ILimit 后
channel.SetLimit(myLimit)

1.7. 8.1.7 扩展的最佳实践

1.7.1. 不要破坏原有流程

扩展点设计遵循"替换不修改"原则。不需要 fork zhenyi 代码,只需要在启动时注入你的实现:

// 替换前
group := zactor.NewGroup(process, false)

// 替换后
group := zactor.NewGroup(process, false)
group.SetDiscoverer(yourDiscovery)
zbus.DefaultBus = yourBus
gate.SetRemoteRouteStrategy(yourStrategy)

1.7.2. 接口粒度

总线、远程选址等抽象方法面较窄;Discoverer 等方法较多,实现前可先抄 zdiscovery.EtcdDiscovery 的分层结构,再替换存储后端。

1.7.3. 有问题先看现有实现

每个接口都有默认实现(Etcd、NATS、RendezvousHash)。实现新接口前先读默认实现的代码,会省很多时间。

1.8. 8.1.8 本节要点

  1. 分层:只动连接/线消息/限流等多在 zhenyi-base;发现、总线、Gate 选址、zmsg 信封等多在 zhenyi;两个 ziface 包路径不同,勿混用。
  2. 线协议信封以 zmsg.FixedHeaderSizeMessage 为准;业务负载走 Data
  3. zhenyi-base/ziface.IWireMessage + znet 读路径;跨进程候选视图可选实现 IGroupRemoteRouteTableView
  4. 远程选址:zroute.RemoteRouteStrategyzgate.Server.SetRemoteRouteStrategy 注入。
  5. 发现:zhenyi/ziface.Discoverer;总线:zbus.TopicBuszbus.DefaultBus;连接限流:ILimit + SetLimit
  6. 替换默认实现优先用启动期注入,避免 fork 热路径。

results matching ""

    No results matching ""