本ページは、AWS に関する個人の勉強および勉強会で使用することを目的に、AWS ドキュメントなどを参照し作成しておりますが、記載の誤り等が含まれる場合がございます。

最新の情報については、AWS 公式ドキュメントをご参照ください。

Amazon Kinesis Data Streams は、リアルタイムでストリーミングデータを収集、処理、分析できるフルマネージドサービスです。大量のデータを毎秒数千から数百万のレコードレベルで、複数のソースから継続的に収集し、リアルタイムでの処理を可能にします。

1.1. 公式ドキュメント

Amazon Kinesis Data Streamsを理解する公式ドキュメントは次のとおりです。

Amazon Kinesis Data Streams サービス概要

Amazon Kinesis Data Streams ドキュメント

Amazon Kinesis Data Streams よくある質問

Amazon Kinesis Data Streams の料金

1.2. 学習リソース

【AWS Black Belt Online Seminar】Amazon Kinesis(pdf)

blackbelt

「Amazon Kinesis Data Streams」をグラレコで解説|builders.flash

1.3. ワークショップ

Amazon Kinesis データストリーム ハンズオン

Real Time Streaming with Amazon Kinesis

ストリーミングデータをニアリアルタイムに取得し分析するソリューションを試す|builders.flash

ドキュメントには、以下の入門チュートリアルがあります。

1.4. 導入のメリット

Kinesis Data Streamsを導入する主なメリットは以下の5つです。

1.5. 主なユースケース

2.1. アーキテクチャ概要

Kinesis Data Streamsは以下の主要コンポーネントで構成されています。

kinesis_architecture

基本的なデータフロー:

  1. プロデューサーがデータレコードをストリームに送信
  2. データはシャードに分散して格納
  3. コンシューマーがシャードからデータを読み取り
  4. データは設定された保持期間内で利用可能

2.2. シャード

シャードは、Kinesis Data Streamの基本的なスケーリング単位です。

shard

シャードの特徴:

シャード管理:

2.3. データレコード

datarecord

データレコードは以下の要素で構成されます。

2.4. プロデューサー

producer

データをストリームに送信するアプリケーションまたはサービスです。

プロデューサーの種類は次のようなものがあります。

Pythonを使用した送信方法(put_record)の例:

import boto3

kinesis = boto3.client('kinesis')

response = kinesis.put_record(
    StreamName='my-stream',
    Data=json.dumps(data),
    PartitionKey='partition-key'
)

# response
#{
#    "ShardId": "shardId-000000000000",
#    "SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
#}

2.5. コンシューマー

consumer

ストリームからデータを読み取り、処理するアプリケーションです。

コンシューマーの種類は次のようなものがあります。

Pythonを使用したデータ取得方法(get_records)の例:

import boto3

kinesis = boto3.client('kinesis')

shard_iterator = kinesis.get_shard_iterator(
    StreamName='your-stream-name',
    ShardId=shard_id,
    ShardIteratorType='LATEST'
)

while True:
    records = kinesis.get_records(ShardIterator=shard_iterator, Limit=100)
    for record in records['Records']:
      :
    shard_iterator = records['NextShardIterator']
    # NextShardIteratorがNoneの場合
    if shard_iterator is None:
        break

# response
#{
#    'Records': [
#        {
#            'SequenceNumber': 'string',
#            'ApproximateArrivalTimestamp': datetime(2015, 1, 1),
#            'Data': b'bytes',
#            'PartitionKey': 'string',
#            'EncryptionType': 'NONE'|'KMS'
#        },
#    ],
#    'NextShardIterator': 'string',
#    :
#}

コンシューマーのタイプは次のようなものがあります。

consumer-type

AWS CLI(register-stream-consumer)で拡張ファンアウトコンシューマーを追加する例:

aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/stream-name \    
    --consumer-name SampleDataStreamConsumer

2.6. データストリーム容量モード

データストリームの容量の管理方法と、データストリームの使用に対する課金方法を決定します。 データストリームのオンデマンドモードとプロビジョンドモードのどちらかを選択できます。

また、 AWS アカウントのデータストリームごとに、オンデマンド容量モードとプロビジョンド容量モードを 24 時間で 2 回切り替えることができます。

AWS CLIの使用例:

aws kinesis update-stream-mode \
  --stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/stream-name \
  --stream-mode-details ON_DEMAND

2.7. 料金体系

Provisioned Mode:

On-Demand Mode:

3.1. サーバーサイド暗号化

Kinesis Data Streamsは保存時暗号化をサポートしています。

暗号化オプションは次のとおりです。

aws kinesis create-stream --stream-name Foo \
    --shard-count 3
    --stream-mode-details PROVISIONED

aws kinesis start-stream-encryption \
    --encryption-type KMS \
    --key-id arn:aws:kms:us-west-2:012345678912:key/a3c4a7cd-728b-45dd-b334-4d3eb496e452 \
    --stream-name Foo

3.2. Kinesis Scaling Utility

オープンソースのスケーリングユーティリティです。「GitHub>awslabs>amazon-kinesis-scaling-utils」で公開されています。

基本機能:

3.3. Kinesis Client Library (KCL)

複数インスタンスでの分散処理を簡単にするライブラリです。

KCLの特徴:

KCL for Pythonの実装例:

from amazon_kclpy import kcl

class RecordProcessor(kcl.RecordProcessorBase):
    def process_records(self, process_records_input):
        for record in process_records_input.records:
            # レコード処理ロジック
            data = record.binary_data
            # ビジネスロジック実装
            :

        # チェックポイント作成
        if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
            self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
            self._last_checkpoint_time = time.time()

3.4. 他のAWSサービスとの統合

主な統合パターン:

Lambdaで実装した場合の例:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")

CloudFormationでのLambda統合例:

LambdaFunction:
  Type: AWS::Lambda::Function
  Properties:
    EventSourceMappings:
      - EventSourceArn: !GetAtt KinesisStream.Arn
        StartingPosition: TRIM_HORIZON | LATEST | AT_TIMESTAMP
        BatchSize: 100 # 最大レコード数

4.1. コスト管理

コスト最適化戦略:

4.2. モニタリング

重要なメトリクス:

AWS CLIでのアラート設定例:

# CloudWatchアラーム設定
aws cloudwatch put-metric-alarm \
    --alarm-name "Kinesis-ReadProvisionedThroughputExceeded-Alarm" \
    --alarm-description "Kinesisストリームで読み取りスループット制限を超過した場合のアラーム" \
    --metric-name ReadProvisionedThroughputExceeded \
    --namespace AWS/Kinesis \
    --statistic Sum \
    --period 300 \
    --threshold 1 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 1 \
    --alarm-actions arn:aws:sns:ap-northeast-1:123456789012:kinesis-alerts \
    --dimensions Name=StreamName,Value=your-stream-name \
    --region ap-northeast-1

4.3. セキュリティ

セキュリティベストプラクティス:

IAMポリシー例:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:region:account:stream/my-stream"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "arn:aws:kinesis:region:account:stream/my-stream"
    }
  ]
}

4.4. パフォーマンス最適化

スループット最適化:

Amazon Kinesis Data Streams は、リアルタイムストリーミングデータ処理のための強力なサービスとして以下の価値を提供します。

リアルタイムデータ分析、IoTデータ処理、ログ集約など、ストリーミングデータを扱うシステムを構築する際は、Kinesis Data Streamsを検討しましょう。特に高スループット・低レイテンシが要求されるアプリケーションにおいて威力を発揮します。