你是否好奇过类似微信这样的 IM 是如何近乎实时收发消息的?除了基于 TCP、QUIC、Websocket 等双向通信技术实现通信外,能否通过普通的 HTTP 协议来实现双向通信吗?本文主要是想介绍 HTTP 协议中的 Server-Sent Event 机制,以及如何基于它实现话题订阅机制。
类似微信,客户端可以通过 POST 请求发送个人或群消息到服务器,并由服务器进行消息存储和转发。客户端接收个人消息,可通过向服务器订阅个人消息实现,服务器收到个人消息后可实时推送给客户端。而客户端接收群消息,可通过向服务器订阅群消息实现,服务器收到某个群的消息后可实时推送给客户端。
SSE 是一种基于 HTTP 连接的服务器推送技术,客户端与服务器初始化好连接后,服务器可以随时向客户端发送内容更新。SSE 是对 HTTP 协议很好的补充,可以轻松实现客户端与服务器双向通信。
HTTP 2.0 中,连接是双向多路复用的(HTTP/1 不是的),SSE 可以与请求重用同一个 h2 连接。换句话说,就是可以通过一个 h2 连接向服务器发送 GET/POST 等请求,同时从服务器接收 Server-Sent Events,数据大小、格式都没有限制,更是解除了 SSE 的并发连接数限制。
与之对应,WebSocket 从设计上并没有充分利用 HTTP/2,需要建立额外的连接,而且协议更复杂,不如基于普通 HTTP 连接的 SSE 简单。
还有一种容易混淆的 HTTP/2 Server Push,这是一个 HTTP/2 中新的可选的特性,允许服务器在客户端(浏览器)请求资源(CSS/JS等)前,先把资源推送给客户端。
话题订阅机制在互联网开发领域是很常见的设计开发模式,如消息队列、Redis Pub/Sub 等中都有使用。
'hi,...' (huoyijie)
+----Publish(SSE)---->+------+
| | |
+---+--+ 'chatgpt' |Chrome|
| |<----Subscribe---<| |
| HTTP | | +------+
| | (Get)
|Server| | +------+
| |<----Subscribe---<| |
+---+--+ 'chatgpt|robot' |Chrome|
| | |
+----Publish(SSE)---->+------+
'hi,...' (jack)
文中所有代码已放到 Github subscribe-topic-with-sse 目录下。
前置条件
- 已安装 Go 1.20+
- 已安装 IDE (如 vscode)
创建 subscribe-topic-with-sse 项目
$ mkdir subscribe-topic-with-sse && cd subscribe-topic-with-sse
$ go mod init subscribe-topic-with-sse
新增 mydb.go 文件,添加测试用户和话题数据:
// mydb.go
package main
// 所有注册用户
var myUsers = map[string]bool{
"huoyijie": true,
"jack": true,
}
// 所有注册话题
var myTopics = map[string]bool{
"chatgpt": true,
"robot": true,
}
新增 types.go 文件,添加如下几个类型:
// types.go
package main
// 在某个话题上发布消息时会创建消息对象
type Message struct {
Topic string `json:"topic" binding:"required"`
Data string `json:"data" binding:"required"`
}
// 向客户端发布消息的 Channel
type ClientChan chan Message
// 客户端,封装用户、订阅话题列表及客户端 Channel
type Client struct {
User string
Topics []string
C ClientChan
}
新增 event.go,添加最核心的 Event 类型:
// event.go
package main
import "log"
type Event struct {
Message chan Message
// New client connections
NewClients chan Client
// Closed client connections
ClosedClients chan Client
// All client connections
Clients map[string]ClientChan
// 所有用户的订阅话题
// 格式: map["huoyijie"]map["chatgpt"]true
Subscribs map[string]map[string]bool
}
// 创建 Event 对象
func NewEvent() (event *Event) {
event = &Event{
Message: make(chan Message),
NewClients: make(chan Client),
ClosedClients: make(chan Client),
Clients: make(map[string]ClientChan),
Subscribs: make(map[string]map[string]bool),
}
// 启动单独一个协程内更新 Clients/Subscribs,避免并发读写 map
go event.listen()
return
}
// 启动单独一个协程内更新 Clients/Subscribs,避免并发读写 map
func (event *Event) listen() {
for {
select {
// Add new available client
case client := <-event.NewClients:
event.Clients[client.User] = client.C
// 订阅话题列表
topics := map[string]bool{}
for _, topic := range client.Topics {
if _, ok := myTopics[topic]; ok {
topics[topic] = true
}
}
event.Subscribs[client.User] = topics
log.Printf("Client added. %d registered clients", len(event.Clients))
// Remove closed client
case client := <-event.ClosedClients:
delete(event.Clients, client.User)
close(client.C)
log.Printf("Removed client. %d registered clients", len(event.Clients))
// Forward message to client
case eventMsg := <-event.Message:
for user, topics := range event.Subscribs {
if _, ok := topics[eventMsg.Topic]; ok {
event.Clients[user] <- eventMsg
}
}
}
}
}
新增 subscribe.go 文件,添加客户端订阅话题 Handler:
// subscribe.go
package main
import (
"errors"
"io"
"net/http"
"strings"
"github.com/gin-gonic/gin"
)
func subscribe(event *Event) gin.HandlerFunc {
return func(c *gin.Context) {
user := c.Query("user")
topics := c.Query("topics")
// 判断用户是否存在,订阅话题是否为空
if _, ok := myUsers[user]; !ok || len(topics) == 0 {
c.AbortWithError(http.StatusBadRequest, errors.New("bad request"))
return
}
// 创建新客户端
client := Client{
User: user,
Topics: strings.Split(topics, "|"),
C: make(ClientChan),
}
event.NewClients <- client
// 如果连接端开,删除该客户端
defer func() {
event.ClosedClients <- client
}()
// Stream message to client
c.Stream(func(w io.Writer) bool {
// 发送该 client Channel 的消息通过 SSE Stream 发给浏览器
if message, ok := <-client.C; ok {
c.SSEvent("message", message)
return true
}
return false
})
}
}
新增 publish.go 文件,添加发布话题消息 Handler:
// publish.go
package main
import "github.com/gin-gonic/gin"
func publish(event *Event) gin.HandlerFunc {
return func(c *gin.Context) {
msg := Message{}
if err := c.BindJSON(&msg); err != nil {
return
}
// 发布话题
event.Message <- msg
}
}
新增 home.go,添加 web 页面:
// home.go
package main
import (
"errors"
"fmt"
"net/http"
"github.com/gin-gonic/gin"
)
func home() gin.HandlerFunc {
return func(c *gin.Context) {
user := c.Query("user")
topics := c.Query("topics")
// 判断用户是否存在,订阅话题是否为空
if _, ok := myUsers[user]; !ok || len(topics) == 0 {
c.AbortWithError(http.StatusBadRequest, errors.New("bad request"))
return
}
c.Writer.WriteString(fmt.Sprintf(`
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Server Sent Event</title>
</head>
<body>
<div>Topic Messages:</div>
<div class="event-data"></div>
</body>
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
// EventSource object of javascript listens the streaming events from our go server and prints the message.
var stream = new EventSource("/subscribe?user=%s&topics=%s");
stream.addEventListener("message", function(e){
$('.event-data').append(e.data + "</br>")
});
</script>
</html>`, user, topics))
}
}
请求 http://host:port/?user=huoyijie&topics=chatgpt|robot
时会返回上述页面,注意上面 js 代码部分:
var stream = new EventSource("/subscribe?user=%s&topics=%s");
stream.addEventListener("message", function(e){
$('.event-data').append(e.data + "</br>")
});
页面打开后,会马上请求 /subscribe?user=huoyijie&topics=chatgpt|robot
实现用户 huoyijie
对话题列表 chatgpt|robot
的订阅。然后当某个话题有新消息时,就会自动通过 SSE Stream 发送消息到 web 页面。
最后添加 main.go 文件,添加 Handler 注册代码:
// main.go
package main
import (
"github.com/gin-gonic/gin"
)
func main() {
event := NewEvent()
r := gin.Default()
r.GET("/", home())
// 订阅话题消息
r.GET("subscribe", subscribe(event))
// 发布话题消息
r.POST("publish", publish(event))
r.Run(":8000")
}
接下来安装依赖和启动服务器:
# 下载安装依赖
$ go mod tidy
# 运行应用
$ go run .
打开一个新浏览器 Tab 页面,访问 URL: http://localhost:8000/?user=huoyijie&topics=chatgpt
打开另一个新 Tab 页面,访问 URL: http://localhost:8000/?user=jack&topics=chatgpt|robot
现在有 2 个用户分别订阅了话题,现在我们来测试下发布话题消息:
# 向 robot 话题发布消息
$ curl -d '{"topic": "robot", "data": "hi robot"}' http://localhost:8000/publish
只有用户 jack
收到了 hi robot
消息,因为只有 jack
订阅了 robot
。
# 向 chatgpt 话题发布消息
curl -d '{"topic": "chatgpt", "data": "hi chatgpt"}' http://localhost:8000/publish
2个用户都收到了 hi chatgpt
消息,因为他们都订阅了 chatgpt
。
如果打开浏览器 DevTool,可以看到服务器通过 SSE Stream 发送到浏览器的 EventStream
数据:
本文主要介绍了 HTTP 协议中的 Server-Sent Event 机制,以及如何基于它实现话题订阅机制。如果想实现类似微信中的实时收发消息功能,客户端可以通过 POST 请求发送个人或群消息到服务器,并由服务器进行消息存储和转发。客户端接收个人消息,可通过向服务器订阅个人消息实现,服务器收到个人消息后可实时推送给客户端。而客户端接收群消息,可通过向服务器订阅群消息实现,服务器收到某个群的消息后可实时推送给客户端。