17
AWS Managed Airflow for your complex workflows
Airflow is a tool to orchestrate complex workflow which was created at Airbnb in 2014. Airflow gained significant traction across several organizations in recent days due to the ability to create complex data pipelines with ease. The Airflow user interface (UI) serves as an operational dashboard to schedule, monitor and control any scripts or applications.
Despite Airflow has been adopted by several organizations, deploying and managing the infrastructure has always been challenging and introduces operational overhead. To combat this problem many companies came up with the idea of introducing managed Airflow Infrastructure such as MWAA from Amazon Web Services (AWS). AWS takes the responsibility for keeping your infrastructure with almost no downtime and up-to-date security patches in place along with the latest version readily available to use.
Besides, the other key benefits of using AWS MWAA is its elasticity, the ability to scale up and down based on the workload, and the easiness of building and deploying production-grade secure infrastructure with seamless integration with other AWS Services.
Let's dive into some key concepts of Airflow :)
In Airflow, workflows are defined in a Python file which is also referred to as DAG. You can imagine a DAG as a single job that can have multiple tasks in general. There are three common parts in every DAG those are as follows:
- DAG Initialization
- Tasks
- Tasks Dependencies
DAGs are written in Python and are often identified by their unique dag_id
. During the initialization, we specify when to start, scheduled time and so forth. Here is a simple DAG below:
from airflow.models import DAG
from airflow.utils.dates import days_ago
dag = DAG(
dag_id="sample_dag",
start_date=days_ago(2),
description="Sample DAG",
schedule_interval='@daily')
Tasks perform different actions from executing a piece of Shell script to triggering EMR jobs. It is necessary to have a DAG before we create any task. Also, every task in a DAG is defined by an operator and similar to dag_id
, the task_id
would need to be unique within the DAG.
def function_a (**kwargs):
name = kwargs['name']
return f'hello {name} !!'
first_task = PythonOperator(
task_id="first_task",
python_callable= function_a,
op_kwargs= {'name': 'Fayaz'},
dag= dag)
second_task = DummyOperator(task_id="second_task", dag=dag)
Now the last part of the DAG is to create dependencies among the tasks. In this case, we are going to trigger the first_task
first and then trigger the second_task
as soon as the first_task completes. So it will look like this:
first_task >> second_task
Now we understood what is Airflow and how to create a simple DAG so let's spin up the AWS MWAA to run this DAG.
You need to use your AWS Account to perform the next few steps which may incur some charges.
Before we create a new MWAA environment, we would need to create an S3 bucket
which must have versions enabled.
Step 1. Go to Managed Airflow Console and click Create Environment
Step 2. Enter a name and choose the Airflow version as 2.0.2(Latest)
Step 3. Choose the S3 bucket - the one you have created
Step 4. For Dags folder type s3://{your-bucket-name}/dags
Step 5. Click Next
Step 6. Click on Create MWAA VPC
Step 7. It will take you to a page with a bunch of VPC, Subnets details. Click Create Stack which may take a few mins to complete
Step 8. Choose the VPC you just created and scroll down to enter environment class and other configurations
Step 9. Choose Create a new role and click Next
Step 10. Verify all the details and click Create environment
Generally, it takes 10-20 mins to spin up the Airflow Infrastructure so this is the time to get your coffee â before we deploy our very first DAG đ€Łđ
đđđ When you refresh in a few minutes, you will see the environment status as Available
so click Open Airflow UI
Yay!! Now we got our environment up and ready to go. So let's deploy our first DAG.
Step 1: To deploy the DAG, we would need to copy the .py
file to our s3/dags
location. Copy the below code and put that in a .py
file and save it as demo_dag.py in your local.
"""
Importing necessary modules
"""
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
"""
Initializing DAGs
"""
dag = DAG(
dag_id="grepy_sample_dag",
start_date=days_ago(2),
description="DAG which orchestrates a simple ML workflow",
schedule_interval='@daily')
"""
Creating Tasks
"""
def function_a (**kwargs):
name = kwargs['name']
return f'hello {name} !!'
first_task = PythonOperator(
task_id="first_task",
python_callable= function_a,
op_kwargs= {'name': 'Fayaz'},
dag= dag)
second_task = DummyOperator(task_id="second_task", dag=dag)
"""
Dependencies
"""
first_task >> second_task
Step 2: Upload the demo_dag.py
file to your s3/dags folder.
Step 3: That's it!! Now again it may take a few minutes when you deploy a DAG for the first time, but it will look like this
When you toggle on for the first time, the DAG will automatically be triggered so click on the DAG name which will take you to the tree view where you see the job status and task dependencies.
One of the best parts is you can see all the logs when you go to the Graph view which is also available in CloudWatch so you can ingest that to your Splunk or ELK for further analysis.
In this post, we took a high-level look at Airflow basics and we dived into AWS Managed Airflow along with a sample DAG deployment which runs a simple Python function. Similarly, you can orchestrate any type of task using various operators which is available on any Airflow infrastructure.
Thank you for your time and happy learning !! đ
17