gRPC streamingをつかうとマイクロサービスの責務が整理できるし省コネクションでメリットあるよね、という話


今回はgRPCをマイクロサービス間通信に導入することってメリットあるよね、というエントリです。 定期的に処理を実行してくれるバッチはよくあるものですがバッチの駆動をgRPCを使って次のような構成で動かしました。

上記の構成を踏まえ次からはメリットをまとめます。

gRPCをマイクロサービスに導入するとメリットあるよね

キューのRead権限をバッチサーバから剥がせる

キュー駆動でバッチを動かしている場合、例えばAmazon SQSを導入しているとRead権限が必要です。上記の構成であればキューを監視するのはバッチサーバではなくコントロールサーバになります。そのためキューを監視する権限をコントロールサーバに集約できるメリットがあります。

ログ集約サーバへの送信責務もバッチサーバから剥がせる

図のとおりgRPCのBidirectional streamingを使えば複数のレスポンスを送信することができます。バッチ処理結果や各種ログはコントロールサーバへ送り、ログ集約サーバへの送信はコントロールサーバが行います。gRPCで各サービスをつないでおいてログを送り、受けとったクライアントにログの集約を任せる、といった構成は導入メリットの1つな気がします。(ログの送信漏れ考慮は必要ですが)

そもそものgRPCのメリット

そもそものgRPCのメリットがあります。異なる言語のマイクロサービス間の通信でもProtocol Buffersを定義することで容易に通信を確立できますし、streamingの方式を用途に合わせて選択することで省コネクションでマイクロサービス間のやり取りが行えます。

GoとJavaでBidirectional gRPC streamingをつかったデモ

上記の図の構成をもとにgRPCのクライアントをGoサーバをJavaで通信方式はBidirectional streamingを採用してデモを作ってみました。

どのようなバッチサービス?

Bidirectional streamingを採用しているので、リクエストが複数あってレスポンスも複数、または1つのようなサービスを考えました。

結果、数値を受け取り割り算をして商と余りを返すサービスを実装しました。

Redisからキューを送信してクライアントがリクエストとレスポンスを受け取ったイメージです。

1
2
3
# Redis
$ redis-cli
127.0.0.1:6379> PUBLISH my_queue '{"serviceName" : "division", "numbers" : [10, 3]}'
1
2
3
4
5
# Client
12:27:50.452 Request : {serviceName:'division', message:'10', time:'time string'}
12:27:50.452 Request : {serviceName:'division', message:'3', time:'time string'}
12:27:50.455 Response: {serviceName:'division', message:'quotient:3', time:'time string'}
12:27:50.456 Response: {serviceName:'division', message:'remainder:1', time:'time string'}

クライアントは103のリクエストを2つ送り、商が3と余りが1の結果を受け取ります。(余りが0であればレスポンスは1つになる)

protoファイル

protoファイルは次のようになりました。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";

option go_package = "protobuf";
package proto;

service MicroService {
  rpc MicroService (stream Request) returns (stream Response) {}
}

message Request {
  string name = 1;
  string message = 2;
  string time = 3;
}

message Response {
  string name = 1;
  string message = 2;
  string time = 3;
}

クライアントのコード(Go)

リクエストを送信してレスポンスを受け取っている通信周りのコードの抜粋です。

※コード全体はgithubにあります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
waitc := make(chan struct{})
go func() {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            close(waitc)
            return
        }
        if err != nil {
            log.Error("Failed to receive a message : %v", err)
            return
        }
        responseLog.Info("{serviceName:'%s', message:'%s', time:'%s'}", in.Name, in.Message, in.Time)
    }
}()

for {
    message, err := pubSub.ReceiveMessage()
    if err != nil {
        panic(err)
    }
    requests, err := getRequests(message)
    if err != nil {
        panic(err)
    }

    for _, request := range requests {

        requestLog.Info("{serviceName:'%s', message:'%s', time:'%s'}", request.Name, request.Message, request.Time)
        if err := stream.Send(&request); err != nil {
            log.Error("Failed to send a message: %v", err)
        }
    }
}

stream.CloseSend()
<-waitc

サーバのコード(Java)

リクエストを受け取りレスポンスを送信している通信周りのコードの抜粋です。

割り算をする数値が分けられて送られてきます。1回目のリクエストでキーを生成してリクエストを保持しながら2回目のリクエストで割った結果を送信しています。

※コード全体はgithubにあります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
return new StreamObserver<Microservice.Request>() {
    public void onNext(Microservice.Request req) {
        Long key = getTime(req);
        Observable.just(req)
                .subscribe(new Observer<Microservice.Request>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.i("Request", getRequestLog(req));
                    }

                    @Override
                    public void onNext(Microservice.Request request) {
                        if (!routeNumber.containsKey(key)) {
                            routeNumber.put(key, Arrays.asList(req));
                        } else if (routeNumber.get(key).size() == 1) {

                            Microservice.Request prevRequest = routeNumber.get(key).get(0);
                            Integer leftTerm = Integer.parseInt(prevRequest.getMessage());
                            Integer rightTerm = Integer.parseInt(req.getMessage());

                            Integer quotient = leftTerm / rightTerm;
                            Integer remainder = leftTerm % rightTerm;

                            if (remainder == 0) {
                                responses.putIfAbsent(key, Arrays.asList(
                                        getResponse(req.getName(), String.format("quotient:%d", quotient))));
                            } else {
                                responses.putIfAbsent(key, Arrays.asList(
                                        getResponse(req.getName(), String.format("quotient:%d", quotient)),
                                        getResponse(req.getName(), String.format("remainder:%d", remainder))));
                            }
                        } else {
                            Log.w(String.format("waring, unknown state. key:{%s}, value:{%s}", key, routeNumber.get(key)));
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(String.format("onError %s", e.getMessage()));
                    }

                    @Override
                    public void onComplete() {
                        if (responses.containsKey(key)) {
                            Observable.fromIterable(responses.get(key))
                                    .subscribe(res -> {
                                        responseObserver.onNext(res);
                                        Log.i("Response", getResponseLog(res));
                                    });
                            routeNumber.remove(key);
                            responses.remove(key);
                        }
                    }
                });
    }

    public void onError(Throwable t) {
        logger.log(Level.WARNING, "microService cancelled");
    }

    public void onCompleted() {
        responseObserver.onCompleted();
    }
};

デモ

まとめ

コードを公開しています

コード全体はgitbubで確認できます。