Knative EventingのSourceにAWS SQSをつかいServiceを起動する
前回のエントリではKnativeの Build
の知見をまとめたが今回は Eventing
をまとめる。試したところServerlessを体感できて、これぞKnativeの醍醐味ではという感想である。
モチベーション
KnativeのEventingを導入する際にAWS環境ではどのようなAWSサービスを使うことになるのか知りたかった。またEventingのアーキテクチャとAWSが提供するサービスが協調できるのか疑問でもあった。
調査したところSQSとMSK(Amazon Managed Streaming for Kafka)を使う選択になりそうという結果になった。ちなみにGCPであればGCP Cloud Pub/SubでEventingのアーキテクチャとバシッとハマるのでいいな〜という印象である。
このエントリではAWSサービスを使ってKnative Eventingを動かすまでをまとめていきたい。
※ 最初にお断りでローカルでサクッと確認するまでをまとめているのでMSKのインストールはせずにChannelにはIn-Memory Channelsを使っています。
Knative Eventing
Knative Eventingのアーキテクチャはdocsから確認できる。
キーワードはSource
、Channel
、Subscription
、Service
である。このうちSourceとChannelはマネージドに合わせて数種類の選択肢が用意されている。
上記はSourceにGitHubSource、ChannelにKafkaを使った例である。GitHub(Source)のイベント(マージやプルリクエスト)を検知してKafka(Channel)に流しChannelをサブスクライブしているサービスへイベントを伝搬させている。サービスには特定のイメージを指定できる。
GitHubSourceとApache Kafka Channelsはサンプルが用意さている。
docs/eventing/samples/github-source at master · knative/docs · GitHub
eventing/config/provisioners/kafka at master · knative/eventing · GitHub
AWS SQS Sourceを使う
本題である。SourceにAWS SQSを使い特定のQueue URLに流れたデータをサービスで出力してみる。工程はサンプルに沿って進んでいるが一部誤りがあるので正しい工程をまとめていく。
※ Istio, Service, Eventingのインストールが済んでいれば knative/eventing-sourcesをチェックアウト
までSkipしてください。
Istioのインストール
1
|
$ kubectl apply --filename https://raw.githubusercontent.com/knative/serving/v0.2.2/third_party/istio-1.0.2/istio.yaml |
Istio Injectorを有効にする
1
|
$ kubectl label namespace default istio-injection=enabled |
Knative Servingをインストール
1 2 3 |
$ curl -L https://github.com/knative/serving/releases/download/v0.2.2/release-lite.yaml \ | sed 's/LoadBalancer/NodePort/' \ | kubectl apply --filename - |
Eventingのインストール
1
|
kubectl apply --filename https://github.com/knative/eventing/releases/download/v0.2.1/release.yaml |
knative/eventing-sourcesをチェックアウト
1
|
$ git clone git@github.com:knative/eventing-sources.git |
※ 以降はeventing-sources
のディレクトリで手順を実行します
Channelの作成
1
|
$ (eventing-sources) $ kubectl -n default apply -f samples/awssqs_source/channel.yaml |
チャンネルは標準の in-memory-channel
を利用している。AWSのProductionであればApache Kafkaのチャンネルを利用を検討している。
applyしたチャンネルは次のような定義である。
|
|
nameにqux-1
を設定しており後述するAwsSqsSourceとSubscriptionのリソースから参照される。
AWS Credentialsのリソース化
1 2 3 4 5 |
(eventing-sources) $ echo '[default] aws_access_key_id = XXXXXXXX aws_secret_access_key = XXXXXXXX' > ./cred.txt (eventing-sources) $ kubectl -n knative-sources create secret generic awssqs-source-credentials --from-file=credentials=./cred.txt |
ここでリソース化したAWS CredentialsをEventingのリソースがVolumeMountして利用することになる。アクセスとシークレットのキーを用意してリソース化する。
AwsSqsSourceを有効にする
1 2 |
(eventing-sources) $ export KO_DOCKER_REPO="ko.local" (eventing-sources) $ ko apply -f config/default-awssqs.yaml |
ko
のセットアップが済んでいる人はKO_DOCKER_REPOの変数を設定する必要はない。
SQSのQUEUE URLとQUEUE NAMEを書き換える
1 2 3 4 5 |
(eventing-sources) $ export QUEUE_URL="SQS QueueのURLをここに入れる" (eventing-sources) $ sed -i -e "s|QUEUE_URL|$QUEUE_URL|" samples/awssqs_source/awssqs-source.yaml (eventing-sources) $ export QUEUE_NAME="my-queue" (eventing-sources) $ sed -i -e "s|QUEUE_NAME|$QUEUE_NAME|" samples/awssqs_source/awssqs-source.yaml |
QUEUE_URLとQUEUE_NAMEを書き換えると下記のようになる。
|
|
この awsqs-source.yaml
はdefaultのネームスペースにapplyするのでAWS Credentialsを再度セットアップする。
1
|
(eventing-sources) $ kubectl -n default create secret generic aws-credentials --from-file=credentials=cred.txt |
AwsSqsSourceをデプロイする
1
|
(eventing-sources) $ ko -n default apply -f samples/awssqs_source/awssqs-source.yaml |
Subscriberをデプロイする
最後にSubscriberをデプロイする。
1
|
(eventing-sources) $ ko -n default apply -f samples/awssqs_source/subscriber.yaml |
デプロイしたsubscriberは次のような定義になっている。
|
|
Subscriptionリソースを定義してchannelにqux-1
をsubscriberにmessage-dumper
を設定している。message-dumperはKnative ServingのServiceでありQueueが送信されなければPodを終了してくれる。
動作確認
SQSを送信する
1 2 |
$ export QUEUE_URL = "SQS QueueのURL" $ aws sqs send-message --queue-url $QUEUE_URL --message-body "Hello World from sqs" |
2つのポッドが起動しているのを確認する
1 2 3 4 |
$ kubectl get pod NAME READY STATUS RESTARTS AGE awssqs-my-queue-source-gzxg5-8756589d-bmft6 2/2 Running 0 8m message-dumper-00001-deployment-7c787dfbfc-v547h 3/3 Running 0 6m |
- awssqs-my-queue-source
- ポッド内には
receive_adaper
というコンテナが動いていた。SQSをポーリングしているようにみえる。
- ポッド内には
- message-dumper
- subsriber.yamlで定義したServiceから起動したポッドである。このポッドは一定時間にキューが流れてこなれば自動で終了してくれる。
ログを確認する
1 2 3 4 |
$ stern message-dumper -c user-container ... 省略 {"Attributes":{"SentTimestamp":"UNIXTIME"},"Body":"Hello World from sqs", ... } |
message-dumperのPodに上記のようなログが流れていれば成功である。
Severlessを体感する
最後のQueueから5分以上経過するPodが自動で終了することが確認できる。下記のログはSQSにキューが流れてPodが起動して終了するまでをwatchしている様子。最後にキューがを受け取ってから5分くらいで終了している。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ kubectl get pod -w NAME READY STATUS RESTARTS AGE awssqs-my-queue-source-gzxg5-8756589d-bmft6 2/2 Running 0 14m message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 Pending 0 0s message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 Pending 0 1s message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 Init:0/1 0 1s message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 PodInitializing 0 2s message-dumper-00001-deployment-7c787dfbfc-gchnn 2/3 Running 0 5s message-dumper-00001-deployment-7c787dfbfc-gchnn 3/3 Running 0 6s message-dumper-00001-deployment-7c787dfbfc-gchnn 3/3 Terminating 0 5m message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 Terminating 0 5m message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 Terminating 0 5m message-dumper-00001-deployment-7c787dfbfc-gchnn 0/3 Terminating 0 5m 6s |
まとめ
- Knative EventingのSourceにAWS SQSをつかいKnative Eventingnを体験した。
- SourceにAWS SQSが使えるようになったのでKnativeの採用が加速すると思う。
- これまでSQSを利用するサービスはSQSのポーリングを独自に実装する必要があったがKnativeがその役割を担ってくれるので助かる。
- SourceにはCronJobSourceもありcronでスケジュールすることができるので稼働時間が限られるようなバッチサービスに採用したい。