Kotlin コルーチンでasync/awaitをつかってgRPC Serverをリクエストしてみた


Kotlin1.1からの新機能であるコルーチン(Coroutines)を試していきたい。Kotlin コルーチンをつかえば非同期処理を同期的なコードで書けるし、非同期処理をブロッキングすることもシンプルなコードで書ける。
今回はgRPC Serverへのリクエスト部分をコルーチンをつかい非同期化させてみたのでコードをまとめていく。

Serverへのリクエストは非同期化するが処理結果や例外処理をキャッチしたい

メインのリクエストは非同期化するがgRPC Serverのレスポンスや例外処理を呼び出し元でどのようにすれば受け取れるだろうか。この場合にはコルーチンの結果を返すことができるasyncをつかいawaitで処理を中断してコルーチンから値を取り出す。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
suspend fun getTask(taskId: Long): TaskOutbound =
        async(CommonPool) {
            try {
                val outbound = ShutdownLoan.using(getChannel(), { channel ->
                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
                })
                Result.Success<TaskOutbound, GrpcException>(outbound)
            } catch (e: Exception) {
                val status = Status.fromThrowable(e)
                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
                Result.Failure<TaskOutbound, GrpcException>(status with status.description)
            }
        }.await().fold({ it }, { throw it })

非同期化する前のコードとの差分は次のようになっている。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-    fun getTask(taskId: Long): TaskOutbound =
-            try {
-                ShutdownLoan.using(getChannel(), { channel ->
-                    val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
-                    TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
-                })
-            } catch (e: Exception) {
-                val status = Status.fromThrowable(e)
-                logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
-                throw status with status.description
-            }
-
+    suspend fun getTask(taskId: Long): TaskOutbound =
+            async(CommonPool) {
+                try {
+                    val outbound = ShutdownLoan.using(getChannel(), { channel ->
+                        val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build()
+                        TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg)
+                    })
+                    Result.Success<TaskOutbound, GrpcException>(outbound)
+                } catch (e: Exception) {
+                    val status = Status.fromThrowable(e)
+                    logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) }
+                    Result.Failure<TaskOutbound, GrpcException>(status with status.description)
+                }
+            }.await().fold({ it }, { throw it })

コルーチンの呼び出し元でブロッキングする

getTask関数はsuspendキーワードがついているため呼び出し元でrunBlockingブロックを加えブロッキングをしている。

変更コードの差分は次のとおりである。

1
2
3
4
5
     fun fetchByTaskId(req: ServerRequest) = ok().json().body(
-            Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong()))))
+            runBlocking {
+                Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong())))
+            })

まとめ

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。