Post

Amazon Kinesis - 실시간 스트리밍 데이터 처리

Kinesis Data Streams로 대규모 실시간 데이터를 수집하고 처리하기

Amazon Kinesis란?

Amazon Kinesis는 실시간 스트리밍 데이터를 쉽게 수집, 처리 및 분석할 수 있는 완전 관리형 서비스임.

초당 기가바이트의 데이터를 실시간으로 처리할 수 있으며, 비디오, 오디오, 애플리케이션 로그, 웹사이트 클릭스트림, IoT 텔레메트리 데이터 등을 처리함.


왜 Kinesis를 사용해야 하는가?

기존 방식의 문제점

전통적인 배치 처리 방식:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────┐
│  IoT 센서 (1000대)                      │
│     ↓                                   │
│  데이터 저장 (S3/DB)                    │
│     ↓                                   │
│  배치 작업 (1시간마다 실행)              │
│     ├─ 모든 데이터 읽기                 │
│     ├─ 처리 및 분석                     │
│     └─ 결과 저장                        │
│     ↓                                   │
│  대시보드 업데이트 (1시간 지연)          │
│                                         │
│  문제점:                                │
│  - 실시간 대응 불가                     │
│  - 높은 지연 시간 (1시간+)              │
│  - 리소스 낭비 (피크 시간 대비)          │
│  - 확장성 제한                          │
└─────────────────────────────────────────┘

지연 시간: 1시간+
이상 상황 감지: 느림
리소스 사용: 비효율적

폴링 기반 실시간 처리:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
┌─────────────────────────────────────────┐
│  IoT 센서                               │
│     ↓                                   │
│  API Gateway + Lambda                   │
│     ↓                                   │
│  SQS Queue                              │
│     ↓                                   │
│  Lambda (폴링, 1초마다)                  │
│     ↓                                   │
│  처리 및 저장                           │
│                                         │
│  문제점:                                │
│  - 순서 보장 어려움                     │
│  - 중복 처리 가능성                     │
│  - 초당 수천 건 처리 시 비효율           │
│  - 여러 소비자 지원 어려움               │
└─────────────────────────────────────────┘

비용: 높음 (잦은 Lambda 호출)
순서: 보장 안됨
확장성: 제한적

Kinesis의 해결 방법

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
┌─────────────────────────────────────────┐
│  IoT 센서 (1000대)                      │
│     ↓                                   │
│  Kinesis Data Streams                   │
│  ├─ Shard 1 (1MB/s, 1000 records/s)    │
│  ├─ Shard 2                            │
│  └─ Shard 3                            │
│     ↓                                   │
│  Lambda (배치 처리, 100개씩)             │
│  ├─ 실시간 분석                         │
│  ├─ 이상 탐지                           │
│  └─ 알림 발송                           │
│     ↓                                   │
│  여러 소비자 동시 처리                   │
│  ├─ Consumer 1: 실시간 대시보드          │
│  ├─ Consumer 2: DynamoDB 저장           │
│  └─ Consumer 3: S3 아카이빙             │
│                                         │
│  장점:                                  │
│  - 밀리초 지연 시간                     │
│  - 순서 보장 (파티션 키 기준)            │
│  - 무한 확장 (샤드 추가)                │
│  - 여러 소비자 독립적 처리               │
│  - 24시간 데이터 보관 (재처리 가능)      │
└─────────────────────────────────────────┘

지연 시간: < 1초
이상 상황 감지: 실시간
리소스 사용: 효율적
확장성: 샤드 추가로 무제한

장점:

  • 실시간 처리: 밀리초 단위 지연시간
  • 순서 보장: 파티션 키 기반 순서 보장
  • 확장 가능: 샤드 추가로 처리량 증가
  • 여러 소비자: 동일 스트림을 여러 애플리케이션이 독립적으로 소비
  • 데이터 보관: 24시간~365일 보관 후 재처리 가능
  • 내구성: 3개 AZ에 자동 복제

Kinesis 서비스 비교

Kinesis Data Streams vs Firehose vs Analytics

특징Data StreamsData FirehoseData Analytics
목적실시간 스트리밍ETL + 저장실시간 분석
지연 시간< 1초60초+실시간
관리샤드 관리 필요완전 관리형완전 관리형
소비자커스텀 (Lambda 등)S3, Redshift 등SQL 쿼리
보관24시간~365일없음 (즉시 저장)없음
가격$0.015/샤드/시간$0.029/GB$0.11/시간
사용 사례실시간 처리로그 수집→S3실시간 대시보드

선택 기준:

  • Data Streams: 실시간 처리, 순서 보장, 여러 소비자 (IoT, 로그 스트리밍)
  • Firehose: 간단한 ETL, S3/Redshift 저장 (로그 아카이빙)
  • Analytics: 실시간 SQL 분석 (대시보드, 알림)

이 글에서는 Kinesis Data Streams에 집중함.


Kinesis Data Streams 핵심 개념

1. Stream

데이터 레코드의 시퀀스를 보관하는 논리적 단위. 하나 이상의 샤드로 구성됨.

2. Shard

스트림의 처리 단위. 각 샤드는 고유한 용량을 가짐:

  • Write: 1MB/sec, 1000 records/sec
  • Read: 2MB/sec (shared), 2MB/sec per consumer (enhanced fan-out)

3. Data Record

스트림에 저장되는 데이터 단위:

  • Partition Key: 레코드를 샤드에 분배하는 키 (순서 보장 기준)
  • Sequence Number: 샤드 내 레코드의 고유 식별자 (자동 생성)
  • Data Blob: 실제 데이터 (최대 1MB)

4. Producer

스트림에 데이터를 넣는 애플리케이션:

  • AWS SDK (PutRecord, PutRecords)
  • Kinesis Producer Library (KPL)
  • Kinesis Agent

5. Consumer

스트림에서 데이터를 읽는 애플리케이션:

  • Lambda (Event Source Mapping)
  • Kinesis Client Library (KCL)
  • Kinesis Data Analytics
  • Kinesis Data Firehose

실전 프로젝트 사례

1. IoT 센서 데이터 수집 및 처리 (iot-sensor-lab)

아키텍처:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
IoT 센서 (온도/습도)
    ↓
  (PutRecords)
    ↓
Kinesis Data Streams
  ├─ 샤드 1 (센서 1-500)
  └─ 샤드 2 (센서 501-1000)
    ↓
  (Event Source Mapping, 배치 100개)
    ↓
Lambda (process-sensor-data)
  ├─ DynamoDB 저장
  ├─ CloudWatch 메트릭 발행
  └─ 임계값 초과 시 SNS 알림

Serverless Framework 설정:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# serverless.yml
provider:
  name: aws
  runtime: nodejs20.x

resources:
  Resources:
    # Kinesis Data Stream
    SensorDataStream:
      Type: AWS::Kinesis::Stream
      Properties:
        Name: iot-sensor-stream-${sls:stage}
        ShardCount: 2  # 2000 records/sec 처리 가능
        RetentionPeriodHours: 24
        StreamModeDetails:
          StreamMode: PROVISIONED  # 또는 ON_DEMAND

    # DynamoDB Table
    SensorDataTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: sensor-data-${sls:stage}
        BillingMode: PAY_PER_REQUEST
        AttributeDefinitions:
          - AttributeName: deviceId
            AttributeType: S
          - AttributeName: timestamp
            AttributeType: N
        KeySchema:
          - AttributeName: deviceId
            KeyType: HASH
          - AttributeName: timestamp
            KeyType: RANGE
        StreamSpecification:
          StreamViewType: NEW_AND_OLD_IMAGES

functions:
  processSensorData:
    handler: functions/process-sensor-data.handler
    events:
      - stream:
          type: kinesis
          arn: !GetAtt SensorDataStream.Arn
          batchSize: 100  # 한 번에 100개 레코드 처리
          startingPosition: LATEST  # 또는 TRIM_HORIZON
          maximumRetryAttempts: 3
          bisectBatchOnFunctionError: true  # 에러 시 배치 분할
          maximumRecordAgeInSeconds: 3600  # 1시간 넘은 레코드 버림
          parallelizationFactor: 10  # 동시 처리 증가
          destinations:
            onFailure:
              arn: !GetAtt ProcessingDLQ.Arn
              type: sqs
    environment:
      TABLE_NAME: !Ref SensorDataTable
      SNS_TOPIC_ARN: !Ref AlertTopic
    iamRoleStatements:
      - Effect: Allow
        Action:
          - dynamodb:PutItem
          - dynamodb:BatchWriteItem
        Resource: !GetAtt SensorDataTable.Arn
      - Effect: Allow
        Action:
          - cloudwatch:PutMetricData
        Resource: '*'
      - Effect: Allow
        Action:
          - sns:Publish
        Resource: !Ref AlertTopic

    ProcessingDLQ:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: sensor-processing-dlq-${sls:stage}
        MessageRetentionPeriod: 1209600  # 14일

Producer: 센서 데이터 전송

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// scripts/send-sensor-data.mjs
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';

const kinesisClient = new KinesisClient({ region: 'ap-northeast-2' });
const STREAM_NAME = 'iot-sensor-stream-dev';

// 센서 데이터 생성
function generateSensorData(deviceId) {
  return {
    deviceId,
    sensorType: 'DHT22',
    temperature: 20 + Math.random() * 15,  // 20-35도
    humidity: 40 + Math.random() * 40,     // 40-80%
    location: `Seoul-Zone-${Math.floor(Math.random() * 5) + 1}`,
    timestamp: Date.now(),
    batteryLevel: 80 + Math.random() * 20
  };
}

// 배치로 전송 (효율적)
async function sendBatch(records) {
  const response = await kinesisClient.send(new PutRecordsCommand({
    StreamName: STREAM_NAME,
    Records: records.map(data => ({
      Data: Buffer.from(JSON.stringify(data)),
      PartitionKey: data.deviceId  // deviceId로 샤드 분배
    }))
  }));

  console.log('Sent:', {
    success: records.length - response.FailedRecordCount,
    failed: response.FailedRecordCount
  });

  // 실패한 레코드 재시도
  if (response.FailedRecordCount > 0) {
    const failedRecords = response.Records
      .map((result, idx) => result.ErrorCode ? records[idx] : null)
      .filter(Boolean);

    console.log('Failed records:', failedRecords);
    // 재시도 로직...
  }
}

// 1000개 센서에서 동시 전송
async function simulateIoTDevices() {
  const devices = Array.from({ length: 1000 }, (_, i) => `device-${i + 1}`);

  while (true) {
    const batch = [];

    // 500개씩 배치 (PutRecords 최대 500개)
    for (let i = 0; i < 500; i++) {
      const deviceId = devices[Math.floor(Math.random() * devices.length)];
      batch.push(generateSensorData(deviceId));
    }

    await sendBatch(batch);

    // 1초 대기
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
}

simulateIoTDevices();

Consumer: Lambda 처리기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// functions/process-sensor-data.handler
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, BatchWriteCommand } from '@aws-sdk/lib-dynamodb';
import { CloudWatchClient, PutMetricDataCommand } from '@aws-sdk/client-cloudwatch';
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';

const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient());
const cwClient = new CloudWatchClient();
const snsClient = new SNSClient();

const TABLE_NAME = process.env.TABLE_NAME;
const SNS_TOPIC_ARN = process.env.SNS_TOPIC_ARN;

export const handler = async (event) => {
  console.log('Processing Kinesis records:', event.Records.length);

  const sensorDataList = [];
  const metrics = [];
  const alerts = [];

  // Kinesis 레코드 파싱
  for (const record of event.Records) {
    // Base64 디코딩
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    const sensorData = JSON.parse(payload);

    console.log('Sensor data:', sensorData);

    // DynamoDB 저장용 데이터
    sensorDataList.push({
      deviceId: sensorData.deviceId,
      timestamp: sensorData.timestamp,
      temperature: sensorData.temperature,
      humidity: sensorData.humidity,
      location: sensorData.location,
      batteryLevel: sensorData.batteryLevel,
      sequenceNumber: record.kinesis.sequenceNumber
    });

    // CloudWatch 메트릭
    metrics.push({
      MetricName: 'Temperature',
      Value: sensorData.temperature,
      Unit: 'None',
      Timestamp: new Date(sensorData.timestamp),
      Dimensions: [
        { Name: 'DeviceId', Value: sensorData.deviceId },
        { Name: 'Location', Value: sensorData.location }
      ]
    });

    metrics.push({
      MetricName: 'Humidity',
      Value: sensorData.humidity,
      Unit: 'Percent',
      Timestamp: new Date(sensorData.timestamp),
      Dimensions: [
        { Name: 'DeviceId', Value: sensorData.deviceId },
        { Name: 'Location', Value: sensorData.location }
      ]
    });

    // 임계값 체크
    if (sensorData.temperature > 30 || sensorData.humidity > 80) {
      alerts.push({
        deviceId: sensorData.deviceId,
        location: sensorData.location,
        temperature: sensorData.temperature,
        humidity: sensorData.humidity,
        timestamp: new Date(sensorData.timestamp).toISOString()
      });
    }
  }

  // 1. DynamoDB 배치 저장 (25개씩)
  for (let i = 0; i < sensorDataList.length; i += 25) {
    const batch = sensorDataList.slice(i, i + 25);

    await dynamoClient.send(new BatchWriteCommand({
      RequestItems: {
        [TABLE_NAME]: batch.map(item => ({
          PutRequest: { Item: item }
        }))
      }
    }));
  }

  console.log('Saved to DynamoDB:', sensorDataList.length);

  // 2. CloudWatch 메트릭 발행 (20개씩)
  for (let i = 0; i < metrics.length; i += 20) {
    const batch = metrics.slice(i, i + 20);

    await cwClient.send(new PutMetricDataCommand({
      Namespace: 'IoTSensor/Metrics',
      MetricData: batch
    }));
  }

  console.log('Published metrics:', metrics.length);

  // 3. 알림 전송
  if (alerts.length > 0) {
    await snsClient.send(new PublishCommand({
      TopicArn: SNS_TOPIC_ARN,
      Subject: `센서 임계값 초과 알림 (${alerts.length}건)`,
      Message: JSON.stringify(alerts, null, 2)
    }));

    console.log('Alerts sent:', alerts.length);
  }

  return { statusCode: 200, processedRecords: event.Records.length };
};

Kinesis Event 구조:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
  "Records": [
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "device-123",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJkZXZpY2VJZCI6ImRldmljZS0xMjMiLCJ0ZW1wZXJhdHVyZSI6MjUuNX0=",  // Base64
        "approximateArrivalTimestamp": 1703154000.0
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000000:49590338271490256608559692538361571095921575989136588898",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
      "awsRegion": "ap-northeast-2",
      "eventSourceARN": "arn:aws:kinesis:ap-northeast-2:123456789012:stream/iot-sensor-stream-dev"
    }
  ]
}

Kinesis 고급 기능

1. Enhanced Fan-Out (EFO)

기본 소비자는 샤드당 2MB/sec를 공유하지만, EFO는 소비자마다 전용 2MB/sec 제공.

사용 사례:

  • 여러 소비자가 동일한 스트림을 독립적으로 소비
  • 각 소비자가 높은 처리량 필요
  • 70ms 더 낮은 지연시간 필요

설정:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
functions:
  consumerA:
    handler: functions/consumer-a.handler
    events:
      - stream:
          type: kinesis
          arn: !GetAtt MyStream.Arn
          consumer: true  # Enhanced Fan-Out 활성화
          consumerName: ConsumerA

  consumerB:
    handler: functions/consumer-b.handler
    events:
      - stream:
          type: kinesis
          arn: !GetAtt MyStream.Arn
          consumer: true
          consumerName: ConsumerB

2. On-Demand Capacity Mode

샤드 수를 자동으로 관리 (Provisioned 모드 대신).

장점:

  • 샤드 관리 불필요
  • 자동 확장
  • 트래픽 패턴 예측 불필요

단점:

  • 비용이 약간 더 비쌈

설정:

1
2
3
4
5
6
SensorDataStream:
  Type: AWS::Kinesis::Stream
  Properties:
    Name: my-stream
    StreamModeDetails:
      StreamMode: ON_DEMAND  # 자동 확장

3. Shard Splitting & Merging

Shard Splitting (처리량 증가):

1
2
3
4
5
6
7
8
9
10
import { KinesisClient, SplitShardCommand } from '@aws-sdk/client-kinesis';

const kinesisClient = new KinesisClient();

// 샤드를 2개로 분할
await kinesisClient.send(new SplitShardCommand({
  StreamName: 'my-stream',
  ShardToSplit: 'shardId-000000000000',
  NewStartingHashKey: '170141183460469231731687303715884105728'  // 중간 해시 키
}));

Shard Merging (비용 절감):

1
2
3
4
5
6
7
8
import { MergeShardsCommand } from '@aws-sdk/client-kinesis';

// 두 샤드를 하나로 병합
await kinesisClient.send(new MergeShardsCommand({
  StreamName: 'my-stream',
  ShardToMerge: 'shardId-000000000001',
  AdjacentShardToMerge: 'shardId-000000000002'
}));

Kinesis vs 대안 비교

Kinesis vs SQS

항목KinesisSQS
순서 보장✓ (파티션 키)✓ (FIFO만)
처리량MB/s per shard무제한 (표준)
지연 시간< 1초밀리초
소비자여러 개 (독립)하나 (경쟁)
데이터 보관24시간~365일최대 14일
가격$0.015/샤드/시간$0.40/million
사용 사례실시간 스트리밍작업 큐

선택 기준:

  • Kinesis: 실시간 분석, 순서 중요, 여러 소비자
  • SQS: 작업 큐, 순서 덜 중요, 하나의 소비자

Kinesis vs Kafka (MSK)

항목KinesisKafka (MSK)
관리완전 관리형부분 관리형
처리량MB/s per shardGB/s per broker
지연 시간< 1초밀리초
가격$0.015/샤드/시간$90+/broker/월
에코시스템AWS 통합Kafka 에코시스템
복잡도낮음높음
사용 사례AWS 중심Multi-cloud

선택 기준:

  • Kinesis: AWS 중심, 간단한 관리, 중소 규모
  • Kafka: 대규모, 복잡한 스트림 처리, 멀티 클라우드

성능 최적화

1. Partition Key 전략

BAD: 모든 레코드가 하나의 샤드로

1
2
3
4
5
await kinesisClient.send(new PutRecordCommand({
  StreamName: STREAM_NAME,
  Data: Buffer.from(JSON.stringify(data)),
  PartitionKey: 'fixed-key'  // 항상 같은 키 → 하나의 샤드만 사용!
}));

GOOD: 균등 분배

1
2
3
4
5
await kinesisClient.send(new PutRecordCommand({
  StreamName: STREAM_NAME,
  Data: Buffer.from(JSON.stringify(data)),
  PartitionKey: data.deviceId  // 디바이스별 분배
}));

BEST: Explicit Hash Key로 완전한 제어

1
2
3
4
5
6
7
8
9
10
11
12
import crypto from 'crypto';

const hash = crypto.createHash('md5')
  .update(data.deviceId)
  .digest('hex');

await kinesisClient.send(new PutRecordCommand({
  StreamName: STREAM_NAME,
  Data: Buffer.from(JSON.stringify(data)),
  PartitionKey: data.deviceId,
  ExplicitHashKey: parseInt(hash, 16).toString()  // 정확한 샤드 지정
}));

2. 배치 전송 (PutRecords)

BAD: 개별 전송

1
2
3
for (const record of records) {
  await kinesisClient.send(new PutRecordCommand({ ... }));  // 느림!
}

GOOD: 배치 전송 (최대 500개)

1
2
3
4
5
6
7
await kinesisClient.send(new PutRecordsCommand({
  StreamName: STREAM_NAME,
  Records: records.map(data => ({
    Data: Buffer.from(JSON.stringify(data)),
    PartitionKey: data.deviceId
  }))
}));

3. Lambda Tuning

1
2
3
4
5
6
7
8
9
10
functions:
  processStream:
    handler: handler.main
    events:
      - stream:
          batchSize: 100                    # 배치 크기 (기본 100)
          parallelizationFactor: 10          # 동시 처리 (기본 1, 최대 10)
          maximumRetryAttempts: 3            # 재시도 횟수
          bisectBatchOnFunctionError: true   # 에러 시 배치 분할
          maximumRecordAgeInSeconds: 3600    # 오래된 레코드 버림

비용 최적화

실제 프로젝트 비용 분석

IoT 센서 시스템 (월 기준):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Kinesis Data Streams:
- 샤드 2개: 2 × $0.015 × 24시간 × 30일 = $21.60
- PUT 요청: 10억건 × $0.014 / million = $14.00
- Extended retention (7일): 2 × $0.023 × 24 × 7 = $7.73
- 총합: $43.33/월

Lambda 처리:
- 실행 횟수: 1,000만회 (배치 100개씩)
- 실행 시간: 1,000만 × 200ms × $0.0000166667 = $33.33
- 총합: $33.33/월

전체 비용: $76.66/월

SQS로 구현 시:
- SQS 요청: 10억건 × $0.40 = $400
- Lambda 실행: $33.33
- 총합: $433.33/월

Kinesis 절감: $356.67/월 (82% 절감)

비용 절감 팁

1. Retention 기간 최소화:

1
2
3
4
5
SensorDataStream:
  Type: AWS::Kinesis::Stream
  Properties:
    RetentionPeriodHours: 24  # 기본값 (무료)
    # 365시간까지 가능 (추가 비용)

2. On-Demand vs Provisioned:

1
2
3
4
5
6
7
8
9
Provisioned (예측 가능한 트래픽):
- 샤드 2개: $21.60/월

On-Demand (변동 트래픽):
- 기본 비용: $0.04/시간 × 24 × 30 = $28.80
- PUT 요청: $0.04/million (Provisioned: $0.014)
- 총합: 약 $68.80/월

→ 일정한 트래픽이면 Provisioned가 저렴!

3. 배치 크기 최적화:

1
2
# Lambda 호출 횟수 감소
batchSize: 100  # 100개씩 처리 → 10배 적은 호출

실전 경험에서 배운 것

1. ProvisionedThroughputExceededException 대응

문제: 샤드 용량 초과 시 에러 발생

해결:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { retry } from './utils';

async function putRecordWithRetry(params) {
  return retry(async () => {
    try {
      return await kinesisClient.send(new PutRecordCommand(params));
    } catch (error) {
      if (error.name === 'ProvisionedThroughputExceededException') {
        // Exponential backoff
        await new Promise(resolve =>
          setTimeout(resolve, Math.random() * 100)
        );
        throw error;  // 재시도
      }
      throw error;
    }
  }, { maxAttempts: 3 });
}

2. Partition Key는 신중하게

교훈: 초기에 잘못된 파티션 키 선택 시 핫 샤드 발생

1
2
3
4
5
// BAD: 특정 센서에 트래픽 집중
PartitionKey: data.sensorType  // 대부분 'DHT22' → 하나의 샤드만 사용

// GOOD: 균등 분배
PartitionKey: data.deviceId  // 1000개 디바이스 → 샤드 전체 활용

3. Sequence Number로 중복 제거

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const processedSequenceNumbers = new Set();

export const handler = async (event) => {
  for (const record of event.Records) {
    const seqNum = record.kinesis.sequenceNumber;

    if (processedSequenceNumbers.has(seqNum)) {
      console.log('Duplicate record, skipping:', seqNum);
      continue;
    }

    // 처리...
    processedSequenceNumbers.add(seqNum);
  }
};

4. Lambda Timeout은 충분히

Kinesis Event Source Mapping은 타임아웃 시 전체 배치 재시도.

1
2
3
4
functions:
  processStream:
    handler: handler.main
    timeout: 60  # 충분한 시간 (기본 6초)

마무리

Kinesis Data Streams는 실시간 스트리밍 데이터 처리의 AWS 표준임. 순서 보장, 여러 소비자 지원, 데이터 보관으로 복잡한 실시간 분석을 쉽게 구현할 수 있음.

사용 권장 사항:

  • IoT 센서 데이터 실시간 수집
  • 로그 스트리밍 (CloudWatch 대신)
  • 클릭스트림 분석
  • 실시간 대시보드

선택 가이드:

  • 실시간 + 순서 + 여러 소비자 → Kinesis
  • 간단한 작업 큐 → SQS
  • 대규모 + 복잡한 처리 → Kafka (MSK)
  • 로그 수집 → S3 → Kinesis Firehose

최근 프로젝트에서 1000개 IoT 센서 데이터를 Kinesis로 실시간 처리하며, SQS 대비 80% 비용 절감과 밀리초 단위 지연시간으로 임계값 초과 시 즉각 대응할 수 있어 매우 만족스러웠음.

This post is licensed under CC BY 4.0 by the author.