1. 2.5 粘包拆包
本节把 2.2–2.4 收口为一条读路径:Read 入环 → 循环解析至无完整帧 → 业务处理,并说明与 ParseData 复用、扩容与超时相关的边界行为(以 zhenyi-base/znet 与 channel 读循环源码为准)。
1.1. 2.5.1 粘包的本质
TCP 是流式协议,没有消息边界。调用一次 Write 发出的数据,对方可能一次 Read 就全部收到,也可能分好几次才收完。
客户端发送三条消息: 服务端可能收到:
[HeaderA|BodyA] [HeaderA|BodyA|HeaderB|BodyB|HeaderC|BodyC]
[HeaderB|BodyB] → 或者:
[HeaderC|BodyC] [HeaderA|BodyA|HeaderB]
[BodyB|HeaderC|BodyC]
这就是粘包。反过来,如果一条消息很大,一次 Read 只收到了一部分,就是拆包。
粘包和拆包是同一个问题的两面:数据流的切分点和消息边界不一致。
1.2. 2.5.2 解决思路:固定头声明长度
2.2 已给出协议格式——固定头中含 dataLen(v0 头 12 字节、v1 为 13 字节,以 HeaderLen() 为准),用于声明 body 长度。
粘包拆包的解法就是:
- 读到完整头 → 得知 body 长度
- 累积数据直到够一条完整消息 → 解析
- 剩余数据留给下一条消息
看起来简单,但有两个关键问题要处理:
| 问题 | 说明 |
|---|---|
| 数据不够一条消息 | 一次 Read 可能只包含部分头或部分体,需留在环中等待补全 |
一次 Read 包含多条消息 |
数据可能够解析出多条消息,要循环处理 |
1.3. 2.5.3 zhenyi 的实现:read 循环 + RingBuffer + 状态机
zhenyi 把粘包拆包的实现拆成三层,各司其职:
conn.Read() → RingBuffer(累积数据)→ BaseSocket.ParseFromRingBuffer(状态机解析)
1.3.1. 第一步:read 循环,数据写入 RingBuffer
func (c *BaseChannel) read() bool {
// 1. 从网络读到 RingBuffer(零拷贝,直接写入内部内存)
nRead, err := c.readBuffer.WriteFromReader(c.conn, 0)
if nRead > 0 {
c.resetReadDeadline() // 刷新心跳计时
}
// ... 错误处理 ...
// 2. 循环解析所有完整消息
for {
c.parseData.ResetForReuse()
parsed, parseErr := c.socketParser.ParseFromRingBuffer(c.readBuffer, &c.parseData)
if !parsed {
break // 数据不够,等下次 Read
}
c.handleParsedMessage() // 处理一条消息
}
return false
}
这里有一个关键点:不是读一次解析一次,而是循环解析到没有完整消息为止。
如果一次 Read 收到了 3 条完整消息,这个 for 循环会解析 3 次。如果只收到了半个头,ParseFromRingBuffer 返回 false,循环 break,等下次 Read 补充数据。
1.3.2. 第二步:RingBuffer 累积数据
RingBuffer 在此充当未解析字节队列(写入端追加、解析成功后前移读侧):
- 每次
Read追加数据到 write 指针 - 解析成功后移动 read 指针,丢弃已处理的数据
- 未解析完的数据留在原地,等下次
Read补充
Read 1 后: [HeaderA|BodyA|HeaderB|..半截..]
↑ write
↑ read
解析出消息A后:[..半截HeaderB..]
↑ write
↑ read (read 指针移动,丢弃已处理数据)
Read 2 后: [..半截HeaderB..|BodyB|HeaderC|BodyC]
↑ write
↑ read
解析出消息B、C后:[](缓冲区清空)
↑ write
↑ read
1.3.3. 第三步:BaseSocket 状态机解析
以下为与源码同构的阅读示意,返回值、错误类型与 PeekHeader12 等辅助方法名以 socket.go 为准:
func (base *BaseSocket) ParseFromRingBuffer(rb *RingBuffer, parseData *ParseData) (bool, error) {
// 状态1:等头(未读长度 < HeaderLen() 则返回 false)
if rb.Len() < headerLen {
return false, nil
}
// 状态2:读头
msgId, seqId, dataLen := rb.PeekHeader12(0)
// 状态3:校验
if msgId > maxMsgId { return false, ErrInvalidMsgId }
if dataLen > maxDataLen { return false, ErrInvalidDataLen }
// 状态4:等体(不够 headerLen + dataLen 就返回 false)
if rb.Len() < headerLen + dataLen {
return false, nil
}
// 状态5:零拷贝读取 body
first, second, _ := rb.PeekTwoSlices(headerLen, dataLen)
// 如果跨边界,从池中取 buffer 拷贝一次
// 如果连续,直接引用
// 状态6:丢弃已解析的数据
rb.Discard(headerLen + dataLen)
return true, nil
}
核心逻辑:数据不够就返回 false,不消费任何数据;数据够了就解析并丢弃。
Peek 不移动指针,Discard 才移动指针。这样在"数据不够"的情况下,RingBuffer 的状态完全没有变化,下次 Read 追加数据后重新解析即可。
1.4. 2.5.4 复用 ParseData
每条消息都要解析一次,如果每次都创建新的 ParseData 对象,就是无谓的内存分配。
zhenyi 的做法是复用:
// 创建连接时初始化,整个连接生命周期只用这一个
parseData ParseData
parseMsg NetMessage
// 每次解析前重置,而不是重新创建
c.parseData.ResetForReuse()
parsed, parseErr := c.socketParser.ParseFromRingBuffer(c.readBuffer, &c.parseData)
ResetForReuse 只重置状态,不分配内存。
1.5. 2.5.5 异常处理
粘包拆包的异常场景也需要处理:
1.5.1. 超大包
if dataLen > maxDataLen { // 默认 1MB
return false, ErrInvalidDataLen
}
单包超过 1MB 直接断开连接,防止恶意客户端发送超大包撑爆内存。
1.5.2. RingBuffer 满了
if err == ErrBufferFull {
// 先尝试扩容
if c.readBuffer.Grow(65536) {
// 扩容成功,继续读
} else {
// 已达 MaxSize(1MB),靠解析循环消费腾出空间
// 如果解析也消化不了,最终会触发单包超限断开
}
}
RingBuffer 满了不会立即断开连接,而是先尝试扩容。扩容也失败就靠解析消费腾空间,给业务一个处理的机会。
1.5.3. 心跳超时
if netErr.Timeout() {
// 读超时 = 没收到任何数据 = 客户端可能已断开
return true // 退出读循环,关闭连接
}
通过 SetReadDeadline 设置读超时,每次收到数据刷新。超时说明连接空闲太久,主动关闭。
1.6. 2.5.6 本节要点
- 粘包/拆包:TCP 无消息边界;定界依赖协议头中的长度与状态机。
- 路径:
WriteFromReader(或等价读入)→ 环上未读区间 →ParseFromRingBuffer直至无完整帧。 - 循环解析:单次网络读可能对应多条业务消息,需在内层
for中耗尽可读完整帧。 - 复用:
ParseData(及关联NetMessage)在连接生命周期内重置复用,减少分配。 - 边界:
dataLen上限、Grow/ErrBufferFull分支、读超时关闭等以配置与 channel 实现为准。
第二章(网络层:协议、缓冲、多协议、定界)到此收束。第三章进入 Actor 侧邮箱与串行处理(与第一章模型衔接)。