How to create an aws Fargate service that spins tasks up and down based on SQS queue messages and cpu usage - using aws-cdk β˜οΈπŸ‘€

Until recently I had an ECS service that would listen constantly to an SQS queue waiting until there were any messages and then would process the message. This was a waste of resources and not cost-effective at all. So, I looked into alternative ways where I could simply spin up tasks only when messages were pushed to the SQS queue. This would mean my service would not lay idle all day and all night waiting.
Enter aws-cdk. Please bear in mind this is not a beginners tutorial but one that caused me a lot of headaches and iterations to get right. Hopefully, this will help at least one person/team and save them from weeks of pain.
Prerequisites:
  • Experience with aws-cdk.
  • Experience with python.
  • Experience with aws-sdk.
  • By the end of the tutorial, you will have a Fargate service that spins up tasks when messages are pushed to an SQS queue. I will also show an alternative way to get the service to spin down these tasks when there are no messages on the queue due to some issues around current library implementations.
    Quickstart
    Install the kit with npm install -g aws-cdk
    To check it has been installed cdk --version
    To initiate a project:
    mkdir fargate-cdk
    cd fargate-cdk
    cdk init sample-app --language=python
    Imports
    To begin with, import every service under the sun that aws-cdk provides.
    from aws_cdk import (
        core as cdk,
        aws_ec2 as ec2,
        aws_ecs as ecs,
        aws_ecs_patterns as ecs_patterns,
        aws_sqs as sqs,
        aws_ecr as ecr,
        aws_iam as iam,
        aws_ssm as ssm,
        aws_kms as kms,
        aws_autoscaling as autoscaling,
        aws_applicationautoscaling as app_autoscaling,
        aws_cloudwatch as cw,
    )
    from aws_cdk.aws_cloudwatch_actions import ApplicationScalingAction
    from aws_cdk.core import Duration
    Initialise and fetch variables
    Once you have used the aws-cdk command to initialise your project, navigate to sample-app.py file and you should see a class created by aws-cdk that inherits from cdk.Stack. We shall be placing all our code under the __init__ method for this tutorial.
    Throughout the code, you will need some basic variables that will be used throughout so let's define those first.
    class SampleApp(cdk.Stack):
        def __init__(
            self,
            scope: cdk.Construct,
            construct_id: str,
            **kwargs,
        ) -> None:
            super().__init__(scope, construct_id, **kwargs)
    
            image_tag = "" # tag of the repo within ecr
    
            cluster_name = "" # cluster group name of containers
    
            security_group = "" # virtual firewall for your EC2 instances
    
            kms_key = "" # needed for decrypting
    
            queue_name = "" # name of sqs queue that will be created
    
            vpc_id = "" # id of the VPC in which to provision the cluster.
    ...
    Using variables to fetch objects
    We will now use the defined variables to fetch our vpc, cluster, kms key and to create our sqs queue.
    ...
            # fetches vpc using the id
            vpc = ec2.Vpc.from_lookup(self, "VPC", vpc_id=vpc_id)
    
            # fetches kms_key used for decryption using the arn of the key
            kms_key = kms.Key.from_key_arn(
                self, "EncryptionKey", f"arn:aws:kms:us-west-1:123456789:key/{kms_key}" # arn of kms key
            )
    
            # fetches cluster using defined objects and variables above
            cluster = ecs.Cluster.from_cluster_attributes(
                self,
                "Cluster",
                vpc=vpc,
                cluster_name=cluster_name,
                security_groups=[
                    ec2.SecurityGroup.from_security_group_id(
                        self, "SecurityGroup", security_group_id=security_group
                    )
                ],
            )
    
            # creates a fifo queue
            queue = sqs.Queue(
                self,
                "SampleQueue",
                content_based_deduplication=True,
                fifo=True,
                queue_name=queue_name, # must contain .fifo at end of name
                receive_message_wait_time=Duration.seconds(20),
                retention_period=Duration.seconds(345600),
                visibility_timeout=Duration.seconds(43200),
            )
    ...
    Fetching secrets from Systems Manager Parameter Store and defining environment variables
    THIS is a very key step.
    We must define any parameters that are stored within the systems manager parameter store and that will be used within your application.
    ...
    
            database_name = ssm.StringParameter.from_secure_string_parameter_attributes(
                self,
                "DATABASE_NAME",
                version=1,
                parameter_name="", # name of parameter within ssm
                simple_name=True, # if using name of parameter and not arn or any other identifier
                encryption_key=kms_key, # needed if encryption is turned on within parameter store. Normally needed when using simple_name
            )
    ...
    Then create a dictionary of these secrets. Any variables that your app fetches from the local environment must be defined too.
    ...
            secrets = {
                "DATABASE_NAME": ecs.Secret.from_ssm_parameter(database_name),
            }
    
            environments = {
                "ENV": environment,
            }
    ...
    Creating the Queue Processing Fargate Service
    This is the most important part. It creates the Fargate service and uses everything we have defined up until now.
    This service will listen to the queue created and spin up tasks when there are messages on the queue.
    ...
            fargate_service = ecs_patterns.QueueProcessingFargateService(
                self,
                "SampleAppService",
                cpu=4096, # set the amount of cpu you wish your tasks to have
                memory_limit_mib=8192, # set the amount of memory you with your task to have
                image=ecs.ContainerImage.from_ecr_repository(
                    repository=ecr.Repository.from_repository_name(
                        self,
                        "SampleAppRepository",
                        repository_name="",
                    ), # fetch repo name from ecr
                    tag=image_tag,
                ), # fetch image from a certain ecr repo
                min_scaling_capacity=0, # min number of tasks
                max_scaling_capacity=1, # the max number of tasks a service can spin up
                cluster=cluster, # cluster for the service
                queue=queue, # queue the service shall be listening to.
                scaling_steps=[
                    {"upper": 0, "change": 0},
                    {"lower": 1, "change": +1},
                ], # this defines how the service shall autoscale
                secrets=secrets, # secret parameters from ssm
                environment=environments, # any additional env variables needed
                visibility_timeout=cdk.Duration.hours(12) # how long the task will take the item off the queue for
            )
    ...
    DO NOT use the variable desired_task_count as it is deprecated and will not have an effect on the service at all.
    Key points:
  • Make sure you set a visbility_timeout or when you try to delete the message from the queue after 30 seconds it will fail due to the message id timing out. The default timeout is 30 seconds.
  • It is important to set the upper scaling step to {"upper": 0, "change": 0}. This is because when the task takes the last message off the queue we do not want any change to occur to the service. If we set the change to be -1 the service will try to tear down the task whilst it is processing the last message. So it is key that when there are 0 messages on the queue we do not kill our task. We will solve this issue with CPU alarms later. See this link for an ongoing Github issue.
  • Adding policies to the service
    Now we have set up our service we need to add policies to it so our task application can take action on other aws services.
    ...
            kms_policy = iam.PolicyStatement(
                actions=[
                    "kms:Decrypt",
                ],
                effect=iam.Effect.ALLOW,
                resources=[
                    "", # the arn of the resource
                ])
    
    fargate_service.task_definition.add_to_task_role_policy(kms_policy)
    ...
     Deployment - Stage 1
    Right! Now we have a Fargate service that scales when messages are on the SQS queue. This is stage 1 of deployment and currently, our service can only spin up, and not tear down tasks.
    To allow our tasks to tear down, we need to deploy the code we have produced above. I'll explain why in the next section.
    To deploy: cdk deploy
    Scaling down task
    Now we have deployed the first step, how do we go about getting our service to tear down our task when there are no messages?
    Early I mentioned we could not tear down based on messages on the queue due to the fact it would kill a task mid processing. So to scale down we will focus on the CPU usage of the task. If our tasks CPU usage is below a threshold for a length of time we will scale our tasks down to 0. This way we get around our tasks being killed whilst processing as CPU usage will be high when the task is in use.
    So now add below the Fargate service and before the policies:
    ...
            # initialise the fargate service CPU usage metric to average over 3 minutes
            fargate_service_cpu_metric =         fargate_service.service.metric_cpu_utilization(
                period=Duration.minutes(3),
                statistic="Average"
            )
    
            # add an alarm to our fargate service CPU usage
            scale_in_init = fargate_service_cpu_metric.create_alarm(
                self,
                'SampleApp-ZeroCPUAlarm',
                alarm_description="For when sample app is idle, scale service to 0",
                alarm_name=alarm_name,
                evaluation_periods=1,
                threshold=0.01, # set threshold of cpu usage.
                actions_enabled=True,
    
            # create comparison operator so that we compare our cpu usage with our threshold. We want it less than or equal to the threshold             comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
                datapoints_to_alarm=1,
                statistic="Average",
            )
    
            # define our auto scaling target for our fargate service 
            scalable_target = app_autoscaling.ScalableTarget.from_scalable_target_id(
                self,
                'SampleApp-ScalableTarget',
                scalable_target_id=f'service/{fargate_service.cluster.cluster_name}/{fargate_service.service.service_name}|ecs:service:DesiredCount|ecs',
            )
    
            # define the action taken on our scaling target
            scaling_action = app_autoscaling.StepScalingAction(
                self,
                "scaleToZero",
                scaling_target=scalable_target,
                adjustment_type=autoscaling.AdjustmentType.EXACT_CAPACITY,
            )
    
            # create the adjustment made by our action
            scaling_action.add_adjustment(adjustment=0, upper_bound=0)
    
            # finally add our alarm action to our fargate alarm
            scale_in_init.add_alarm_action(ApplicationScalingAction(scaling_action))
    ...
     Deployment - Stage 2
    Now, we have to redeploy our stack so our alarm is activated on our Fargate service.
    So now, when our tasks are inactive and CPU usage is below the 0.01 threshold. Our Fargate service will spin down our tasks.
    Conclusion
    Hopefully, that will save someone a huge amount of pain. I am definitely a fan of using aws-cdk instead of cloudformation as it allows you to deal with traditional programming languages instead of pure YAML.
    I will definitely be iterating on this code to remove the need for two-stage deployments by taking advantage of pipelines. I will update this tutorial as I learn more.

    86

    This website collects cookies to deliver better user experience

    How to create an aws Fargate service that spins tasks up and down based on SQS queue messages and cpu usage - using aws-cdk β˜οΈπŸ‘€