本ページは、AWS に関する個人の勉強および勉強会で使用することを目的に、AWS ドキュメントなどを参照し作成しておりますが、記載の誤り等が含まれる場合がございます。
最新の情報については、AWS 公式ドキュメントをご参照ください。
Amazon Kinesis Data Streams は、リアルタイムでストリーミングデータを収集、処理、分析できるフルマネージドサービスです。大量のデータを毎秒数千から数百万のレコードレベルで、複数のソースから継続的に収集し、リアルタイムでの処理を可能にします。
Amazon Kinesis Data Streamsを理解する公式ドキュメントは次のとおりです。
Amazon Kinesis Data Streams サービス概要
Amazon Kinesis Data Streams ドキュメント
Amazon Kinesis Data Streams よくある質問
Amazon Kinesis Data Streams の料金
【AWS Black Belt Online Seminar】Amazon Kinesis(pdf)
「Amazon Kinesis Data Streams」をグラレコで解説|builders.flash
Real Time Streaming with Amazon Kinesis
ストリーミングデータをニアリアルタイムに取得し分析するソリューションを試す|builders.flash
ドキュメントには、以下の入門チュートリアルがあります。
Kinesis Data Streamsを導入する主なメリットは以下の5つです。
Kinesis Data Streamsは以下の主要コンポーネントで構成されています。
基本的なデータフロー:
シャードは、Kinesis Data Streamの基本的なスケーリング単位です。
シャードの特徴:
シャード管理:
データレコードは以下の要素で構成されます。
データをストリームに送信するアプリケーションまたはサービスです。
プロデューサーの種類は次のようなものがあります。
Pythonを使用した送信方法(put_record)の例:
import boto3
kinesis = boto3.client('kinesis')
response = kinesis.put_record(
StreamName='my-stream',
Data=json.dumps(data),
PartitionKey='partition-key'
)
# response
#{
# "ShardId": "shardId-000000000000",
# "SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
#}
ストリームからデータを読み取り、処理するアプリケーションです。
コンシューマーの種類は次のようなものがあります。
Pythonを使用したデータ取得方法(get_records)の例:
import boto3
kinesis = boto3.client('kinesis')
shard_iterator = kinesis.get_shard_iterator(
StreamName='your-stream-name',
ShardId=shard_id,
ShardIteratorType='LATEST'
)
while True:
records = kinesis.get_records(ShardIterator=shard_iterator, Limit=100)
for record in records['Records']:
:
shard_iterator = records['NextShardIterator']
# NextShardIteratorがNoneの場合
if shard_iterator is None:
break
# response
#{
# 'Records': [
# {
# 'SequenceNumber': 'string',
# 'ApproximateArrivalTimestamp': datetime(2015, 1, 1),
# 'Data': b'bytes',
# 'PartitionKey': 'string',
# 'EncryptionType': 'NONE'|'KMS'
# },
# ],
# 'NextShardIterator': 'string',
# :
#}
コンシューマーのタイプは次のようなものがあります。
AWS CLI(register-stream-consumer)で拡張ファンアウトコンシューマーを追加する例:
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/stream-name \
--consumer-name SampleDataStreamConsumer
データストリームの容量の管理方法と、データストリームの使用に対する課金方法を決定します。 データストリームのオンデマンドモードとプロビジョンドモードのどちらかを選択できます。
また、 AWS アカウントのデータストリームごとに、オンデマンド容量モードとプロビジョンド容量モードを 24 時間で 2 回切り替えることができます。
AWS CLIの使用例:
aws kinesis update-stream-mode \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/stream-name \
--stream-mode-details ON_DEMAND
Provisioned Mode:
On-Demand Mode:
Kinesis Data Streamsは保存時暗号化をサポートしています。
暗号化オプションは次のとおりです。
aws kinesis create-stream --stream-name Foo \
--shard-count 3
--stream-mode-details PROVISIONED
aws kinesis start-stream-encryption \
--encryption-type KMS \
--key-id arn:aws:kms:us-west-2:012345678912:key/a3c4a7cd-728b-45dd-b334-4d3eb496e452 \
--stream-name Foo
オープンソースのスケーリングユーティリティです。「GitHub>awslabs>amazon-kinesis-scaling-utils」で公開されています。
基本機能:
複数インスタンスでの分散処理を簡単にするライブラリです。
KCLの特徴:
KCL for Pythonの実装例:
from amazon_kclpy import kcl
class RecordProcessor(kcl.RecordProcessorBase):
def process_records(self, process_records_input):
for record in process_records_input.records:
# レコード処理ロジック
data = record.binary_data
# ビジネスロジック実装
:
# チェックポイント作成
if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
self._last_checkpoint_time = time.time()
主な統合パターン:
Lambdaで実装した場合の例:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
CloudFormationでのLambda統合例:
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
EventSourceMappings:
- EventSourceArn: !GetAtt KinesisStream.Arn
StartingPosition: TRIM_HORIZON | LATEST | AT_TIMESTAMP
BatchSize: 100 # 最大レコード数
コスト最適化戦略:
重要なメトリクス:
GetRecords.IteratorAge
は使用されなくなりましたAWS CLIでのアラート設定例:
# CloudWatchアラーム設定
aws cloudwatch put-metric-alarm \
--alarm-name "Kinesis-ReadProvisionedThroughputExceeded-Alarm" \
--alarm-description "Kinesisストリームで読み取りスループット制限を超過した場合のアラーム" \
--metric-name ReadProvisionedThroughputExceeded \
--namespace AWS/Kinesis \
--statistic Sum \
--period 300 \
--threshold 1 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:ap-northeast-1:123456789012:kinesis-alerts \
--dimensions Name=StreamName,Value=your-stream-name \
--region ap-northeast-1
セキュリティベストプラクティス:
IAMポリシー例:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:region:account:stream/my-stream"
},
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:region:account:stream/my-stream"
}
]
}
スループット最適化:
Amazon Kinesis Data Streams は、リアルタイムストリーミングデータ処理のための強力なサービスとして以下の価値を提供します。
リアルタイムデータ分析、IoTデータ処理、ログ集約など、ストリーミングデータを扱うシステムを構築する際は、Kinesis Data Streamsを検討しましょう。特に高スループット・低レイテンシが要求されるアプリケーションにおいて威力を発揮します。