25
Using Google Cloud Platform Operators in Apache Airflow
Different Airflow operators create more possibilities while designing a scheduled workflow. Being aware of those enhances our way of dealing with real-world problems. *
There are many Airflow operators that keep impressing me during my daily job. Recently, I played quite a bit with the GCP operators within Airflow. In this write-up, I would like to first provide an overview of the Airflow Cloud Providers (used to be called contributor operators before Airflow 2.0, see). Then I would like to share one of the example Directed Acyclic Graphs (DAG) workflow models, created for this toy project.
In the world of Airflow Google cloud providers package, which is also valid for Amazon Web Services (AWS), we have four main sub-groups of functions:
1. Operators: They are full-fledged operations, enabling us to execute read/create/delete/update/trigger tasks. Operators do not need to be running CRUD operations on datasets. They can also invoke a GCP function. Some examples and their example use cases:
- BigQueryDeleteTableOperator: In the end of a BigQuery to Google Cloud Storage (GCS) operation, we might need to delete the existing table.
- GCSCreateBucketOperator: To store a set of dataset partitions for specific dates, we might need to create a bucket in the beginning of a DAG.
- CloudFunctionInvokeFunctionOperator: In case we update a certain point of a Cloud Function, we might need to test it in the end of a DAG.
2. Hooks: They are flexible clients, enabling us to interact with cloud providers. Hooks are the underlying clients in the operators, we can create customised functions (Python callable) or operators with those. Some examples are listed below. All include the methods for CRUD operations in the services they mention in their name:
3. Sensors: They are simply checkers. They check if data exists in a certain location. They can be used mostly for checking if a certain operation is completed. Some examples and their example use cases:
- BigQueryTablePartitionExistenceSensor: You might need to run a sequential task to process the latest partition right after you check that is created by another task.
- GCSObjectExistenceSensor: You might need to run a sequential task to process an object within a certain bucket right after you check that is created by another task.
4. Transfers: They are simply data transport operations. They enable us to move data from one bucket to another one. They can also move data from one service to another, even if the service belongs to another cloud provider. Some examples are listed below:
GCP operators in Airflow can be summarised as in the following chart:
We need a GCP connection (id) on Airflow, to have a functioning setup for Google Cloud operations on Airflow. The GCP connection can be set via configurations (some DevOps effort), or it can be set through the Airflow Web UI. It is explained here. Each of the GCP task that we create, to enable authorisation, we need to refer to the GCP connection id.
The example DAG is shown in the following chart. With this DAG, I aim to load a partitioned table into the Google Cloud Storage (GCS), then compose the multiple files generated by the previous process.
Each task within this DAG can be considered as a kind of GCP configuration. They are explained as follows:
The task retrieves a partitioned table from BigQuery and exports into a GCS bucket. For each partition we create a separate gzipped text file (compression
, export_format
). Just to create a set of assumptions, I do not want to have the header row (print_header
), and I use comma (,
) as text file delimiter (field_delimiter
). The partitions are created with the wildcard (*
) character in the end of the file name (destination_cloud_storage_uris
).
from airflow.models import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator)
from airflow.utils.dates import days_ago
# Define configuration global variables
with DAG(
'gcp_dag',
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
bigquery_to_gcs = BigQueryToGCSOperator(
gcp_conn_id='gcp_connection_id',
task_id='bigquery_to_gcs',
compression='GZIP',
export_format='CSV',
field_delimiter=',',
print_header=False,
source_project_dataset_table=f'{DATASET_NAME}.{TABLE}',
destination_cloud_storage_uris=[
f'gs://{DATA_EXPORT_BUCKET_NAME}/{EXPECTED_FILE_NAME}-*.csv.gz',
],
)
# Define other operations
GCP operators do not provide a full-fledged solution when it comes to the problem of composing multiple text files into one gzip file. Thus we need to create our own customised functions (Python callable) or operators (see this documentation).
The Python Operator, in my example DAG, calls a mysterious function (compose_files_into_one
) with bucket_name
, source_object_prefix
, destination_object
, gcp_conn_id
parameters. Those parameters are provided as keys in the Python Operator's op_kwargs
argument.
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Define configuration global variables
with DAG(
'gcp_dag',
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
# Previous operations
compose_files = PythonOperator(
task_id='gcs_compose',
python_callable=compose_files_into_one,
op_kwargs={
'bucket_name': DATA_EXPORT_BUCKET_NAME,
'source_object_prefix': EXPECTED_FILE_NAME,
'destination_object': f'{EXPECTED_FILE_NAME}.csv.gz',
'gcp_conn_id': 'gcp_connection_id'
},
)
# Any other operations
As the Python Operator calls compose_files_into_one
, all the magic happens there. compose_files_into_one
is a function contains all the hook logic. It uses the GCSHook
as a client to list all the objects with the given prefix. Then it composes the partition files into one gzip file.
from airflow.providers.google.cloud.hooks.gcs import GCSHook
def compose_files_into_one(bucket_name: str,
source_object_prefix: str,
destination_object: str,
gcp_conn_id: str) -> None:
'''Composes wildcarded files into one in the given destination'''
gcs_hook = GCSHook(
gcp_conn_id=gcp_conn_id
)
list_of_objects = gcs_hook.list(
bucket_name,
prefix=source_object_prefix
)
gcs_hook.compose(
bucket_name,
source_objects=list_of_objects,
destination_object=destination_object
)
The compose operation can be summarised with the following chart:
In the end of the compose task, I have the partitioned datasets as well as the already composed gzip file in the GCS bucket, so we had the same data twice as in multiple files and in a compact gzip file.
As a conscientious developer I want to delete the remaining partitions from the bucket and keep only the compact gzip file. Thus, I use GCS Delete Objects Operator to create the last task of my example DAG.
GCS Delete Objects Operator does not require much of a configuration. It needs a bucket_name
, a gcp_conn_id
, and a prefix
parameter. With the prefix
parameter, we can filter out the objects to be deleted. So, if the partitions start with <defined_text_file_name>-<partition_number>
, then we can use <defined_text_file_name>-
to filter out all the partitions to be deleted.
from airflow.models import DAG
from airflow.providers.google.cloud.operators.gcs import (
GCSDeleteObjectsOperator)
from airflow.utils.dates import days_ago
with DAG(
'gcp_dag',
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
) as dag:
# Previous operations
delete_combined_objects = GCSDeleteObjectsOperator(
task_id='gcs_combined_files_delete',
gcp_conn_id='gcp_connection_id',
bucket_name=DATA_EXPORT_BUCKET_NAME,
prefix=f'{EXPECTED_FILE_NAME}-'
)
# DAG definition (dependency)
GCP operators in Airflow are quite extendable and lightweight, and they require a small amount of configuration. Most of the operators are well-fitting for the use cases that I am able to think of. This write-up is a result of my appreciation of this nicely evolved providers package of Airflow. With every release of the providers' packages, yet another shortcut is added in the lives of data programmers. This is another good example of how being aware of those shortcuts can make our workflows more reliable and maintainable.
To have a look at the whole example DAG, with its Docker-compose setup, you can refer to its Github repository.
(*) This reminds me a lot of the godly boons I got when playing Hades, which are limited upgrades acquired by the protagonist, Zagreus. Gods randomly provide upgrades (boons) in Zagreus' (the protagonist of the game) journeys. Those upgrades make Zagreus' life easier for some time, exactly like the Google Cloud Platform (GCP) operators in Airflow being boons that make our lives easier for a while.
25