1. 2.5 粘包拆包

本节把 2.2–2.4 收口为一条读路径:Read 入环 → 循环解析至无完整帧 → 业务处理,并说明与 ParseData 复用、扩容与超时相关的边界行为(以 zhenyi-base/znet 与 channel 读循环源码为准)。

1.1. 2.5.1 粘包的本质

TCP 是流式协议,没有消息边界。调用一次 Write 发出的数据,对方可能一次 Read 就全部收到,也可能分好几次才收完。

客户端发送三条消息:         服务端可能收到:

[HeaderA|BodyA]             [HeaderA|BodyA|HeaderB|BodyB|HeaderC|BodyC]
[HeaderB|BodyB]    →        或者:
[HeaderC|BodyC]             [HeaderA|BodyA|HeaderB]
                            [BodyB|HeaderC|BodyC]

这就是粘包。反过来,如果一条消息很大,一次 Read 只收到了一部分,就是拆包。

粘包和拆包是同一个问题的两面:数据流的切分点和消息边界不一致。

1.2. 2.5.2 解决思路:固定头声明长度

2.2 已给出协议格式——固定头中含 dataLen(v0 头 12 字节、v1 为 13 字节,以 HeaderLen() 为准),用于声明 body 长度。

粘包拆包的解法就是:

  1. 读到完整头 → 得知 body 长度
  2. 累积数据直到够一条完整消息 → 解析
  3. 剩余数据留给下一条消息

看起来简单,但有两个关键问题要处理:

问题 说明
数据不够一条消息 一次 Read 可能只包含部分头或部分体,需留在环中等待补全
一次 Read 包含多条消息 数据可能够解析出多条消息,要循环处理

1.3. 2.5.3 zhenyi 的实现:read 循环 + RingBuffer + 状态机

zhenyi 把粘包拆包的实现拆成三层,各司其职:

conn.Read() → RingBuffer(累积数据)→ BaseSocket.ParseFromRingBuffer(状态机解析)

1.3.1. 第一步:read 循环,数据写入 RingBuffer

func (c *BaseChannel) read() bool {
    // 1. 从网络读到 RingBuffer(零拷贝,直接写入内部内存)
    nRead, err := c.readBuffer.WriteFromReader(c.conn, 0)
    if nRead > 0 {
        c.resetReadDeadline()  // 刷新心跳计时
    }
    // ... 错误处理 ...

    // 2. 循环解析所有完整消息
    for {
        c.parseData.ResetForReuse()
        parsed, parseErr := c.socketParser.ParseFromRingBuffer(c.readBuffer, &c.parseData)
        if !parsed {
            break  // 数据不够,等下次 Read
        }
        c.handleParsedMessage()  // 处理一条消息
    }

    return false
}

这里有一个关键点:不是读一次解析一次,而是循环解析到没有完整消息为止

如果一次 Read 收到了 3 条完整消息,这个 for 循环会解析 3 次。如果只收到了半个头,ParseFromRingBuffer 返回 false,循环 break,等下次 Read 补充数据。

1.3.2. 第二步:RingBuffer 累积数据

RingBuffer 在此充当未解析字节队列(写入端追加、解析成功后前移读侧):

  • 每次 Read 追加数据到 write 指针
  • 解析成功后移动 read 指针,丢弃已处理的数据
  • 未解析完的数据留在原地,等下次 Read 补充
Read 1 后:  [HeaderA|BodyA|HeaderB|..半截..]
                              ↑ write
             ↑ read

解析出消息A后:[..半截HeaderB..]
                              ↑ write
                        ↑ read  (read 指针移动,丢弃已处理数据)

Read 2 后:  [..半截HeaderB..|BodyB|HeaderC|BodyC]
                                            ↑ write
                        ↑ read

解析出消息B、C后:[](缓冲区清空)
                              ↑ write
                              ↑ read

1.3.3. 第三步:BaseSocket 状态机解析

以下为与源码同构的阅读示意,返回值、错误类型与 PeekHeader12 等辅助方法名以 socket.go 为准:

func (base *BaseSocket) ParseFromRingBuffer(rb *RingBuffer, parseData *ParseData) (bool, error) {
    // 状态1:等头(未读长度 < HeaderLen() 则返回 false)
    if rb.Len() < headerLen {
        return false, nil
    }

    // 状态2:读头
    msgId, seqId, dataLen := rb.PeekHeader12(0)

    // 状态3:校验
    if msgId > maxMsgId { return false, ErrInvalidMsgId }
    if dataLen > maxDataLen { return false, ErrInvalidDataLen }

    // 状态4:等体(不够 headerLen + dataLen 就返回 false)
    if rb.Len() < headerLen + dataLen {
        return false, nil
    }

    // 状态5:零拷贝读取 body
    first, second, _ := rb.PeekTwoSlices(headerLen, dataLen)
    // 如果跨边界,从池中取 buffer 拷贝一次
    // 如果连续,直接引用

    // 状态6:丢弃已解析的数据
    rb.Discard(headerLen + dataLen)

    return true, nil
}

核心逻辑:数据不够就返回 false,不消费任何数据;数据够了就解析并丢弃。

Peek 不移动指针,Discard 才移动指针。这样在"数据不够"的情况下,RingBuffer 的状态完全没有变化,下次 Read 追加数据后重新解析即可。

1.4. 2.5.4 复用 ParseData

每条消息都要解析一次,如果每次都创建新的 ParseData 对象,就是无谓的内存分配。

zhenyi 的做法是复用

// 创建连接时初始化,整个连接生命周期只用这一个
parseData    ParseData
parseMsg     NetMessage

// 每次解析前重置,而不是重新创建
c.parseData.ResetForReuse()
parsed, parseErr := c.socketParser.ParseFromRingBuffer(c.readBuffer, &c.parseData)

ResetForReuse 只重置状态,不分配内存。

1.5. 2.5.5 异常处理

粘包拆包的异常场景也需要处理:

1.5.1. 超大包

if dataLen > maxDataLen {  // 默认 1MB
    return false, ErrInvalidDataLen
}

单包超过 1MB 直接断开连接,防止恶意客户端发送超大包撑爆内存。

1.5.2. RingBuffer 满了

if err == ErrBufferFull {
    // 先尝试扩容
    if c.readBuffer.Grow(65536) {
        // 扩容成功,继续读
    } else {
        // 已达 MaxSize(1MB),靠解析循环消费腾出空间
        // 如果解析也消化不了,最终会触发单包超限断开
    }
}

RingBuffer 满了不会立即断开连接,而是先尝试扩容。扩容也失败就靠解析消费腾空间,给业务一个处理的机会。

1.5.3. 心跳超时

if netErr.Timeout() {
    // 读超时 = 没收到任何数据 = 客户端可能已断开
    return true  // 退出读循环,关闭连接
}

通过 SetReadDeadline 设置读超时,每次收到数据刷新。超时说明连接空闲太久,主动关闭。

1.6. 2.5.6 本节要点

  1. 粘包/拆包:TCP 无消息边界;定界依赖协议头中的长度与状态机。
  2. 路径WriteFromReader(或等价读入)→ 环上未读区间 → ParseFromRingBuffer 直至无完整帧。
  3. 循环解析:单次网络读可能对应多条业务消息,需在内层 for 中耗尽可读完整帧。
  4. 复用ParseData(及关联 NetMessage)在连接生命周期内重置复用,减少分配。
  5. 边界dataLen 上限、Grow/ErrBufferFull 分支、读超时关闭等以配置与 channel 实现为准。

第二章(网络层:协议、缓冲、多协议、定界)到此收束。第三章进入 Actor 侧邮箱与串行处理(与第一章模型衔接)。

results matching ""

    No results matching ""