前言

在微服務架構中,「解耦(Decoupling)」是至關重要的設計原則。透過非同步通訊,我們可以讓服務之間不再緊密相依,提高系統的可擴展性與容錯能力。

AWS 提供了完整的應用程式整合服務,幫助我們構建事件驅動架構(Event-Driven Architecture)。本篇將深入探討:

  • SQS:可靠的訊息佇列服務
  • SNS:高吞吐量的發布/訂閱服務
  • EventBridge:無伺服器事件匯流排
  • Kinesis:即時資料串流處理
  • 實戰:如何選擇正確的整合模式

應用程式整合服務全景

graph TB
    subgraph "Point-to-Point"
        A[SQS] -->|佇列| B[消費者]
    end
    
    subgraph "Pub/Sub"
        C[SNS] -->|Fan-out| D[SQS 1]
        C -->|Fan-out| E[SQS 2]
        C -->|Fan-out| F[Lambda]
    end
    
    subgraph "Event Bus"
        G[EventBridge] -->|規則路由| H[Target A]
        G -->|規則路由| I[Target B]
    end
    
    subgraph "Streaming"
        J[Kinesis] -->|即時串流| K[Consumer]
    end
服務 模式 特性 典型用途
SQS Queue 緩衝、削峰填谷 非同步任務處理
SNS Pub/Sub 廣播、Fan-out 通知推送、系統解耦
EventBridge Event Bus 內容路由、SaaS 整合 複雜事件路由、排程
Kinesis Stream 有序、重播、大數據 Log 收集、即時分析

SQS:Simple Queue Service

SQS 核心概念

SQS 是 AWS 最古老的服務之一,提供完全託管的訊息佇列。

Standard vs FIFO Queue

特性 Standard Queue FIFO Queue
順序性 盡力而為(Best-Effort) 嚴格保證先進先出
重複訊息 至少一次(At-Least-Once) 恰好一次(Exactly-Once)
吞吐量 幾乎無限 有限(每秒 300-3000 訊息)
成本 較低 較高

SQS 重要參數

  • Visibility Timeout:消費者取出訊息後,訊息對其他消費者不可見的時間。若處理失敗,超時後訊息會重新出現。
  • Message Retention Period:訊息保留時間(預設 4 天,最長 14 天)。
  • Dead Letter Queue (DLQ):處理失敗多次的訊息會被移送到這裡,避免阻塞佇列。

SQS 與 Lambda 整合

SQS 是 Lambda 最常見的觸發來源之一。

sequenceDiagram
    participant Producer
    participant SQS
    participant Lambda Service
    participant Function
    
    Producer->>SQS: Send Message
    loop Polling
        Lambda Service->>SQS: Long Polling
        SQS-->>Lambda Service: Batch of Messages
    end
    Lambda Service->>Function: Invoke with Batch
    
    alt Success
        Function-->>Lambda Service: Success
        Lambda Service->>SQS: Delete Messages
    else Partial Failure
        Function-->>Lambda Service: Report Batch Item Failures
        Lambda Service->>SQS: Delete Successes, Retry Failures
    end

Go 語言處理 SQS Batch Item Failures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error) {
var batchItemFailures []events.SQSBatchItemFailure

for _, record := range event.Records {
if err := processMessage(record); err != nil {
// 記錄失敗的 MessageId,AWS 會只重試這些訊息
batchItemFailures = append(batchItemFailures, events.SQSBatchItemFailure{
ItemIdentifier: record.MessageId,
})
}
}

return events.SQSEventResponse{
BatchItemFailures: batchItemFailures,
}, nil
}

func processMessage(record events.SQSMessage) error {
// 業務邏輯...
return nil
}

func main() {
lambda.Start(handler)
}

SNS:Simple Notification Service

SNS 核心概念

SNS 是高吞吐量的 Pub/Sub 服務,支援多種訂閱協定(HTTP/S, Email, SMS, SQS, Lambda)。

Fan-out 模式

這是 SNS 最強大的應用場景:一個事件觸發多個後續動作。

graph LR
    Order[訂單服務] -->|Publish| Topic[SNS Topic: OrderCreated]
    
    Topic -->|Subscribe| Q1[SQS: 發票服務]
    Topic -->|Subscribe| Q2[SQS: 物流服務]
    Topic -->|Subscribe| Q3[SQS: 通知服務]
    
    Q1 --> Invoice[發票處理]
    Q2 --> Shipping[物流安排]
    Q3 --> Email[發送 Email]

優點

  • 生產者不需要知道消費者的存在
  • 新增消費者不需要修改生產者程式碼
  • 各個消費者可以依照自己的速度處理(透過 SQS 緩衝)

訊息過濾 (Message Filtering)

SNS 可以在 Topic 層級過濾訊息,只將符合條件的訊息發送給特定訂閱者。

1
2
3
4
5
// 訂閱屬性 (Subscription Filter Policy)
{
"store": ["amazon", "ebay"],
"price": [{"numeric": [">", 100]}]
}

EventBridge:無伺服器事件匯流排

EventBridge vs SNS

雖然兩者都支援 Pub/Sub,但 EventBridge 更專注於事件路由第三方整合

特性 SNS EventBridge
主要用途 廣播通知、Fan-out 事件路由、SaaS 整合
目標數量 每個 Topic 可有大量訂閱者 每個 Rule 最多 5 個目標
過濾能力 簡單屬性過濾 複雜內容過濾 (JSON Pattern)
輸入轉換 不支援 支援 (Input Transformer)
SaaS 整合 不支援 原生支援 (Zendesk, Shopify…)
Schema Registry 不支援 支援

EventBridge 規則範例

1
2
3
4
5
6
7
8
9
// 攔截 EC2 狀態改變事件
{
"source": ["aws.ec2"],
"detail-type": ["EC2 Instance State-change Notification"],
"detail": {
"state": ["terminated"],
"instance-id": ["i-1234567890abcdef0"]
}
}

排程任務 (Scheduler)

EventBridge Scheduler 是 CloudWatch Events 的升級版,用於定時觸發任務。

  • 支援 Cron 表達式
  • 支援一次性排程
  • 支援時區設定

Kinesis:即時資料串流

Kinesis Data Streams 核心概念

Kinesis 用於處理大規模的即時資料串流(如 Log、點擊流、IoT 數據)。

graph LR
    subgraph "Kinesis Data Stream"
        P1[Producer] --> S1[Shard 1]
        P2[Producer] --> S2[Shard 2]
        P3[Producer] --> S3[Shard 3]
    end
    
    S1 --> C1[Consumer]
    S2 --> C1
    S3 --> C2[Consumer]

Shard (分片)

  • Kinesis 的擴展單位
  • 每個 Shard 提供:
    • 寫入:1 MB/sec 或 1,000 records/sec
    • 讀取:2 MB/sec
  • 資料保留:預設 24 小時,最長 365 天

Kinesis vs SQS

特性 SQS Kinesis Data Streams
消費模式 競爭消費 (Competing Consumers) 獨佔消費 (每個 Shard 一個 Worker)
順序性 僅 FIFO Queue 保證 Shard 內保證順序
重播能力 無 (取出即刪除) 有 (可重播過去 24h+ 資料)
並發處理 自動擴展 受限於 Shard 數量
適用場景 任務佇列、解耦 Log 收集、即時分析、順序敏感

實戰:選擇正確的整合模式

場景一:非同步任務處理

需求:使用者上傳圖片後,需要產生縮圖。
選擇S3 -> SQS -> Lambda

  • S3 Event Notification 發送事件到 SQS
  • SQS 緩衝請求,避免流量突波壓垮 Lambda
  • Lambda 處理失敗可透過 DLQ 重試

場景二:微服務解耦

需求:訂單建立後,需要通知庫存、發票、物流服務。
選擇SNS -> SQS (Fan-out Pattern)

  • 訂單服務發布訊息到 SNS Topic
  • 各個服務擁有自己的 SQS Queue 訂閱該 Topic
  • 實現完全解耦,新增服務不影響現有流程

場景三:複雜事件路由

需求:根據訂單金額和類別,路由到不同的處理流程。
選擇EventBridge

  • 使用 EventBridge 的 Content Filtering 功能
  • 金額 > 1000 路由到 VIP 處理流程
  • 類別 = “Digital” 路由到數位商品流程

場景四:日誌收集與分析

需求:收集所有伺服器的 Access Log,進行即時異常偵測。
選擇Kinesis Data Streams -> Kinesis Data Analytics / Lambda

  • Agent 將 Log 寫入 Kinesis
  • 保證 Log 順序性
  • 支援重播以進行故障排查

Go 語言實戰:SNS + SQS Fan-out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
"context"
"encoding/json"
"fmt"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)

type OrderEvent struct {
OrderID string `json:"orderId"`
Amount float64 `json:"amount"`
}

// 發布訊息到 SNS
func PublishOrder(ctx context.Context, topicArn string, order OrderEvent) error {
cfg, _ := config.LoadDefaultConfig(ctx)
client := sns.NewFromConfig(cfg)

msg, _ := json.Marshal(order)

_, err := client.Publish(ctx, &sns.PublishInput{
TopicArn: &topicArn,
Message: aws.String(string(msg)),
MessageAttributes: map[string]types.MessageAttributeValue{
"amount": {
DataType: aws.String("Number"),
StringValue: aws.String(fmt.Sprintf("%f", order.Amount)),
},
},
})
return err
}

// 從 SQS 接收訊息
func PollQueue(ctx context.Context, queueUrl string) {
cfg, _ := config.LoadDefaultConfig(ctx)
client := sqs.NewFromConfig(cfg)

for {
resp, _ := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: &queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20, // Long Polling
})

for _, msg := range resp.Messages {
// 處理訊息...
// 處理成功後刪除
client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: &queueUrl,
ReceiptHandle: msg.ReceiptHandle,
})
}
}
}

本章重點回顧

關鍵要點

  1. SQS

    • 用於緩衝與解耦
    • 區分 Standard 與 FIFO
    • 善用 Visibility Timeout 與 DLQ
  2. SNS

    • 用於一對多廣播 (Fan-out)
    • 支援多種訂閱協定
    • 簡單的屬性過濾
  3. EventBridge

    • 用於事件路由與 SaaS 整合
    • 強大的內容過濾與轉換能力
    • 包含 Scheduler 功能
  4. Kinesis

    • 用於即時資料串流
    • 保證順序與重播能力
    • 以 Shard 為擴展單位

下一篇預告

在系列的第六篇文章中,我們將探討 DevOps 與 CI/CD

  • CodePipeline 自動化流程
  • CloudFormation 與 CDK 基礎設施即程式碼 (IaC)
  • 如何構建現代化的部署流水線

系列文章導覽


參考資源