How to scaling Amazon ECS based on SQS number of message queue by AWS CDK

2021-11-22 AWS

這是近期遇到的一個 case,為了協助客戶能夠用得更省錢又能夠做到更好的 Auto Scaling,所以做了一次 demo 順便把過程記錄下來。在架構中很常見會使用 Message queue 作為 microservices 緩衝的一個元件,但是當 Message queue 的數量往往不容易控制,所以接收 queue 的 consumers 就必須隨著 queue 的數量增加或減少,如果沒有 queue 的時候最好乾脆把 consumer 通通關掉,進而省下一筆費用!而這篇是以 Amazon SQS (Message queue) + Amazon ECS (consumer) 作為範例,所有的建置將採用 AWS CDK 作為部署方法,詳細的請見 shazi7804/cdk-samples

在這個架構中 API Gateway 扮演 Producer 發送 queue 的角色,而 Amazon ECS 是 consumer polling 持續收 Amazon SQS queue,Amazon CloudWatch 會持續監控 Amazon SQS 幾個 metrics 作為 scaling Amazon ECS task 的依據,通常有幾個 metrics 可以用來作為 scaling 參考

  • ApproximateNumberOfMessagesVisible
  • ApproximateNumberOfMessagesDelayed
  • ApproximateNumberOfMessagesNotVisible

Create Amazon SQS

用 AWS CDK 產生一個 Amazon SQS

const messageQueue = new sqs.Queue(this, 'queue', {
    visibilityTimeout: cdk.Duration.seconds(300)
});

Create Amazon ECS & Task definition

定義 ECS task 的 IAM Role 訪問權限,task 必須擁有將 log 寫到 CloudWatch logs 以及到 Amazon SQS 拿 queue 的權限。

const taskRole = new iam.Role(this, 'ecs-task-role', {
    roleName: 'AmazonECSTaskRoleForSqsScaling',
    assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com')
});

taskRole.attachInlinePolicy(
    new iam.Policy(this, "sqs-full-access", {
        statements: [
            new iam.PolicyStatement({
                actions: ["sqs:*"],
                effect: iam.Effect.ALLOW,
                resources: [ messageQueue.queueArn ],
            })
        ],
    })
);

taskRole.attachInlinePolicy(
    new iam.Policy(this, "send-cw-logs", {
        statements: [
            new iam.PolicyStatement({
                actions: [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ],
                effect: iam.Effect.ALLOW,
                resources: [ "arn:aws:logs:*:*:*" ],
            })
        ],
    })
);

Create API Gateway with Amazon SQS

Rest API Gateway 將 queue/ Resource 路徑設定為 Amazon SQS,方便待會可以用 cURL 測試發 queue。

const api = new apigw.RestApi(this, "api-gateway", {
    restApiName: 'send-message-queue-with-ecs-scaling',
    deployOptions: {
        stageName: "run",
        tracingEnabled: true,
    },
});

const resourceQueue = api.root.addResource("queue");

resourceQueue.addMethod(
    "GET",
    new apigw.AwsIntegration({
        service: "sqs",
        path: `${cdk.Aws.ACCOUNT_ID}/${messageQueue.queueName}`,
        integrationHttpMethod: "POST",
        options: {
            credentialsRole,
            passthroughBehavior: apigw.PassthroughBehavior.NEVER,
            requestParameters: {
                "integration.request.header.Content-Type": `'application/x-www-form-urlencoded'`,
            },
            requestTemplates: {
                "application/json": `Action=SendMessage&MessageBody=$util.urlEncode("$method.request.querystring.message")`,
            },
            integrationResponses: [
                {
                    statusCode: "200",
                    responseTemplates: {
                    "application/json": `{"done": true}`,
                    },
                },
            ],
        },
    }),
    { methodResponses: [{ statusCode: "200" }] }
);

Create Amazon ECS and task definition with AWS Fargate

Amazon ECS task 作者選擇用 AWS Fargate 作為 hosting,將 log driver 導去 Amazon CloudWatch logs 方便待會 debug 使用。

const cluster = new ecs.Cluster(this, 'EcsCluster', { vpc });

const taskDefinition = new ecs.FargateTaskDefinition(this, "task-definition", {
        memoryLimitMiB: 4096,
        cpu: 2048,
        taskRole
});

const logging = new ecs.AwsLogDriver({
    streamPrefix: "task",
    logGroup: new logs.LogGroup(this, "LogGroup", {
        logGroupName: "/aws/ecs/scalingBySqs",
        retention: logs.RetentionDays.ONE_MONTH
    })
});

先建立 Amazon ECR repository

$ aws ecr create-repository \
    --repository-name python-message-queue-consumer \
    --region ${REGION}

再來將 Container image 推送到 Amazon ECR,詳細可以參考 shazi7804/container-samples

  • Dockerfile
# Dockerfile

FROM python:3.6

USER root

WORKDIR /app

ADD . /app

RUN pip install --trusted-host pypi.python.org -r requirements.txt

CMD ["python", "app.py"]
  • app.py
import boto3

sqs = boto3.resource('sqs')

queue = sqs.get_queue_by_name(QueueName='EcsScalingBySqsStack-queue...')

while 1:
    messages = queue.receive_messages(WaitTimeSeconds=5)

    for message in messages:
        print("Activating MessageId: {0}".format(message.message_id))
        print("Message received: {0}".format(message.body))
        message.delete()
        print("Finished for MessageId: {0}".format(message.message_id))

QueueName 必須先建立 Amazon SQS 後才能拿到,或者直接定義 Amazon SQS name。

將 Dockerfile build 成 image 推到 Amazon ECR python-message-queue-consumer

$ aws ecr get-login-password --region ${REGION}| docker login \
                         --username AWS \
                         --password-stdin \
                         ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com
$ docker build -t python-message-queue-consumer .
$ docker tag python-message-queue-consumer:latest \
             ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/python-message-queue-consumer:latest
$ docker push ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/python-message-queue-consumer:latest

再將 image repository 定義到 Amazon ECS task definition 並且用 service 啟動

const repository = ecr.Repository.fromRepositoryName(this, 'repo', 'python-message-queue-consumer');

const container = taskDefinition.addContainer('python-message-queue-consumer', {
    image: ecs.ContainerImage.fromEcrRepository(repository, 'latest'),
    memoryLimitMiB: 256,
    cpu: 256,
    logging
});

const service = new ecs.FargateService(this, "task-service", {
    cluster,
    taskDefinition: taskDefinition,
    desiredCount: 0,
});

const serviceScaling = service.autoScaleTaskCount({
    minCapacity: 0,
    maxCapacity: 50,
});

在這裡預設 minCapacity0 讓沒有 queue 時不用有 task 值守在那,而 maxCapacity 上限依據情境設定。

Amazon ECS Scaling based on Amazon SQS items

Amazon ECS scaling 支援三種類別

在這個案例作者希望 task 依據 queue 的數量可以有幅度的增長,所以在這個範例建立了兩個 scaling policies:

serviceScaling.scaleOnMetric("scaling-initial-by-queue-metric", {
    metric: messageQueue.metricApproximateNumberOfMessagesVisible(),
    adjustmentType: asg.AdjustmentType.CHANGE_IN_CAPACITY,
    evaluationPeriods: 1,
    cooldown: cdk.Duration.seconds(60),
    scalingSteps: [
        { upper: 0, change: -1 },
        { lower: 1, change: +1 },
        { lower: 5, change: +3 },
        { lower: 10, change: +5 },
    ],
});

serviceScaling.scaleOnMetric("scaling-high-by-queue-metric", {
    metric: messageQueue.metricApproximateNumberOfMessagesVisible(),
    adjustmentType: asg.AdjustmentType.PERCENT_CHANGE_IN_CAPACITY,
    cooldown: cdk.Duration.seconds(60),
    scalingSteps: [
        { upper: 10, change: -10 },
        { lower: 15, change: +67 },
    ],
});

第一個 scaling-initial-by-queue-metric 在少量 queue 時用 +1/3/5 的方式慢慢加 task 上去,但是當 consumers 供不應求時改用 percent 的計算方法一次就增加 67% tasks,當有大量 queue 時可以有更快的 scaling 速度。

將寫好的 CDK 代碼 deploy AWS,拿到 API Gateway 的位址 https://xxxxx.execute-api.${REGION}.amazonaws.com

$ cdk deploy

Testing

嘗試用 for 迴圈送 queue 到 API Gateway,如果拿到 `{“done”: true} 就是 queue 成功送達 Amazon SQS 囉

$ for i in {1..1000}; do curl 'https://xxxxx.execute-api.${REGION}.amazonaws.com/run/queue?message=test'; done

{"done": true}
{"done": true}
{"done": true}
...

再來檢查一下 Amazon ECS task 和 Amazon SQS 的變化

當 Amazon SQS 收到來自 API Gateway 的 queue 時,CloudWatch alarm 會呼叫 Amazon ECS 要開始 scaling 了而長出新的 task,後續依據 queue 的減少/增加而變化,這也是符合使用雲端 pay-as-you-go 的好處。

References

給 Mr. 沙先生一點建議

彙整

分類

展開全部 | 收合全部

License

訂閱 Mr. 沙先生 的文章

輸入你的 email 用於訂閱