iT邦幫忙

2019 iT 邦幫忙鐵人賽

DAY 25
0

實作 Day23 的簡單的 gRPC 應用,來嘗試雙向串流

透過官方的範例,實作 golang 的 gRPC 中的雙向串流。

撰寫協定
在協定用 stream 來代表串流物件,設定 RouteChat 的 rpc 方法中,雙向都傳送串流物件

syntax = "proto3";

service Portal {
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

message RouteNote {
  Point location = 1;
  string message = 2;
}

服務端

package main

import (
	"fmt"
	"io"
	"log"
	"net"
	"sync"

	pb "github.com/cyan92128505/cyangrpc/pb"

	"google.golang.org/grpc"
)

type PortalService struct {
	mu         sync.Mutex
	routeNotes map[string][]*pb.RouteNote
}

func (s *PortalService) RouteChat(stream pb.Portal_RouteChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		key := serialize(in.Location)

		s.mu.Lock()
		s.routeNotes[key] = append(s.routeNotes[key], in)
		rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
		copy(rn, s.routeNotes[key])
		s.mu.Unlock()

		for _, note := range rn {
			if err := stream.Send(note); err != nil {
				return err
			}
		}
	}
}

func serialize(point *pb.Point) string {
	return fmt.Sprintf("%d %d", point.Latitude, point.Longitude)
}

func main() {
	listenPort, err := net.Listen("tcp", ":19003")
	if err != nil {
		log.Fatalln(err)
	}
	server := grpc.NewServer()
	portalService := &PortalService{routeNotes: make(map[string][]*pb.RouteNote)}
	pb.RegisterPortalServer(server, portalService)
	log.Println("server on")
	server.Serve(listenPort)
}

用戶端

package main

import (
	"context"
	"io"
	"log"
	"time"

	pb "github.com/cyan92128505/cyangrpcclient/pb"

	"google.golang.org/grpc"
)

func runRouteChat(client pb.PortalClient) {
	notes := []*pb.RouteNote{
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.RouteChat(ctx)
	if err != nil {
		log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("Failed to receive a note : %v", err)
			}
			log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
		}
	}()
	for _, note := range notes {
		if err := stream.Send(note); err != nil {
			log.Fatalf("Failed to send a note: %v", err)
		}
	}
	stream.CloseSend()
	<-waitc
}

func main() {
	conn, err := grpc.Dial("127.0.0.1:19003", grpc.WithInsecure())
	if err != nil {
		log.Fatal("client connection error:", err)
	}
	defer conn.Close()
	client := pb.NewPortalClient(conn)
	runRouteChat(client)
}

服務端和用戶端,都是使用Recv方法和Send方法來做接收和傳送,並等待io.EOF作為串流的結束,
對微服務架構來說,雙向串流是方便的傳輸方式。

console


上一篇
Day 24 : JWT - 在 Golang 中使用 JSON Web Token
下一篇
Day 26 : Jest - 前端測試
系列文
遺留系統重構 - 從 MEAN Stack 轉移到 go-vue-postgresql30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言