欢迎,来自IP地址为:216.73.216.133 的朋友
如果 你曾经好奇过 QQ、微信 或 WhatsApp 等聊天应用背后的工作原理,本教程将为你揭晓答案。我们将使用 Go 语言从零开始构建一个实时聊天服务器,并学习现代通信系统的基本概念。
完成本教程后,我们将构建一个功能完善的聊天室,支持无限并发用户实时聊天、服务器崩溃后消息仍能持久保存、会话管理功能使用户在网络中断后能够重新连接、用户间互发私信,并能优雅地处理速度慢或断开连接的客户端。
更重要的是,这可以让我们理解分布式系统背后的基本概念。本教程将学习使用 goroutine 和 channel 进行并发编程、使用 TCP 套接字进行网络通信、使用预写式日志 (WAL) 实现数据持久性、使用互斥锁进行状态管理,以及如何设计在故障发生时能够优雅降级的系统。这些概念贯穿于从数据库到消息队列再到 Web 服务器的方方面面。
分布式聊天室概念设计
总体来说,聊天室服务器允许多个用户同时连接并实时交换消息。如果想要达到”生产级”要求,则指它具备实际应用中应有的功能:数据持久化,服务器重启后消息不会丢失;能够优雅地处理网络故障;并且支持大量并发用户而不会降低速度。
“分布式”特性指的是系统如何管理来自不同地点的多个客户端,这些客户端同时尝试发送和接收消息。这带来了一些有趣的挑战:如何确保每个人看到的消息顺序一致?如何处理网络连接速度慢的客户端?如果有人意外断开连接会发生什么?
这些并非纯粹的理论问题。所有联网应用都涉及并发、状态管理和故障处理。无论开发的是聊天应用、多人游戏、协作编辑器还是交易平台,都会面临类似的挑战。我们在这里学到的模式广泛适用于各种分布式系统。
聊天应用是绝佳的学习项目,因为它们将多个具有挑战性的问题集中在一起。这需要安全地管理并发连接,在不阻塞的情况下向多个客户端广播消息,处理不可靠的网络,持久化数据,并确保系统能够优雅地从崩溃中恢复。这些主题中的每一个都可以单独成为一篇教程,但在这里,我们将看到它们如何在实际应用中协同运作。
教程涉及内容
本教程演示了构建分布式系统所需的几个重要概念,将学到以下内容:
1. Go 语言中的 TCP 套接字编程
我们将学习如何接受传入的 TCP 连接、通过网络套接字读写数据,以及如何优雅地处理连接失败。这些技能对于任何联网应用程序都至关重要,从 Web 服务器到数据库客户端皆是如此。
2. 使用 Goroutine 和 Channel 进行并发编程
Go 语言的并发模型是其最强大的特性之一。我们将学习如何使用 goroutine 同时处理多个客户端而不会阻塞。还将使用 channel 安全地协调 goroutine 之间的操作,避免共享内存并发中常见的陷阱,例如竞态条件和死锁。
3. 分布式系统的状态管理
管理并发操作间的共享状态非常棘手。我们将学习何时使用互斥锁而不是通道,如何设计锁粒度以避免瓶颈,以及如何在多个 goroutine 访问相同数据时确保数据一致性。
4. 持久化预写式日志(WAL)
数据库使用 WAL(预写入日志)来确保数据在崩溃期间不会丢失。本教程将实现相同的模式,学习如何在持久性和性能之间取得平衡。我们将了解 fsync 的重要性,理解不同持久化策略的优缺点,并学习如何在意外关机后恢复状态。
5. 会话管理和重连接
总体来说,网络是不稳定的。用户会断线,WiFi 会掉线,移动网络连接也会频繁切换基站。这就需要构建一个基于令牌的会话系统,让用户能够无缝重新连接,保留聊天记录和身份信息,而无需密码或复杂的身份验证。
6. 优雅降级和容错
绝对的可靠性是不可能的,所以就需要设计出能够应对部分故障的方案。我们将学习如何防止慢速客户端影响快速客户端,如何在持久化失败时继续运行,以及如何在出现问题时正确清理资源。
教程要求
为了更好地学习本教程,您应该具备一些基础知识。虽然无需成为专家,但应该掌握基本概念:
- Go 语言基础(goroutine、通道、接口)
- TCP/IP 网络基础
- 基本并发概念
- 文件 I/O 操作
教程概述
本教程将逐步构建一个可用于实际生产应用的聊天室。
首先,我们将探索整体架构,了解各个组件如何协同工作。然后,学习并发模型和持久化策略等核心概念。
接下来,将设置项目结构并定义代表客户端、消息和聊天室的核心数据类型。之后,就将实现服务器初始化和事件循环,所有协调工作都在此进行。
随后,我们将构建网络层来处理客户端连接,实现消息广播以确保消息能够送达所有用户,并使用预写式日志和快照实现持久化。
接下来,将实现会话管理以实现重新连接,构建用户操作的命令系统,并创建一个简单的客户端应用程序来测试我们的服务器。
最后,将学习如何测试和部署聊天室,并回顾构建分布式系统的关键经验。
完成本教程后,我们将拥有一个完整且可运行的聊天室,并了解分布式系统如何处理并发、持久化和故障恢复。
架构概述
该系统采用客户端-服务器架构(C/S),其内部组件协同工作,提供强大的聊天体验。

分布式聊天室系统架构图
组件分解
1. 网络层
- TCP Listener:服务器接受 9000 端口的传入连接
- Connection Handler:使用专用的 goroutine 管理各个客户端的连接
- Protocol:简单换行符文本协议
2. 客户端管理
每个客户端都会产生两个 goroutine:
- 读 goroutine:获取从客户端发来的消息
- 写 goroutine:向客户端发送消息(使用非阻塞的带缓冲通道)
3. 聊天室核心
以下代码是系统的核心–一个运行事件循环的单个 goroutine:
for {
select {
case client := <-cr.join:
// Handle new client
case client := <-cr.leave:
// Handle disconnection
case message := <-cr.broadcast:
// Broadcast to all clients
case client := <-cr.listUsers:
// Send user list
case dm := <-cr.directMessage:
// Handle private message
}
}
4. 状态管理
我们将使用三种同步数据结构来进行状态管理:
- clients map[*Client]bool:活动连接(互斥锁保护)
- sessions map[string]*SessionInfo:用户重连接会话
- messages []Message:内存历史消息记录
5. 持久层
两阶段方法:
- Write-Ahead Log (WAL):持久化目的的写前日志
- Snapshots:定期进行完整状态转储以加快恢复速度
6. 会话管理
这支持基于令牌的身份验证重新连接:
- 为每个用户生成唯一令牌
- 会话超时时间为 1 小时
- 为重连接用户保留聊天记录
消息流
以下是系统消息的传递流程:
用户输入 → 客户端读 → 服务器接收 → 广播通道 → 聊天室主循环 → WAL 持久化 → 分发给所有用户 → 客户端写协程 → TCP 发送 → 用户客户端显示
这里的广播通道充当同步点,确保消息的完全有序。
系统核心设计概念
系统并发模式
本聊天室使用 Go 的 CSP(通信顺序进程)模型。这与其他语言中习惯的并发方式截然不同。
在传统的并发编程中,会使用锁(互斥锁)来保护共享内存。多个线程访问同一个数据结构,这就需要使用锁来确保每次只有一个线程可以修改它。这种方法虽然可行,但容易出错。忘记锁,就会出现竞态条件;持有锁的时间过长,则会导致死锁。
Go 提倡一种不同的方法:它不是通过共享内存来通信,而是通过通道(channel)来共享内存。通过通道在 goroutine 之间传递数据。每次只有一个 goroutine 拥有数据,这种设计从根本上消除了许多并发错误。
通道提供了诸多优势,它们从设计上消除了大多数竞态条件,因为每次只有一个 goroutine 拥有数据,所以不存在访问竞争。通道提供了自然的流量控制,因为通道会在满时阻塞(背压),也会在空时阻塞(等待数据)。它们让消息流的逻辑更加清晰,因为我们可以通过追踪通道来了解数据在系统中的流动方式。此外,它们还提供了更好的组合性,因为可以将通道与 select 语句结合使用,从而协调多个操作。
尽管如此,我们在这个项目中仍然会使用互斥锁。通道并非总是最佳选择。当多个 goroutine 需要快速、频繁地访问共享数据结构(例如 map)时,我们会使用互斥锁。而当我们想要协调行为或转移数据所有权时,我们会使用通道。
以下是聊天室如何使用通道来协调所有操作的代码段:
type ChatRoom struct {
join chan *Client // New connections
leave chan *Client // Disconnections
broadcast chan string // Messages to all
listUsers chan *Client // User list requests
directMessage chan DirectMessage // Private messages
// Shared state (mutex-protected)
clients map[*Client]bool
mu sync.Mutex
// Message history (separate mutex)
messages []Message
messageMu sync.Mutex
}
注意,我们为不同类型的事件设置了五个通道。主事件循环使用 select 语句从所有这些通道接收数据。这意味着所有状态更改都在同一位置按顺序进行,使系统更易于理解。
我们可以使用一个接受不同消息类型的通道,但使用不同的通道可以使代码更清晰。当我们向 chatRoom.join 发送消息时,其含义一目了然。同样向 chatRoom.broadcast 发送消息时,也是如此。
互斥锁保护了许多 goroutine 频繁读取的数据。每次广播消息时都需要访问客户端映射。使用互斥锁进行快速读取比通过通道传递整个映射更高效。
理解持久化策略
当服务器崩溃时(这种情况迟早会发生),我们需要恢复聊天记录。用户希望服务器重启后他们的消息仍然存在。但持久化成本很高:写入磁盘比写入内存慢数千倍。因此,就需要一种能够平衡持久性和性能的策略。
我们将采用类似于真实数据库的两层架构:预写式日志(WAL)和快照。
WAL 是主要的持久化机制。它的工作原理如下:每条消息都会立即追加到名为 messages.wal 的文件中。该文件是只追加的,这意味着我们只写入到文件末尾。只追加写入速度很快,因为磁盘无需寻址到不同的位置。
每条消息都以单行 JSON 格式写入。写入每条消息后,我们会调用 fsync 函数。这会告诉操作系统立即将数据写入物理磁盘,而不仅仅是将其缓冲在内存中。如果没有 fsync 函数,如果在写入完成之前断电,操作系统可能会丢失部分数据。
WAL(预写式日志)是只追加式的,不会修改。这使得它非常可靠。如果服务器在写入过程中崩溃,最坏的情况是末尾出现一行损坏,我们可以在恢复期间检测到并跳过它。
预写式日志的问题在于它会无限增长。如果有一百万条消息,每次重启服务器都需要重放一百万条日志条目,显然这很慢。
快照解决了这个问题。每 5 分钟,如果新消息超过 100 条,我们会将所有消息历史记录写入一个名为 snapshot.json 的单独文件。这记录了聊天记录在该时刻的完整状态。
创建快照后,我们会清空 WAL。新消息会继续追加到 WAL 中,但现在我们只需要重放自上次快照以来的消息。
服务器启动时,首先会加载快照文件(如果存在)。这会让我们获得上次快照时的状态,其中可能包含 10 万条消息,加载它会消耗一些时间。然后,它会重放 WAL 日志中的所有条目。这样我们就能获取自上次快照以来写入的消息,可能只有 50 条。重放这些消息只需几毫秒。最后,系统恢复正常运行。
这种双层系统兼具两者的优势:正常运行期间,仅追加的 WAL 日志可实现快速写入;崩溃后,通过快照和少量 WAL 日志重放实现快速恢复;每条消息后都执行 fsync 操作,确保数据持久性;并且由于 WAL 日志永远不会过大,因此恢复时间也可以得到控制。
这种方案的缺点是,由于同时存在快照和 WAL 日志,快照会暂时占用更多磁盘空间。但磁盘空间成本低廉,而正确性却成本高昂。
现在您已经了解了聊天室设计背后的关键概念,是时候开始构建了。首先,我们需要设置项目结构并创建必要的目录和文件。
聊天室项目文件结构
首先,创建项目目录结构。我们将在教程过程中逐步创建所需要的文件:
# mkdir -p chatroom/cmd/server # mkdir -p chatroom/cmd/client # mkdir -p chatroom/internal/chatroom # mkdir -p chatroom/pkg/token # mkdir -p chatroom/chatdata # cd chatroom
系统文件清单如下图所示:

聊天室程序文件清单
然后初始化 Go 模块。
请注意,程序需要安装 Go 1.23.2 或更高版本。早期版本可能也能运行,但代码示例假设使用了 Go 1.23 及更高版本中的功能。此版本对标准库进行了改进,提高了并发编程的效率。
# go mod init chatroom
于是,项目的 go.mod 文件内容如下所示:
module chatroom go 1.25.0
项目结构搭建完毕后,就可以开始编写代码了。第一步是定义用于表示聊天室核心组件的数据类型:消息、客户端和聊天室本身。
定义核心数据类型
定义核心数据结构的文件为”internal/chatroom/types.go”,这些类型构成了聊天室的基础,因此了解每个类型代表什么以及为什么这样设计至关重要。
package chatroom
import (
"net"
"os"
"sync"
"time"
)
// Message represents a single chat message with metadata
type Message struct {
ID int `json:"id"`
From string `json:"from"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
Channel string `json:"channel"` // "global" or "private:username"
}
// Client represents a connected user
type Client struct {
conn net.Conn // TCP connection
username string // Display name
outgoing chan string // Buffered channel for writes
lastActive time.Time // For idle detection
messagesSent int // Statistics
messagesRecv int
isSlowClient bool // Testing flag
reconnectToken string
mu sync.Mutex // Protects stats fields
}
// SessionInfo tracks reconnection data
type SessionInfo struct {
Username string
ReconnectToken string
LastSeen time.Time
CreatedAt time.Time
}
// ChatRoom is the central coordinator
type ChatRoom struct {
// Communication channels
join chan *Client
leave chan *Client
broadcast chan string
listUsers chan *Client
directMessage chan DirectMessage
// State
clients map[*Client]bool
mu sync.Mutex
totalMessages int
startTime time.Time
// Message history
messages []Message
messageMu sync.Mutex
nextMessageID int
// Persistence
walFile *os.File
walMu sync.Mutex
dataDir string
// Sessions
sessions map[string]*SessionInfo
sessionsMu sync.Mutex
}
// DirectMessage represents a private message
type DirectMessage struct {
toClient *Client
message string
}
Message 类型释义
Message 结构体存储了关于聊天消息的所有必要信息。ID 字段唯一标识每条消息,确保消息按顺序排列。Timestamp 字段显示消息的发送时间,这对于聊天记录至关重要。
Channel 字段用于表示聊天频道,即对于用户的可见性。这个字段很有意思,目前,我们仅使用”global”字段来存储公开消息,但这种设计允许我们以后添加私有频道或聊天室,而无需更改数据结构。优秀的数据结构能够预见未来的需求。
Client 类型释义
每个已连接的用户都由一个 Client 结构体表示。conn 字段是他们的 TCP 连接——我们通过它来发送和接收数据。
outgoing 通道对性能至关重要。请注意,它是一个 chan 字符串,这意味着它是一个字符串通道。我们之后会将它设置为一个缓冲通道(大小为 10)。这个缓冲区意味着我们可以为该客户端排队 10 条消息而不会阻塞。如果某个客户端读取速度较慢,我们可以继续向其他客户端发送消息。
如果没有这个缓冲区,一个速度较慢的客户端就会阻塞整个广播。有了缓冲区,速度较慢的客户端如果跟不上,只会错过一些消息,这比拖慢所有人的速度要好得多。
lastActive 时间戳帮助我们检测空闲用户。如果某个用户 5 分钟内没有发送消息,我们可以断开他们的连接以释放资源。
mu 互斥锁保护统计字段。多个 goroutine 会更新 messagesSent 和 messagesRecv,因此我们需要一个互斥锁来防止竞争条件。
Chatroom 类型释义
这是系统的核心。请注意,我们有两种类型的字段:通道和受保护状态。
五个通道(join、leave、broadcast、listUsers、directMessage)用于系统的不同部分与主事件循环通信。当有新客户端连接时,我们会将其发送到”join”通道。当有人发送消息时,消息会发送到”broadcast”通道。
这些通道是无缓冲的(容量为 0),因为我们需要同步。向无缓冲通道发送消息时,会阻塞直到有人接收为止。这确保事件循环按顺序处理事件。
受保护状态(映射和切片)需要互斥锁,因为多个 goroutine 会访问它。请注意,我们为不同的数据使用不同的互斥锁。”mu”互斥锁保护”clients”映射。”messageMu”互斥锁保护”messages”切片。”sessionsMu”互斥锁保护”sessions”映射。
为什么要使用不同的互斥锁?为了性能。如果我们对所有内容都使用同一个互斥锁,广播消息会锁定所有数据,从而阻止新客户端加入。使用不同的互斥锁意味着不同的操作可以并发执行。
WAL 文件 (walFile) 也有自己的互斥锁 (walMu),因为写入磁盘速度很慢。我们不希望在等待磁盘 I/O 时占用主互斥锁。
定义好数据类型后,下一步是创建一个用于初始化服务器的函数。该函数将设置所有数据结构,恢复之前运行中持久化的状态,并启动后台工作进程。
初始化服务器
服务器初始化至关重要,因为我们需要按正确的顺序设置所有数据结构。如果在打开 WAL 日志后恢复状态,可能会导致消息重复发送。如果在加载历史记录之前开始接受连接,用户将看不到历史消息。
名为”internal/chatroom/run.go”的文件来完成引导服务器的工作:
package chatroom
import (
"fmt"
"net"
"time"
)
func NewChatRoom(dataDir string) (*ChatRoom, error) {
cr := &ChatRoom{
clients: make(map[*Client]bool),
join: make(chan *Client),
leave: make(chan *Client),
broadcast: make(chan string),
listUsers: make(chan *Client),
directMessage: make(chan DirectMessage),
sessions: make(map[string]*SessionInfo),
messages: make([]Message, 0),
startTime: time.Now(),
dataDir: dataDir,
}
if err := cr.loadSnapshot(); err != nil {
fmt.Printf("Failed to load snapsjot: %v\n", err)
}
if err := cr.initializePersistence(); err != nil {
return nil, err
}
go cr.periodicSnapshots()
return cr, nil
}
func (cr *ChatRoom) periodicSnapshots() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
cr.messageMu.Lock()
messageCount := len(cr.messages)
cr.messageMu.Unlock()
if messageCount > 100 {
if err := cr.createSnapshot(); err != nil {
fmt.Printf("Snapshot failed: %v\n", err)
}
}
}
}
func (cr *ChatRoom) Run() {
fmt.Println("ChatRoom heart beating...")
go cr.cleanupInactiveClients()
for {
select {
case client := <-cr.join:
cr.handleJoin(client)
case client := <-cr.leave:
cr.handleLeave(client)
case message := <-cr.broadcast:
cr.handleBroadcast(message)
case client := <-cr.listUsers:
cr.sendUserList(client)
case dm := <-cr.directMessage:
cr.handleDirectMessage(dm)
}
}
}
func runServer() {
chatRoom, err := NewChatRoom("./chatdata")
if err != nil {
fmt.Printf("Failed to initialize: %v\n", err)
}
defer chatRoom.shutdown()
go chatRoom.Run()
listener, err := net.Listen("tcp", ":9000")
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer listener.Close()
fmt.Println("Server started on :9000")
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println(" Error accepting connection:", err)
continue
}
fmt.Println("New connection from:", conn.RemoteAddr())
go handleClient(conn, chatRoom)
}
}
func (cr *ChatRoom) shutdown() {
fmt.Println("\n Shutting down...")
if err := cr.createSnapshot(); err != nil {
fmt.Printf(" Final snapshot failed: %v\n", err)
}
if cr.walFile != nil {
cr.walFile.Close()
}
fmt.Println("Shutdown complete")
}
让我们来详细分析一下初始化过程中发生的情况:
1. 创建数据结构
我们首先创建所有映射和通道。”make”函数会正确初始化它们。对于映射,它会创建一个可直接使用的空映射。对于通道,它会创建一个无缓冲通道(容量为 0)。
注意,我们创建的 messages 切片初始容量为 0,但预留了增长空间:”make([]Message, 0)”。这比从 nil 开始更高效,因为切片无需分配内存即可立即追加数据。
2. 加载快照
在接受任何连接之前,我们会尝试从磁盘加载快照。这会恢复服务器上次运行时的聊天记录。如果快照不存在(首次运行)或加载失败(文件损坏),则我们会使用空的聊天记录继续运行。
此步骤必须在初始化 WAL 日志之前执行。如果先打开 WAL 日志,则可能会重复发送快照中已存在的消息,从而导致重复记录。
3. 初始化 WAL 日志
initializePersistence() 方法函数以追加模式打开 WAL 文件。它还会重放 WAL 中在上次快照之后发生的任何条目。这确保我们不会丢失任何已写入 WAL 但尚未包含在快照中的消息。
如果此步骤失败,我们将返回错误并拒绝启动。为什么?因为如果我们无法写入 WAL,就无法保证持久性。与其接受我们无法持久化的消息而欺骗用户,不如拒绝启动。
4. 启动后台工作进程
“periodicSnapshots()”函数运行在一个独立的 goroutine 中。它每 5 分钟唤醒一次,检查是否需要创建快照。请注意”defer ticker.Stop()”–这一点很重要。如果我们忘记停止 ticker,就会造成 goroutine 泄漏并浪费资源。
goroutine 获取”messageMu”锁只是为了读取消息计数,然后立即释放它。我们在创建快照期间不持有该锁,因为这样做速度慢,而且会阻塞消息广播。
这里有一些可调参数。5 分钟表示恢复过程中最多只需要重放 5 分钟的消息。100 条消息表示在流量低谷期不会频繁创建快照。
在实际生产系统中,可以将这些参数设置为可配置项。高流量聊天可能需要更短的间隔,而低流量聊天可能需要更长的间隔以减少磁盘 I/O。
此时,聊天室服务器已经初始化了所有必要的数据结构和后台工作进程,接下来需要构建核心协调机制。事件循环是聊天室中所有状态变更发生的地方,它就像心跳一样,确保一切保持同步。
5. 构建事件循环
事件循环是聊天室的核心。所有客户端连接、消息和断开连接都通过这个单一节点进行处理。这看似会成为瓶颈,但实际上正是它让系统简单安全。
“Run()”方法是服务器的”心跳”。所有神奇的功能都在这里实现。系统中的每个事件都会经过这个循环。
函数中使用了 select 语句,select 语句是 Go 语言最强大的并发特性之一。它类似于通道的 switch 语句。select 语句会等待,直到某个 case 可以执行,然后才会执行该 case。
具体来说:循环会阻塞在 select 语句处,等待五个通道中的任何一个有数据到达。当任何一个通道有数据到达时,相应的 case 就会执行。case 执行完毕后,循环会回到等待状态。
例如,当一个新客户端连接时,程序中其他部分的代码会将该客户端发送到 cr.join。select 语句接收到该客户端连接后,会执行 cr.handleJoin(client)。一旦 cr.handleJoin(client) 执行完毕,循环就会回到等待状态。
这里只使用了一个 goroutine 来处理所有事件,这看起来像是一个瓶颈,为什么不并且处理所有事件呢?当然是为了保持一致性,以下是按顺序处理的优势:
- 无条件状态竞争
只有一个 goroutine 会修改 clients 映射、messages 切片和 sessions 映射。这样就无需担心两个操作会相互干扰。在 handleJoin 中添加客户端时,我们可以确信没有其他代码同时移除客户端或广播消息。这非常强大。并发系统中的大多数 bug 都源于操作的意外交错执行。通过顺序处理事件,可以消除一整类 bug。
- 事件排序
消息会按照到达顺序广播。这看似显而易见,但却至关重要。如果 Alice 发送”Hello”,然后 Bob 发送”Hi”,我们需要确保每个人都能按此顺序看到它们。而使用并行处理时,则需要额外的同步机制来保持顺序。
- 简化状态转换
我们可以将系统状态视为一系列转换进行推理。”发生 join 事件后,客户端将加入映射。发生 leave 事件后,客户端将被移除”,我们无需担心并发状态更改会使推理失效。
- 易于调试
当出现问题时,我们还可以向事件循环添加日志记录,从而准确查看导致问题的事件序列。在并行处理中,事件顺序取决于线程调度,这使得错误难以重现。
您可能会担心顺序处理会限制性能。但实际上,对于这种工作负载来说,顺序处理完全没问题。原因如下:
- 处理程序运行速度很快。它们执行一些简单的操作,例如向映射表中添加元素、从映射表中移除元素或将消息转发到通道。这些操作只需几微秒即可完成。事件循环每秒可以处理数千个事件。
- 耗时较长的操作(例如写入磁盘、向客户端连接发送消息)发生在其他 goroutine 中。事件循环不会等待这些操作完成。它只是将数据发送到通道或将任务添加到队列,然后立即跳转到下一个事件。
- 如果需要更高的吞吐量,可以将聊天室分片成多个房间,每个房间都有自己的事件循环。但对于单个聊天室来说,顺序处理既简单又足够快。
注意事件循环前的”go cr.cleanupInactiveClients()”这行代码。它会启动一个后台 goroutine,定期检查空闲客户端。
为什么不把它放在事件循环里呢?因为它是基于时间的,而不是基于事件消息的。清理工作进程每 30 秒唤醒一次,并发送空闲客户端的断开连接事件。这些事件会通过正常的事件循环,从而保持单线程的状态变更特性。
6. 启动/停止服务器
“runServer()”函数将所有功能连接起来:
- 使用”NewChatRoom()”创建聊天室
- 延迟执行关闭函数,使其在函数退出时运行
- 使用”go chatRoom.Run()”在单独的 goroutine 中启动事件循环
- 监听 9000 端口上的 TCP 连接,对于每个连接,使用”go handleClient()”创建一个 goroutine
defer 语句非常重要。无论函数如何退出(正常返回、panic 或错误),shutdown 函数都会执行。这确保我们能够创建最终快照并干净地关闭 WAL 文件。
信号处理 goroutine 监听 SIGINT(Ctrl+C)或 SIGTERM(系统关闭)信号。当收到信号时,它会调用 shutdown() 并优雅地退出。这意味着当我们按下 Ctrl+C 时,服务器会在停止之前保存其状态。
事件循环运行并监听连接后,下一步是处理客户端实际连接时发生的情况。这包括读取客户端用户名、创建会话以及设置通信通道。
处理客户端连接
当客户端连接到服务器时,需要执行以下几项操作:建立 TCP 连接、提示输入用户名、创建 Client 对象来表示该客户端、启动 goroutine 来读写消息,以及处理正常的断开连接和意外故障。
名为”internal/chatroom/io.go”的文件来管理客户端连接。当客户端连接时,”handleClient()”函数会管理整个生命周期:
package chatroom
import (
"bufio"
"fmt"
"math/rand"
"net"
"strings"
"time"
)
func handleClient(conn net.Conn, chatRoom *ChatRoom) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic in handleClient: %v\n", r)
}
conn.Close()
}()
// Set initial timeout for username entry
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
reader := bufio.NewReader(conn)
// Prompt for username or reconnection
conn.Write([]byte("Enter username (or 'reconnect::'): \n"))
input, err := reader.ReadString('\n')
if err != nil {
fmt.Println("Failed to read username:", err)
return
}
input = strings.TrimSpace(input)
var username string
var reconnectToken string
var isReconnecting bool
// Parse reconnection attempt
if strings.HasPrefix(input, "reconnect:") {
parts := strings.Split(input, ":")
if len(parts) == 3 {
username = parts[1]
reconnectToken = parts[2]
isReconnecting = true
} else {
conn.Write([]byte("Invalid format. Use: reconnect::\n"))
return
}
} else {
username = input
}
// Generate guest name if empty
if username == "" {
username = fmt.Sprintf("Guest%d", rand.Intn(1000))
}
// Validate reconnection or check for duplicate
if isReconnecting {
if chatRoom.validateReconnectToken(username, reconnectToken) {
fmt.Printf("%s reconnected successfully\n", username)
conn.Write([]byte(fmt.Sprintf("Welcome back, %s!\n", username)))
} else {
conn.Write([]byte("Invalid token or session expired.\n"))
return
}
} else {
// Prevent duplicate logins
if chatRoom.isUsernameConnected(username) {
conn.Write([]byte("Username already connected. Use reconnect if you lost connection.\n"))
return
}
// Create or retrieve session
chatRoom.sessionsMu.Lock()
existingSession := chatRoom.sessions[username]
chatRoom.sessionsMu.Unlock()
if existingSession != nil {
token := existingSession.ReconnectToken
msg := fmt.Sprintf("Tip: Save this token: %s\n", token)
msg += fmt.Sprintf("To reconnect: reconnect:%s:%s\n", username, token)
conn.Write([]byte(msg))
} else {
session := chatRoom.createSession(username)
token := session.ReconnectToken
msg := fmt.Sprintf("Your token: %s\n", token)
msg += fmt.Sprintf("To reconnect: reconnect:%s:%s\n", username, token)
conn.Write([]byte(msg))
}
}
// Create client object
client := &Client{
conn: conn,
username: username,
outgoing: make(chan string, 10), // Buffered
lastActive: time.Now(),
reconnectToken: reconnectToken,
isSlowClient: rand.Float64() < 0.1, // 10% chance for testing
}
// Clear timeout for normal operation
conn.SetReadDeadline(time.Time{})
// Notify chatroom
chatRoom.join <- client
// Send welcome message
welcomeMsg := buildWelcomeMessage(username)
conn.Write([]byte(welcomeMsg))
// Start read/write loops
go readMessages(client, chatRoom)
writeMessages(client) // Blocks until disconnect
// Update session on disconnect
chatRoom.updateSessionActivity(username)
chatRoom.leave <- client
}
func buildWelcomeMessage(username string) string {
msg := fmt.Sprintf("Welcome, %s!\n", username)
msg += "Commands:\n"
msg += " /users - List all users\n"
msg += " /history [N] - Show last N messages\n"
msg += " /msg - Private message\n"
msg += " /token - Show your reconnect token\n"
msg += " /stats - Show your stats\n"
msg += " /quit - Leave\n"
return msg
}
初始的 30 秒超时机制可防止连接耗尽,断开那些未能快速输入用户名的客户端的连接。缓冲式出站通道可防止慢速客户端阻塞广播器。基于令牌的重连机制允许用户无需复杂的身份验证即可恢复会话。双 goroutine 设计意味着读写操作独立进行,因此慢速写入不会阻塞传入消息。
添加从客户端读取消息功能
代码中添加 readMessages() goroutine 来处理所有传入的数据:
func readMessages(client *Client, chatRoom *ChatRoom) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic in readMessages for %s: %v\n", client.username, r)
}
}()
reader := bufio.NewReader(client.conn)
for {
// Set 5-minute idle timeout
client.conn.SetReadDeadline(time.Now().Add(5 * time.Minute))
message, err := reader.ReadString('\n')
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("%s timed out\n", client.username)
} else {
fmt.Printf("%s disconnected: %v\n", client.username, err)
}
return
}
client.markActive() // Update activity timestamp
message = strings.TrimSpace(message)
if message == "" {
continue
}
client.mu.Lock()
client.messagesRecv++
client.mu.Unlock()
// Process commands vs. regular messages
if strings.HasPrefix(message, "/") {
handleCommand(client, chatRoom, message)
continue
}
// Regular message - format and broadcast
formatted := fmt.Sprintf("[%s]: %s\n", client.username, message)
chatRoom.broadcast <- formatted
}
}
空闲 5 分钟后将触发自动断开连接。这可以防止僵尸连接消耗资源。
添加向客户端发送消息功能
添加 writeMessages() 函数以实现向客户端发送消息功能:
func writeMessages(client *Client) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic in writeMessages for %s: %v\n", client.username, r)
}
}()
writer := bufio.NewWriter(client.conn)
for message := range client.outgoing {
// Simulate slow client (testing mode)
if client.isSlowClient {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
_, err := writer.WriteString(message)
if err != nil {
fmt.Printf("Write error for %s: %v\n", client.username, err)
return
}
err = writer.Flush()
if err != nil {
fmt.Printf("Flush error for %s: %v\n", client.username, err)
return
}
}
}
在实际应用中,客户端的网络速度差异很大。网络连接速度慢的客户端不应阻碍其他用户接收消息。对于任何需要向多个接收者广播消息的系统来说,这都是一个根本性的挑战。
为了解决这个问题,我们采用了两种技术。首先,我们将出站通道缓冲区大小设置为 10。这意味着系统可以为单个客户端排队 10 条消息,而不会阻塞其他用户。如果某个客户端暂时变慢(例如,他们正在另一个标签页加载大型网页),缓冲区可以吸收这种速度下降的影响。
其次,在广播消息时(之后 的代码中会看到看到),我们使用非阻塞发送。如果某个客户端的缓冲区已满(因为他们的速度持续过慢),我们会跳过向该客户端发送消息,而不是阻塞所有其他用户。速度较慢的客户端会错过一些消息,但其他用户仍然可以正常接收消息。这被称为优雅降级:即使系统的某些功能出现故障,系统也能继续运行。
处理完客户端连接后,下一步是实现任何聊天系统的核心功能:向所有已连接的用户广播消息。广播是指高效、安全地将消息发送给多个接收者。
实现消息广播
广播是聊天应用的核心。当一个用户发送消息时,消息需要立即送达所有其他用户。但这比听起来要复杂得多,因为我们需要持久化消息以保证其有效性,以不同的速度将其发送给不同的客户端而不造成阻塞,并且还要在所有客户端上保持消息顺序一致。
文件”internal/chatroom/handlers.go”实现了事件处理功能。
“handleBroadcast()”方法函数用来将消息发送给所有客户端:
package chatroom
import (
"fmt"
"strings"
"time"
)
func (cr *ChatRoom) handleBroadcast(message string) {
// Parse message metadata
parts := strings.SplitN(message, ": ", 2)
from := "system"
actualContent := message
if len(parts) == 2 {
from = strings.Trim(parts[0], "[]")
actualContent = parts[1]
}
// Create persistent message record
cr.messageMu.Lock()
msg := Message{
ID: cr.nextMessageID,
From: from,
Content: actualContent,
Timestamp: time.Now(),
Channel: "global",
}
cr.nextMessageID++
cr.messages = append(cr.messages, msg)
cr.messageMu.Unlock()
// Persist to WAL
if err := cr.persistMessage(msg); err != nil {
fmt.Printf("Failed to persist: %v\n", err)
// Continue anyway - availability over consistency
}
// Collect current clients
cr.mu.Lock()
clients := make([]*Client, 0, len(cr.clients))
for client := range cr.clients {
clients = append(clients, client)
}
cr.totalMessages++
cr.mu.Unlock()
fmt.Printf("Broadcasting to %d clients: %s", len(clients), message)
// Fan-out to all clients
for _, client := range clients {
select {
case client.outgoing <- message:
client.mu.Lock()
client.messagesSent++
client.mu.Unlock()
default:
fmt.Printf("Skipped %s (channel full)\n", client.username)
}
}
}
从代码中可以看到,如果 WAL 写入失败,我们仍然会广播消息。为什么?因为对于聊天应用程序来说,可用性比绝对一致性更重要。用户可以立即收到消息,并且我们可以在需要时手动处理 WAL 修复。
处理客户端 join 和 leave 事件
将如下代码加放文件,以处理客户端的加入了离开事件:
func (cr *ChatRoom) handleJoin(client *Client) {
cr.mu.Lock()
cr.clients[client] = true
cr.mu.Unlock()
client.markActive()
fmt.Printf("%s joined (total: %d)\n", client.username, len(cr.clients))
cr.sendHistory(client, 10)
announcement := fmt.Sprintf("*** %s joined the chat ***\n", client.username)
cr.handleBroadcast(announcement)
}
func (cr *ChatRoom) handleLeave(client *Client) {
cr.mu.Lock()
if !cr.clients[client] {
cr.mu.Unlock()
return
}
delete(cr.clients, client)
cr.mu.Unlock()
fmt.Printf("%s left (total: %d)\n", client.username, len(cr.clients))
// Close channel safely
select {
case <-client.outgoing:
// Already closed
default:
close(client.outgoing)
}
announcement := fmt.Sprintf("*** %s left the chat ***\n", client.username)
cr.handleBroadcast(announcement)
}
“handleJoin()”函数会将客户端添加到活跃客户端列表中,将其标记为活跃状态以便进行空闲状态跟踪,并向其发送最近的 10 条消息,以便查看最近的对话,同时广播一条通知,告知所有人该客户端已加入。
“handleLeave()”函数会将客户端从列表中移除,安全地关闭其出站通道(select 函数会检查通道是否已关闭以避免出现 panic),并广播一条离开通知。
发送用户列表和历史消息
为了实现各项功能,需要将如下辅助函数添加到代码文件中:
func (cr *ChatRoom) sendHistory(client *Client, count int) {
cr.messageMu.Lock()
defer cr.messageMu.Unlock()
start := len(cr.messages) - count
if start < 0 {
start = 0
}
historyMsg := "Recent messages:\n"
for i := start; i < len(cr.messages); i++ {
msg := cr.messages[i]
historyMsg += fmt.Sprintf(" [%s]: %s\n", msg.From, msg.Content)
}
select {
case client.outgoing <- historyMsg:
default:
}
}
func (cr *ChatRoom) sendUserList(client *Client) {
cr.mu.Lock()
defer cr.mu.Unlock()
list := "Users online:\n"
for c := range cr.clients {
status := ""
if c.isInactive(1 * time.Minute) {
status = " (idle)"
}
list += fmt.Sprintf(" - %s%s\n", c.username, status)
}
list += fmt.Sprintf("\nTotal messages: %d\n", cr.totalMessages)
list += fmt.Sprintf("Uptime: %s\n", time.Since(cr.startTime).Round(time.Second))
select {
case client.outgoing <- list:
default:
}
}
func (cr *ChatRoom) handleDirectMessage(dm DirectMessage) {
select {
case dm.toClient.outgoing <- dm.message: dm.toClient.mu.Lock() dm.toClient.messagesSent++ dm.toClient.mu.Unlock() default: fmt.Printf("Couldn't deliver DM to %s\n", dm.toClient.username) } } func (cr *ChatRoom) findClientByUsername(username string) *Client { cr.mu.Lock() defer cr.mu.Unlock() for client := range cr.clients { if client.username == username { return client } } return nil } func (c *Client) markActive() { c.mu.Lock() defer c.mu.Unlock() c.lastActive = time.Now() } func (c *Client) isInactive(timeout time.Duration) bool { c.mu.Lock() defer c.mu.Unlock() return time.Since(c.lastActive) > timeout
}
如果所有代码正常工作,那么现在我们就已经拥有一个可以正常运行的聊天系统,客户端可以连接并交换消息。
但是存在一个关键问题:如果服务器崩溃或重启,所有消息都会丢失。下一步是添加持久化功能,以便消息在故障后仍然保留。
持久化 WAL 日志和消息快照
持久化机制可以确保聊天记录在服务器崩溃和重启后仍然存在。如果没有持久化,每次服务器宕机用户都会丢失所有对话。
我们使用两种互补的机制来实现持久化:预写式日志 (WAL) 用于即时持久化,快照用于快速恢复。
文件”internal/chatroom/persistence.go”来处理数据持久化。
首先是实现 WAL 日志来保证系统崩溃后消息依然存在:
package chatroom
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
)
func (cr *ChatRoom) initializePersistence() error {
if err := os.MkdirAll(cr.dataDir, 0755); err != nil {
return fmt.Errorf("create data dir: %w", err)
}
walPath := filepath.Join(cr.dataDir, "messages.wal")
if err := cr.recoverFromWAL(walPath); err != nil {
fmt.Printf("Recovery failed: %v\n", err)
}
file, err := os.OpenFile(walPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open wal: %w", err)
}
cr.walFile = file
fmt.Printf("WAL initialized: %s\n", walPath)
return nil
}
func (cr *ChatRoom) recoverFromWAL(walPath string) error {
file, err := os.Open(walPath)
if err != nil {
if os.IsNotExist(err) {
fmt.Println("No WAL found (fresh start)")
return nil
}
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
recovered := 0
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var msg Message
if err := json.Unmarshal([]byte(line), &msg); err != nil {
fmt.Printf("Skipping corrupt line: %s\n", line)
continue
}
cr.messages = append(cr.messages, msg)
if msg.ID >= cr.nextMessageID {
cr.nextMessageID = msg.ID + 1
}
recovered++
}
fmt.Printf("Recovered %d messages\n", recovered)
return nil
}
func (cr *ChatRoom) persistMessage(msg Message) error {
cr.walMu.Lock()
defer cr.walMu.Unlock()
data, err := json.Marshal(msg)
if err != nil {
return err
}
_, err = cr.walFile.Write(append(data, '\n'))
if err != nil {
return err
}
return cr.walFile.Sync()
}
WAL 日志的每一行,都是采用 JSON 编码的消息串,示例如下:
{"id":1,"from":"Alice","content":"Hello world","timestamp":"2026-03-03T10:00:00Z","channel":"global"}
{"id":2,"from":"Bob","content":"Hi Alice!","timestamp":"2026-03-03T10:00:05Z","channel":"global"}
“Sync()”调用对于数据持久性至关重要。如果没有它,操作系统可能会将写入操作缓冲在内存中,导致崩溃时数据丢失。但缺点是 Sync() 的开销较大(每次调用大约需要 1-10 毫秒)。生产系统可能会将多个消息批量处理以提高吞吐量。
创建和加载消息快照
将以下处理快照的代码加放文件中以实现快照相关功能:
func (cr *ChatRoom) createSnapshot() error {
snapshotPath := filepath.Join(cr.dataDir, "snapshot.json")
tempPath := snapshotPath + ".tmp"
file, err := os.Create(tempPath)
if err != nil {
return err
}
defer file.Close()
cr.messageMu.Lock()
data, err := json.MarshalIndent(cr.messages, "", " ")
cr.messageMu.Unlock()
if err != nil {
return err
}
if _, err := file.Write(data); err != nil {
return err
}
if err := file.Sync(); err != nil {
return err
}
file.Close()
if err := os.Rename(tempPath, snapshotPath); err != nil {
return err
}
fmt.Printf("Snapshot created (%d messages)\n", len(cr.messages))
return cr.truncateWAL()
}
func (cr *ChatRoom) truncateWAL() error {
cr.walMu.Lock()
defer cr.walMu.Unlock()
if cr.walFile != nil {
cr.walFile.Close()
}
walPath := filepath.Join(cr.dataDir, "messages.wal")
file, err := os.OpenFile(walPath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
cr.walFile = file
fmt.Println("WAL truncated")
return nil
}
func (cr *ChatRoom) loadSnapshot() error {
snapshotPath := filepath.Join(cr.dataDir, "snapshot.json")
file, err := os.Open(snapshotPath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return err
}
cr.messageMu.Lock()
err = json.Unmarshal(data, &cr.messages)
cr.messageMu.Unlock()
if err != nil {
return err
}
for _, msg := range cr.messages {
if msg.ID >= cr.nextMessageID {
cr.nextMessageID = msg.ID + 1
}
}
fmt.Printf("Loaded %d messages from snapshot\n", len(cr.messages))
return nil
}
程序中采用先定稿到”.tmp”文件文件,然后再重命名方式保存消息快照。这样可以确保不会出现未写入完成的快照。即使在写入过程中断电,旧的快照仍然有效。
恢复流程
服务器启动时,首先会加载快照(如果存在),该快照可能包含很多条消息,所以要消耗一些时间。然后,它会重放自快照以来写入的 WAL 日志条目,这些条目可能仅包含最近的消息,这个过程相对较快。
有了持久化机制,聊天的消息是安全的。但网络连接并不稳定。用户可能会因为 WiFi 断线、手机切换基站或笔记本电脑进入睡眠模式而断开连接。下一步是实现会话管理,以便用户在重新连接时不会丢失身份或聊天记录。
实现会话管理
会话管理功能允许用户在网络中断后重新连接到服务器时,无需创建新帐户或重新输入凭据。我们将使用跨连接持久的加密安全令牌来实现此功能。
文件”internal/chatroom/session.go”来管理会话处理重新连接。
import (
"chatroom/pkg/token"
"fmt"
"time"
)
func (cr *ChatRoom) createSession(username string) *SessionInfo {
cr.sessionsMu.Lock()
defer cr.sessionsMu.Unlock()
tok := token.GenerateToken()
session := &SessionInfo{
Username: username,
ReconnectToken: tok,
LastSeen: time.Now(),
CreatedAt: time.Now(),
}
cr.sessions[username] = session
fmt.Printf("Created session for %s (token: %s...)\n", username, session.ReconnectToken[:8])
return session
}
func (cr *ChatRoom) validateReconnectToken(username, token string) bool {
cr.sessionsMu.Lock()
defer cr.sessionsMu.Unlock()
session, exists := cr.sessions[username]
if !exists {
return false
}
if session.ReconnectToken != token {
return false
}
if time.Since(session.LastSeen) > 1*time.Hour {
delete(cr.sessions, username)
return false
}
session.LastSeen = time.Now()
return true
}
func (cr *ChatRoom) updateSessionActivity(username string) {
cr.sessionsMu.Lock()
defer cr.sessionsMu.Unlock()
if session, exists := cr.sessions[username]; exists {
session.LastSeen = time.Now()
}
}
func (cr *ChatRoom) isUsernameConnected(username string) bool {
cr.mu.Lock()
defer cr.mu.Unlock()
for client := range cr.clients {
if client.username == username {
return true // Already connected
}
}
return false
}
// cleanupInactiveClients periodically removes sessions that haven't been seen
// for a long time.
func (cr *ChatRoom) cleanupInactiveClients() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
cr.mu.Lock()
var toRemove []*Client
for client := range cr.clients {
if client.isInactive(5 * time.Minute) {
fmt.Printf(" Removing inactive client: %s\n", client.username)
toRemove = append(toRemove, client)
}
}
cr.mu.Unlock()
// Remove inactive clients
for _, client := range toRemove {
cr.leave <- client
}
}
}
程序中调用了 token 中的函数,”pkg/token/token.go”相关代码如下:
package token
import (
"crypto/rand"
"encoding/hex"
)
// GenerateToken returns a secure random 16-byte hex token.
func GenerateToken() string {
b := make([]byte, 16)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
此处的令牌以明文形式通过 TCP 传输。在生产环境中,我们应该使用 TLS 加密来保护传输中的令牌,在存储令牌之前对其进行哈希处理,以防止泄露,并对重连尝试实施速率限制,以防止暴力破解攻击。
聊天室目前已经支持基本的消息传递和重连功能。但用户除了发送消息之外,还需要其他方式与系统交互。命令系统提供了诸如列出用户、查看历史记录和发送私信等功能。
构建命令系统
命令是以斜杠(/)开头的消息,用于执行特殊操作,而不是向所有人广播。许多聊天应用程序都使用这种模式。我们将实现几个能够提升用户体验的实用命令。
在”io.go”文件中添加如下代码:
func handleCommand(client *Client, chatRoom *ChatRoom, command string) {
parts := strings.Fields(command)
if len(parts) == 0 {
return
}
switch parts[0] {
case "/users":
chatRoom.listUsers <- client
case "/stats":
client.mu.Lock()
stats := fmt.Sprintf("Your Stats:\n")
stats += fmt.Sprintf(" Messages sent: %d\n", client.messagesSent)
stats += fmt.Sprintf(" Messages received: %d\n", client.messagesRecv)
stats += fmt.Sprintf(" Last active: %s ago\n",
time.Since(client.lastActive).Round(time.Second))
client.mu.Unlock()
select {
case client.outgoing <- stats:
default:
}
case "/msg":
if len(parts) < 3 {
select {
case client.outgoing <- "Usage: /msg \n":
default:
}
return
}
targetUsername := parts[1]
messageText := strings.Join(parts[2:], " ")
targetClient := chatRoom.findClientByUsername(targetUsername)
if targetClient == nil {
select {
case client.outgoing <- fmt.Sprintf("User '%s' not found\n", targetUsername):
default:
}
return
}
privateMsg := fmt.Sprintf("[From %s]: %s\n", client.username, messageText)
select {
case targetClient.outgoing <- privateMsg:
default:
select {
case client.outgoing <- fmt.Sprintf("%s's inbox is full\n", targetUsername):
default:
}
return
}
select {
case client.outgoing <- fmt.Sprintf("Message sent to %s\n", targetUsername): default: } case "/history": count := 20 if len(parts) > 1 {
fmt.Sscanf(parts[1], "%d", &count)
}
if count > 100 {
count = 100
}
cr.sendHistory(client, count)
case "/token":
chatRoom.sessionsMu.Lock()
session := chatRoom.sessions[client.username]
chatRoom.sessionsMu.Unlock()
if session != nil {
msg := fmt.Sprintf("Your reconnect token:\n")
msg += fmt.Sprintf(" reconnect:%s:%s\n", client.username, session.ReconnectToken)
select {
case client.outgoing <- msg:
default:
}
}
case "/quit":
announcement := fmt.Sprintf("%s left the chat\n", client.username)
chatRoom.broadcast <- announcement
select {
case client.outgoing <- "Goodbye!\n":
default:
}
time.Sleep(100 * time.Millisecond)
client.conn.Close()
default:
select {
case client.outgoing <- fmt.Sprintf("Unknown: %s\n", parts[0]):
default:
}
}
}
我们的服务器现在已具备所有核心功能:连接处理、消息广播、数据持久化、会话管理和命令。但要实际使用这个聊天室,我们还需要一个客户端应用程序。客户端比服务器简单得多,因为它只需要连接并转发消息。
创建客户端程序
客户端应用程序为聊天室提供用户界面。它连接到服务器,显示收到的消息,并发送用户输入的消息。虽然服务器端包含许多并发组件,较为复杂,但客户端却非常简单。
文件”internal/chatroom/client.go”实现了客户端功能。
package chatroom
import (
"bufio"
"fmt"
"net"
"os"
"strings"
)
// StartClient connects to the local chat server and relays stdin/stdout.
func StartClient() {
conn, err := net.Dial("tcp", ":9000")
if err != nil {
fmt.Println("Error connecting to server:", err)
return
}
defer conn.Close()
fmt.Println("Connected to chat server")
// Start reading from server (in background)
go func() {
reader := bufio.NewReader(conn)
for {
message, err := reader.ReadString('\n') // Waits for Enter key
if err != nil {
fmt.Println("Disconnected from server.")
os.Exit(0)
}
fmt.Print("\r" + message)
fmt.Print(">> ")
}
}()
// Read input from user and send to server
inputReader := bufio.NewReader(os.Stdin)
fmt.Println("Welcome to the chat server!")
for {
fmt.Print(">> ")
message, _ := inputReader.ReadString('\n') // Waits for Enter key
message = strings.TrimSpace(message)
if message == "" {
continue
}
// Write/Send message to the server
conn.Write([]byte(message + "\n"))
}
}
客户端使用两个 goroutine 同时处理通信。主 goroutine 从标准输入(键盘)读取数据并向服务器发送消息。当输入消息并按下回车键时,消息会立即通过 TCP 连接发送出去。
后台 goroutine 持续从服务器读取数据。每当收到消息时,它都会将其打印到屏幕上。”\r”(回车符)会在打印消息之前清除当前的”>>”提示符,因此新消息不会出现在与输入内容相同的行上。打印完消息后,它会重新打印提示符,以便用户可以继续输入。
这种双 goroutine 设计意味着可以在输入的同时接收消息。如果有人在用户输入消息的过程中发送了一条消息,他们的消息会立即显示,而提示符会重新出现在消息下方。
“defer conn.Close()”确保函数退出时连接被正确关闭。如果服务器断开连接,读取 goroutine 会收到错误并调用”os.Exit(0)”来优雅地终止整个客户端程序。
创建程序入口
程序入口代码位于”cmd”目录中,分别是”cmd/server/main.go”和”cmd/client/main.go”,用以生成最终的可执行文件。服务端的入口程序代码如下:
package main
import (
"chatroom/internal/chatroom"
"fmt"
"os"
)
func main() {
fmt.Println("Starting server from cmd/server...")
chatroom.StartServer()
os.Exit(0)
}
而客户端的程序代码为:
package main
import (
"chatroom/internal/chatroom"
"fmt"
)
func main() {
fmt.Println("Starting client from cmd/client...")
chatroom.StartClient()
}
最后,为”internal/chatroom/server.go”文件中添加一个包装函数用以启动服务器:
package chatroom
func StartServer() {
runServer()
}
所有入口点创建完毕后,我们的聊天室程序就完成了,可以进行测试了。下一步是学习如何测试该应用,以确保一切运行正常。
测试聊天室应用
测试像聊天室这样的并发系统需要采用与测试典型顺序代码不同的方法。我们需要验证 goroutine 是否正确协调,消息是否按正确的顺序到达,以及系统是否能处理诸如断开连接之类的极端情况。
编写单元测试文件
单元测试用于独立验证各个组件。对于聊天室而言,最重要的测试是验证消息是否能正确地广播给所有已连接的客户端。
创建”internal/chatroom/chatroom_test.go”文件:
package chatroom
import (
"testing"
"strings"
"time"
)
func TestBroadcast(t *testing.T) {
cr, _ := NewChatRoom("./testdata")
defer cr.shutdown()
go cr.Run()
// Create mock clients
client1 := &Client{
username: "Alice",
outgoing: make(chan string, 10),
}
client2 := &Client{
username: "Bob",
outgoing: make(chan string, 10),
}
// Join clients
cr.join <- client1
cr.join <- client2
time.Sleep(100 * time.Millisecond)
// Broadcast message
cr.broadcast <- "[Alice]: Hello!"
// Verify both receive it
select {
case msg := <-client1.outgoing:
if !strings.Contains(msg, "Hello!") {
t.Fatal("Client1 didn't receive correct message")
}
case <-time.After(1 * time.Second):
t.Fatal("Client1 didn't receive message")
}
select {
case msg := <-client2.outgoing:
if !strings.Contains(msg, "Hello!") {
t.Fatal("Client2 didn't receive correct message")
}
case <-time.After(1 * time.Second):
t.Fatal("Client2 didn't receive message")
}
}
此测试会创建一个聊天室实例,并使用”go cr.Run()”启动其事件循环。然后,它会创建两个模拟客户端。请注意,这些并非真正的 TCP 连接–它们只是带有出站通道的 Client 结构体。这样,我们就可以在无需实际网络连接的情况下测试广播逻辑。
测试会将两个客户端发送到加入通道,等待 100 毫秒以处理它们,然后广播一条消息。带有超时的 select 语句至关重要。它们会尝试从每个客户端的出站通道接收消息,但如果在 1 秒内没有收到任何消息,则测试失败。这可以防止测试在出现问题时无限期地挂起。
“time.Sleep(100 * time.Millisecond)”让事件循环有时间处理加入事件,然后再进行广播。在实际系统中,一般会使用通道进行同步,但对于测试而言,短暂的休眠是可以接受的。
使用如下命令进行功能测试:
# go test ./internal/chatroom -v
“-v”标志会显示详细输出,打印每个测试的运行结果。我们可以查看广播测试是否通过以及耗时。以下输出显示测试已通过:

使用测试文件进行功能测试
应用程序联调测试
联调测试会判断整个系统是否存在问题,包括实际服务器、实际客户端以及真实的网络连接,不同于之前的单元测试,联调测试会涉及所有方面。
首先使用如下命令启动服务器及三个客户端:
# Terminal 1: Start server go run cmd/server/main.go # Terminal 2: Client 1 go run cmd/client/main.go # Enter username: Alice # Terminal 3: Client 2 go run cmd/client/main.go # Enter username: Bob # Terminal 4: Client 3 go run cmd/client/main.go # Enter username: John
测试目的:
- 基本消息功能:从 Alice 发送一条消息,并验证 Bob 和 John 是否都能收到。应该在所有客户端窗口中看到该消息,消息内容以方括号括起来显示发送者的用户名。尝试从每个客户端发送消息,以验证广播功能是否在所有方向上都有效。
- 加入和离开通知:当新客户端连接时,所有现有客户端都应该看到”已加入聊天”的通知。当有人断开连接(使用 /quit 命令或关闭终端)时,所有人都应该看到”已离开聊天”的消息。这可以确认加入和离开处理程序工作正常。
- 私信功能:使用 /msg Bob,这是来自 Alice 客户端的私信。该消息应该只出现在 Bob 的窗口中,而不会出现在 John 或 Alice 的窗口中。尝试在不同的用户对之间发送私信,以验证路由功能是否正常。发送者应该收到消息已发送的确认信息。
- 用户列表:从任何客户端运行 /users 命令。应该看到所有已连接用户的列表。如果用户闲置超过一分钟,其状态应显示为”(idle)”。该命令还应显示消息总数和服务器运行时间。
- 聊天记录:新客户端加入时应自动收到最近 10 条消息。也可以使用”/history 20″命令请求最近 20 条消息。这可以验证消息持久化功能是否正常工作。
- 会话重连:在一台客户端上,使用”/token”命令获取您的重连令牌。令牌格式类似于”reconnect:Alice:338f04ca….”。复制此令牌,使用 Ctrl+C 断开客户端连接,启动一台新客户端,并在提示时粘贴重连字符串。就应该会使用之前的身份重新加入聊天,其他用户也不会看到重复的加入通知。
- 统计信息:使用”/stats”命令查看已发送和接收的消息数量以及上次活跃的时间。这可以验证客户端统计信息跟踪功能是否正常工作。
- 错误处理:尝试使用已被使用的用户名进行连接–这应该会被拒绝。尝试向一个不存在的用户发送私信–应该会收到错误提示。尝试使用无效的重连令牌–应该会被拒绝。这些测试验证了程序的验证逻辑是否有效。
查看服务器终端以了解服务器的视角。我们将看到连接日志、广播确认信息以及任何错误。当客户端断开连接时,应该会看到他们的会话正在更新。当服务器创建快照时,也会看到这些快照被记录下来。
联调测试可以发现单元测试遗漏的问题,例如网络超时、跨多个客户端的消息排序问题,或者 WAL 文件的创建和锁定方式存在问题。
服务器典型输出如下图所示:

聊天室服务器典型输出
发布应用程序
所有功能测评正常后,就可以发布应用程序成为可执行文件实际使用了。使用如下命令就可以将服务器和客户端编译可执行文件:
go build -o server cmd/server/main.go go build -o client cmd/client/main.go
至此,我们已经从零开始构建了一个可用于生产环境的分布式聊天室。本教程演示了分布式系统的重要概念,包括并发模式、网络编程、状态管理、持久化和容错。
