Amazon Kinesis - 실시간 스트리밍 데이터 처리
Kinesis Data Streams로 대규모 실시간 데이터를 수집하고 처리하기
Amazon Kinesis란?
Amazon Kinesis는 실시간 스트리밍 데이터를 쉽게 수집, 처리 및 분석할 수 있는 완전 관리형 서비스임.
초당 기가바이트의 데이터를 실시간으로 처리할 수 있으며, 비디오, 오디오, 애플리케이션 로그, 웹사이트 클릭스트림, IoT 텔레메트리 데이터 등을 처리함.
왜 Kinesis를 사용해야 하는가?
기존 방식의 문제점
전통적인 배치 처리 방식:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────┐
│ IoT 센서 (1000대) │
│ ↓ │
│ 데이터 저장 (S3/DB) │
│ ↓ │
│ 배치 작업 (1시간마다 실행) │
│ ├─ 모든 데이터 읽기 │
│ ├─ 처리 및 분석 │
│ └─ 결과 저장 │
│ ↓ │
│ 대시보드 업데이트 (1시간 지연) │
│ │
│ 문제점: │
│ - 실시간 대응 불가 │
│ - 높은 지연 시간 (1시간+) │
│ - 리소스 낭비 (피크 시간 대비) │
│ - 확장성 제한 │
└─────────────────────────────────────────┘
지연 시간: 1시간+
이상 상황 감지: 느림
리소스 사용: 비효율적
폴링 기반 실시간 처리:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
┌─────────────────────────────────────────┐
│ IoT 센서 │
│ ↓ │
│ API Gateway + Lambda │
│ ↓ │
│ SQS Queue │
│ ↓ │
│ Lambda (폴링, 1초마다) │
│ ↓ │
│ 처리 및 저장 │
│ │
│ 문제점: │
│ - 순서 보장 어려움 │
│ - 중복 처리 가능성 │
│ - 초당 수천 건 처리 시 비효율 │
│ - 여러 소비자 지원 어려움 │
└─────────────────────────────────────────┘
비용: 높음 (잦은 Lambda 호출)
순서: 보장 안됨
확장성: 제한적
Kinesis의 해결 방법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
┌─────────────────────────────────────────┐
│ IoT 센서 (1000대) │
│ ↓ │
│ Kinesis Data Streams │
│ ├─ Shard 1 (1MB/s, 1000 records/s) │
│ ├─ Shard 2 │
│ └─ Shard 3 │
│ ↓ │
│ Lambda (배치 처리, 100개씩) │
│ ├─ 실시간 분석 │
│ ├─ 이상 탐지 │
│ └─ 알림 발송 │
│ ↓ │
│ 여러 소비자 동시 처리 │
│ ├─ Consumer 1: 실시간 대시보드 │
│ ├─ Consumer 2: DynamoDB 저장 │
│ └─ Consumer 3: S3 아카이빙 │
│ │
│ 장점: │
│ - 밀리초 지연 시간 │
│ - 순서 보장 (파티션 키 기준) │
│ - 무한 확장 (샤드 추가) │
│ - 여러 소비자 독립적 처리 │
│ - 24시간 데이터 보관 (재처리 가능) │
└─────────────────────────────────────────┘
지연 시간: < 1초
이상 상황 감지: 실시간
리소스 사용: 효율적
확장성: 샤드 추가로 무제한
장점:
- 실시간 처리: 밀리초 단위 지연시간
- 순서 보장: 파티션 키 기반 순서 보장
- 확장 가능: 샤드 추가로 처리량 증가
- 여러 소비자: 동일 스트림을 여러 애플리케이션이 독립적으로 소비
- 데이터 보관: 24시간~365일 보관 후 재처리 가능
- 내구성: 3개 AZ에 자동 복제
Kinesis 서비스 비교
Kinesis Data Streams vs Firehose vs Analytics
| 특징 | Data Streams | Data Firehose | Data Analytics |
|---|---|---|---|
| 목적 | 실시간 스트리밍 | ETL + 저장 | 실시간 분석 |
| 지연 시간 | < 1초 | 60초+ | 실시간 |
| 관리 | 샤드 관리 필요 | 완전 관리형 | 완전 관리형 |
| 소비자 | 커스텀 (Lambda 등) | S3, Redshift 등 | SQL 쿼리 |
| 보관 | 24시간~365일 | 없음 (즉시 저장) | 없음 |
| 가격 | $0.015/샤드/시간 | $0.029/GB | $0.11/시간 |
| 사용 사례 | 실시간 처리 | 로그 수집→S3 | 실시간 대시보드 |
선택 기준:
- Data Streams: 실시간 처리, 순서 보장, 여러 소비자 (IoT, 로그 스트리밍)
- Firehose: 간단한 ETL, S3/Redshift 저장 (로그 아카이빙)
- Analytics: 실시간 SQL 분석 (대시보드, 알림)
이 글에서는 Kinesis Data Streams에 집중함.
Kinesis Data Streams 핵심 개념
1. Stream
데이터 레코드의 시퀀스를 보관하는 논리적 단위. 하나 이상의 샤드로 구성됨.
2. Shard
스트림의 처리 단위. 각 샤드는 고유한 용량을 가짐:
- Write: 1MB/sec, 1000 records/sec
- Read: 2MB/sec (shared), 2MB/sec per consumer (enhanced fan-out)
3. Data Record
스트림에 저장되는 데이터 단위:
- Partition Key: 레코드를 샤드에 분배하는 키 (순서 보장 기준)
- Sequence Number: 샤드 내 레코드의 고유 식별자 (자동 생성)
- Data Blob: 실제 데이터 (최대 1MB)
4. Producer
스트림에 데이터를 넣는 애플리케이션:
- AWS SDK (PutRecord, PutRecords)
- Kinesis Producer Library (KPL)
- Kinesis Agent
5. Consumer
스트림에서 데이터를 읽는 애플리케이션:
- Lambda (Event Source Mapping)
- Kinesis Client Library (KCL)
- Kinesis Data Analytics
- Kinesis Data Firehose
실전 프로젝트 사례
1. IoT 센서 데이터 수집 및 처리 (iot-sensor-lab)
아키텍처:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
IoT 센서 (온도/습도)
↓
(PutRecords)
↓
Kinesis Data Streams
├─ 샤드 1 (센서 1-500)
└─ 샤드 2 (센서 501-1000)
↓
(Event Source Mapping, 배치 100개)
↓
Lambda (process-sensor-data)
├─ DynamoDB 저장
├─ CloudWatch 메트릭 발행
└─ 임계값 초과 시 SNS 알림
Serverless Framework 설정:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# serverless.yml
provider:
name: aws
runtime: nodejs20.x
resources:
Resources:
# Kinesis Data Stream
SensorDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: iot-sensor-stream-${sls:stage}
ShardCount: 2 # 2000 records/sec 처리 가능
RetentionPeriodHours: 24
StreamModeDetails:
StreamMode: PROVISIONED # 또는 ON_DEMAND
# DynamoDB Table
SensorDataTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: sensor-data-${sls:stage}
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: deviceId
AttributeType: S
- AttributeName: timestamp
AttributeType: N
KeySchema:
- AttributeName: deviceId
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
functions:
processSensorData:
handler: functions/process-sensor-data.handler
events:
- stream:
type: kinesis
arn: !GetAtt SensorDataStream.Arn
batchSize: 100 # 한 번에 100개 레코드 처리
startingPosition: LATEST # 또는 TRIM_HORIZON
maximumRetryAttempts: 3
bisectBatchOnFunctionError: true # 에러 시 배치 분할
maximumRecordAgeInSeconds: 3600 # 1시간 넘은 레코드 버림
parallelizationFactor: 10 # 동시 처리 증가
destinations:
onFailure:
arn: !GetAtt ProcessingDLQ.Arn
type: sqs
environment:
TABLE_NAME: !Ref SensorDataTable
SNS_TOPIC_ARN: !Ref AlertTopic
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:PutItem
- dynamodb:BatchWriteItem
Resource: !GetAtt SensorDataTable.Arn
- Effect: Allow
Action:
- cloudwatch:PutMetricData
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: !Ref AlertTopic
ProcessingDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: sensor-processing-dlq-${sls:stage}
MessageRetentionPeriod: 1209600 # 14일
Producer: 센서 데이터 전송
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// scripts/send-sensor-data.mjs
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
const kinesisClient = new KinesisClient({ region: 'ap-northeast-2' });
const STREAM_NAME = 'iot-sensor-stream-dev';
// 센서 데이터 생성
function generateSensorData(deviceId) {
return {
deviceId,
sensorType: 'DHT22',
temperature: 20 + Math.random() * 15, // 20-35도
humidity: 40 + Math.random() * 40, // 40-80%
location: `Seoul-Zone-${Math.floor(Math.random() * 5) + 1}`,
timestamp: Date.now(),
batteryLevel: 80 + Math.random() * 20
};
}
// 배치로 전송 (효율적)
async function sendBatch(records) {
const response = await kinesisClient.send(new PutRecordsCommand({
StreamName: STREAM_NAME,
Records: records.map(data => ({
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: data.deviceId // deviceId로 샤드 분배
}))
}));
console.log('Sent:', {
success: records.length - response.FailedRecordCount,
failed: response.FailedRecordCount
});
// 실패한 레코드 재시도
if (response.FailedRecordCount > 0) {
const failedRecords = response.Records
.map((result, idx) => result.ErrorCode ? records[idx] : null)
.filter(Boolean);
console.log('Failed records:', failedRecords);
// 재시도 로직...
}
}
// 1000개 센서에서 동시 전송
async function simulateIoTDevices() {
const devices = Array.from({ length: 1000 }, (_, i) => `device-${i + 1}`);
while (true) {
const batch = [];
// 500개씩 배치 (PutRecords 최대 500개)
for (let i = 0; i < 500; i++) {
const deviceId = devices[Math.floor(Math.random() * devices.length)];
batch.push(generateSensorData(deviceId));
}
await sendBatch(batch);
// 1초 대기
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
simulateIoTDevices();
Consumer: Lambda 처리기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// functions/process-sensor-data.handler
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, BatchWriteCommand } from '@aws-sdk/lib-dynamodb';
import { CloudWatchClient, PutMetricDataCommand } from '@aws-sdk/client-cloudwatch';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient());
const cwClient = new CloudWatchClient();
const snsClient = new SNSClient();
const TABLE_NAME = process.env.TABLE_NAME;
const SNS_TOPIC_ARN = process.env.SNS_TOPIC_ARN;
export const handler = async (event) => {
console.log('Processing Kinesis records:', event.Records.length);
const sensorDataList = [];
const metrics = [];
const alerts = [];
// Kinesis 레코드 파싱
for (const record of event.Records) {
// Base64 디코딩
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
const sensorData = JSON.parse(payload);
console.log('Sensor data:', sensorData);
// DynamoDB 저장용 데이터
sensorDataList.push({
deviceId: sensorData.deviceId,
timestamp: sensorData.timestamp,
temperature: sensorData.temperature,
humidity: sensorData.humidity,
location: sensorData.location,
batteryLevel: sensorData.batteryLevel,
sequenceNumber: record.kinesis.sequenceNumber
});
// CloudWatch 메트릭
metrics.push({
MetricName: 'Temperature',
Value: sensorData.temperature,
Unit: 'None',
Timestamp: new Date(sensorData.timestamp),
Dimensions: [
{ Name: 'DeviceId', Value: sensorData.deviceId },
{ Name: 'Location', Value: sensorData.location }
]
});
metrics.push({
MetricName: 'Humidity',
Value: sensorData.humidity,
Unit: 'Percent',
Timestamp: new Date(sensorData.timestamp),
Dimensions: [
{ Name: 'DeviceId', Value: sensorData.deviceId },
{ Name: 'Location', Value: sensorData.location }
]
});
// 임계값 체크
if (sensorData.temperature > 30 || sensorData.humidity > 80) {
alerts.push({
deviceId: sensorData.deviceId,
location: sensorData.location,
temperature: sensorData.temperature,
humidity: sensorData.humidity,
timestamp: new Date(sensorData.timestamp).toISOString()
});
}
}
// 1. DynamoDB 배치 저장 (25개씩)
for (let i = 0; i < sensorDataList.length; i += 25) {
const batch = sensorDataList.slice(i, i + 25);
await dynamoClient.send(new BatchWriteCommand({
RequestItems: {
[TABLE_NAME]: batch.map(item => ({
PutRequest: { Item: item }
}))
}
}));
}
console.log('Saved to DynamoDB:', sensorDataList.length);
// 2. CloudWatch 메트릭 발행 (20개씩)
for (let i = 0; i < metrics.length; i += 20) {
const batch = metrics.slice(i, i + 20);
await cwClient.send(new PutMetricDataCommand({
Namespace: 'IoTSensor/Metrics',
MetricData: batch
}));
}
console.log('Published metrics:', metrics.length);
// 3. 알림 전송
if (alerts.length > 0) {
await snsClient.send(new PublishCommand({
TopicArn: SNS_TOPIC_ARN,
Subject: `센서 임계값 초과 알림 (${alerts.length}건)`,
Message: JSON.stringify(alerts, null, 2)
}));
console.log('Alerts sent:', alerts.length);
}
return { statusCode: 200, processedRecords: event.Records.length };
};
Kinesis Event 구조:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "device-123",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "eyJkZXZpY2VJZCI6ImRldmljZS0xMjMiLCJ0ZW1wZXJhdHVyZSI6MjUuNX0=", // Base64
"approximateArrivalTimestamp": 1703154000.0
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
"awsRegion": "ap-northeast-2",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-2:123456789012:stream/iot-sensor-stream-dev"
}
]
}
Kinesis 고급 기능
1. Enhanced Fan-Out (EFO)
기본 소비자는 샤드당 2MB/sec를 공유하지만, EFO는 소비자마다 전용 2MB/sec 제공.
사용 사례:
- 여러 소비자가 동일한 스트림을 독립적으로 소비
- 각 소비자가 높은 처리량 필요
- 70ms 더 낮은 지연시간 필요
설정:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
functions:
consumerA:
handler: functions/consumer-a.handler
events:
- stream:
type: kinesis
arn: !GetAtt MyStream.Arn
consumer: true # Enhanced Fan-Out 활성화
consumerName: ConsumerA
consumerB:
handler: functions/consumer-b.handler
events:
- stream:
type: kinesis
arn: !GetAtt MyStream.Arn
consumer: true
consumerName: ConsumerB
2. On-Demand Capacity Mode
샤드 수를 자동으로 관리 (Provisioned 모드 대신).
장점:
- 샤드 관리 불필요
- 자동 확장
- 트래픽 패턴 예측 불필요
단점:
- 비용이 약간 더 비쌈
설정:
1
2
3
4
5
6
SensorDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: my-stream
StreamModeDetails:
StreamMode: ON_DEMAND # 자동 확장
3. Shard Splitting & Merging
Shard Splitting (처리량 증가):
1
2
3
4
5
6
7
8
9
10
import { KinesisClient, SplitShardCommand } from '@aws-sdk/client-kinesis';
const kinesisClient = new KinesisClient();
// 샤드를 2개로 분할
await kinesisClient.send(new SplitShardCommand({
StreamName: 'my-stream',
ShardToSplit: 'shardId-000000000000',
NewStartingHashKey: '170141183460469231731687303715884105728' // 중간 해시 키
}));
Shard Merging (비용 절감):
1
2
3
4
5
6
7
8
import { MergeShardsCommand } from '@aws-sdk/client-kinesis';
// 두 샤드를 하나로 병합
await kinesisClient.send(new MergeShardsCommand({
StreamName: 'my-stream',
ShardToMerge: 'shardId-000000000001',
AdjacentShardToMerge: 'shardId-000000000002'
}));
Kinesis vs 대안 비교
Kinesis vs SQS
| 항목 | Kinesis | SQS |
|---|---|---|
| 순서 보장 | ✓ (파티션 키) | ✓ (FIFO만) |
| 처리량 | MB/s per shard | 무제한 (표준) |
| 지연 시간 | < 1초 | 밀리초 |
| 소비자 | 여러 개 (독립) | 하나 (경쟁) |
| 데이터 보관 | 24시간~365일 | 최대 14일 |
| 가격 | $0.015/샤드/시간 | $0.40/million |
| 사용 사례 | 실시간 스트리밍 | 작업 큐 |
선택 기준:
- Kinesis: 실시간 분석, 순서 중요, 여러 소비자
- SQS: 작업 큐, 순서 덜 중요, 하나의 소비자
Kinesis vs Kafka (MSK)
| 항목 | Kinesis | Kafka (MSK) |
|---|---|---|
| 관리 | 완전 관리형 | 부분 관리형 |
| 처리량 | MB/s per shard | GB/s per broker |
| 지연 시간 | < 1초 | 밀리초 |
| 가격 | $0.015/샤드/시간 | $90+/broker/월 |
| 에코시스템 | AWS 통합 | Kafka 에코시스템 |
| 복잡도 | 낮음 | 높음 |
| 사용 사례 | AWS 중심 | Multi-cloud |
선택 기준:
- Kinesis: AWS 중심, 간단한 관리, 중소 규모
- Kafka: 대규모, 복잡한 스트림 처리, 멀티 클라우드
성능 최적화
1. Partition Key 전략
BAD: 모든 레코드가 하나의 샤드로
1
2
3
4
5
await kinesisClient.send(new PutRecordCommand({
StreamName: STREAM_NAME,
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: 'fixed-key' // 항상 같은 키 → 하나의 샤드만 사용!
}));
GOOD: 균등 분배
1
2
3
4
5
await kinesisClient.send(new PutRecordCommand({
StreamName: STREAM_NAME,
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: data.deviceId // 디바이스별 분배
}));
BEST: Explicit Hash Key로 완전한 제어
1
2
3
4
5
6
7
8
9
10
11
12
import crypto from 'crypto';
const hash = crypto.createHash('md5')
.update(data.deviceId)
.digest('hex');
await kinesisClient.send(new PutRecordCommand({
StreamName: STREAM_NAME,
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: data.deviceId,
ExplicitHashKey: parseInt(hash, 16).toString() // 정확한 샤드 지정
}));
2. 배치 전송 (PutRecords)
BAD: 개별 전송
1
2
3
for (const record of records) {
await kinesisClient.send(new PutRecordCommand({ ... })); // 느림!
}
GOOD: 배치 전송 (최대 500개)
1
2
3
4
5
6
7
await kinesisClient.send(new PutRecordsCommand({
StreamName: STREAM_NAME,
Records: records.map(data => ({
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: data.deviceId
}))
}));
3. Lambda Tuning
1
2
3
4
5
6
7
8
9
10
functions:
processStream:
handler: handler.main
events:
- stream:
batchSize: 100 # 배치 크기 (기본 100)
parallelizationFactor: 10 # 동시 처리 (기본 1, 최대 10)
maximumRetryAttempts: 3 # 재시도 횟수
bisectBatchOnFunctionError: true # 에러 시 배치 분할
maximumRecordAgeInSeconds: 3600 # 오래된 레코드 버림
비용 최적화
실제 프로젝트 비용 분석
IoT 센서 시스템 (월 기준):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Kinesis Data Streams:
- 샤드 2개: 2 × $0.015 × 24시간 × 30일 = $21.60
- PUT 요청: 10억건 × $0.014 / million = $14.00
- Extended retention (7일): 2 × $0.023 × 24 × 7 = $7.73
- 총합: $43.33/월
Lambda 처리:
- 실행 횟수: 1,000만회 (배치 100개씩)
- 실행 시간: 1,000만 × 200ms × $0.0000166667 = $33.33
- 총합: $33.33/월
전체 비용: $76.66/월
SQS로 구현 시:
- SQS 요청: 10억건 × $0.40 = $400
- Lambda 실행: $33.33
- 총합: $433.33/월
Kinesis 절감: $356.67/월 (82% 절감)
비용 절감 팁
1. Retention 기간 최소화:
1
2
3
4
5
SensorDataStream:
Type: AWS::Kinesis::Stream
Properties:
RetentionPeriodHours: 24 # 기본값 (무료)
# 365시간까지 가능 (추가 비용)
2. On-Demand vs Provisioned:
1
2
3
4
5
6
7
8
9
Provisioned (예측 가능한 트래픽):
- 샤드 2개: $21.60/월
On-Demand (변동 트래픽):
- 기본 비용: $0.04/시간 × 24 × 30 = $28.80
- PUT 요청: $0.04/million (Provisioned: $0.014)
- 총합: 약 $68.80/월
→ 일정한 트래픽이면 Provisioned가 저렴!
3. 배치 크기 최적화:
1
2
# Lambda 호출 횟수 감소
batchSize: 100 # 100개씩 처리 → 10배 적은 호출
실전 경험에서 배운 것
1. ProvisionedThroughputExceededException 대응
문제: 샤드 용량 초과 시 에러 발생
해결:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { retry } from './utils';
async function putRecordWithRetry(params) {
return retry(async () => {
try {
return await kinesisClient.send(new PutRecordCommand(params));
} catch (error) {
if (error.name === 'ProvisionedThroughputExceededException') {
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.random() * 100)
);
throw error; // 재시도
}
throw error;
}
}, { maxAttempts: 3 });
}
2. Partition Key는 신중하게
교훈: 초기에 잘못된 파티션 키 선택 시 핫 샤드 발생
1
2
3
4
5
// BAD: 특정 센서에 트래픽 집중
PartitionKey: data.sensorType // 대부분 'DHT22' → 하나의 샤드만 사용
// GOOD: 균등 분배
PartitionKey: data.deviceId // 1000개 디바이스 → 샤드 전체 활용
3. Sequence Number로 중복 제거
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const processedSequenceNumbers = new Set();
export const handler = async (event) => {
for (const record of event.Records) {
const seqNum = record.kinesis.sequenceNumber;
if (processedSequenceNumbers.has(seqNum)) {
console.log('Duplicate record, skipping:', seqNum);
continue;
}
// 처리...
processedSequenceNumbers.add(seqNum);
}
};
4. Lambda Timeout은 충분히
Kinesis Event Source Mapping은 타임아웃 시 전체 배치 재시도.
1
2
3
4
functions:
processStream:
handler: handler.main
timeout: 60 # 충분한 시간 (기본 6초)
마무리
Kinesis Data Streams는 실시간 스트리밍 데이터 처리의 AWS 표준임. 순서 보장, 여러 소비자 지원, 데이터 보관으로 복잡한 실시간 분석을 쉽게 구현할 수 있음.
사용 권장 사항:
- IoT 센서 데이터 실시간 수집
- 로그 스트리밍 (CloudWatch 대신)
- 클릭스트림 분석
- 실시간 대시보드
선택 가이드:
- 실시간 + 순서 + 여러 소비자 → Kinesis
- 간단한 작업 큐 → SQS
- 대규모 + 복잡한 처리 → Kafka (MSK)
- 로그 수집 → S3 → Kinesis Firehose
최근 프로젝트에서 1000개 IoT 센서 데이터를 Kinesis로 실시간 처리하며, SQS 대비 80% 비용 절감과 밀리초 단위 지연시간으로 임계값 초과 시 즉각 대응할 수 있어 매우 만족스러웠음.