rx-preferencesをつかってSharedPreferencesの更新をSubscribeする

ブログエントリで試しているAndroidアプリにFluxアーキテクチャを導入をしたことでデータの流れがStreamになった。SharedPreferencesも rx-preferencesをつかえば更新状態をSubscribeできるので導入過程をまとめていく。

github.com

お試し中のアプリはOpenWeatherMapのAPIを使い天気情報を取得する。ZipCodeをクエリに追加すれば地域の天気情報が取得できる。アプリ内でZipCodeを登録できるようにしたいのでSharedPreferencesで管理することにする。

rx-preferencesをつかう

rx-preferencesを追加してRepository化するまで。

# dependencies.gradle

rxPreferences = 'com.f2prateek.rx.preferences2:rx-preferences:2.0.0-RC3'
# SettingsRepository.kt

@Singleton
class SettingsRepository @Inject constructor(private val application: Application) {

    private val SETTING_NAME = "setting"
    private val ZIP_CODE_NAME = "zip_code"

    fun getZipCode() = getRxSharedPreferences(SETTING_NAME).getString(ZIP_CODE_NAME)
    fun updateZipCode(zipCode: String) = getZipCode().set(zipCode)

    private fun getRxSharedPreferences(name: String) = RxSharedPreferences.create(
            application.getSharedPreferences(BuildConfig.APPLICATION_ID + '.' + name, Context.MODE_PRIVATE))
}

Repository層の役割はデータ取得や更新など。Repositoryを使う側はその先がAPIかDBかに関心する必要させたくないのでSharedPreferencesのデータ取得や更新もRepository層で実装した。
このRepositoryをFluxアーキテクチャに乗せていく。

ActionとStoreをつくる。Dispatcherはつくらない。

# SettingsAction.kt

@Singleton
class SettingsAction @Inject constructor(private val settingsRepository: SettingsRepository) {

    fun updateZipCode(zipCode: String) = settingsRepository.updateZipCode(zipCode)
}

ActionはZipCodeを更新するメソッドを追加した。

# SettingsStore.kt

@Singleton
class SettingsStore @Inject constructor(private val settingsRepository: SettingsRepository) {
    fun zipCode() = settingsRepository.getZipCode()
            .asObservable()
            .subscribeOn(Schedulers.io())
            .toFlowable(BackpressureStrategy.LATEST)
}

StoreにはZipCodeの状態をObserveする zipCode()メソッドを追加した。

rx-preferencesは asObservable()を呼び出せばStreamオブジェクトを取得できる。BackpressureStrategy.LATESTを有効にすることで常に最新の値が流れるようにした。

Fluxアーキテクチャに乗るならばDispatcherを作るところだが、 rx-preferencesがStreamを提供してくれるのでこのエントリではDispatcherを追加していない。このアプリはまだ小さいのでDispatcherの追加はしない判断をしたが大きなアプリであればデータのアクションに応じて専用のDispatcherに乗せたほうが良い場合もあると思う。

ZipCodeの更新をSubscribeする

FragmentでZipCodeの更新をSubscribeする。

# ForecastsFragment.kt

class ForecastsFragment : AutoDisposeFragmentKotlin() {

    // ---

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {

        forecastsStore.forecasts()
                .observeOn(AndroidSchedulers.mainThread())
                .`as`(autoDisposable(this))
                .subscribe { forecasts ->
                    cityView.text = "%s/%s".format(forecasts.city.name, forecasts.city.country)
                    listView.adapter = ArrayAdapter<String>(activity, android.R.layout.simple_list_item_1,
                            forecasts.list.map {
                                "%s - %s %s/%s".format(
                                        DateUtils.formatDateTime(activity, it.dt * 1000L, FORMAT_NO_YEAR),
                                        it.weather.get(0).main, it.temp.min, it.temp.max)
                            })
                }

        savedInstanceState ?: settingsStore.zipCode()
                .observeOn(AndroidSchedulers.mainThread())
                .`as`(AutoDispose.autoDisposable(this))
                .subscribe {
                    if (it.isNotBlank()) {
                        forecastsAction.findByDaily(it)
                    } else {
                        errorAction.onError("You must to set ZipCode.")
                    }
                }
    }

    // ---
}

SettingActivityでZipCodeが更新されると settingsStore.zipCode()に最新のZipCodeが流れてくるのでSubscribeをしてAPIを呼びだしている。
forecastsAction.findByDaily(it)が呼び出されれば、 forecastsStore.forecasts()に最新のAPI結果が流れてくるのでViewが切り替わる。

コード

rx-preferencesの導入前と後のコード比較ができるようにPRを残しています。 エントリで紹介したコードは断片的なので参考になれば嬉しいです。

https://github.com/soushin/sunshine-app/pull/4github.com

まとめ

  • rx-preferencesをつかってSharedPreferencesの更新をSubsribeしてみた。1つのアクションが発火されると連鎖的に複数のStreamが流動するのを体験できた。
  • まだ2つのStream(APIとSharedPreferences)だけであるが3つ,4つといったStreamが絡み合う条件などは実装が複雑になりそうである。ただRxには実装の複雑性を回避するものもあるので今後のエントリでまとめていきたい。

Dagger2 + uber/Autodispose + RxJava2でFluxアーキテクチャを導入してみた

ブログエントリで試しているAndroidアプリにFluxアーキテクチャを導入してみたのでまとめる。

Fluxアーキテクチャ

Fluxアーキテクチャに関する情報は調べるとたくさん見つかるので概要までに留めておく。

github.com

Fluxはアプリケーションデータを管理するためのデータフローのパターンで、最も重要なコンセプトはデータの流れが単一方向であること。

f:id:n_soushi:20180228092104p:plain

出典:https://github.com/facebook/flux/tree/master/examples/flux-concepts

Fluxを理解するための要素として、ActionDispatcherStoreViewがある。上記の図にもあるとおりActionからViewまでデータは一方向に流れている。ActionからDispatcherを通してStoreへデータを流す。Storeから状態変更されたデータをViewへ渡す。ViewからActionを通してDispatcherに伝達するフローはViewがonClickなどのイベントを検知して応じたActionを発火させているものである。

ViewはStoreの状態変更の検知やActionを発火させる役割を担う。ViewはStoreの状態変更があれば即座に対応したり、イベントが起きたらActionへ伝えActionからDispatcherを通してStoreの状態変更を検知するためにPub/Subを行う必要がある。ActionとStoreを仲介するDispatcherにStreamを導入することでデータのPub/Subを実現する。このエントリではRxJava2を採用した。

ここからはFluxアーキテクチャをAndroidアプリに導入する過程のコードをまとめていく。

Dispatcher

class ForecastsDispatcher {
    val forecastsProcessor = PublishProcessor.create<Forecasts>()
}

PublishProcessorでFlowableなPublisherを定義する。forecastsProcessorは天気情報を保持する。

Action

@Singleton
class ForecastsAction @Inject constructor(private val forecastsDispatcher: ForecastsDispatcher,
                                          private val errorDispatcher: ErrorDispatcher,
                                          private val openWeatherMapRepository: OpenWeatherMapRepository) {
    fun findByDaily() {
        openWeatherMapRepository.findForecastByDaily()
                .subscribeOn(Schedulers.io())
                .subscribe({
                    forecastsDispatcher.forecastsProcessor.onNext(it)
                }, {
                    errorDispatcher.onError(Err(it.message))
                })
    }
}

openWeatherMapRepositoryを通して天気情報を取得してforecastsDispatcher.forecastsProcessor.onNext(it)でStreamにデータを流す。

Store

@Singleton
class ForecastsStore @Inject constructor(private val forecastsDispatcher: ForecastsDispatcher) {
    fun forecasts() = forecastsDispatcher.forecastsProcessor
}

Storeはデータの状態を保持する。またSetterは定義せずデータ取得のメソッドを定義する。forecasts()はdispatcherを通して最新の情報を取得する。

View

class ForecastsFragment : AutoDisposeFragmentKotlin() {

    @Inject lateinit var forecastsAction: ForecastsAction
    @Inject lateinit var forecastsStore: ForecastsStore
    @Inject lateinit var errorStore: ErrorStore

    // ---

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        forecastsStore.forecasts()
                .observeOn(AndroidSchedulers.mainThread())
                .`as`(autoDisposable(this))
                .subscribe { forecasts ->
                    listView.adapter = ArrayAdapter<String>(activity, android.R.layout.simple_list_item_1,
                            forecasts.list.map {
                                "%s - %s %s/%s".format(
                                        DateUtils.formatDateTime(activity, it.dt * 1000L, FORMAT_NO_YEAR),
                                        it.weather.get(0).main, it.temp.min, it.temp.max)
                            })
                }

        savedInstanceState ?: forecastsAction.findByDaily()
    }

    // ---

ActionのforecastsAction.findByDaily()を実行して、forecastsStore.forecasts()で購読処理をする。

autoDisposable

ライフサイクルに合わせてStreamを廃棄させたい。 forecastsStore.forecasts()でStreamを購読しているが他のActivityやFragmentに切り替わってもStreamのインスタンスを保持し続けるとメモリリークを起こす原因となるため不要なStreamを廃棄する必要がある。

そこでuber/AutoDisposeを導入すれば、Streamの廃棄をAndroidのライフサイクルに合わせて自動で行ってくれる。

github.com

abstract class AutoDisposeFragmentKotlin : Fragment(), LifecycleScopeProvider<AutoDisposeFragmentKotlin.FragmentEvent> {

    // ---

    companion object {

        /**
         * This is a function of current event -> target disposal event. That is to say that if event A
         * returns B, then any stream subscribed to during A will autodispose on B. In Android, we make
         * symmetric boundary conditions. Create -> Destroy, Start -> Stop, etc. For anything after
         * Resume we dispose on the next immediate destruction event. Subscribing after Detach is an
         * error.
         */
        private val CORRESPONDING_EVENTS: Function<FragmentEvent, FragmentEvent> =
                Function { lifecycleEvents ->
            when (lifecycleEvents) {
                ATTACH -> DETACH
                CREATE -> DESTROY
                CREATE_VIEW -> DESTROY_VIEW
                START -> STOP
                RESUME -> PAUSE
                PAUSE -> STOP
                STOP -> DESTROY_VIEW
                DESTROY_VIEW -> DESTROY
                DESTROY -> DETACH
                else -> throw LifecycleEndedException("Cannot bind to Fragment lifecycle after detach.")
            }
        }
    }
}

Streamの購読を開始したイベントとStreamを廃棄させるイベントをマッピングしているコードが上記である。このコードはuber/AutoDisposeのrecipeを参考にした。

AutoDisposeを使えばCREATEで購読を開始したStreamをDESTROYで廃棄してくれる。

ErrorDispatcher

APIエラーが起きた場合のエラーをViewまで伝達させるためにErrorDispatcherを定義した。

class ErrorDispatcher {
    val errors = PublishSubject.create<Err>().toSerialized()

    fun onError(err: Err) = errors.onNext(err)
}

エラーはBackpressureは必要ないのでPublishSubjectを定義してシリアライズしている。

ErrorStoreとView

@Singleton
class ErrorStore @Inject constructor(private val errorDispatcher: ErrorDispatcher) {
    fun errors() = errorDispatcher.errors
}

ErrorStoreを定義することでViewから最新のエラーを取得することができる。

class ForecastsFragment : AutoDisposeFragmentKotlin() {

    @Inject lateinit var forecastsAction: ForecastsAction
    @Inject lateinit var forecastsStore: ForecastsStore
    @Inject lateinit var errorStore: ErrorStore

    // ---

    override fun onResume() {
        super.onResume()

        errorStore.errors()
                .observeOn(AndroidSchedulers.mainThread())
                .`as`(autoDisposable(this))
                .subscribe { error ->
                    Toast.makeText(activity, error.message, Toast.LENGTH_LONG).show()
                }
    }

    // ---
}

コード

Fluxアーキテクチャの導入前と後のコード比較ができるようにPRを残しています。 エントリで紹介したコードは断片的なので参考になれば嬉しいです。

github.com

まとめ

  • Fluxアーキテクチャを構成する要素であるAction、Dispatcher、Store、Viewを理解してコード化してみた。
  • コード化すると各要素の責務が明確になる。明確になるということはコード運用に置いてPullRequestの注視ポイントも明確になると思う。
  • Fluxアーキテクチャのコンセプトである単一方向のデータフローはコード化するとより理解が深まった。
  • View(Fragment)のコードをシンプルに構成してくれているのはRxJavaの恩恵でありFluxアーキテクチャとStreamのマッチングは切り離せないものである。

Dagger2(android support module)をつかってFragmentにDIする

Dagger2(android support module)をつかってFragmentにDIする方法をまとめていく。

HasSupportFragmentInjectorを実装する

MainActivityにHasSupportFragmentInjectorを継承させ必要な実装をします。

class MainActivity : AppCompatActivity(), HasSupportFragmentInjector {

    @Inject
    lateinit var androidInjector: DispatchingAndroidInjector<Fragment>

    override fun supportFragmentInjector() = androidInjector

    override fun onCreate(savedInstanceState: Bundle?) {
        AndroidInjection.inject(this)
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        setContentFragment(R.id.mainLayout)
    }

    private fun setContentFragment(containerViewId: Int) {
        supportFragmentManager.let { manager ->
            manager.findFragmentById(containerViewId)?.let { return }
            ForecastsFragment.newInstance().apply {
                manager?.beginTransaction()?.add(containerViewId, this)?.commit()
            }
        }
    }
}

Fragmenのモジュールをまとめる

Fragmentで管理するモジュールを MainModuleとしてまとめる。

@Module
internal abstract class MainModule {

    @ContributesAndroidInjector
    abstract fun contributeMainFragment(): ForecastsFragment

}

MainActivityのSubcomponentにMainModuleを追加する。

@Module
internal abstract class UiModule {

    @ContributesAndroidInjector(modules = [MainModule::class])
    internal abstract fun contributeMainActivity(): MainActivity
}

今後、MainActivityに機能が追加する場合は、MainModuleに依存するモジュールを追加していく。

Fragmenのライフサイクルに合わせてInject

サンプルのコードではonAttachでInjetctをしている。

class ForecastsFragment : Fragment() {

    @Inject
    lateinit var openWeatherMapRepository: OpenWeatherMapRepository

    private var listView: ListView by Delegates.notNull()

    override fun onAttach(context: Context?) {
        AndroidSupportInjection.inject(this)
        super.onAttach(context)
    }

    override fun onCreateView(inflater: LayoutInflater?, container: ViewGroup?, savedInstanceState: Bundle?): View? {
        val view = inflater?.inflate(R.layout.forcasts_fragment, container, false) ?: return null
        listView = view.findViewById<ListView>(R.id.list_view)
        return view
    }

    override fun onActivityCreated(savedInstanceState: Bundle?) {
        super.onActivityCreated(savedInstanceState)

        openWeatherMapRepository.findForecastByDaily()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { forecasts ->
                    listView.adapter = ArrayAdapter<String>(activity, android.R.layout.simple_list_item_1,
                            forecasts.list.map {
                                "%s - %s %s/%s".format(
                                        DateUtils.formatDateTime(activity, it.dt * 1000L, FORMAT_NO_YEAR),
                                        it.weather.get(0).main, it.temp.min, it.temp.max)
                            })
                }
    }

    companion object {
        fun newInstance() = ForecastsFragment()
    }
}
  • Fragmentの理解は公式のドキュメントが分かりやすい。
  • Fragmentを部品と捉えてタブレットとハンドセットで異なるレイアウトでも部品(Fragment)を組み合わせることで柔軟に対応することができるし部品の再利用も容易にできる。

コード

今回のコードはこちらのPRにまとまっていますので参考になれば嬉しいです。

github.com

Dagger2 (android support module)とretrofit2をつかってAPIレスポンスをListViewで表示する

掲題のとおりAndroidのListViewを表示してみる。 APIリクエストは retrofitを使い天気情報を取得できるOpenWeatherMapのAPIを利用する。

github.com

DIにはDaggerを使い、2.11から有効なandroid support moduleを利用する。

github.com

APIをリクエストするServiceクラスをつくる

interface OpenWeatherMapService {

    @GET("/data/2.5/forecast/daily?q=94043&mode=json&units=metric&cnt=7&APPID=XXXXX")
    fun findForecastByDaily(): Observable<Forecasts>
}
  • レスポンスの型はObservable<Forecasts>。型パラメータのForecastsは Parcelableを実装したDTO。
  • APPID=XXXXXはopenweathermapから取得したID

このServiceクラスをRepositoryクラスから呼び出し見通しの良いコードにするためにDIを利用していく。DIについては後述する。

Parcelableを実装したDTO(data class)

data class Forecasts(var cod: Int, var list: List<Forecast>) : Parcelable {

    constructor(src: Parcel) : this(
            cod = src.readInt(),
            list = src.createTypedArrayList(Forecast.CREATOR)
    )

    // -
}

ActivityやFragmentにパラメータを渡すために Parcelableを実装したdata classを用意する。 フィールドにプリミティブ型ではないオブジェクト型を使う場合は次のようにする。

data class Forecast(var dt: Long, var temp: Temp, var weather: List<Weather>) : Parcelable {

    constructor(src: Parcel) : this(
            dt = src.readLong(),
            temp = src.readParcelable(Temp::class.java.classLoader),  //  ← data class `Temp`
            weather = src.createTypedArrayList(Weather.CREATOR) //  ← List型のパラメータに data class `Weather`
    )

    override fun describeContents(): Int {
        return 0
    }

    override fun writeToParcel(dest: Parcel?, flags: Int) {
        dest?.writeLong(dt)
        dest?.writeParcelable(temp, flags) //  ← data class `Temp`
        dest?.writeList(weather) //  ← List型のパラメータに data class `Weather`
    }

    // -
}

MainActivityでDIする

MainActivityでOpenWeatherMapServiceを提供するRepositoryクラスをInjectするまでの過程をまとてめていく。

RepositoryModuleをつくる

class OpenWeatherMapRepository(val openWeatherMapService: OpenWeatherMapService) {
    fun findForecastByDaily() = openWeatherMapService.findForecastByDaily()
}
@Module
internal object RepositoryModule {

    @Provides
    @Singleton
    @JvmStatic
    fun provideOpenWeatherMapRepository(openWeatherMapService: OpenWeatherMapService) =
            OpenWeatherMapRepository(openWeatherMapService)
}

OpenWeatherMapRepositoryを提供するRepositoryModuleをつくった。

DataModuleをつくる

RepositoryModuleをIncludeしたDataModuleをつくる。このモジュールでRetrofitクライアントをビルドする。

@Module(includes = arrayOf(RepositoryModule::class))
internal object DataModule {

    @Provides
    @Singleton
    @JvmStatic
    fun provideMoshi() = Moshi.Builder()
            .add(KotlinJsonAdapterFactory())
            .build()

    @Provides
    @Singleton
    @JvmStatic
    fun provideOkHttp(): OkHttpClient = OkHttpClient.Builder()
            .build()

    @Provides
    @Singleton
    @JvmStatic
    fun provideRetrofit(oktHttpClient: OkHttpClient, moshi: Moshi): Retrofit = Retrofit.Builder()
            .client(oktHttpClient)
            .baseUrl("http://api.openweathermap.org")
            .addConverterFactory(MoshiConverterFactory.create(moshi))
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .build()

    @Provides
    @Singleton
    @JvmStatic
    fun provideOpenWeatherMapService(retrofit: Retrofit) = retrofit.create(OpenWeatherMapService::class.java)
}
  • JSONパーサにはKotlinサポートが入っているMoshiをつかう

@ContributesAndroidInjectorをつかいMainActivityへのInjectを定義する

Dagger 2.11の重要ポイントの1つ。ActivityへのInjectは @ContributesAndroidInjectorをつかいUiModuleをつくる。

@Module
internal abstract class UiModule {

    @ContributesAndroidInjector
    internal abstract fun contributeMainActivity(): MainActivity
}

Activityが増えたときには、ここにActivityへのInjectを追加する。

ApplicationComponentに AndroidInjector<KotlinApplication> を継承させる

Dagger 2.11の重要ポイントの1つ。ApplicationクラスへInjectさせるためにApplicationComponentに AndroidInjector<KotlinApplication> を継承させる。後述するApplicationの親クラスに dagger.android.support.DaggerApplicationを使うためmoduleにAndroidSupportInjectionModule を追加する。

@Singleton
@Component(modules = arrayOf(AndroidSupportInjectionModule::class,
        AppModule::class,
        DataModule::class,
        UiModule::class))
interface ApplicationComponent : AndroidInjector<KotlinApplication> {

    @Component.Builder
    interface Builder {
        @BindsInstance
        fun application(application: KotlinApplication): Builder
        fun build(): ApplicationComponent
    }

    override fun inject(application: KotlinApplication)
}

Applicationクラスに DaggerApplicationを継承させ実装する

HasActivityInjectorを継承する流れを紹介するエントリもあるが DaggerApplicationHasActivityInjectorの実装が含まれているのでこちらをつかう。

class KotlinApplication : DaggerApplication() {

    override fun applicationInjector() = DaggerApplicationComponent.builder()
            .application(this)
            .build()

    override fun onCreate() {
        super.onCreate()
    }
}

MainActivityにOpenWeatherMapRepositoryをInjectする

最後にMainActivityにOpenWeatherMapRepositoryをInjectする。Dagger 2.11の重要ポイントの1つ。InjectするためにはonCreateでAndroidInjection.inject(this)を呼び出す。

class MainActivity : AppCompatActivity() {

    @Inject lateinit var openWeatherMapRepository: OpenWeatherMapRepository

    override fun onCreate(savedInstanceState: Bundle?) {
        AndroidInjection.inject(this)

    // -
}

APIレスポンスをListViewに表示する

class MainActivity : AppCompatActivity() {

    @Inject lateinit var openWeatherMapRepository: OpenWeatherMapRepository

    override fun onCreate(savedInstanceState: Bundle?) {
        AndroidInjection.inject(this)
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        openWeatherMapRepository.findForecastByDaily()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { forecasts ->
                    findViewById<ListView>(R.id.listview).let { view ->
                        view.adapter = ArrayAdapter<String>(this, android.R.layout.simple_list_item_1,
                                forecasts.list.map {
                                    "%s - %s %s/%s".format(
                                    DateUtils.formatDateTime(this, it.dt * 1000L, FORMAT_NO_YEAR),
                                            it.weather.get(0).main, it.temp.min, it.temp.max)
                                })
                    }
                }
    }
}

ListのItemViewには simple_list_item_1をつかって日にちと最高気温と最低気温を表示している。

まとめ

  • Daggert2(android support module)のDIをまとめた。android support module以前のDI方法だとコピペコードが増える懸念があり登場した経緯を知ってなるほど、と思った。
  • retrofitはシンプルな使い方までに留まっているので引き続き触っていきながら知見をまとめていきたい。

コード

このエントリまでのコードがPull Requestにまとまっていますので参考になれば嬉しいです。(初回のコミットなので不要なlayoutコードなどが散見してます。)

github.com

参考

minikube + helmでローカル環境を構築する

plasmaの動作確認のためにローカル環境を構築する機会がありminikube + helmで構築してみようと始めたのがエントリのモチベーション。

github.com

plasmaはServer Push型のミドルウェアでFRESH!で使われている。ポーリング撲滅を掲げgRPC/SSEを用いて省コネクションでイベントのSubscribeを実現している。
plasmaの動作確認にはplasmaredisのミドルウェアとイベントをSubscribeするアプリケーションが必要(kotlin + SpringBootで書いた)。これらをコンテナ化してローカル環境で確認していきたい。

これまでのローカルのコンテナ実行環境は docker-composeでやっていたけど、Kubernetesの初学も兼ねてminikubeでやってみる。helmを使いコンテナ全体をパッケージングしていく。

minikubeでkubernetes環境を起動する

$  minikube start
$  eval $(minikube docker-env)

必要なDockerイメージをビルドまたはプルした状態がこちら。

$ docker images | grep -v "gcr.io"
REPOSITORY                                             TAG                 IMAGE ID            CREATED             SIZE
soushin/plasmacli                                      latest              49ca95df3dad        2 days ago          942MB
redis                                                  latest              861cc310cd91        4 days ago          107MB
openfresh/plasma                                       latest              7ff567596426        6 weeks ago         16.6MB
java                                                   openjdk-8           d23bdf5b1b1b        12 months ago       643MB

gcr.io/*のイメージはリストから排除しています

helmを使いコンテナ全体をパッケージングをする

$ helm create plasmacli
$ tree ./plasmacli
./plasmacli
├── Chart.yaml
├── charts
├── templates
│   ├── NOTES.txt
│   ├── _helpers.tpl
│   ├── deployment.yaml
│   ├── ingress.yaml
│   └── service.yaml
└── values.yaml

helm create {packageName}で雛形を生成する。


このエントリで修正したyamlは values.yamltemplates/deployment.yamlの2つ。
templates/deployment.yamlにコンテナとコンテナ内の環境変数、Internal/Externalのポートを定義する。値は values.yamlから参照する。

templates/deployment.yamlとvalues.yaml
# `plasmacli`コンテナ定義を抜粋

      containers:
        - name: plasmacli
          image: "{{ .Values.plasmaCli.image.repository }}:{{ .Values.plasmaCli.image.tag }}"
          imagePullPolicy: {{ .Values.plasmaCli.image.pullPolicy }}
          ports:
            - name: plasmacli
              containerPort: {{ .Values.service.internalPort }}
          env:
          - name: PLASMA_CLI_PORT
            value: {{ .Values.service.internalPort | quote }}
          - name: PLASMA_HOST
            value: {{ .Values.plasmaCli.env.plasmaHost | quote }}
          - name: PLASMA_PORT
            value: {{ .Values.plasmaCli.env.plasmaPort | quote }}

# 関連する値を `values.yaml`から抜粋

plasmaCli:
  image:
    repository: soushin/plasmacli
    tag: latest
    pullPolicy: IfNotPresent
  env:
    plasmaHost: localhost
    plasmaPort: 50051

service:
  type: NodePort
  port: 80
  internalPort: 8080

パッケージングしたchartをインストールする

$ helm package plasmacli
$ helm install --name plasmacli local/plasmacli
NAME:   plasmacli
LAST DEPLOYED: Sat Jan 27 11:30:23 2018
NAMESPACE: default
STATUS: DEPLOYED

RESOURCES:
==> v1/Service
NAME       TYPE      CLUSTER-IP    EXTERNAL-IP  PORT(S)       AGE
plasmacli  NodePort  10.98.66.149  <none>       80:31118/TCP  0s

==> v1beta2/Deployment
NAME       DESIRED  CURRENT  UP-TO-DATE  AVAILABLE  AGE
plasmacli  1        0        0           0          0s

podを確認してみると無事起動している。

$ kubectl get pods -n default
NAME                        READY     STATUS    RESTARTS   AGE
plasmacli-957c79484-d4dkh   3/3       Running   0          2d

クラスタの外からアクセスする

クラスタの外からのアクセスにはserviceをNodePort化したのでポート番号が払い出されている。
アクセスURLは次のように確認できる。

$ minikube service plasmacli --url
http://192.168.64.12:31118

このURLのバックエンドにはイベントをSubscribeするアプリケーションが設定されているので次のように /health_check が叩けるようになっている。

$ curl http://192.168.64.12:31118/health_check
true

plasmaの動作確認をしてみよう

イベントをSubscribeするアプリケーションではイベント名:my-eventをSubscribeするようにしている。

redisからchannelへPublishする。

PUBLISH plasma '{"meta": { "type": "my-event"}, "data": "HELLO"}'
PUBLISH plasma '{"meta": { "type": "my-event"}, "data": "My Plasma"}'

イベントをSubscribeするアプリケーションのログの最後にPayloadデータ(HELLO, My Plasmaの文字列)が出力できた。

$ kubectl logs -f  plasmacli-957c79484-d4dkh -c plasmacli

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::             (v2.0.0.M7)

[INFO ][2018-01-27 11:30:26.380] Starting Application.Companion on plasmacli-957c79484-d4dkh with PID 5 (/usr/local/plasma-cli/lib/plasma-cli.jar started by root in /)
[INFO ][2018-01-27 11:30:26.420] No active profile set, falling back to default profiles: default
[INFO ][2018-01-27 11:30:26.582] Refreshing org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@445b84c0: startup date [Sat Jan 27 11:30:26 GMT+09:00 2018]; root of context hierarchy
[INFO ][2018-01-27 11:30:29.531] Mapped (Accept: [application/json] && /health_check) => {
 (GET && /) -> org.springframework.web.reactive.function.server.RouterFunctionDsl$GET$1@709ba3fb
}
[INFO ][2018-01-27 11:30:29.564] Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler]
[INFO ][2018-01-27 11:30:29.565] Mapped URL path [/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler]
[INFO ][2018-01-27 11:30:29.661] Looking for @ControllerAdvice: org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@445b84c0: startup date [Sat Jan 27 11:30:26 GMT+09:00 2018]; root of context hierarchy
[INFO ][2018-01-27 11:30:31.598] Registering beans for JMX exposure on startup
[INFO ][2018-01-27 11:30:31.745] Started HttpServer on /0.0.0.0:8080
[INFO ][2018-01-27 11:30:31.761] Netty started on port(s): 8080
[INFO ][2018-01-27 11:30:31.773] Started Application.Companion in 6.292 seconds (JVM running for 7.875)

[INFO ][2018-01-27 11:46:06.378] stream observe: onNext={"HELLO"}
[INFO ][2018-01-27 11:48:48.386] stream observe: onNext={"My Plasma"}

コードはgithubにあります

github.com

コードはgithubに置いてますので合わせて参照ください。

まとめ

  • 簡易的なplasmaの動作確認環境が構築できた。
  • これからは公開するアプリケーションのコンテナパッケージはhelmで公開していこうとモチベーションがあがった。
  • kubectlの各種コマンドはDockerコマンドライクなところが多く、ここらへんはとても飲み込みやすい。
  • 今回は初学ということで用語の解説などを飛ばしまったので触っていきながら用語の理解を深めていく必要を感じた。
  • helmのvaluesで定義した値とdeployment.yamlで参照する値のKeyのマッピングにミスが多かったのでIDEのhelmライブラリがあると便利そう。
  • Serviceの ClusterIPやPod間のアクセスなど気になるところが数多あるので引き続きアウトプットしていく。

SpringBoot 2.0.0でRouterFunctionのエラーハンドリングをWebExceptionHandlerで行う

SpringBoot 2.0.0からサポートされるRouterFunctionのエラーハンドリングをまとめていきたい。

RouterFunctionは従来のアノテーションベースでAPIを作る形ではなくDSLベースでルーティングを定義していく。

@Configuration
class TaskRoutes(private val taskHandler: TaskHandler) {

    @Bean
    fun taskRouter() = router {
        (accept(APPLICATION_JSON) and "/api").nest {
            "/task".nest {
                POST("/", taskHandler::create)
                GET("/{id}", taskHandler::fetchByTaskId)
                PUT("/{id}", taskHandler::updateByTaskId)
                DELETE("/{id}", taskHandler::deleteByTaskId)
                PUT("/{id}/finish", taskHandler::finishByTaskId)
            }
            "/tasks".nest {
                GET("/", taskHandler::fetchAll)
            }
        }
    }
}

TaskHandlerクラスは各エンドポイントを処理するが処理中に発生したエラーを共通的にハンドリングする場合にはどうすればよいか? そんなときは WebExceptionHandlerを実装すれば良い。

@Component
class ApiErrorHandler(private val objectMapper: ObjectMapper) : WebExceptionHandler {

    private val logger = KotlinLogging.logger {}

    override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono<Void> {
        return handle(ex)
                .flatMap {
                    it.writeTo(exchange, HandlerStrategiesResponseContext(HandlerStrategies.withDefaults()))
                }
                .flatMap {
                    Mono.empty<Void>()
                }
    }

    private fun handle(t: Throwable): Mono<ServerResponse> {
        return when (t) {
            is SystemException -> {
                "api error".let {
                    logger.error(t) { t.message ?: it }
                    createResponse(t.status, t.message ?: it)
                }
            }
            is DecodingException -> {
                "invalid request".let {
                    logger.warn(t) { t.message ?: it }
                    createResponse(HttpStatus.BAD_REQUEST, it)
                }
            }
            else -> {
                logger.error(t) { "Unknown Exception: %s".format(t.message ?: "unknown error") }
                createResponse(HttpStatus.INTERNAL_SERVER_ERROR, t.message ?: "internal server error")
            }
        }
    }

    private fun createResponse(httpStatus: HttpStatus, message: String, code: String? = null): Mono<ServerResponse> {
        return Error(objectMapper.writeValueAsString(listOf(ErrorItem(message, code, null)))).let {
            ServerResponse.status(httpStatus).syncBody(it)
        }
    }
}

private class HandlerStrategiesResponseContext(val strategies: HandlerStrategies) : ServerResponse.Context {

    override fun messageWriters(): List<HttpMessageWriter<*>> {
        return this.strategies.messageWriters()
    }

    override fun viewResolvers(): List<ViewResolver> {
        return this.strategies.viewResolvers()
    }
}

[WebExceptionHandler.handle]の引数exに例外クラスが渡ってくるので、メッセージとHTTPレスポンスを組み立てレスポンスBodyにセットしている。

404エラーの場合のエラーハンドリングが正しくできていることを確認。

curl -v -XGET http://localhost:8080/api/task/2
Note: Unnecessary use of -X or --request, GET is already inferred.
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /api/task/2 HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.56.0
> Accept: */*
>
< HTTP/1.1 404 Not Found
< transfer-encoding: chunked
< Content-Type: application/json;charset=UTF-8
<
* Connection #0 to host localhost left intact
{"message":"task not found","errorCode":"404","field":null}

まとめ

  • これまでFilterFunctionでエラーハンドリングを行っていたが開発をする過程で間違っていることに気づいた。ただしくWebExceptionHandlerを利用する方法をアウトプットした。合わせて Web on Reactive Stack を参照してほしい。

コード

コードはgithubにあります。

github.com

詳細なコードは次のリンクからどうぞ。 spring5-kotlin-application/ApiErrorHandler.kt at master · soushin/spring5-kotlin-application · GitHub

モダンな負荷テストツールのk6を試してみた

負荷テストツールのk6を試す機会があったのでアウトプットしていく。

k6.io

k6は負荷テストサービスを提供するloadimpact社が開発する負荷テストツールでツール自体はgoでテストケースはES6で書く。

レポート出力はInfluxDB+Grafanaまたはloadimpact Insightsと連携する。連携方法はテスト実行時のコマンドにDB URLやTokenを加えるだけのシンプルな設計。

最近はgRPC通信の採用も増えているが今のところk6ではサポートされていない。ただ負荷テストサービスを提供するloadimpact社なので今後のサポートに期待できる。オープンソースだしgoだし自分で作りたい欲も出てくるのがモダンなk6の良いところ。

今回のエントリは簡易的なAPIサーバに負荷テストを実行するまでの流れをコードを交えてまとめていく。

k6をdockerで動かす

サクッと試せる環境を作りたかったのでAPIサーバとk6のコンテナをdocker-composeで構成管理した。k6はイメージが提供されているので簡単に動かせる。

docker pull loadimpact/k6

テストシナリオを書く

テストシナリオはEC6で書く。馴染みがあるJavascriptでシナリオを書けるのが良い。

テストシナリオの書き方で理解したかったのが次の2点。

上記、githubのリンクにあるとおり難しくなくドキュメントやサンプルも豊富ですぐに理解できた。

また、checkgroupthresholdsはテストシナリオを書く上で重要な要素となる。勿論、その他にも理解すべき要素はある。ドキュメントが豊富なので触っていきながら理解を深めていきたい。

docs.k6.io

k6 runオプションを理解する

テストの実行はk6 runのコマンドを実行する。どれくらいのユーザをアクセスさせるか、どれくらいテストを続けるかのオプションを理解する必要がある。今回使ったオプションは次の通り。

  • vus 同時接続ユーザ数の理解で良さそう。
  • iterations テストの実行回数。
k6 run --vus 5 --iterations 5 ./scripts/localhost.js

上記のコマンドだと5ユーザがアクセスしてテスト回数が5回行われる。iterationsは5ユーザが5回テストを実行するのではなく、全体のテスト回数が5回となる。そのため5ユーザが1回テストを実行するコマンドとなる。

またdurationstageも利用頻度が高いオプションになる。durationはテスト実行期間を指定できる、stageはJmeterのramp upのようなオプションである。

docs.k6.io

テスト結果をグラフに出力してみた

loadimpact Insightsにテスト結果を出力してみた。

docs.k6.io

連携方法もシンプル。loadimpact.comのアカウントをつくりダッシュボードからCLOUDトークンを発行してrunオプションに加えるだけである。連携した結果のイメージは次のとおり。

f:id:n_soushi:20171111221320p:plain

vusの数やテストシナリオの数でアカウントのグレードが異なり有料オプションとなる。継続的に負荷テストを実行するのであれば有料オプションを選択することを検討するのもありだし、InfluxDB+Grafanaの連携方法もある。標準のstdoutからもテスト結果は確認できるので、どのように負荷テストを運用するかでレポート運用方法の選択肢はこちらに委ねられている印象。

docs.k6.io

サクッと動かせるコードあります

githubに動かせるコードを置いたので興味ある人は参照ください。

github.com