Site icon Mr. 沙先生

How to scaling Amazon ECS based on SQS number of message queue by AWS CDK

這是近期遇到的一個 case,為了協助客戶能夠用得更省錢又能夠做到更好的 Auto Scaling,所以做了一次 demo 順便把過程記錄下來。在架構中很常見會使用 Message queue 作為 microservices 緩衝的一個元件,但是當 Message queue 的數量往往不容易控制,所以接收 queue 的 consumers 就必須隨著 queue 的數量增加或減少,如果沒有 queue 的時候最好乾脆把 consumer 通通關掉,進而省下一筆費用!而這篇是以 Amazon SQS (Message queue) + Amazon ECS (consumer) 作為範例,所有的建置將採用 AWS CDK 作為部署方法,詳細的請見 shazi7804/cdk-samples

在這個架構中 API Gateway 扮演 Producer 發送 queue 的角色,而 Amazon ECS 是 consumer polling 持續收 Amazon SQS queue,Amazon CloudWatch 會持續監控 Amazon SQS 幾個 metrics 作為 scaling Amazon ECS task 的依據,通常有幾個 metrics 可以用來作為 scaling 參考

Create Amazon SQS

用 AWS CDK 產生一個 Amazon SQS

const messageQueue = new sqs.Queue(this, 'queue', {
    visibilityTimeout: cdk.Duration.seconds(300)
});

Create Amazon ECS & Task definition

定義 ECS task 的 IAM Role 訪問權限,task 必須擁有將 log 寫到 CloudWatch logs 以及到 Amazon SQS 拿 queue 的權限。

const taskRole = new iam.Role(this, 'ecs-task-role', {
    roleName: 'AmazonECSTaskRoleForSqsScaling',
    assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com')
});

taskRole.attachInlinePolicy(
    new iam.Policy(this, "sqs-full-access", {
        statements: [
            new iam.PolicyStatement({
                actions: ["sqs:*"],
                effect: iam.Effect.ALLOW,
                resources: [ messageQueue.queueArn ],
            })
        ],
    })
);

taskRole.attachInlinePolicy(
    new iam.Policy(this, "send-cw-logs", {
        statements: [
            new iam.PolicyStatement({
                actions: [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ],
                effect: iam.Effect.ALLOW,
                resources: [ "arn:aws:logs:*:*:*" ],
            })
        ],
    })
);

Create API Gateway with Amazon SQS

Rest API Gateway 將 queue/ Resource 路徑設定為 Amazon SQS,方便待會可以用 cURL 測試發 queue。

const api = new apigw.RestApi(this, "api-gateway", {
    restApiName: 'send-message-queue-with-ecs-scaling',
    deployOptions: {
        stageName: "run",
        tracingEnabled: true,
    },
});

const resourceQueue = api.root.addResource("queue");

resourceQueue.addMethod(
    "GET",
    new apigw.AwsIntegration({
        service: "sqs",
        path: `${cdk.Aws.ACCOUNT_ID}/${messageQueue.queueName}`,
        integrationHttpMethod: "POST",
        options: {
            credentialsRole,
            passthroughBehavior: apigw.PassthroughBehavior.NEVER,
            requestParameters: {
                "integration.request.header.Content-Type": `'application/x-www-form-urlencoded'`,
            },
            requestTemplates: {
                "application/json": `Action=SendMessage&MessageBody=$util.urlEncode("$method.request.querystring.message")`,
            },
            integrationResponses: [
                {
                    statusCode: "200",
                    responseTemplates: {
                    "application/json": `{"done": true}`,
                    },
                },
            ],
        },
    }),
    { methodResponses: [{ statusCode: "200" }] }
);

Create Amazon ECS and task definition with AWS Fargate

Amazon ECS task 作者選擇用 AWS Fargate 作為 hosting,將 log driver 導去 Amazon CloudWatch logs 方便待會 debug 使用。

const cluster = new ecs.Cluster(this, 'EcsCluster', { vpc });

const taskDefinition = new ecs.FargateTaskDefinition(this, "task-definition", {
        memoryLimitMiB: 4096,
        cpu: 2048,
        taskRole
});

const logging = new ecs.AwsLogDriver({
    streamPrefix: "task",
    logGroup: new logs.LogGroup(this, "LogGroup", {
        logGroupName: "/aws/ecs/scalingBySqs",
        retention: logs.RetentionDays.ONE_MONTH
    })
});

先建立 Amazon ECR repository

$ aws ecr create-repository \
    --repository-name python-message-queue-consumer \
    --region ${REGION}

再來將 Container image 推送到 Amazon ECR,詳細可以參考 shazi7804/container-samples

# Dockerfile

FROM python:3.6

USER root

WORKDIR /app

ADD . /app

RUN pip install --trusted-host pypi.python.org -r requirements.txt

CMD ["python", "app.py"]
import boto3

sqs = boto3.resource('sqs')

queue = sqs.get_queue_by_name(QueueName='EcsScalingBySqsStack-queue...')

while 1:
    messages = queue.receive_messages(WaitTimeSeconds=5)

    for message in messages:
        print("Activating MessageId: {0}".format(message.message_id))
        print("Message received: {0}".format(message.body))
        message.delete()
        print("Finished for MessageId: {0}".format(message.message_id))

QueueName 必須先建立 Amazon SQS 後才能拿到,或者直接定義 Amazon SQS name。

將 Dockerfile build 成 image 推到 Amazon ECR python-message-queue-consumer

$ aws ecr get-login-password --region ${REGION}| docker login \
                         --username AWS \
                         --password-stdin \
                         ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com
$ docker build -t python-message-queue-consumer .
$ docker tag python-message-queue-consumer:latest \
             ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/python-message-queue-consumer:latest
$ docker push ${ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/python-message-queue-consumer:latest

再將 image repository 定義到 Amazon ECS task definition 並且用 service 啟動

const repository = ecr.Repository.fromRepositoryName(this, 'repo', 'python-message-queue-consumer');

const container = taskDefinition.addContainer('python-message-queue-consumer', {
    image: ecs.ContainerImage.fromEcrRepository(repository, 'latest'),
    memoryLimitMiB: 256,
    cpu: 256,
    logging
});

const service = new ecs.FargateService(this, "task-service", {
    cluster,
    taskDefinition: taskDefinition,
    desiredCount: 0,
});

const serviceScaling = service.autoScaleTaskCount({
    minCapacity: 0,
    maxCapacity: 50,
});

在這裡預設 minCapacity0 讓沒有 queue 時不用有 task 值守在那,而 maxCapacity 上限依據情境設定。

Amazon ECS Scaling based on Amazon SQS items

Amazon ECS scaling 支援三種類別

在這個案例作者希望 task 依據 queue 的數量可以有幅度的增長,所以在這個範例建立了兩個 scaling policies:

serviceScaling.scaleOnMetric("scaling-initial-by-queue-metric", {
    metric: messageQueue.metricApproximateNumberOfMessagesVisible(),
    adjustmentType: asg.AdjustmentType.CHANGE_IN_CAPACITY,
    evaluationPeriods: 1,
    cooldown: cdk.Duration.seconds(60),
    scalingSteps: [
        { upper: 0, change: -1 },
        { lower: 1, change: +1 },
        { lower: 5, change: +3 },
        { lower: 10, change: +5 },
    ],
});

serviceScaling.scaleOnMetric("scaling-high-by-queue-metric", {
    metric: messageQueue.metricApproximateNumberOfMessagesVisible(),
    adjustmentType: asg.AdjustmentType.PERCENT_CHANGE_IN_CAPACITY,
    cooldown: cdk.Duration.seconds(60),
    scalingSteps: [
        { upper: 10, change: -10 },
        { lower: 15, change: +67 },
    ],
});

第一個 scaling-initial-by-queue-metric 在少量 queue 時用 +1/3/5 的方式慢慢加 task 上去,但是當 consumers 供不應求時改用 percent 的計算方法一次就增加 67% tasks,當有大量 queue 時可以有更快的 scaling 速度。

將寫好的 CDK 代碼 deploy AWS,拿到 API Gateway 的位址 https://xxxxx.execute-api.${REGION}.amazonaws.com

$ cdk deploy

Testing

嘗試用 for 迴圈送 queue 到 API Gateway,如果拿到 `{“done”: true} 就是 queue 成功送達 Amazon SQS 囉

$ for i in {1..1000}; do curl 'https://xxxxx.execute-api.${REGION}.amazonaws.com/run/queue?message=test'; done

{"done": true}
{"done": true}
{"done": true}
...

再來檢查一下 Amazon ECS task 和 Amazon SQS 的變化

當 Amazon SQS 收到來自 API Gateway 的 queue 時,CloudWatch alarm 會呼叫 Amazon ECS 要開始 scaling 了而長出新的 task,後續依據 queue 的減少/增加而變化,這也是符合使用雲端 pay-as-you-go 的好處。

References

Exit mobile version