1. 7.2 单进程 IM 实现
本章我们基于上一节的设计,实现一个完整的单进程 IM 系统。
1.1. 7.2.1 项目结构
im_single_demo/
└── main.go # 完整的单进程 IM 示例
为了简化,我们把代码都放在一个 main.go 文件中。
1.2. 7.2.2 消息定义
首先定义消息 ID(与 examples/im_single_demo/main.go 一致):
const (
MsgLoginReq int32 = 1 // 登录请求
MsgJoinReq int32 = 2 // 加入房间请求
MsgLeaveReq int32 = 3 // 离开房间请求
MsgSendReq int32 = 4 // 发送消息请求
)
示例服务端用 JSON 作为 payload 编解码:zhenyi/zcodec 中的 NewJSONMessage 返回实现 ziface.IMessage(含 MarshalVT / SizeVT 等)的类型,可直接交给 SendToClient、BatchSendToClients。反序列化请求体使用 zhenyi-base/zserialize 的 UnmarshalJson(与仓库一致),而不是手写 encoding/json unmarshaler——这样与网关已解析的 msg.Data 约定对齐。
自行实现「map 转 JSON 字节 + msgId」时,也必须完整满足 IMessage / IWireMessage 热路径契约;因此本章以仓库为准优先调用 zcodec.NewJSONMessage。
1.3. 7.2.3 房间状态管理
定义房间状态结构体:
type chatState struct {
sessionNick map[uint64]string // sessionId → nickname
sessionRoom map[uint64]string // sessionId → room
roomSessions map[string]map[uint64]struct{} // room → session 集合
}
func newChatState() *chatState {
return &chatState{
sessionNick: make(map[uint64]string),
sessionRoom: make(map[uint64]string),
roomSessions: make(map[string]map[uint64]struct{}),
}
}
房间状态管理的核心逻辑:
// 设置用户昵称
func (s *chatState) setNick(sessionID uint64, nick string) {
s.sessionNick[sessionID] = nick
}
// 加入房间
func (s *chatState) joinRoom(sessionID uint64, room string) {
// 先离开之前的房间
prev := s.sessionRoom[sessionID]
if prev != "" && prev != room {
if set := s.roomSessions[prev]; set != nil {
delete(set, sessionID)
}
}
// 加入新房间
s.sessionRoom[sessionID] = room
set := s.roomSessions[room]
if set == nil {
set = make(map[uint64]struct{})
s.roomSessions[room] = set
}
set[sessionID] = struct{}{}
}
// 离开房间
func (s *chatState) leaveRoom(sessionID uint64, room string) {
if room == "" {
room = s.sessionRoom[sessionID]
}
if room != "" {
if set := s.roomSessions[room]; set != nil {
delete(set, sessionID)
}
}
delete(s.sessionRoom, sessionID)
}
// 获取昵称
func (s *chatState) getNick(sessionID uint64) string {
nick := s.sessionNick[sessionID]
if nick == "" {
return fmt.Sprintf("guest_%d", sessionID)
}
return nick
}
1.4. 7.2.4 IM Actor 定义与批量下行
type ImServer struct {
*zstream.Server
state *chatState
}
// batchToRoomExcept:向房间内除 excludeSession 外的连接批量下行(payload 只序列化一次,经 zactor.BatchSendToClients)。
func (s *ImServer) batchToRoomExcept(origin *zmsg.Message, room string, excludeSession uint64, clientMsg ziface.IMessage) {
gate := origin.SrcActor
if gate == 0 {
s.GetLogger().Error("batchToRoomExcept: origin.SrcActor is zero")
return
}
var ids []int64
for sid := range s.state.roomSessions[room] {
if sid == excludeSession {
continue
}
ids = append(ids, int64(sid))
}
if len(ids) == 0 {
return
}
s.BatchSendToClients(origin, map[uint64][]int64{gate: ids}, clientMsg)
}
// batchToRoomAll:向房间内全部连接批量下行(含发送者)。
func (s *ImServer) batchToRoomAll(origin *zmsg.Message, room string, clientMsg ziface.IMessage) {
gate := origin.SrcActor
if gate == 0 {
s.GetLogger().Error("batchToRoomAll: origin.SrcActor is zero")
return
}
var ids []int64
for sid := range s.state.roomSessions[room] {
ids = append(ids, int64(sid))
}
if len(ids) == 0 {
return
}
s.BatchSendToClients(origin, map[uint64][]int64{gate: ids}, clientMsg)
}
zstream.Server 嵌入 zactor.Actor,上述 BatchSendToClients 由框架提供,把一批 sessionId 绑定到来源 Gate Actor 后发往对端连接。
1.5. 7.2.5 启动入口
func main() {
// 初始化日志
logConfig := zlog.NewDefaultLoggerConfig()
logConfig.WithOptions(zlog.WithConsole(true))
zlog.NewDefaultLoggerWithConfig(logConfig)
// 创建 App
app := zstartup.NewApp(context.Background(), zstartup.AppConfig{
Process: 0, // 进程编号
IsSingle: true, // 单机模式,不使用服务发现
ConnType: znet.TCP,
Actors: []zmodel.ActorConfig{
{
Id: 1,
ActorType: ActorTypeGate,
Name: "gate",
Index: 1,
Addr: "127.0.0.1:8001", // 监听地址
Process: 1,
},
{
Id: 2,
ActorType: ActorTypeIM,
Name: "im",
Index: 1,
Process: 1,
},
},
})
// 注册 Actor 工厂
// ...
}
zstartup.App 是 zhenyi 提供的统一启动入口,它负责:
- 创建 Actor 实例
- 初始化 Actor
- 启动 Actor 生命周期管理
使用 znet.TCP 时须添加:import "github.com/aiyang-zh/zhenyi-base/znet"。Gate 工厂里若打日志用到 zap,另需 go.uber.org/zap。
1.6. 7.2.6 注册 Gateway Actor
err := app.RegisterActorFactory(ActorTypeGate, func(a *zstartup.App, c zmodel.ActorConfig) ziface.IServerActor {
s := zgate.NewServer(c, a.ConnType)
s.GetHandleMgr().RegisterHandle(MsgLoginReq, func(ctx context.Context, msg *zmsg.Message) {
var req struct {
UserID int64 `json:"userId"`
}
_ = zserialize.UnmarshalJson(msg.Data, &req)
data, err := zcodec.NewJSONMessage(MsgLoginReq, map[string]any{
"ok": true,
"type": "login_ack",
"sessionId": msg.SessionId,
"userId": req.UserID,
})
if err != nil {
panic(err)
}
s.SendToClient(msg, data)
s.GetLogger().Info("login success", zap.Int64("userId", req.UserID))
})
return s
})
Gateway 在本示例中只对 MsgLoginReq 做了 RegisterHandle;其余 msgId 由框架放进 IM Actor 的 HandleClientMessage 路径处理。
1.7. 7.2.7 注册 IM Actor
下面与 examples/im_single_demo/main.go 一致:join / leave 在对调用方回 ack 之外,还通过 batchToRoomExcept 向房内其他会话推送 room_notify;send 使用 batchToRoomAll 广播 chat_broadcast,并对不含 sign 的 JSON 负载做 SM3 摘要写入 sign(im_single_client 内可对 chat_broadcast 校验)。
err = app.RegisterActorFactory(ActorTypeIM, func(a *zstartup.App, c zmodel.ActorConfig) ziface.IServerActor {
s := &ImServer{
Server: zstream.NewServer(c),
state: newChatState(),
}
s.GetHandleMgr().RegisterHandle(MsgJoinReq, func(ctx context.Context, msg *zmsg.Message) {
var req struct {
Room string `json:"room"`
Nickname string `json:"nickname"`
}
_ = zserialize.UnmarshalJson(msg.Data, &req)
if req.Room == "" {
req.Room = "lobby"
}
if req.Nickname != "" {
s.state.setNick(msg.SessionId, req.Nickname)
}
s.state.joinRoom(msg.SessionId, req.Room)
data, err := zcodec.NewJSONMessage(MsgJoinReq, map[string]any{
"ok": true,
"type": "join_ack",
"sessionId": msg.SessionId,
"room": req.Room,
"nickname": s.state.getNick(msg.SessionId),
})
if err != nil {
panic(err)
}
s.SendToClient(msg, data)
notifyJoin, err := zcodec.NewJSONMessage(MsgJoinReq, map[string]any{
"type": "room_notify",
"event": "join",
"room": req.Room,
"sessionId": msg.SessionId,
"nickname": s.state.getNick(msg.SessionId),
})
if err != nil {
panic(err)
}
s.batchToRoomExcept(msg, req.Room, msg.SessionId, notifyJoin)
})
s.GetHandleMgr().RegisterHandle(MsgLeaveReq, func(ctx context.Context, msg *zmsg.Message) {
var req struct {
Room string `json:"room"`
}
_ = zserialize.UnmarshalJson(msg.Data, &req)
room := req.Room
if room == "" {
room = s.state.sessionRoom[msg.SessionId]
}
if room != "" {
notifyLeave, err := zcodec.NewJSONMessage(MsgLeaveReq, map[string]any{
"type": "room_notify",
"event": "leave",
"room": room,
"sessionId": msg.SessionId,
"nickname": s.state.getNick(msg.SessionId),
})
if err != nil {
panic(err)
}
s.batchToRoomExcept(msg, room, msg.SessionId, notifyLeave)
}
s.state.leaveRoom(msg.SessionId, req.Room)
data, err := zcodec.NewJSONMessage(MsgLeaveReq, map[string]any{
"ok": true,
"type": "leave_ack",
"sessionId": msg.SessionId,
"room": req.Room,
})
if err != nil {
panic(err)
}
s.SendToClient(msg, data)
})
s.GetHandleMgr().RegisterHandle(MsgSendReq, func(ctx context.Context, msg *zmsg.Message) {
var req struct {
Room string `json:"room"`
Text string `json:"text"`
}
_ = zserialize.UnmarshalJson(msg.Data, &req)
if req.Room == "" {
req.Room = "lobby"
}
s.state.joinRoom(msg.SessionId, req.Room)
payload := map[string]any{
"type": "chat_broadcast",
"room": req.Room,
"fromSessionId": msg.SessionId,
"nickname": s.state.getNick(msg.SessionId),
"text": req.Text,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
panic(err)
}
payload["sign"] = hex.EncodeToString(zencrypt.SM3Bytes(payloadBytes))
data, err := zcodec.NewJSONMessage(MsgSendReq, payload)
if err != nil {
panic(err)
}
s.batchToRoomAll(msg, req.Room, data)
})
return s
})
1.8. 7.2.8 启动应用
err = app.Run()
if err != nil {
panic(err)
}
调用 app.Run() 后,App 会:
- 启动 Gateway,开始监听客户端连接
- 启动 IM Actor,进入消息处理循环
- 保持运行,直到收到关闭信号
1.9. 7.2.9 Source of Truth
为避免书稿与示例分叉,以仓库 examples/im_single_demo/main.go 为唯一整文件权威版本(含 RegisterActorFactory、flag、-conn、国密 GM-TLS、线协议 SM4-GCM 载荷加密等)。本节 7.2.4–7.2.8 已按该文件的主线拆开讲解;若书内片段与源码不一致,以源码为准。
1.10. 7.2.10 运行测试
启动服务器:
go run ./examples/im_single_demo
服务器会监听 127.0.0.1:8001,等待客户端连接。
1.11. 7.2.11 本节要点
- Gateway:示例仅在 Gate 注册
MsgLoginReq;其余 msgId 进 IM。 - IM:房间状态 +
BatchSendToClients批量下行(chat_broadcast、通知等)。 - 编解码:
zcodec+zserialize与仓库一致;ConnType使用znet.TCP(或 WebSocket 等由AppConfig指定)。
客户端见 7.3;多进程见 7.4。