Spring Web Flux でServer Sent EventsのPush型APIをつくってみた
Spring Boot 2.0からWebFlux frameworkが導入された。Reactorが使えてReactive Programmingが可能になったのでPush型のAPIをつくってみた。
どんなAPIをつくるか
- コンテンツが更新されたら件数を教えてくるAPI
- Server Sent EventsのContent-typeをサポートする
- コンテンツの件数が変更されたらAPIをSubscribeしているクライアントに件数を通知する
RedisのPub/Subを使う
RedisのPub/Subを使ってリアルタイムのコンテンツ更新を通知する。アプリはServerSide+Kotlinで実装したのでJedisのライブラリを使う。
JedisはJedisPubSub
のクラスを実装すればチャンネルに通知されたメッセージをSubscribeできる。onMessage
でReactorクラスのFluxにデータを流すようにした。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@Service
class TaskService(private val jedisPool: JedisPool,
private val taskBackendClient: TaskBackendClient) {
companion object {
private val CHANNEL = "task"
}
private val flux = Flux.create<String> { emitter ->
TaskSubscriber(emitter).let { sub ->
jedisPool.resource.use {
it.subscribe(sub, CHANNEL)
}
}
}
fun publishUpdateTask() {
jedisPool.resource.use {
it.publish(CHANNEL, "updateTask")
}
}
fun subscribeTaskCount() =
flux.map {
taskBackendClient.getTaskCount().count
}.run {
share()
}
}
class TaskSubscriber(private val fluxSink: FluxSink<String>) : JedisPubSub() {
override fun onMessage(channel: String?, message: String?) {
fluxSink.next(message)
}
}
|
ポイントはFlux.create<T>
で作られるFluxをTaskSubscriberのコンストラクタに渡してTaskSubscriberのonMessageでデータを流すようにしているところ。
コンテンツが更新されたらTaskService#publishUpdateTask
を呼び出しチャンネルに通知をPublishする。
ここまでくればTaskServiceを必要なところで使えばOK。
API HandlerでSever-Send-EventsのServerResponseを返す
bodyToServerSentEvents
の拡張関数を使えばSever-Send-EventsのServerResponseを返せる。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Component
class TaskHandler(private val taskBackendClient: TaskBackendClient,
private val taskService: TaskService) {
// ---
fun create(req: ServerRequest): Mono<ServerResponse> {
return ok().json().body(
req.bodyToMono(CreateTaskInbound::class.java).map { p ->
TaskModel(taskBackendClient.createTask(p.title)).also {
taskService.publishUpdateTask()
}
})
}
// ---
fun fetchTaskCount(req: ServerRequest): Mono<ServerResponse> {
return ok().json().bodyToServerSentEvents(
taskService.subscribeTaskCount()
.map {
TaskCount(it)
}
)
}
}
|
text/event-streamをacceptするroutingをRouterFunctionDslに追加する
1
2
3
4
5
6
7
8
9
10
11
12
|
@Configuration
class TaskRoutes(private val taskHandler: TaskHandler) {
@Bean
fun taskRouter() = router {
"/api".nest {
accept(TEXT_EVENT_STREAM).nest {
GET("/task-count", taskHandler::fetchTaskCount)
}
}
}
}
|
まとめ
- Spring BootでServer Sent EventsのPush型APIをつくってみた
- ReactorをサポートしているのとServer Sent EventsのAPIを作りやすい仕組みがSpring Boot 2.0から整っているのありがたい
- ReactorにProcessorが用意されているがRx javaと機能差異がどれくらいあるか気になる
コード
動くコードあります。