WebSocket协议解读

avatar
websocket和http协议的关联:

  • 都是应用层协议,都基于tcp传输协议。
  • 跟http有良好的兼容性,ws和http的默认端口都是80,wss和https的默认端口都是443。
  • websocket在握手阶段采用http发送数据。

websocket和http协议的差异:

  • http是半双工,而websocket通过多路复用实现了全双工。
  • http只能由client主动发起数据请求,而websocket还可以由server主动向client推送数据。在需要及时刷新的场景中,http只能靠client高频地轮询,浪费严重。
  • http是短连接(也可以实现长连接, HTTP1.1 的连接默认使用长连接),每次数据请求都得经过三次握手重新建立连接,而websocket是长连接。
  • http长连接中每次请求都要带上header,而websocket在传输数据阶段不需要带header。

  WebSocket是HTML5下的产物,能更好的节省服务器资源和带宽,websocket应用场景举例:

  • html5多人游戏
  • 聊天室
  • 协同编辑
  • 基于实时位置的应用
  • 股票实时报价
  • 弹幕
  • 视频会议

websocket握手协议:
Request Header

1
2
3
4
Sec-Websocket-Version:13
Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Key:duR0pUQxNgBJsRQKj2Jxsw==

Response Header

1
2
3
Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Accept:a1y2oy1zvgHsVyHMx+hZ1AYrEHI=
  • Upgrade:websocket和Connection:Upgrade指明使用WebSocket协议。
  • Sec-WebSocket-Version 指定Websocket协议版本。
  • Sec-WebSocket-Key是一个Base64 encode的值,是浏览器随机生成的。
  • 服务端收到Sec-WebSocket-Key后拼接上一个固定的GUID,进行一次SHA-1摘要,再转成Base64编码,得到Sec-WebSocket-Accept返回给客户端。客户端对本地的Sec-WebSocket-Key执行同样的操作跟服务端返回的结果进行对比,如果不一致会返回错误关闭连接。如此操作是为了把websocket header跟http header区分开。

WebSocket CS架构实现

  首先需要安装gorilla的websocket包。

1
go get github.com/gorilla/websocket
  1. 将http升级到WebSocket协议。
1
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*websocket.Conn, error)
  1. 客户端发起握手,请求建立连接。
1
func (*websocket.Dialer) Dial(urlStr string, requestHeader http.Header) (*websocket.Conn, *http.Response, error)
  1. 基于connection进行read和write。

ws_server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main

import (
"fmt"
"log"
"net"
"net/http"
"strconv"
"time"

"github.com/gorilla/websocket"
)

type (
Request struct {
A int
B int
}
Response struct {
Sum int
}
)

type WsServer struct {
listener net.Listener
addr string
upgrade *websocket.Upgrader
}

// 构造函数
func NewWsServer(port int) *WsServer {
ws := new(WsServer)
ws.addr = "0.0.0.0:" + strconv.Itoa(port)
ws.upgrade = &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second, //握手超时时间
ReadBufferSize: 2048, //读缓冲大小
WriteBufferSize: 1024, //写缓冲大小
//请求检查函数,用于统一的链接检查,以防止跨站点请求伪造。如果Origin请求头存在且原始主机不等于请求主机头,则返回false
CheckOrigin: func(r *http.Request) bool {
log.Printf("request url %s\n", r.URL)
log.Println("handshake request header")
for key, values := range r.Header {
log.Printf("%s:%s\n", key, values[0])
}
return true
},
//http错误响应函数
Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {},
}
return ws
}

// httpHandler必须实现ServeHTTP接口
func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/add" {
fmt.Println("path error")
http.Error(w, "请求的路径不存在", 222) //把出错的话术写到ResponseWriter里
return
}
conn, err := ws.upgrade.Upgrade(w, r, nil) //将http协议升级到websocket协议
if err != nil {
log.Printf("upgrade http to websocket error: %v\n", err)
return
}
fmt.Printf("establish conection to client %s\n", conn.RemoteAddr().String())
go ws.handleConnection(conn)
}

// 处理连接里发来的请求数据
func (ws *WsServer) handleConnection(conn *websocket.Conn) {
defer func() {
conn.Close()
}()
for { //20秒关闭连接
conn.SetReadDeadline(time.Now().Add(20 * time.Second))
var request Request
if err := conn.ReadJSON(&request); err != nil {
//判断是不是超时
if netError, ok := err.(net.Error); ok { //如果ok==true,说明类型断言成功
if netError.Timeout() {
fmt.Printf("read message timeout, remote %s\n", conn.RemoteAddr().String())
return
}
}
//忽略websocket.CloseGoingAway/websocket.CloseNormalClosure这2种closeErr,如果是其他closeErr就打一条错误日志
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
fmt.Printf("client %s %v\n", conn.RemoteAddr().String(), err)
}
return //只要ReadMessage发生错误,就关闭这条连接
} else {
response := Response{Sum: request.A + request.B}
if err = conn.WriteJSON(&response); err != nil {
log.Printf("write response failed: %v", err)
} else {
log.Printf("write response %d\n", response.Sum)
}
}
}
}

func (ws *WsServer) Start() (err error) {
ws.listener, err = net.Listen("tcp", ws.addr) //http和websocket都是建立在tcp之上的
if err != nil {
log.Printf("listen error:%s\n", err)
return
}
err = http.Serve(ws.listener, ws) //开始对外提供http服务。可以接收很多连接请求,其他一个连接处理出错了,也不会影响其他连接
if err != nil {
log.Printf("http server error: %v\n", err)
return
}

// if err:=http.ListenAndServe(ws.addr, ws);err!=nil{ //Listen和Serve两步合成一步
// fmt.Printf("http server error: %v\n", err)
// return
// }
return nil
}

func main() {
ws := NewWsServer(5657)
ws.Start()
}

ws_client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"

"github.com/gorilla/websocket"
)

type (
Request struct {
A int
B int
}
Response struct {
Sum int
}
)

func main() {
dialer := &websocket.Dialer{}
header := http.Header{
"Cookie": []string{"name=zcy"},
}
conn, resp, err := dialer.Dial("ws://localhost:5657/add", header) //Dial:握手阶段,会发送一条http请求。请求一个不存在的路径试试看
defer resp.Body.Close()
if err != nil {
fmt.Printf("dial server error:%v\n", err)
fmt.Println(resp.StatusCode)
msg, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(msg))
return
}
fmt.Println("handshake response header")
for key, values := range resp.Header {
fmt.Printf("%s:%s\n", key, values[0])
}
// time.Sleep(5 * time.Second)
defer conn.Close()
for i := 0; i < 10; i++ {
request := Request{A: 7, B: 4}
requestBytes, _ := json.Marshal(request)
err = conn.WriteJSON(request) //websocket.Conn直接提供发json序列化和反序列化方法
if err != nil {
log.Println(err)
break
}
fmt.Printf("write request %s\n", string(requestBytes))
var response Response
err = conn.ReadJSON(&response)
if err != nil {
log.Println(err)
break
}
log.Printf("receive response: %d\n", response.Sum)
time.Sleep(1 * time.Second)
}
}

  websocket发送的消息类型有5种:TextMessag,BinaryMessage, CloseMessag,PingMessage,PongMessage。TextMessag和BinaryMessage分别表示发送文本消息和二进制消息。CloseMessage关闭帧,接收方收到这个消息就关闭连接
PingMessage和PongMessage是保持心跳的帧,发送方接收方是PingMessage,接收方发送方是PongMessage,目前浏览器没有相关api发送ping给服务器,只能由服务器发ping给浏览器,浏览器返回pong消息。

聊于室实现

  gorilla的websocket项目中有一个聊天室的demo,此处讲一下它的设计思路。
avatar
Hub

  • Hub持有每一个Client的指针,broadcast管道里有数据时把它写入每一个Client的send管道中。
  • 注销Client时关闭Client的send管道。

Client

  • 前端(browser)请求建立websocket连接时,为这条websocket连接专门启一个协程,创建一个client。
  • client把前端发过来的数据写入hub的broadcast管道。
  • client把自身send管道里的数据写给前端。
  • client跟前端的连接断开时请求从hub那儿注销自己。

Front

  • 当打开浏览器页面时,前端会请求建立websocket连接。
  • 关闭浏览器页面时会主动关闭websocket连接。

存活监测

  • 当hub发现client的send管道写不进数据时,把client注销掉。
  • client给websocket连接设置一个读超时,并周期性地给前端发ping消息,如果没有收到pong消息则下一次的conn.read()会报出超时错误,此时client关闭websocket连接。

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package main

import (
"bytes"
"flag"
"log"
"net/http"

"github.com/gorilla/websocket"
)

// 跨域问题
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func serveHome(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "C:/Users/ren/Desktop/新建文件夹/chat/home.html")
}

// 创建用户端类型
type Client struct {
hub *Hub //总线类型
conn *websocket.Conn //websocket连接
username []byte //用户名
send chan []byte //每个用户收发消息管道

}

// 从websocket连接读出数据,发给hub
func (client *Client) read() {
//收尾工作
defer func() {
//从hub那注销client
client.hub.unregister <- client
log.Printf("%s offline\n", client.username)
log.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
//关闭websocket管道
client.conn.Close()
}()
for {
_, message, err := client.conn.ReadMessage()
if err != nil {
// 如果前端主动断开连接,会触发报错,退出循环读取
break
}
// 换行符用空格替代,并且把首位连续的空格去掉
message = bytes.TrimSpace(bytes.Replace(message, []byte{'\n'}, []byte{' '}, -1))

if len(client.username) == 0 {
// 约定:第一次发言设置为用户名,该信息不进行广播
client.username = message
log.Printf("%s online\n", client.username)
} else {
// 要广播的内容前面加上用户名
client.hub.broadcast <- bytes.Join([][]byte{client.username, message}, []byte(": "))
}
}
}

// 从hub的broadcast读取数据,写到websocket连接里
func (client *Client) write() {
defer func() {
// 给前端写数据失败,就可以关闭连接了
log.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
client.conn.Close()
}()
for {
msg, ok := <-client.send
// 正常情况是hub发来了数据,如果前端断开了连接,read()会触发client.send管道关闭,从而执行!ok
if !ok {
client.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 通过NextWriter创建一个新的writer,主要是为了确保上一个writer已经被关闭,即它想写的内容已经flush到conn里去了
writer, err := client.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
writer.Write(msg)
writer.Write([]byte{'\n'})

//如果client.send里还有消息,则一次都写给前端
for i := 0; i < len(client.send); i++ {
writer.Write(<-client.send)
writer.Write([]byte{'\n'})
}
// 必须调用close,否则下次调用client.conn.NextWriter时本条消息才会发送到前端
err = writer.Close()
if err != nil {
return
}
}
}

// 创建总线类型
type Hub struct {
clients []*Client //维护所有Client
register chan *Client //Client注册请求通过管道来接收
unregister chan *Client //Client注销请求通过管道来接收
broadcast chan []byte //需要广播的消息
}

// 总线启动方法
func (hub *Hub) Run() {
for {
select {
case client := <-hub.register:
hub.clients = append(hub.clients, client)
case client := <-hub.unregister:
for i, v := range hub.clients {
if client == v {
// 注销client
hub.clients = append(hub.clients[:i], hub.clients[i+1:]...)
// hub从此不再向该client广播消息
close(client.send)
}
}
case msg := <-hub.broadcast:
// 一人发消息,广播到所有用户管道
for _, client := range hub.clients {
client.send <- msg
}
}
}
}

func SeveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
//http升级到websocket协议
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatalf("upgrade error: %v\n", err.Error())
}
log.Printf("connect to client %s\n", conn.RemoteAddr().String())
//每来一个前端请求,就会创建一个client
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
//向hub注册client
client.hub.register <- client

//websocket是全双工模式,可以同时read和write
go client.read()
go client.write()
}

// 总线构造函数
func NewHub() *Hub {
return &Hub{
// 维护所有用户
clients: make([]*Client, 0),
register: make(chan *Client),
unregister: make(chan *Client),
// 同步管道,确保hub里消息不会堆积,如果同时有多个client想给hub发数据就阻塞
broadcast: make(chan []byte),
}
}

func main() {
// 总线阻塞在子协程
hub := NewHub()
go hub.Run()
// http监听,启动home主页
port := flag.String("port", "5656", "http service port")
flag.Parse()
http.HandleFunc("/", serveHome)
// websocket路由
http.HandleFunc("/chat", func(w http.ResponseWriter, r *http.Request) {
SeveWs(hub, w, r)
})
log.Printf("http serve on port %s\n", *port)
err := http.ListenAndServe(":"+*port, nil)
if err != nil {
log.Fatal(err)
}
}

客户端

go客户端

writer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main

import (
"bufio"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"sync"

"github.com/gorilla/websocket"
)

var wg sync.WaitGroup

// 消息管道
var ch = make(chan []byte)

func SendMessage(conn websocket.Conn) {
defer wg.Done()
defer func() {
log.Printf("close connection to %s\n", conn.RemoteAddr().String())
conn.Close()
}()
for {
msg, ok := <-ch
if !ok {
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 通过NextWriter创建一个新的writer,主要是为了确保上一个writer已经被关闭,即它想写的内容已经flush到conn里去了
writer, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
writer.Write(msg)
writer.Write([]byte{'\n'})

//如果ch里还有消息,则一次都写给前端
for i := 0; i < len(ch); i++ {
writer.Write(<-ch)
writer.Write([]byte{'\n'})
}
// 必须调用close,否则下次调用client.conn.NextWriter时本条消息才会发送到前端
err = writer.Close()
if err != nil {
return
}
}
}

func main() {
dialer := &websocket.Dialer{}
header := http.Header{
"Cookie": []string{"name=zcy"},
}
//Dial:握手阶段,会发送一条http请求
conn, resp, err := dialer.Dial("ws://localhost:5656/chat", header)
defer resp.Body.Close()
if err != nil {
log.Printf("dial server error:%v\n", err)
log.Println(resp.StatusCode)
msg, _ := ioutil.ReadAll(resp.Body)
log.Println(string(msg))
return
}

defer conn.Close()

// 启动一个子协程阻塞监听管道消息,发送给服务端
wg.Add(1)
go SendMessage(*conn)

// 开始发言
fmt.Println("please input your name and enter exit to exit")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
s := scanner.Text()
if s == "exit" {
break
}
// 协定第一条消息为名字
ch <- []byte(s)
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}

close(ch)
wg.Wait()
}

reader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"fmt"
"io/ioutil"
"log"
"net/http"

"github.com/gorilla/websocket"
)

func ReadMessage(conn websocket.Conn) {
// defer wg.Done()
//收尾工作
defer func() {
log.Printf("close connection to %s\n", conn.RemoteAddr().String())
//关闭websocket管道
conn.Close()
}()
for {
_, message, err := conn.ReadMessage()
if err != nil {
// 如果前端主动断开连接,会触发报错,退出循环读取
break
}
// 换行符用空格替代,并且把首位连续的空格去掉
// message = bytes.TrimSpace(bytes.Replace(message, []byte{'\n'}, []byte{' '}, -1))
fmt.Printf("%s", message)
}
}

func main() {
dialer := &websocket.Dialer{}
header := http.Header{
"Cookie": []string{"name=zcy"},
}
//Dial:握手阶段,会发送一条http请求
conn, resp, err := dialer.Dial("ws://localhost:5656/chat", header)
defer resp.Body.Close()
if err != nil {
log.Printf("dial server error:%v\n", err)
log.Println(resp.StatusCode)
msg, _ := ioutil.ReadAll(resp.Body)
log.Println(string(msg))
return
}

defer conn.Close()

ReadMessage(*conn)
}