流式RPCDemo

日之朝矣

流式RPC适合用于需要处理大量数据或需要实时交互的场景,例如音视频流传输,实时聊天等

服务端流式RPC

客户端发送单个请求到服务器,服务器以流式方式返回多个响应,直到完成处理或者达到某个条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// hello.proto
syntax = "proto3"; // 版本声明

// 项目中import导入生成的Go代码的名称,在这个例子中服务端为hello_server/pb,客户端为hello_client/pb
option go_package = "hello_server/pb";

package pb; // proto文件模块


// 定义服务
service Greeter {
// 定义方法
rpc LotsOfReplies(HelloRequest)returns(stream HelloResponse){} // 这里的返回值前加了stream
}

// 定义消息
message HelloRequest{
string name = 1; // 1是字段序号
}

之后生成一下代码

1
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/hello.proto

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"hello_server/pb"
"net"
)

type server struct {
pb.UnimplementedGreeterServer // 若是没有实现对应方法的话,这段代码会使用默认一段
}

func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {
words := []string{
"你好",
"hello",
"こんにちは",
"안녕하세요",
}

for _, word := range words {
data := &pb.HelloResponse{
Reply: word + in.GetName(),
}
// 使用Send方法返回多个数据
if err := stream.Send(data); err != nil {
return err
}
}
return nil
}

// grpc server
func main() {
// 启动服务
listen, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen, err: %v", err)
return
}
s := grpc.NewServer() // 创建rpc服务
// 注册服务
pb.RegisterGreeterServer(s, &server{})
// 启动服务
err = s.Serve(listen)
if err != nil {
fmt.Printf("s.Serve err: %v\n", err)
return
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"context"
"flag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"hello_client/proto"
"io"
"log"
"time"
)

// grpc 客户端
// 调用Server端的SayHello方法

var name = flag.String("name", "Rzyy", "通过-name说出名称")

func main() {
flag.Parse()
// 链接server
conn, err := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
// 创建客户端
client := proto.NewGreeterClient(conn) // 使用生成的代码

runLotsOfReplies(client)
}

func runLotsOfReplies(c proto.GreeterClient) {
// server 端流式RPC
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := c.LotsOfReplies(ctx, &proto.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("c.LotsOfReplies failed, err: %v", err)
}
for {
// 接收服务端返回的流式数据,当收到io.EOF或错误时退出
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("c.LotsOfReplies failed, err: %v", err)
}
log.Printf("got reply: %q\n", res.GetReply())
}
}

执行结果:

1
2
3
4
2023/05/15 20:49:43 got reply: "你好Rzyy"
2023/05/15 20:49:43 got reply: "helloRzyy"
2023/05/15 20:49:43 got reply: "こんにちはRzyy"
2023/05/15 20:49:43 got reply: "안녕하세요Rzyy"

客户端流式RPC

客户端以流式方式发送多个请求到服务器,并且服务器返回单个响应。这意味着客户端可以连续地发送多个请求给服务器,而服务器在接收到所有请求后,处理请求并返回一个响应。

在 proto文件中,添加定义一个rpc方法

1
2
3
4
5
6
service Greeter {
// 定义方法
rpc LotsOfReplies(HelloRequest)returns(stream HelloResponse){}
// 这是新添加的方法
rpc LotsOfGreeting(stream HelloRequest)returns(HelloResponse){}
}

server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 客户端流式RPC
func (s *server) LotsOfGreeting(stream pb.Greeter_LotsOfGreetingServer) error {
reply := "你好:"

for {
res, err := stream.Recv()
if err == io.EOF {
// 最终统一回复
return stream.SendAndClose(&pb.HelloResponse{Reply: reply})
}
if err != nil {
return err
}
reply += res.GetName()
}
}

client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 客户端 流式rpc
func runLotsOfGreetings(c proto.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := c.LotsOfGreeting(ctx)
if err != nil {
log.Fatalf("c.LotsOfReplies failed, err: %v", err)
}
names := []string{"RZZY", "日之朝矣", "KatoMegumi"}
for _, name := range names {
// 发送流式数据
err := stream.Send(&proto.HelloRequest{Name: name})
if err != nil {
log.Fatalf("stream.Send(%v) falied err: %v", name, err)
}
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("c.LotsOfGreetings failed: %v", err)
}
log.Printf("got reply: %v", res.GetReply())

}

执行结果:

1
2023/05/15 21:46:04 got reply: 你好:RZZY日之朝矣KatoMegumi

双向流式RPC

在双向流式RPC中,客户端和服务器都可以以流式方式同时发送和接收多个请求和响应。这意味着客户端和服务器可以建立一个双向的流通道,通过该通道交换多个请求和响应。

在proto文件中新增内容:

1
2
3
4
5
6
7
8
service Greeter {
// 定义方法
rpc LotsOfReplies(HelloRequest)returns(stream HelloResponse){}
rpc LotsOfGreeting(stream HelloRequest)returns(HelloResponse){}
// 这个是新增的
rpc BindHello(stream HelloRequest)returns(HelloResponse){}
}

生成一下代码

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *server) LotsOfGreetingAndReplies(stream pb.Greeter_LotsOfGreetingAndRepliesServer) error {
for {
// 接收流式请求
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := magic(in.GetName()) // 对收到的数据做些处理
// 返回流式响应
if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil {
return err
}
}
}

// magic
func magic(s string) string {
s = strings.ReplaceAll(s, "吗", "")
s = strings.ReplaceAll(s, "吧", "")
s = strings.ReplaceAll(s, "你", "我")
s = strings.ReplaceAll(s, "?", "!")
s = strings.ReplaceAll(s, "?", "!")
return s
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 双向流式rpc
func runLotsOfGreetingAndReplies(c proto.GreeterClient) {
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
stream, err := c.LotsOfGreetingAndReplies(ctx)
if err != nil {
log.Fatalf("c.LotsOfGreetingAndReplies err: %v", err.Error())
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
log.Println("断掉了")
return
}
if err != nil {
log.Fatalf("stream.CloseAndRecv() err:%v", err.Error())
}
log.Printf("Kelee: %s\n", in.GetReply())
}
}()
reader := bufio.NewReader(os.Stdin)
for {
cmd, _ := reader.ReadString('\n')
cmd = strings.TrimSpace(cmd)
if len(cmd) == 0 {
continue
}
if strings.ToUpper(cmd) == "QUIT" || strings.ToUpper(cmd) == "EXIT" {
break
}
err := stream.Send(&proto.HelloRequest{Name: cmd})
if err != nil {
log.Fatalf("stream.Send err: %v", err.Error())
}
}
stream.CloseSend()
<-waitc
}

由客户端发送消息,然后服务端回应,像聊天一样,不过例子是一问一答

  • 标题: 流式RPCDemo
  • 作者: 日之朝矣
  • 创建于 : 2023-06-09 15:31:19
  • 更新于 : 2023-10-10 08:35:51
  • 链接: https://rzzy.fun/2023/06/09/Stream-gRPC-Demo/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
 评论