這是近期遇到的一個 case,為了協助客戶能夠用得更省錢又能夠做到更好的 Auto Scaling,所以做了一次 demo 順便把過程記錄下來。在架構中很常見會使用 Message queue 作為 microservices 緩衝的一個元件,但是當 Message queue 的數量往往不容易控制,所以接收 queue 的 consumers 就必須隨著 queue 的數量增加或減少,如果沒有 queue 的時候最好乾脆把 consumer 通通關掉,進而省下一筆費用!而這篇是以 Amazon SQS (Message queue) + Amazon ECS (consumer) 作為範例,所有的建置將採用 AWS CDK 作為部署方法,詳細的請見 shazi7804/cdk-samples。
在這個架構中 API Gateway 扮演 Producer 發送 queue 的角色,而 Amazon ECS 是 consumer polling 持續收 Amazon SQS queue,Amazon CloudWatch 會持續監控 Amazon SQS 幾個 metrics 作為 scaling Amazon ECS task 的依據,通常有幾個 metrics 可以用來作為 scaling 參考
- ApproximateNumberOfMessagesVisible
- ApproximateNumberOfMessagesDelayed
- ApproximateNumberOfMessagesNotVisible
Create Amazon SQS
用 AWS CDK 產生一個 Amazon SQS
const messageQueue = new sqs.Queue(this, 'queue', {
visibilityTimeout: cdk.Duration.seconds(300)
});
Create Amazon ECS & Task definition
定義 ECS task 的 IAM Role 訪問權限,task 必須擁有將 log 寫到 CloudWatch logs 以及到 Amazon SQS 拿 queue 的權限。
const taskRole = new iam.Role(this, 'ecs-task-role', {
roleName: 'AmazonECSTaskRoleForSqsScaling',
assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com')
});
taskRole.attachInlinePolicy(
new iam.Policy(this, "sqs-full-access", {
statements: [
new iam.PolicyStatement({
actions: ["sqs:*"],
effect: iam.Effect.ALLOW,
resources: [ messageQueue.queueArn ],
})
],
})
);
taskRole.attachInlinePolicy(
new iam.Policy(this, "send-cw-logs", {
statements: [
new iam.PolicyStatement({
actions: [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
effect: iam.Effect.ALLOW,
resources: [ "arn:aws:logs:*:*:*" ],
})
],
})
);
Create API Gateway with Amazon SQS
Rest API Gateway 將 queue/
Resource 路徑設定為 Amazon SQS,方便待會可以用 cURL 測試發 queue。
const api = new apigw.RestApi(this, "api-gateway", {
restApiName: 'send-message-queue-with-ecs-scaling',
deployOptions: {
stageName: "run",
tracingEnabled: true,
},
});
const resourceQueue = api.root.addResource("queue");
resourceQueue.addMethod(
"GET",
new apigw.AwsIntegration({
service: "sqs",
path: `${cdk.Aws.ACCOUNT_ID}/${messageQueue.queueName}`,
integrationHttpMethod: "POST",
options: {
credentialsRole,
passthroughBehavior: apigw.PassthroughBehavior.NEVER,
requestParameters: {
"integration.request.header.Content-Type": `'application/x-www-form-urlencoded'`,
},
requestTemplates: {
"application/json": `Action=SendMessage&MessageBody=$util.urlEncode("$method.request.querystring.message")`,
},
integrationResponses: [
{
statusCode: "200",
responseTemplates: {
"application/json": `{"done": true}`,
},
},
],
},
}),
{ methodResponses: [{ statusCode: "200" }] }
);
Create Amazon ECS and task definition with AWS Fargate
Amazon ECS task 作者選擇用 AWS Fargate 作為 hosting,將 log driver 導去 Amazon CloudWatch logs 方便待會 debug 使用。
const cluster = new ecs.Cluster(this, 'EcsCluster', { vpc });
const taskDefinition = new ecs.FargateTaskDefinition(this, "task-definition", {
memoryLimitMiB: 4096,
cpu: 2048,
taskRole
});
const logging = new ecs.AwsLogDriver({
streamPrefix: "task",
logGroup: new logs.LogGroup(this, "LogGroup", {
logGroupName: "/aws/ecs/scalingBySqs",
retention: logs.RetentionDays.ONE_MONTH
})
});
先建立 Amazon ECR repository
$ aws ecr create-repository \
--repository-name python-message-queue-consumer \
--region ${REGION}
再來將 Container image 推送到 Amazon ECR,詳細可以參考 shazi7804/container-samples
- Dockerfile
# Dockerfile
FROM python:3.6
USER root
WORKDIR /app
ADD . /app
RUN pip install --trusted-host pypi.python.org -r requirements.txt
CMD ["python", "app.py"]
- app.py
import boto3
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='EcsScalingBySqsStack-queue...')
while 1:
messages = queue.receive_messages(WaitTimeSeconds=5)
for message in messages:
print("Activating MessageId: {0}".format(message.message_id))
print("Message received: {0}".format(message.body))
message.delete()
print("Finished for MessageId: {0}".format(message.message_id))
QueueName 必須先建立 Amazon SQS 後才能拿到,或者直接定義 Amazon SQS name。
將 Dockerfile build 成 image 推到 Amazon ECR python-message-queue-consumer
$ aws ecr get-login-password --region ${REGION}| docker login \
--username AWS \
--password-stdin \
${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com
$ docker build -t python-message-queue-consumer .
$ docker tag python-message-queue-consumer:latest \
${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/python-message-queue-consumer:latest
$ docker push ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/python-message-queue-consumer:latest
再將 image repository 定義到 Amazon ECS task definition 並且用 service 啟動
const repository = ecr.Repository.fromRepositoryName(this, 'repo', 'python-message-queue-consumer');
const container = taskDefinition.addContainer('python-message-queue-consumer', {
image: ecs.ContainerImage.fromEcrRepository(repository, 'latest'),
memoryLimitMiB: 256,
cpu: 256,
logging
});
const service = new ecs.FargateService(this, "task-service", {
cluster,
taskDefinition: taskDefinition,
desiredCount: 0,
});
const serviceScaling = service.autoScaleTaskCount({
minCapacity: 0,
maxCapacity: 50,
});
在這裡預設 minCapacity
為 0
讓沒有 queue 時不用有 task 值守在那,而 maxCapacity 上限依據情境設定。
Amazon ECS Scaling based on Amazon SQS items
Amazon ECS scaling 支援三種類別
在這個案例作者希望 task 依據 queue 的數量可以有幅度的增長,所以在這個範例建立了兩個 scaling policies:
serviceScaling.scaleOnMetric("scaling-initial-by-queue-metric", {
metric: messageQueue.metricApproximateNumberOfMessagesVisible(),
adjustmentType: asg.AdjustmentType.CHANGE_IN_CAPACITY,
evaluationPeriods: 1,
cooldown: cdk.Duration.seconds(60),
scalingSteps: [
{ upper: 0, change: -1 },
{ lower: 1, change: +1 },
{ lower: 5, change: +3 },
{ lower: 10, change: +5 },
],
});
serviceScaling.scaleOnMetric("scaling-high-by-queue-metric", {
metric: messageQueue.metricApproximateNumberOfMessagesVisible(),
adjustmentType: asg.AdjustmentType.PERCENT_CHANGE_IN_CAPACITY,
cooldown: cdk.Duration.seconds(60),
scalingSteps: [
{ upper: 10, change: -10 },
{ lower: 15, change: +67 },
],
});
第一個 scaling-initial-by-queue-metric
在少量 queue 時用 +1/3/5 的方式慢慢加 task 上去,但是當 consumers 供不應求時改用 percent 的計算方法一次就增加 67% tasks,當有大量 queue 時可以有更快的 scaling 速度。
將寫好的 CDK 代碼 deploy AWS,拿到 API Gateway 的位址 https://xxxxx.execute-api.${REGION}.amazonaws.com
$ cdk deploy
Testing
嘗試用 for 迴圈送 queue 到 API Gateway,如果拿到 `{“done”: true} 就是 queue 成功送達 Amazon SQS 囉
$ for i in {1..1000}; do curl 'https://xxxxx.execute-api.${REGION}.amazonaws.com/run/queue?message=test'; done
{"done": true}
{"done": true}
{"done": true}
...
再來檢查一下 Amazon ECS task 和 Amazon SQS 的變化
當 Amazon SQS 收到來自 API Gateway 的 queue 時,CloudWatch alarm 會呼叫 Amazon ECS 要開始 scaling 了而長出新的 task,後續依據 queue 的減少/增加而變化,這也是符合使用雲端 pay-as-you-go 的好處。