Kotlin コルーチンでasync/awaitをつかってgRPC Serverをリクエストしてみた
Kotlin1.1からの新機能であるコルーチン(Coroutines)を試していきたい。Kotlin コルーチンをつかえば非同期処理を同期的なコードで書けるし、非同期処理をブロッキングすることもシンプルなコードで書ける。
今回はgRPC Serverへのリクエスト部分をコルーチンをつかい非同期化させてみたのでコードをまとめていく。
Serverへのリクエストは非同期化するが処理結果や例外処理をキャッチしたい
メインのリクエストは非同期化するがgRPC Serverのレスポンスや例外処理を呼び出し元でどのようにすれば受け取れるだろうか。この場合にはコルーチンの結果を返すことができるasync
をつかいawait
で処理を中断してコルーチンから値を取り出す。
|
|
非同期化する前のコードとの差分は次のようになっている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
- fun getTask(taskId: Long): TaskOutbound = - try { - ShutdownLoan.using(getChannel(), { channel -> - val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build() - TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg) - }) - } catch (e: Exception) { - val status = Status.fromThrowable(e) - logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) } - throw status with status.description - } - + suspend fun getTask(taskId: Long): TaskOutbound = + async(CommonPool) { + try { + val outbound = ShutdownLoan.using(getChannel(), { channel -> + val msg = TaskInbound.newBuilder().setTaskId(taskId.toInt()).build() + TaskServiceGrpc.newBlockingStub(channel).getTaskService(msg) + }) + Result.Success<TaskOutbound, GrpcException>(outbound) + } catch (e: Exception) { + val status = Status.fromThrowable(e) + logger.error(e) { "gRPC server error, code:{%d}, description:{%s}".format(status.code.value(), status.description) } + Result.Failure<TaskOutbound, GrpcException>(status with status.description) + } + }.await().fold({ it }, { throw it }) |
getTask
関数の先頭にsuspendキーワードがついている。この関数内でコルーチンの処理結果を受け取りるためawait()
をつけた。非同期処理の処理結果を受け取るということは処理を中断することになる。コルーチンの処理を中断することができるSuspending functions
をgetTask
関数に加えた。async
ブロックは先のとおり値を返すコルーチンにするためである。async
ブロック内にgRPC Serverのリクエスト処理がある。タスクを1件取得する処理が記述されている。修正前のコードは例外が起きた場合、throwしていたが修正後はcom.github.kittinunf.result.Result
を使ってResultオブジェクトを返却するコードにした。こうするとasync
ブロックをtry-catchで囲む必要がなくなるので可読性があがることを期待してやってみた。{ throw it }
節でコルーチンで起きた例外を受け取り必要な処理を挟むことができる。
コルーチンの呼び出し元でブロッキングする
getTask
関数はsuspendキーワードがついているため呼び出し元でrunBlocking
ブロックを加えブロッキングをしている。
変更コードの差分は次のとおりである。
1 2 3 4 5 |
fun fetchByTaskId(req: ServerRequest) = ok().json().body( - Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong())))) + runBlocking { + Mono.just(TaskModel(taskBackendClient.getTask(req.pathVariable("id").toLong()))) + }) |
まとめ
- コルーチンを試して非同期処理を導入できた。
- サンプルのコードでは
await()
を1度しか使っていないが複数の非同期処理をasyncで呼び出し用途にあわせてawaitを使って直列に処理させたり並列に処理させることができる。 - 一見、非同期処理を複雑な印象を与えコードの可読性を下げる懸念を与えるがKotlinコルーチンが素晴らしいのは同期的なシンプルなコードでこれを実現できることにある。
コード
エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。