76
How to create an aws Fargate service that spins tasks up and down based on SQS queue messages and cpu usage - using aws-cdk ☁️👀
Until recently I had an ECS service that would listen constantly to an SQS queue waiting until there were any messages and then would process the message. This was a waste of resources and not cost-effective at all. So, I looked into alternative ways where I could simply spin up tasks only when messages were pushed to the SQS queue. This would mean my service would not lay idle all day and all night waiting.
Enter aws-cdk
. Please bear in mind this is not a beginners tutorial but one that caused me a lot of headaches and iterations to get right. Hopefully, this will help at least one person/team and save them from weeks of pain.
Prerequisites:
- Experience with aws-cdk.
- Experience with python.
- Experience with aws-sdk.
By the end of the tutorial, you will have a Fargate service that spins up tasks when messages are pushed to an SQS queue. I will also show an alternative way to get the service to spin down these tasks when there are no messages on the queue due to some issues around current library implementations.
Install the kit with npm install -g aws-cdk
To check it has been installed cdk --version
To initiate a project:
mkdir fargate-cdk
cd fargate-cdk
cdk init sample-app --language=python
To begin with, import every service under the sun that aws-cdk
provides.
from aws_cdk import (
core as cdk,
aws_ec2 as ec2,
aws_ecs as ecs,
aws_ecs_patterns as ecs_patterns,
aws_sqs as sqs,
aws_ecr as ecr,
aws_iam as iam,
aws_ssm as ssm,
aws_kms as kms,
aws_autoscaling as autoscaling,
aws_applicationautoscaling as app_autoscaling,
aws_cloudwatch as cw,
)
from aws_cdk.aws_cloudwatch_actions import ApplicationScalingAction
from aws_cdk.core import Duration
Once you have used the aws-cdk
command to initialise your project, navigate to sample-app.py
file and you should see a class created by aws-cdk
that inherits from cdk.Stack
. We shall be placing all our code under the __init__
method for this tutorial.
Throughout the code, you will need some basic variables that will be used throughout so let's define those first.
class SampleApp(cdk.Stack):
def __init__(
self,
scope: cdk.Construct,
construct_id: str,
**kwargs,
) -> None:
super().__init__(scope, construct_id, **kwargs)
image_tag = "" # tag of the repo within ecr
cluster_name = "" # cluster group name of containers
security_group = "" # virtual firewall for your EC2 instances
kms_key = "" # needed for decrypting
queue_name = "" # name of sqs queue that will be created
vpc_id = "" # id of the VPC in which to provision the cluster.
...
We will now use the defined variables to fetch our vpc
, cluster
, kms key
and to create our sqs queue.
...
# fetches vpc using the id
vpc = ec2.Vpc.from_lookup(self, "VPC", vpc_id=vpc_id)
# fetches kms_key used for decryption using the arn of the key
kms_key = kms.Key.from_key_arn(
self, "EncryptionKey", f"arn:aws:kms:us-west-1:123456789:key/{kms_key}" # arn of kms key
)
# fetches cluster using defined objects and variables above
cluster = ecs.Cluster.from_cluster_attributes(
self,
"Cluster",
vpc=vpc,
cluster_name=cluster_name,
security_groups=[
ec2.SecurityGroup.from_security_group_id(
self, "SecurityGroup", security_group_id=security_group
)
],
)
# creates a fifo queue
queue = sqs.Queue(
self,
"SampleQueue",
content_based_deduplication=True,
fifo=True,
queue_name=queue_name, # must contain .fifo at end of name
receive_message_wait_time=Duration.seconds(20),
retention_period=Duration.seconds(345600),
visibility_timeout=Duration.seconds(43200),
)
...
THIS is a very key step.
We must define any parameters that are stored within the systems manager parameter store and that will be used within your application.
...
database_name = ssm.StringParameter.from_secure_string_parameter_attributes(
self,
"DATABASE_NAME",
version=1,
parameter_name="", # name of parameter within ssm
simple_name=True, # if using name of parameter and not arn or any other identifier
encryption_key=kms_key, # needed if encryption is turned on within parameter store. Normally needed when using simple_name
)
...
Then create a dictionary of these secrets. Any variables that your app fetches from the local environment must be defined too.
...
secrets = {
"DATABASE_NAME": ecs.Secret.from_ssm_parameter(database_name),
}
environments = {
"ENV": environment,
}
...
This is the most important part. It creates the Fargate service and uses everything we have defined up until now.
This service will listen to the queue created and spin up tasks when there are messages on the queue.
...
fargate_service = ecs_patterns.QueueProcessingFargateService(
self,
"SampleAppService",
cpu=4096, # set the amount of cpu you wish your tasks to have
memory_limit_mib=8192, # set the amount of memory you with your task to have
image=ecs.ContainerImage.from_ecr_repository(
repository=ecr.Repository.from_repository_name(
self,
"SampleAppRepository",
repository_name="",
), # fetch repo name from ecr
tag=image_tag,
), # fetch image from a certain ecr repo
min_scaling_capacity=0, # min number of tasks
max_scaling_capacity=1, # the max number of tasks a service can spin up
cluster=cluster, # cluster for the service
queue=queue, # queue the service shall be listening to.
scaling_steps=[
{"upper": 0, "change": 0},
{"lower": 1, "change": +1},
], # this defines how the service shall autoscale
secrets=secrets, # secret parameters from ssm
environment=environments, # any additional env variables needed
visibility_timeout=cdk.Duration.hours(12) # how long the task will take the item off the queue for
)
...
DO NOT use the variable desired_task_count
as it is deprecated and will not have an effect on the service at all.
Key points:
- Make sure you set a
visbility_timeout
or when you try to delete the message from the queue after 30 seconds it will fail due to the message id timing out. The default timeout is 30 seconds. - It is important to set the upper scaling step to
{"upper": 0, "change": 0}
. This is because when the task takes the last message off the queue we do not want any change to occur to the service. If we set thechange
to be-1
the service will try to tear down the task whilst it is processing the last message. So it is key that when there are 0 messages on the queue we do not kill our task. We will solve this issue with CPU alarms later. See this link for an ongoing Github issue.
Now we have set up our service we need to add policies to it so our task application can take action on other aws services.
...
kms_policy = iam.PolicyStatement(
actions=[
"kms:Decrypt",
],
effect=iam.Effect.ALLOW,
resources=[
"", # the arn of the resource
])
fargate_service.task_definition.add_to_task_role_policy(kms_policy)
...
Right! Now we have a Fargate service that scales when messages are on the SQS queue. This is stage 1 of deployment and currently, our service can only spin up, and not tear down tasks.
To allow our tasks to tear down, we need to deploy the code we have produced above. I'll explain why in the next section.
To deploy: cdk deploy
Now we have deployed the first step, how do we go about getting our service to tear down our task when there are no messages?
Early I mentioned we could not tear down based on messages on the queue due to the fact it would kill a task mid processing. So to scale down we will focus on the CPU usage of the task. If our tasks CPU usage is below a threshold for a length of time we will scale our tasks down to 0. This way we get around our tasks being killed whilst processing as CPU usage will be high when the task is in use.
So now add below the Fargate service and before the policies:
...
# initialise the fargate service CPU usage metric to average over 3 minutes
fargate_service_cpu_metric = fargate_service.service.metric_cpu_utilization(
period=Duration.minutes(3),
statistic="Average"
)
# add an alarm to our fargate service CPU usage
scale_in_init = fargate_service_cpu_metric.create_alarm(
self,
'SampleApp-ZeroCPUAlarm',
alarm_description="For when sample app is idle, scale service to 0",
alarm_name=alarm_name,
evaluation_periods=1,
threshold=0.01, # set threshold of cpu usage.
actions_enabled=True,
# create comparison operator so that we compare our cpu usage with our threshold. We want it less than or equal to the threshold comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
datapoints_to_alarm=1,
statistic="Average",
)
# define our auto scaling target for our fargate service
scalable_target = app_autoscaling.ScalableTarget.from_scalable_target_id(
self,
'SampleApp-ScalableTarget',
scalable_target_id=f'service/{fargate_service.cluster.cluster_name}/{fargate_service.service.service_name}|ecs:service:DesiredCount|ecs',
)
# define the action taken on our scaling target
scaling_action = app_autoscaling.StepScalingAction(
self,
"scaleToZero",
scaling_target=scalable_target,
adjustment_type=autoscaling.AdjustmentType.EXACT_CAPACITY,
)
# create the adjustment made by our action
scaling_action.add_adjustment(adjustment=0, upper_bound=0)
# finally add our alarm action to our fargate alarm
scale_in_init.add_alarm_action(ApplicationScalingAction(scaling_action))
...
Now, we have to redeploy our stack so our alarm is activated on our Fargate service.
So now, when our tasks are inactive and CPU usage is below the 0.01 threshold. Our Fargate service will spin down our tasks.
Hopefully, that will save someone a huge amount of pain. I am definitely a fan of using aws-cdk
instead of cloudformation
as it allows you to deal with traditional programming languages instead of pure YAML.
I will definitely be iterating on this code to remove the need for two-stage deployments by taking advantage of pipelines. I will update this tutorial as I learn more.
76