Dagger2 + uber/Autodispose + RxJava2でFluxアーキテクチャを導入してみた


ブログエントリで試しているAndroidアプリにFluxアーキテクチャを導入してみたのでまとめる。

Fluxアーキテクチャ

Fluxアーキテクチャに関する情報は調べるとたくさん見つかるので概要までに留めておく。

Fluxはアプリケーションデータを管理するためのデータフローのパターンで、最も重要なコンセプトはデータの流れが単一方向であること。

出典:https://github.com/facebook/flux/tree/master/examples/flux-concepts

Fluxを理解するための要素として、ActionDispatcherStoreViewがある。上記の図にもあるとおりActionからViewまでデータは一方向に流れている。ActionからDispatcherを通してStoreへデータを流す。Storeから状態変更されたデータをViewへ渡す。ViewからActionを通してDispatcherに伝達するフローはViewがonClickなどのイベントを検知して応じたActionを発火させているものである。

ViewはStoreの状態変更の検知やActionを発火させる役割を担う。ViewはStoreの状態変更があれば即座に対応したり、イベントが起きたらActionへ伝えActionからDispatcherを通してStoreの状態変更を検知するためにPub/Subを行う必要がある。ActionとStoreを仲介するDispatcherにStreamを導入することでデータのPub/Subを実現する。このエントリではRxJava2を採用した。

ここからはFluxアーキテクチャをAndroidアプリに導入する過程のコードをまとめていく。

Dispatcher

1
2
3
class ForecastsDispatcher {
    val forecastsProcessor = PublishProcessor.create<Forecasts>()
}

PublishProcessorでFlowableなPublisherを定義する。forecastsProcessorは天気情報を保持する。

Action

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Singleton
class ForecastsAction @Inject constructor(private val forecastsDispatcher: ForecastsDispatcher,
                                          private val errorDispatcher: ErrorDispatcher,
                                          private val openWeatherMapRepository: OpenWeatherMapRepository) {
    fun findByDaily() {
        openWeatherMapRepository.findForecastByDaily()
                .subscribeOn(Schedulers.io())
                .subscribe({
                    forecastsDispatcher.forecastsProcessor.onNext(it)
                }, {
                    errorDispatcher.onError(Err(it.message))
                })
    }
}

openWeatherMapRepositoryを通して天気情報を取得してforecastsDispatcher.forecastsProcessor.onNext(it)でStreamにデータを流す。

Store

1
2
3
4
@Singleton
class ForecastsStore @Inject constructor(private val forecastsDispatcher: ForecastsDispatcher) {
    fun forecasts() = forecastsDispatcher.forecastsProcessor
}

Storeはデータの状態を保持する。またSetterは定義せずデータ取得のメソッドを定義する。forecasts()はdispatcherを通して最新の情報を取得する。

View

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ForecastsFragment : AutoDisposeFragmentKotlin() {

    @Inject lateinit var forecastsAction: ForecastsAction
    @Inject lateinit var forecastsStore: ForecastsStore
    @Inject lateinit var errorStore: ErrorStore

    // ---

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        forecastsStore.forecasts()
                .observeOn(AndroidSchedulers.mainThread())
                .`as`(autoDisposable(this))
                .subscribe { forecasts ->
                    listView.adapter = ArrayAdapter<String>(activity, android.R.layout.simple_list_item_1,
                            forecasts.list.map {
                                "%s - %s %s/%s".format(
                                        DateUtils.formatDateTime(activity, it.dt * 1000L, FORMAT_NO_YEAR),
                                        it.weather.get(0).main, it.temp.min, it.temp.max)
                            })
                }

        savedInstanceState ?: forecastsAction.findByDaily()
    }

    // ---

ActionのforecastsAction.findByDaily()を実行して、forecastsStore.forecasts()で購読処理をする。

autoDisposable

ライフサイクルに合わせてStreamを廃棄させたい。 forecastsStore.forecasts()でStreamを購読しているが他のActivityやFragmentに切り替わってもStreamのインスタンスを保持し続けるとメモリリークを起こす原因となるため不要なStreamを廃棄する必要がある。

そこでuber/AutoDisposeを導入すれば、Streamの廃棄をAndroidのライフサイクルに合わせて自動で行ってくれる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
abstract class AutoDisposeFragmentKotlin : Fragment(), LifecycleScopeProvider<AutoDisposeFragmentKotlin.FragmentEvent> {

    // ---

    companion object {

        /**
         * This is a function of current event -> target disposal event. That is to say that if event A
         * returns B, then any stream subscribed to during A will autodispose on B. In Android, we make
         * symmetric boundary conditions. Create -> Destroy, Start -> Stop, etc. For anything after
         * Resume we dispose on the next immediate destruction event. Subscribing after Detach is an
         * error.
         */
        private val CORRESPONDING_EVENTS: Function<FragmentEvent, FragmentEvent> =
                Function { lifecycleEvents ->
            when (lifecycleEvents) {
                ATTACH -> DETACH
                CREATE -> DESTROY
                CREATE_VIEW -> DESTROY_VIEW
                START -> STOP
                RESUME -> PAUSE
                PAUSE -> STOP
                STOP -> DESTROY_VIEW
                DESTROY_VIEW -> DESTROY
                DESTROY -> DETACH
                else -> throw LifecycleEndedException("Cannot bind to Fragment lifecycle after detach.")
            }
        }
    }
}

Streamの購読を開始したイベントとStreamを廃棄させるイベントをマッピングしているコードが上記である。このコードはuber/AutoDisposeのrecipeを参考にした。

AutoDisposeを使えばCREATEで購読を開始したStreamをDESTROYで廃棄してくれる。

ErrorDispatcher

APIエラーが起きた場合のエラーをViewまで伝達させるためにErrorDispatcherを定義した。

1
2
3
4
5
class ErrorDispatcher {
    val errors = PublishSubject.create<Err>().toSerialized()

    fun onError(err: Err) = errors.onNext(err)
}

エラーはBackpressureは必要ないのでPublishSubjectを定義してシリアライズしている。

ErrorStoreとView

1
2
3
4
@Singleton
class ErrorStore @Inject constructor(private val errorDispatcher: ErrorDispatcher) {
    fun errors() = errorDispatcher.errors
}

ErrorStoreを定義することでViewから最新のエラーを取得することができる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class ForecastsFragment : AutoDisposeFragmentKotlin() {

    @Inject lateinit var forecastsAction: ForecastsAction
    @Inject lateinit var forecastsStore: ForecastsStore
    @Inject lateinit var errorStore: ErrorStore

    // ---

    override fun onResume() {
        super.onResume()

        errorStore.errors()
                .observeOn(AndroidSchedulers.mainThread())
                .`as`(autoDisposable(this))
                .subscribe { error ->
                    Toast.makeText(activity, error.message, Toast.LENGTH_LONG).show()
                }
    }

    // ---
}

コード

Fluxアーキテクチャの導入前と後のコード比較ができるようにPRを残しています。 エントリで紹介したコードは断片的なので参考になれば嬉しいです。

まとめ