Knative EventingのSourceにAWS SQSをつかいServiceを起動する


前回のエントリではKnativeの Buildの知見をまとめたが今回は Eventingをまとめる。試したところServerlessを体感できて、これぞKnativeの醍醐味ではという感想である。

モチベーション

KnativeのEventingを導入する際にAWS環境ではどのようなAWSサービスを使うことになるのか知りたかった。またEventingのアーキテクチャとAWSが提供するサービスが協調できるのか疑問でもあった。

調査したところSQSとMSK(Amazon Managed Streaming for Kafka)を使う選択になりそうという結果になった。ちなみにGCPであればGCP Cloud Pub/SubでEventingのアーキテクチャとバシッとハマるのでいいな〜という印象である。

このエントリではAWSサービスを使ってKnative Eventingを動かすまでをまとめていきたい。

※ 最初にお断りでローカルでサクッと確認するまでをまとめているのでMSKのインストールはせずにChannelにはIn-Memory Channelsを使っています。

Knative Eventing

Knative Eventingのアーキテクチャはdocsから確認できる。

キーワードはSourceChannelSubscriptionServiceである。このうちSourceとChannelはマネージドに合わせて数種類の選択肢が用意されている。

上記はSourceにGitHubSource、ChannelにKafkaを使った例である。GitHub(Source)のイベント(マージやプルリクエスト)を検知してKafka(Channel)に流しChannelをサブスクライブしているサービスへイベントを伝搬させている。サービスには特定のイメージを指定できる。

GitHubSourceとApache Kafka Channelsはサンプルが用意さている。

docs/eventing/samples/github-source at master · knative/docs · GitHub

eventing/config/provisioners/kafka at master · knative/eventing · GitHub

AWS SQS Sourceを使う

本題である。SourceにAWS SQSを使い特定のQueue URLに流れたデータをサービスで出力してみる。工程はサンプルに沿って進んでいるが一部誤りがあるので正しい工程をまとめていく。

※ Istio, Service, Eventingのインストールが済んでいれば knative/eventing-sourcesをチェックアウトまでSkipしてください。

Istioのインストール

1
$ kubectl apply --filename https://raw.githubusercontent.com/knative/serving/v0.2.2/third_party/istio-1.0.2/istio.yaml

Istio Injectorを有効にする

1
$ kubectl label namespace default istio-injection=enabled

Knative Servingをインストール

1
2
3
$ curl -L https://github.com/knative/serving/releases/download/v0.2.2/release-lite.yaml \
  | sed 's/LoadBalancer/NodePort/' \
  | kubectl apply --filename -

Eventingのインストール

1
kubectl apply --filename https://github.com/knative/eventing/releases/download/v0.2.1/release.yaml

knative/eventing-sourcesをチェックアウト

1
$ git clone git@github.com:knative/eventing-sources.git

※ 以降はeventing-sourcesのディレクトリで手順を実行します

Channelの作成

1
$ (eventing-sources) $ kubectl -n default apply -f samples/awssqs_source/channel.yaml

チャンネルは標準の in-memory-channelを利用している。AWSのProductionであればApache Kafkaのチャンネルを利用を検討している。

applyしたチャンネルは次のような定義である。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
metadata:
  name: qux-1
  namespace: default
spec:
  provisioner:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: ClusterChannelProvisioner
    name: in-memory-channel

nameにqux-1を設定しており後述するAwsSqsSourceとSubscriptionのリソースから参照される。

AWS Credentialsのリソース化

1
2
3
4
5
(eventing-sources) $ echo '[default]
aws_access_key_id = XXXXXXXX
aws_secret_access_key = XXXXXXXX' > ./cred.txt

(eventing-sources) $ kubectl -n knative-sources create secret generic awssqs-source-credentials --from-file=credentials=./cred.txt

ここでリソース化したAWS CredentialsをEventingのリソースがVolumeMountして利用することになる。アクセスとシークレットのキーを用意してリソース化する。

AwsSqsSourceを有効にする

1
2
(eventing-sources) $ export KO_DOCKER_REPO="ko.local"
(eventing-sources) $ ko apply -f config/default-awssqs.yaml

koのセットアップが済んでいる人はKO_DOCKER_REPOの変数を設定する必要はない。

SQSのQUEUE URLとQUEUE NAMEを書き換える

1
2
3
4
5
(eventing-sources) $ export QUEUE_URL="SQS QueueのURLをここに入れる"
(eventing-sources) $ sed -i -e "s|QUEUE_URL|$QUEUE_URL|" samples/awssqs_source/awssqs-source.yaml

(eventing-sources) $ export QUEUE_NAME="my-queue"
(eventing-sources) $ sed -i -e "s|QUEUE_NAME|$QUEUE_NAME|" samples/awssqs_source/awssqs-source.yaml

QUEUE_URLとQUEUE_NAMEを書き換えると下記のようになる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: AwsSqsSource
metadata:
  name: my-queue-source
spec:
  awsCredsSecret:
    name: aws-credentials
    key: credentials
  queueUrl: "SQS QueueのURL"
  sink:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: Channel
    name: qux-1

この awsqs-source.yamlはdefaultのネームスペースにapplyするのでAWS Credentialsを再度セットアップする。

1
(eventing-sources) $ kubectl -n default create secret generic aws-credentials --from-file=credentials=cred.txt

AwsSqsSourceをデプロイする

1
(eventing-sources) $ ko -n default apply -f samples/awssqs_source/awssqs-source.yaml

Subscriberをデプロイする

最後にSubscriberをデプロイする。

1
(eventing-sources) $ ko -n default apply -f samples/awssqs_source/subscriber.yaml

デプロイしたsubscriberは次のような定義になっている。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Subscription from the AWS SQS Source's output Channel to the Knative Service below.

apiVersion: eventing.knative.dev/v1alpha1
kind: Subscription
metadata:
  name: awssqs-source-sample
  namespace: default
spec:
  channel:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: Channel
    name: qux-1
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: message-dumper

---

# This is a very simple Knative Service that writes the input request to its log.

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: message-dumper
  namespace: default
spec:
  runLatest:
    configuration:
      revisionTemplate:
        spec:
          container:
            image: github.com/knative/eventing-sources/cmd/message_dumper

Subscriptionリソースを定義してchannelにqux-1をsubscriberにmessage-dumperを設定している。message-dumperはKnative ServingのServiceでありQueueが送信されなければPodを終了してくれる。

動作確認

SQSを送信する

1
2
$ export QUEUE_URL = "SQS QueueのURL"
$ aws sqs send-message --queue-url $QUEUE_URL --message-body "Hello World from sqs"

2つのポッドが起動しているのを確認する

1
2
3
4
$ kubectl get pod
NAME                                               READY     STATUS    RESTARTS   AGE
awssqs-my-queue-source-gzxg5-8756589d-bmft6        2/2       Running   0          8m
message-dumper-00001-deployment-7c787dfbfc-v547h   3/3       Running   0          6m

ログを確認する

1
2
3
4
$ stern message-dumper -c user-container
...
省略
 {"Attributes":{"SentTimestamp":"UNIXTIME"},"Body":"Hello World from sqs", ... }

message-dumperのPodに上記のようなログが流れていれば成功である。

Severlessを体感する

最後のQueueから5分以上経過するPodが自動で終了することが確認できる。下記のログはSQSにキューが流れてPodが起動して終了するまでをwatchしている様子。最後にキューがを受け取ってから5分くらいで終了している。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
$ kubectl get pod -w
NAME                                          READY     STATUS    RESTARTS   AGE
awssqs-my-queue-source-gzxg5-8756589d-bmft6   2/2       Running   0          14m
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       Pending   0         0s
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       Pending   0         1s
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       Init:0/1   0         1s
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       PodInitializing   0         2s
message-dumper-00001-deployment-7c787dfbfc-gchnn   2/3       Running   0         5s
message-dumper-00001-deployment-7c787dfbfc-gchnn   3/3       Running   0         6s
message-dumper-00001-deployment-7c787dfbfc-gchnn   3/3       Terminating   0         5m
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       Terminating   0         5m
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       Terminating   0         5m
message-dumper-00001-deployment-7c787dfbfc-gchnn   0/3       Terminating   0         5m        6s

まとめ