Spring Web Flux でServer Sent EventsのPush型APIをつくってみた


Spring Boot 2.0からWebFlux frameworkが導入された。Reactorが使えてReactive Programmingが可能になったのでPush型のAPIをつくってみた。

どんなAPIをつくるか

RedisのPub/Subを使う

RedisのPub/Subを使ってリアルタイムのコンテンツ更新を通知する。アプリはServerSide+Kotlinで実装したのでJedisのライブラリを使う。

JedisはJedisPubSubのクラスを実装すればチャンネルに通知されたメッセージをSubscribeできる。onMessageでReactorクラスのFluxにデータを流すようにした。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
class TaskService(private val jedisPool: JedisPool,
                  private val taskBackendClient: TaskBackendClient) {

    companion object {
        private val CHANNEL = "task"
    }

    private val flux = Flux.create<String> { emitter ->
        TaskSubscriber(emitter).let { sub ->
            jedisPool.resource.use {
                it.subscribe(sub, CHANNEL)
            }
        }
    }

    fun publishUpdateTask() {
        jedisPool.resource.use {
            it.publish(CHANNEL, "updateTask")
        }
    }

    fun subscribeTaskCount() =
            flux.map {
                taskBackendClient.getTaskCount().count
            }.run {
                share()
            }
}

class TaskSubscriber(private val fluxSink: FluxSink<String>) : JedisPubSub() {
    override fun onMessage(channel: String?, message: String?) {
        fluxSink.next(message)
    }
}

ポイントはFlux.create<T>で作られるFluxをTaskSubscriberのコンストラクタに渡してTaskSubscriberのonMessageでデータを流すようにしているところ。
コンテンツが更新されたらTaskService#publishUpdateTaskを呼び出しチャンネルに通知をPublishする。


ここまでくればTaskServiceを必要なところで使えばOK。

API HandlerでSever-Send-EventsのServerResponseを返す

bodyToServerSentEventsの拡張関数を使えばSever-Send-EventsのServerResponseを返せる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
class TaskHandler(private val taskBackendClient: TaskBackendClient,
                  private val taskService: TaskService) {

    // ---

    fun create(req: ServerRequest): Mono&lt;ServerResponse> {
        return ok().json().body(
                req.bodyToMono(CreateTaskInbound::class.java).map { p ->
                    TaskModel(taskBackendClient.createTask(p.title)).also {
                        taskService.publishUpdateTask()
                    }
                })
    }

    // ---

    fun fetchTaskCount(req: ServerRequest): Mono&lt;ServerResponse> {
        return ok().json().bodyToServerSentEvents(
                taskService.subscribeTaskCount()
                        .map {
                            TaskCount(it)
                        }
        )
    }
}

text/event-streamをacceptするroutingをRouterFunctionDslに追加する

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Configuration
class TaskRoutes(private val taskHandler: TaskHandler) {

    @Bean
    fun taskRouter() = router {
        "/api".nest {
            accept(TEXT_EVENT_STREAM).nest {
                GET("/task-count", taskHandler::fetchTaskCount)
            }
        }
    }
}

まとめ

コード

動くコードあります。