概念

RPC是远程过程调用的简称,是分布式系统中不同节点间流行的通信方式。在互联网时代,RPC已经和IPC一样成为一个不可或缺的基础构件。
avatar

  • RPC传输协议
  • 消息序列化与反序列化

下面是一个基于HTTP的 JSON的 RPC:

avatar

Go语言RPC

Go语言的标准库也提供了一个简单的RPC实现, 包的路径为net/rpc,也就是放在了net包目录下面。因此我们可以猜测该RPC包是建立在net包基础之上的

由上图可知一个rpc服务由2个部分组成:

  • server
  • client

基础的RPC服务

RPC Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"log"
"net"
"net/rpc"
)

type HelloService struct {
}

// Hello的逻辑 就是 将对方发送的消息前面添加一个Hello 然后返还给对方
// 由于我们是一个rpc服务, 因此参数上面还是有约束:
//
// 第一个参数是请求
// 第二个参数是响应
//
// 可以类比Http handler
func (s *HelloService) Hello(request string, reply *string) error {
*reply = fmt.Sprintf("hello, %s", request)
return nil
}

func main() {
// 把我们的对象注册成一个rpc的 receiver
// 其中rpc.Register函数调用会将对象类型中所有满足RPC规则的对象方法注册为RPC函数,
// 所有注册的方法会放在“HelloService”服务空间之下
rpc.RegisterName("HelloService", &HelloService{})

// 然后我们建立一个唯一的TCP链接
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}

// 通过rpc.ServeConn函数在该TCP链接上为对方提供RPC服务。
// 每Accept一个请求,就创建一个goroutie进行处理
for {
conn, err := listener.Accept()
if err != nil {
panic(err)
}
go rpc.ServeConn(conn)
}
}

RPC Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"log"
"net/rpc"
)

func main() {
// 首先是通过rpc.Dial拨号RPC服务, 建立连接
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal(err)
}

var reply string

// 然后通过client.Call调用具体的RPC方法
// 在调用client.Call时:
// 第一个参数是用点号链接的RPC服务名字和方法名字,
// 第二个参数是 请求参数
// 第三个是请求响应, 必须是一个指针, 由底层rpc服务帮你赋值
err = client.Call("HelloService.Hello", "ZHANGSAN", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Printf("reply: %v\n", reply)
}

rpc服务最多的优点就是 我们可以像使用本地函数一样使用 远程服务上的函数, 因此有几个关键点:

  • 远程连接: 类似于我们的pkg
  • 函数名称: 要表用的函数名称
  • 函数参数: 这个需要符合RPC服务的调用签名, 及第一个参数是请求,第二个参数是响应
  • 函数返回: rpc函数的返回是 连接异常信息, 真正的业务Response不能作为返回值

基于接口的RPC

interface

1
2
3
4
5
6
7
8
package service

const HelloServiceName = "HelloService"

type HelloService interface {
Hello(request string, reply *string) error
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"log"
"net"
"net/rpc"
"test-grpc/service"
)

var _ service.HelloService = new(HelloService)

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
*reply = fmt.Sprintf("hello, %s", request)
return nil
}

func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}

rpc.RegisterName("HelloService", new(HelloService))
for {
conn, err := listener.Accept()
if err != nil {
panic(err)
}
go rpc.ServeConn(conn)
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"fmt"
"log"
"net/rpc"
"test-grpc/service"
)

var _ service.HelloService = new(HelloServiceClient)

type HelloServiceClient struct {
client *rpc.Client
}

func (c *HelloServiceClient) Hello(request string, reply *string) error {
err := c.client.Call(fmt.Sprintf("%s.Hello", service.HelloServiceName), "alice", &reply)
if err != nil {
return err
}
return nil
}

func NewHelloServiceClient(net, address string) (*HelloServiceClient, error) {
client, err := rpc.Dial(net, address)
if err != nil {
return nil, err
}
return &HelloServiceClient{
client: client,
}, nil
}

func main() {
client, err := NewHelloServiceClient("tcp", "localhost:1234")
if err != nil {
log.Fatal(err)
}

var reply string
err = client.Hello("alice", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}

gob编码

标准库的RPC默认采用Go语言特有的gob编码, 标准库gob是golang提供的“私有”的编解码方式,它的效率会比json,xml等更高,特别适合在Go语言程序间传递数据

gob的使用很简单, 和之前使用base64编码理念一样, 有 Encoder和Decoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func GobEncode(val interface{}) ([]byte, error) {
buf := bytes.NewBuffer([]byte{})
encoder := gob.NewEncoder(buf)
if err := encoder.Encode(val); err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
}

func GobDecode(data []byte, value interface{}) error {
reader := bytes.NewReader(data)
decoder := gob.NewDecoder(reader)
return decoder.Decode(value)
}

写个测试用例测测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func TestGobEncode(t *testing.T) {
should := assert.New(t)
t1 := &TestStruct{Name: "张三", Value: 10}
b, err := service.GobEncode(t1)
if should.NoError(err) {
s := strings.Replace(strings.Trim(fmt.Sprint(b), "[]"), " ", ",", -1)
fmt.Printf("%v\n", s)
}
}

func TestGobDecode(t *testing.T) {
should := assert.New(t)
t2 := &TestStruct{}
data := []byte{43, 255, 129, 3, 1, 1, 10, 84, 101, 115, 116, 83, 116, 114, 117, 99, 116, 1, 255, 130, 0, 1, 2, 1, 4, 78, 97, 109, 101, 1, 12, 0, 1, 5, 86, 97, 108, 117, 101, 1, 4, 0, 0, 0, 13, 255, 130, 1, 6, 229, 188, 160, 228, 184, 137, 1, 20, 0}
err := service.GobDecode(data, t2)
if should.NoError(err) {
fmt.Printf("t2: %+v\n", t2)
}
}

Json ON TCP

gob是golang提供的“私有”的编解码方式,因此从其它语言调用Go语言实现的RPC服务将比较困难

因此我们可以选用所有语言都支持的比较好的一些编码:

  • MessagePack: 高效的二进制序列化格式。它允许你在多种语言(如JSON)之间交换数据。但它更快更小
  • JSON: 文本编码
  • XML:文本编码
  • Protobuf 二进制编码

Go语言的RPC框架有两个比较有特色的设计:

  • RPC数据打包时可以通过插件实现自定义的编码和解码;
  • RPC建立在抽象的io.ReadWriteCloser接口之上的,我们可以将RPC架设在不同的通讯协议之上。

这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言的RPC。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"test-grpc/jsonrpc_tcp/service"
)

var _ service.HelloService = new(HelloService)

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
*reply = fmt.Sprintf("hello, %s", request)
return nil
}

func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}

rpc.RegisterName("HelloService", new(HelloService))
for {
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// 使用jsonrpc进行编码
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"test-grpc/jsonrpc_tcp/service"
)

var _ service.HelloService = new(HelloServiceClient)

type HelloServiceClient struct {
client *rpc.Client
}

func (c *HelloServiceClient) Hello(request string, reply *string) error {
err := c.client.Call(fmt.Sprintf("%s.Hello", service.HelloServiceName), "alice", &reply)
if err != nil {
return err
}
return nil
}

func NewHelloServiceClient(network, address string) (*HelloServiceClient, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
// 使用jsonrpc进行解码
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
return &HelloServiceClient{
client: client,
}, nil
}

func main() {
client, err := NewHelloServiceClient("tcp", "localhost:1234")
if err != nil {
log.Fatal(err)
}

var reply string
err = client.Hello("alice", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}

使用nc模拟访问

1
2
[root@control ~]# echo -e '{"method":"HelloService.Hello","params":["zhangsan"],"id":1}' | nc 192.168.10.1 1234
{"id":1,"result":"hello, zhangsan","error":null}

Json ON HTTP

Go语言内在的RPC框架已经支持在Http协议上提供RPC服务, 为了支持跨语言,编码我们依然使用Json

新的RPC服务其实是一个类似REST规范的接口,接收请求并采用相应处理流程

首先我们依然要解决JSON编解码的问题, 我们需要将HTTP接口的Handler参数传递给jsonrpc, 因此需要满足jsonrpc接口, 因此我们需要提前构建也给conn io.ReadWriteCloser, writer现成的 reader就是request的body, 直接内嵌就可以

1
2
3
4
5
6
7
8
func NewRPCReadWriteCloserFromHTTP(w http.ResponseWriter, r *http.Request) *RPCReadWriteCloser {
return &RPCReadWriteCloser{w, r.Body}
}

type RPCReadWriteCloser struct {
io.Writer
io.ReadCloser
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"io"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"test-grpc/jsonrpc_http/service"
)

var _ service.HelloService = new(HelloService)

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
*reply = fmt.Sprintf("hello, %s", request)
return nil
}

func (s *HelloService) Calc(req *service.CalcRequest, reply *int) error {
*reply = req.A + req.B
return nil
}

type RPCReadWriteCloser struct {
io.Writer
io.ReadCloser
}

func NewRPCReadWriteCloserFromHTTP(w http.ResponseWriter, r *http.Request) *RPCReadWriteCloser {
return &RPCReadWriteCloser{w, r.Body}
}

func main() {
rpc.RegisterName("HelloService", new(HelloService))

http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
// 使用jsonrpc进行编码
rpc.ServeCodec(jsonrpc.NewServerCodec(NewRPCReadWriteCloserFromHTTP(w, r)))
})

err := http.ListenAndServe(":1234", nil)
if err != nil {
panic(err)
}
}

验证

1
2
3
4
5
[root@control ~]# curl -X POST -d '{"method":"HelloService.Hello","params":["hello"],"id":1}' 192.168.10.1:1234/jsonrpc
{"id":1,"result":"hello, hello","error":null}
[root@control ~]# curl -X POST -d '{"method":"HelloService.Calc","params":[{"a":1,"b":2}],"id":2}' 192.168.10.1:1234/jsonrpc
{"id":2,"result":3,"error":null}