Kotlin + gRPCでio.grpc.Contextをつかってログ出力を横断処理してみた


アスペクト指向プログラミング(AOP)をgRPC Serverではどう扱うか考えていきたい。 横断的な関心事といえば認証やログ出力である。

これらを実現するのがio.grpc.Contextである。

Context (grpc-all 1.5.0 API)

今回のエントリではio.grpc.ContextをつかいgRPC Serverのログ出力を横断的にする方法をまとめていく。

ContextHandlerをつくる

Contextからログをセットしたり取り出したりするHandlerを用意する。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
object GRpcLogContextHandler {

    private val GRPC_LOG: Context.Key<GRpcLogBuilder> = Context.key("GRPC_LOG")

    @JvmStatic
    fun setLog(ctx: Context, log: GRpcLogBuilder) = ctx.withValue(GRPC_LOG, log)

    @JvmStatic
    fun getLog() =
            try {
                GRPC_LOG.get()
            } catch (e: Exception) {
                GRpcLogBuilder({
                    name { "UnknownName" }
                })
            }
}

GRpcLogContextHandlerが提供するsetLoggetLogをgRPC Serverから操作してログオブジェクトを取り出しデータを詰め込む。

ServerInterceptorをつくる

GRpcLogContextHandlerをContextに含めたServerInterceptorを用意する。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
class GRpcLogInterceptor : ServerInterceptor {

    private val logger = KotlinLogging.logger {}

    override fun <ReqT : Any?, RespT : Any?> interceptCall(call: ServerCall<ReqT, RespT>?, headers: Metadata?, next: ServerCallHandler<ReqT, RespT>?): ServerCall.Listener<ReqT> {

        val serverName = call?.methodDescriptor?.fullMethodName!!.replace("/", ".")

        val serverCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

            override fun close(status: Status?, trailers: Metadata?) {

                val log = GRpcLogContextHandler.getLog()

                if (status!!.isOk())
                    log.success { true }

                try {
                    logger.info { log.build().toString() }
                } catch (e: Exception) {
                    logger.warn { "GRpcLogger is not set with %s".format(serverName) }
                }

                super.close(status, trailers)
            }
        }

        val log = GRpcLogBuilder({
            name { serverName }
            remoteAddr { call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString() }
        })
        val ctx = GRpcLogContextHandler.setLog(Context.current(), log)

        return Contexts.interceptCall(ctx, serverCall, headers, next)
    }
}

gRPC Serverでログを詰め込む

gRPC Serverを一部抜粋したコードが次のとおりである。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
override fun updateTaskService(request: UpdateTaskInbound?, responseObserver: StreamObserver<TaskOutbound>?) {
    try {
        val (taskId, title) = GRpcInboundValidator.validUpdateTaskInbound(request)

        val log = GRpcLogContextHandler.getLog()
        log.elem { "taskId" to taskId }
        log.elem { "title" to title }

        val task = updateTaskService(UpdateTaskCommand(taskId.toLong(), title))
        val msg = getOutbound(task)
        responseObserver?.onNext(msg)
        responseObserver?.onCompleted()
    } catch (e: WebAppException.NotFoundException) {
        logger.error { "gRPC server error, task not found." }
        responseObserver?.onError(
                Status.NOT_FOUND.withDescription("task not found.").asRuntimeException())
    } catch (e: WebAppException.BadRequestException) {
        logger.error { "gRPC server error, invalid request." }
        responseObserver?.onError(
                Status.INVALID_ARGUMENT.withDescription("invalid request.").asRuntimeException())
    }
}

上記のうち次の箇所でContextHandlerからログオブジェクトを取り出しログを詰め込んでいることが分かる。

1
2
3
 val log = GRpcLogContextHandler.getLog()
 log.elem { "taskId" to taskId }
 log.elem { "title" to title }

このgRPC Serverは前述したGRpcLogInterceptorがリクエストを前段でインターセプトしているためServerとGRpcLogInterceptorで共通のログオブジェクトを取り出し操作することができている。

まとめ

コード

エントリでは一部コードの抜粋になっているがgithubに全てのコードを参照できるので参考にしてほしい。