1. 7.2 单进程 IM 实现

本章我们基于上一节的设计,实现一个完整的单进程 IM 系统。

1.1. 7.2.1 项目结构

im_single_demo/
└── main.go      # 完整的单进程 IM 示例

为了简化,我们把代码都放在一个 main.go 文件中。

1.2. 7.2.2 消息定义

首先定义消息 ID(与 examples/im_single_demo/main.go 一致):

const (
    MsgLoginReq  int32 = 1  // 登录请求
    MsgJoinReq   int32 = 2  // 加入房间请求
    MsgLeaveReq  int32 = 3  // 离开房间请求
    MsgSendReq   int32 = 4  // 发送消息请求
)

示例服务端用 JSON 作为 payload 编解码:zhenyi/zcodec 中的 NewJSONMessage 返回实现 ziface.IMessage(含 MarshalVT / SizeVT 等)的类型,可直接交给 SendToClientBatchSendToClients。反序列化请求体使用 zhenyi-base/zserializeUnmarshalJson(与仓库一致),而不是手写 encoding/json unmarshaler——这样与网关已解析的 msg.Data 约定对齐。

自行实现「map 转 JSON 字节 + msgId」时,也必须完整满足 IMessage / IWireMessage 热路径契约;因此本章以仓库为准优先调用 zcodec.NewJSONMessage

1.3. 7.2.3 房间状态管理

定义房间状态结构体:

type chatState struct {
    sessionNick  map[uint64]string              // sessionId → nickname
    sessionRoom  map[uint64]string              // sessionId → room
    roomSessions map[string]map[uint64]struct{} // room → session 集合
}

func newChatState() *chatState {
    return &chatState{
        sessionNick:  make(map[uint64]string),
        sessionRoom:  make(map[uint64]string),
        roomSessions: make(map[string]map[uint64]struct{}),
    }
}

房间状态管理的核心逻辑:

// 设置用户昵称
func (s *chatState) setNick(sessionID uint64, nick string) {
    s.sessionNick[sessionID] = nick
}

// 加入房间
func (s *chatState) joinRoom(sessionID uint64, room string) {
    // 先离开之前的房间
    prev := s.sessionRoom[sessionID]
    if prev != "" && prev != room {
        if set := s.roomSessions[prev]; set != nil {
            delete(set, sessionID)
        }
    }

    // 加入新房间
    s.sessionRoom[sessionID] = room
    set := s.roomSessions[room]
    if set == nil {
        set = make(map[uint64]struct{})
        s.roomSessions[room] = set
    }
    set[sessionID] = struct{}{}
}

// 离开房间
func (s *chatState) leaveRoom(sessionID uint64, room string) {
    if room == "" {
        room = s.sessionRoom[sessionID]
    }
    if room != "" {
        if set := s.roomSessions[room]; set != nil {
            delete(set, sessionID)
        }
    }
    delete(s.sessionRoom, sessionID)
}

// 获取昵称
func (s *chatState) getNick(sessionID uint64) string {
    nick := s.sessionNick[sessionID]
    if nick == "" {
        return fmt.Sprintf("guest_%d", sessionID)
    }
    return nick
}

1.4. 7.2.4 IM Actor 定义与批量下行

type ImServer struct {
    *zstream.Server
    state *chatState
}

// batchToRoomExcept:向房间内除 excludeSession 外的连接批量下行(payload 只序列化一次,经 zactor.BatchSendToClients)。
func (s *ImServer) batchToRoomExcept(origin *zmsg.Message, room string, excludeSession uint64, clientMsg ziface.IMessage) {
    gate := origin.SrcActor
    if gate == 0 {
        s.GetLogger().Error("batchToRoomExcept: origin.SrcActor is zero")
        return
    }
    var ids []int64
    for sid := range s.state.roomSessions[room] {
        if sid == excludeSession {
            continue
        }
        ids = append(ids, int64(sid))
    }
    if len(ids) == 0 {
        return
    }
    s.BatchSendToClients(origin, map[uint64][]int64{gate: ids}, clientMsg)
}

// batchToRoomAll:向房间内全部连接批量下行(含发送者)。
func (s *ImServer) batchToRoomAll(origin *zmsg.Message, room string, clientMsg ziface.IMessage) {
    gate := origin.SrcActor
    if gate == 0 {
        s.GetLogger().Error("batchToRoomAll: origin.SrcActor is zero")
        return
    }
    var ids []int64
    for sid := range s.state.roomSessions[room] {
        ids = append(ids, int64(sid))
    }
    if len(ids) == 0 {
        return
    }
    s.BatchSendToClients(origin, map[uint64][]int64{gate: ids}, clientMsg)
}

zstream.Server 嵌入 zactor.Actor,上述 BatchSendToClients 由框架提供,把一批 sessionId 绑定到来源 Gate Actor 后发往对端连接。

1.5. 7.2.5 启动入口

func main() {
    // 初始化日志
    logConfig := zlog.NewDefaultLoggerConfig()
    logConfig.WithOptions(zlog.WithConsole(true))
    zlog.NewDefaultLoggerWithConfig(logConfig)

    // 创建 App
    app := zstartup.NewApp(context.Background(), zstartup.AppConfig{
        Process:  0,       // 进程编号
        IsSingle: true,    // 单机模式,不使用服务发现
        ConnType: znet.TCP,
        Actors: []zmodel.ActorConfig{
            {
                Id:        1,
                ActorType: ActorTypeGate,
                Name:      "gate",
                Index:     1,
                Addr:      "127.0.0.1:8001",  // 监听地址
                Process:   1,
            },
            {
                Id:        2,
                ActorType: ActorTypeIM,
                Name:      "im",
                Index:     1,
                Process:   1,
            },
        },
    })

    // 注册 Actor 工厂
    // ...
}

zstartup.App 是 zhenyi 提供的统一启动入口,它负责:

  1. 创建 Actor 实例
  2. 初始化 Actor
  3. 启动 Actor 生命周期管理

使用 znet.TCP 时须添加:import "github.com/aiyang-zh/zhenyi-base/znet"。Gate 工厂里若打日志用到 zap,另需 go.uber.org/zap

1.6. 7.2.6 注册 Gateway Actor

err := app.RegisterActorFactory(ActorTypeGate, func(a *zstartup.App, c zmodel.ActorConfig) ziface.IServerActor {
    s := zgate.NewServer(c, a.ConnType)

    s.GetHandleMgr().RegisterHandle(MsgLoginReq, func(ctx context.Context, msg *zmsg.Message) {
        var req struct {
            UserID int64 `json:"userId"`
        }
        _ = zserialize.UnmarshalJson(msg.Data, &req)

        data, err := zcodec.NewJSONMessage(MsgLoginReq, map[string]any{
            "ok":        true,
            "type":      "login_ack",
            "sessionId": msg.SessionId,
            "userId":    req.UserID,
        })
        if err != nil {
            panic(err)
        }
        s.SendToClient(msg, data)
        s.GetLogger().Info("login success", zap.Int64("userId", req.UserID))
    })

    return s
})

Gateway 在本示例中只对 MsgLoginReq 做了 RegisterHandle;其余 msgId 由框架放进 IM Actor 的 HandleClientMessage 路径处理。

1.7. 7.2.7 注册 IM Actor

下面与 examples/im_single_demo/main.go 一致:join / leave 在对调用方回 ack 之外,还通过 batchToRoomExcept 向房内其他会话推送 room_notifysend 使用 batchToRoomAll 广播 chat_broadcast,并对不含 sign 的 JSON 负载做 SM3 摘要写入 signim_single_client 内可对 chat_broadcast 校验)。

err = app.RegisterActorFactory(ActorTypeIM, func(a *zstartup.App, c zmodel.ActorConfig) ziface.IServerActor {
    s := &ImServer{
        Server: zstream.NewServer(c),
        state:  newChatState(),
    }

    s.GetHandleMgr().RegisterHandle(MsgJoinReq, func(ctx context.Context, msg *zmsg.Message) {
        var req struct {
            Room     string `json:"room"`
            Nickname string `json:"nickname"`
        }
        _ = zserialize.UnmarshalJson(msg.Data, &req)
        if req.Room == "" {
            req.Room = "lobby"
        }
        if req.Nickname != "" {
            s.state.setNick(msg.SessionId, req.Nickname)
        }
        s.state.joinRoom(msg.SessionId, req.Room)

        data, err := zcodec.NewJSONMessage(MsgJoinReq, map[string]any{
            "ok":        true,
            "type":      "join_ack",
            "sessionId": msg.SessionId,
            "room":      req.Room,
            "nickname":  s.state.getNick(msg.SessionId),
        })
        if err != nil {
            panic(err)
        }
        s.SendToClient(msg, data)

        notifyJoin, err := zcodec.NewJSONMessage(MsgJoinReq, map[string]any{
            "type":      "room_notify",
            "event":     "join",
            "room":      req.Room,
            "sessionId": msg.SessionId,
            "nickname":  s.state.getNick(msg.SessionId),
        })
        if err != nil {
            panic(err)
        }
        s.batchToRoomExcept(msg, req.Room, msg.SessionId, notifyJoin)
    })

    s.GetHandleMgr().RegisterHandle(MsgLeaveReq, func(ctx context.Context, msg *zmsg.Message) {
        var req struct {
            Room string `json:"room"`
        }
        _ = zserialize.UnmarshalJson(msg.Data, &req)
        room := req.Room
        if room == "" {
            room = s.state.sessionRoom[msg.SessionId]
        }
        if room != "" {
            notifyLeave, err := zcodec.NewJSONMessage(MsgLeaveReq, map[string]any{
                "type":      "room_notify",
                "event":     "leave",
                "room":      room,
                "sessionId": msg.SessionId,
                "nickname":  s.state.getNick(msg.SessionId),
            })
            if err != nil {
                panic(err)
            }
            s.batchToRoomExcept(msg, room, msg.SessionId, notifyLeave)
        }
        s.state.leaveRoom(msg.SessionId, req.Room)
        data, err := zcodec.NewJSONMessage(MsgLeaveReq, map[string]any{
            "ok":        true,
            "type":      "leave_ack",
            "sessionId": msg.SessionId,
            "room":      req.Room,
        })
        if err != nil {
            panic(err)
        }
        s.SendToClient(msg, data)
    })

    s.GetHandleMgr().RegisterHandle(MsgSendReq, func(ctx context.Context, msg *zmsg.Message) {
        var req struct {
            Room string `json:"room"`
            Text string `json:"text"`
        }
        _ = zserialize.UnmarshalJson(msg.Data, &req)
        if req.Room == "" {
            req.Room = "lobby"
        }
        s.state.joinRoom(msg.SessionId, req.Room)

        payload := map[string]any{
            "type":          "chat_broadcast",
            "room":          req.Room,
            "fromSessionId": msg.SessionId,
            "nickname":      s.state.getNick(msg.SessionId),
            "text":          req.Text,
        }
        payloadBytes, err := json.Marshal(payload)
        if err != nil {
            panic(err)
        }
        payload["sign"] = hex.EncodeToString(zencrypt.SM3Bytes(payloadBytes))

        data, err := zcodec.NewJSONMessage(MsgSendReq, payload)
        if err != nil {
            panic(err)
        }
        s.batchToRoomAll(msg, req.Room, data)
    })

    return s
})

1.8. 7.2.8 启动应用

err = app.Run()
if err != nil {
    panic(err)
}

调用 app.Run() 后,App 会:

  1. 启动 Gateway,开始监听客户端连接
  2. 启动 IM Actor,进入消息处理循环
  3. 保持运行,直到收到关闭信号

1.9. 7.2.9 Source of Truth

为避免书稿与示例分叉,以仓库 examples/im_single_demo/main.go 为唯一整文件权威版本(含 RegisterActorFactoryflag-conn、国密 GM-TLS、线协议 SM4-GCM 载荷加密等)。本节 7.2.4–7.2.8 已按该文件的主线拆开讲解;若书内片段与源码不一致,以源码为准。

1.10. 7.2.10 运行测试

启动服务器:

go run ./examples/im_single_demo

服务器会监听 127.0.0.1:8001,等待客户端连接。

1.11. 7.2.11 本节要点

  1. Gateway:示例仅在 Gate 注册 MsgLoginReq;其余 msgId 进 IM。
  2. IM:房间状态 + BatchSendToClients 批量下行(chat_broadcast、通知等)。
  3. 编解码zcodec + zserialize 与仓库一致;ConnType 使用 znet.TCP(或 WebSocket 等由 AppConfig 指定)。

客户端见 7.3;多进程见 7.4。

results matching ""

    No results matching ""