Golang
主页 > 脚本 > Golang >

Golang中四种gRPC模式举例介绍

2024-03-30 | 佚名 | 点击:

1. Unary RPC

proto文件如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

syntax = "proto3";

option go_package=".;service";

 

message HelloRequest {

  // Name of the person to greet

  string name = 1;

}

 

message HelloResponse {

  // Greeting message

  string greeting = 1;

}

 

service HelloService {

  // RPC method to say hello

  rpc SayHello (HelloRequest) returns (HelloResponse){}

}

使用命令(注意命令路径和自己的对应):

1

protoc -I . --go-grpc_out=require_unimplemented_servers=false:. --go_out=.  *.proto

对应目录上有xx.pb.go和xx_grpc.pb.go然后对应目录实现服务端接口

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

package main

 

import (

    "context"

    "fmt"

    "google.golang.org/grpc"

    "net"

    "test_grpc/service"

)

 

type HelloService struct {

}

 

func (hs *HelloService) SayHello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) {

    resp := &service.HelloResponse{

        Greeting: fmt.Sprintf("hello %s --from Golang Server", req.Name),

    }

    return resp, nil

}

 

func main() {

    // listen on 127.0.0.1:50051

    listen, err := net.Listen("tcp", "127.0.0.1:50051")

    if err != nil {

        fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)

        return

    }

 

    // grpc server

    s := grpc.NewServer()

 

    // register HelloService in grpc server

    service.RegisterHelloServiceServer(s, &HelloService{})

 

    // start rpc server

    fmt.Println("Golang rpc server is waiting messages......")

    if err = s.Serve(listen); err != nil {

        fmt.Println("Error happened when start rpc server:", err)

        return

    }

}

客户端接口如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

package main

 

import (

    "context"

    "fmt"

    "google.golang.org/grpc"

    "google.golang.org/grpc/credentials/insecure"

    "test_grpc/service"

    "time"

)

 

func main() {

    // connect to server

    conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))

    if err != nil {

        fmt.Println("Connect to rpc server err:", err)

        return

    }

    defer conn.Close()

 

    // init service client

    c := service.NewHelloServiceClient(conn)

 

    // init context with timeout

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()

 

    // send message

    req := &service.HelloRequest{Name: "Golang"}

    r, err := c.SayHello(ctx, req)

    if err != nil {

        fmt.Println("Send message err:", err)

        return

    }

    fmt.Println("Client:", r.Greeting)

}

实际上为了更好得感受gRPC这种跨语言调用的感觉,可以尝试使用python编写client端代码,直接复制proto文件,在python中使用以下命令生成对应的proto文件(注意命令和自己的对应):

1

python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. *.proto

使用python实现的客户端代码如下:

1

2

3

4

5

6

7

8

9

10

# client template

# grpc server address

channel = grpc.insecure_channel("127.0.0.1:50051")

stub = hello_pb2_grpc.HelloServiceStub(channel)

 

# send request

response = stub.SayHello(hello_pb2.HelloRequest(name="Python"))

 

print(response.greeting)

# hello Python --from Golang Server

2. Server-side streaming RPC

重新给出这个的proto文件,服务端将以流式数据的形式发送给客户端数据

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

syntax = "proto3";

option go_package=".;service";

 

message HelloRequest {

  // Name of the person to greet

  string name = 1;

}

 

message HelloResponse {

  // Greeting message

  string greeting = 1;

}

 

service HelloService {

  // RPC method to say hello

  rpc SayHello (HelloRequest) returns (stream HelloResponse){}

}

同理,生成对应的proto文件后,在对应的文件先生成server端的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

package main

 

import (

    "fmt"

    "google.golang.org/grpc"

    "net"

    "test_grpc/service"

)

 

type HelloService struct {

}

 

func (hs *HelloService) SayHello(req *service.HelloRequest, stream service.HelloService_SayHelloServer) error {

    resp := &service.HelloResponse{

        Greeting: fmt.Sprintf("hello %s --from Golang Server", req.Name),

    }

    // 连续发送5次

    for i := 0; i < 5; i++ {

        if err := stream.Send(resp); err != nil {

            return err

        }

    }

    return nil

}

 

func main() {

    // listen on 127.0.0.1:50051

    listen, err := net.Listen("tcp", "127.0.0.1:50051")

    if err != nil {

        fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)

        return

    }

 

    // grpc server

    s := grpc.NewServer()

 

    // register HelloService in grpc server

    service.RegisterHelloServiceServer(s, &HelloService{})

 

    // start rpc server

    fmt.Println("Golang rpc server is waiting messages......")

    if err = s.Serve(listen); err != nil {

        fmt.Println("Error happened when start rpc server:", err)

        return

    }

}

同理给出客户端的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

package main

 

import (

    "context"

    "fmt"

    "google.golang.org/grpc"

    "google.golang.org/grpc/credentials/insecure"

    "io"

    "log"

    "test_grpc/service"

    "time"

)

 

func main() {

    // connect to server

    conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))

    if err != nil {

        fmt.Println("Connect to rpc server err:", err)

        return

    }

    defer conn.Close()

 

    // init service client

    c := service.NewHelloServiceClient(conn)

 

    // init context with timeout

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()

 

    // send message

    req := &service.HelloRequest{Name: "Golang"}

    stream, err := c.SayHello(ctx, req)

    if err != nil {

        fmt.Println("Send message err:", err)

        return

    }

 

    // 加载消息

    for {

        resp, err := stream.Recv()

        // 读到结束标志

        if err == io.EOF {

            log.Fatalf("end.....")

            break

        }

 

        if err != nil {

            log.Fatalf("failed to receive response: %v", err)

        }

 

        log.Printf("Greeting: %s", resp.Greeting)

    }

}

3. Client-side streaming RPC

对应的proto文件如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

syntax = "proto3";

option go_package=".;service";

 

message HelloRequest {

  // Name of the person to greet

  string name = 1;

}

 

message HelloResponse {

  // Greeting message

  string greeting = 1;

}

 

service HelloService {

  // RPC method to say hello

  rpc SayHello (stream HelloRequest) returns (HelloResponse){}

}

同理使用protoc命令生成对应的proto文件,后先编写client端的代码,如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

package main

 

import (

    "context"

    "fmt"

    "google.golang.org/grpc"

    "google.golang.org/grpc/credentials/insecure"

    "log"

    "test_grpc/service"

    "time"

)

 

func main() {

    // connect to server

    conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))

    if err != nil {

        fmt.Println("Connect to rpc server err:", err)

        return

    }

    defer conn.Close()

 

    // init service client

    c := service.NewHelloServiceClient(conn)

 

    // init context with timeout

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()

 

    // create stream

 

    stream, err := c.SayHello(ctx)

    if err != nil {

        log.Fatalf("could not greet: %v", err)

    }

 

    names := []string{"World", "Gophers", "Anthropic"}

 

    for _, name := range names {

        // request body

        req := &service.HelloRequest{Name: name}

        if err := stream.Send(req); err != nil {

            log.Fatalf("faild to send request: %v", err)

        }

    }

 

    resp, err := stream.CloseAndRecv()

    if err != nil {

        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)

    }

    log.Printf("Greeting: %s", resp.Greeting)

}

对应得完成服务端的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

package main

 

import (

    "fmt"

    "google.golang.org/grpc"

    "io"

    "net"

    "strings"

    "test_grpc/service"

)

 

type HelloService struct {

}

 

func (hs *HelloService) SayHello(stream service.HelloService_SayHelloServer) error {

    var strs []string

    for {

        msg, err := stream.Recv()

        if err == io.EOF {

            break

        }

        if err != nil {

            return err

        }

 

        strs = append(strs, msg.Name)

    }

 

    resp := &service.HelloResponse{Greeting: strings.Join(strs, " ")}

 

    err := stream.SendAndClose(resp)

    if err != nil {

        return err

    }

    return nil

}

 

func main() {

    // listen on 127.0.0.1:50051

    listen, err := net.Listen("tcp", "127.0.0.1:50051")

    if err != nil {

        fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)

        return

    }

 

    // grpc server

    s := grpc.NewServer()

 

    // register HelloService in grpc server

    service.RegisterHelloServiceServer(s, &HelloService{})

 

    // start rpc server

    fmt.Println("Golang rpc server is waiting messages......")

    if err = s.Serve(listen); err != nil {

        fmt.Println("Error happened when start rpc server:", err)

        return

    }

}

4. Bidirectional streaming RPC

新的proto文件被如下给出:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

syntax = "proto3";

option go_package=".;service";

 

message HelloRequest {

  // Name of the person to greet

  string name = 1;

}

 

message HelloResponse {

  // Greeting message

  string greeting = 1;

}

 

service HelloService {

  // RPC method to say hello

  rpc SayHello (stream HelloRequest) returns (stream HelloResponse){}

}

和上文中的操作一致,同时给出server端的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

package main

 

import (

    "fmt"

    "google.golang.org/grpc"

    "io"

    "log"

    "net"

    "test_grpc/service"

)

 

type HelloService struct {

}

 

func (hs *HelloService) SayHello(stream service.HelloService_SayHelloServer) error {

    for {

        msg, err := stream.Recv()

        if err == io.EOF {

            break

        }

        if err != nil {

            return err

        }

 

        name := msg.Name

        resp := &service.HelloResponse{Greeting: name}

 

        if err = stream.Send(resp); err != nil {

            log.Fatalf("Failed to send a resp:%s", err)

        }

    }

 

    return nil

}

 

func main() {

    // listen on 127.0.0.1:50051

    listen, err := net.Listen("tcp", "127.0.0.1:50051")

    if err != nil {

        fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)

        return

    }

 

    // grpc server

    s := grpc.NewServer()

 

    // register HelloService in grpc server

    service.RegisterHelloServiceServer(s, &HelloService{})

 

    // start rpc server

    fmt.Println("Golang rpc server is waiting messages......")

    if err = s.Serve(listen); err != nil {

        fmt.Println("Error happened when start rpc server:", err)

        return

    }

}

同时给出下面的client端的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

package main

 

import (

    "context"

    "fmt"

    "google.golang.org/grpc"

    "google.golang.org/grpc/credentials/insecure"

    "io"

    "log"

    "test_grpc/service"

    "time"

)

 

func main() {

    // connect to server

    conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))

    if err != nil {

        fmt.Println("Connect to rpc server err:", err)

        return

    }

    defer conn.Close()

 

    // init service client

    c := service.NewHelloServiceClient(conn)

 

    // init context with timeout

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()

 

    // create stream

    stream, err := c.SayHello(ctx)

    if err != nil {

        log.Fatalf("could not greet: %v", err)

    }

 

    names := []string{"World", "Gophers", "Anthropic"}

 

    waitc := make(chan struct{})

    go func() {

        for {

            resp, err := stream.Recv()

            if err == io.EOF {

                close(waitc)

                return

            }

            if err != nil {

                log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)

            }

            log.Printf("Greeting: %s", resp.Greeting)

        }

    }()

 

    go func() {

        for _, name := range names {

            // request body

            req := &service.HelloRequest{Name: name}

            if err := stream.Send(req); err != nil {

                log.Fatalf("faild to send request: %v", err)

            }

 

            // send delay

            time.Sleep(1)

        }

        // 发送结束的消息

        if err := stream.CloseSend(); err != nil {

            log.Fatalf("failed to close stream: %v", err)

        }

    }()

 

    <-waitc

}

一定要注意关闭发送或者避免针对一个已经关闭stream进行发送消息,读取消息是被允许的,这里有一点类似chan

4. ALTS

4.1 ALTS的介绍

应用层传输安全(ALTS)是谷歌开发的一种相互验证和传输加密系统。它用于确保谷歌基础设施内 RPC 通信的安全。ALTS 类似于相互 TLS,但经过设计和优化,可满足 Google 生产环境的需要。ALTS在gRPC中有以下的特征:

值得注意的是ALTS被全部发挥作用如果应用程序运行在CE或者GKE中

4.2 gRPC客户端使用ALTS传输安全协议

gRPC客户端使用ALTS认证去连接服务端,正如下面代码中所描述的:

1

2

3

4

5

6

7

8

import (

  "google.golang.org/grpc"

  "google.golang.org/grpc/credentials/alts"

)

 

altsTC := alts.NewClientCreds(alts.DefaultClientOptions())

// connect to server

conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(altsTC))

gRPC服务端能够使用ALTS认证来运行客户端连接到它,正如下面的描述:

1

2

3

4

5

6

7

import (

  "google.golang.org/grpc"

  "google.golang.org/grpc/credentials/alts"

)

 

altsTC := alts.NewServerCreds(alts.DefaultServerOptions())

server := grpc.NewServer(grpc.Creds(altsTC))

4.3 Server Authorization

gRPC 内置了使用 ALTS 的服务器授权支持。使用 ALTS 的 gRPC 客户端可以在建立连接前设置预期的服务器服务账户。然后,在握手结束时,服务器授权会保证服务器身份与客户端指定的服务账户之一相匹配。否则,连接将失败。

1

2

3

4

5

6

7

8

9

import (

  "google.golang.org/grpc"

  "google.golang.org/grpc/credentials/alts"

)

 

clientOpts := alts.DefaultClientOptions()

clientOpts.TargetServiceAccounts = []string{expectedServerSA}

altsTC := alts.NewClientCreds(clientOpts)

conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(altsTC))

原文链接:
相关文章
最新更新