25
UPSERTS and DELETES using AWS Glue and Delta Lake
The purpose of this blog post is to demonstrate how you can enable your Data Lake to be ACID-compliant, that is, having the same functionality as a database. This will allow you to do UPSERTS and DELETES directly to your data lake
Let me start first by defining what a Data Lake is:
From AWS
A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.
A data lake is scalable, performant, secure, and cost-efficient. And it has played a crucial part of an organization's Data Analytics pipeline. So what's the problem?
Well, updates.
We all know that data lakes are immutable - the idea that data or objects should not be modified after they are created; how do we then go beyond that immutability?
The answer is Delta Lake.
An open-source storage layer that brings scalable, ACID transactions to Apache Spark™ and big data workloads. It provides serializability, the strongest level of isolation level. Scalable Metadata Handling, Time Travel, and is 100% compatible with Apache Spark APIs.
Basically, it allows you to do DELETES and UPSERTS directly to your data lake.
We all know our beloved Spark doesn't support ACID transactions, but to be fair, it isn't really built to address that kind of specific use case.
I came across a blog post from kundankumarr, explaining how Spark fails ACID.
Atomicity & Consistency
Atomicity states that it should either write full data or nothing to the data source when using spark data frame writer. Consistency, on the other hand, ensures that the data is always in a valid state.
Isolation & Durability
We know that when a transaction is in process and not yet committed, it must remain isolated from any other transaction. This is called Isolation Property. It means writing to a data set shouldn’t impact another concurrent read/write on the same data set.
Finally, Durability. It is the ACID property which guarantees that transactions that have committed will survive permanently. However, when Spark doesn’t correctly implement the commit, then all the durability features offered by the storage goes for a toss.
This part demonstrates how you can use Delta Lake with AWS Glue.
These are the services that will be used in this exercise:
a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.
an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.
an object storage service that offers industry-leading scalability, data availability, security, and performance.
This is what we'll be doing:
Basically, I have an initial data
then I want to apply changes to the Sales
and Profit
column. Then the table in the AWS Glue Data Catalog should be able to capture that changes. Just a basic update to the data.
So, let's start!
First, download the data here - I used Tableau's Superstore Dataset, this one is on Kaggle, you may need to register for an account to download.
Then, you need to download the Delta Lake .jar
file to access it's libraries. You can download it here. Upload it on your S3 Bucket and take note of the S3 path, we'll use this as a reference later.
❗ As of this writing, Glue's Spark Engine (v2.4) only supports v0.6.1 of Delta Lake since versions beyond that were implemented in Spark 3.0.
❗❗❗ UPDATE: AWS GLUE 3.0 WAS RELEASED ON AUGUST 2021! Check out my blog post on this one: ❗❗❗
Navigate to AWS Glue then proceed to the creation of an ETL Job. Specify the This job runs
to A new script to be authored by you
. This will allow you to have a custom spark code.
Under Security configuration, script libraries, and job parameters (optional)
, specify the location of where you stored the .jar
file as shown below:
Then on blank script page, paste the following code:
from delta import *
from pyspark.sql.session import SparkSession
This imports the SparkSession libraries as well as the Delta Lake libraries.
# Initialize Spark Session with Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
This code initializes the SparkSession
along with the Delta Lake configurations.
# Read Source
inputDF = spark.read.format("csv") \
.option("header", "true") \
.load('s3://delta-lake-ia-test/raw/')
We read the source csv file as a Spark DataFrame.
# Write data as DELTA TABLE
inputDF.write.format("delta") \
.mode("overwrite") \
.save("s3a://delta-lake-ia-test/current/")
Then we output it as a Delta
format.
❗ Notice the use of s3a
prefix in the save path, it is essential to use the s3a
prefix instead of the standard s3
as the path. As using the s3
prefix, will throw an UnsupportedFileSystemException
error. Followed by a fs.AbstractFileSystem.s3.impl=null: No AbstractFileSystem configured for scheme: s3
.
More on the differences of s3
and s3a
here
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
deltaTable.generate("symlink_format_manifest")
Athena supports reading from external tables using a manifest
file, which is a text file containing the list of data files to read for querying a table. Running the above code will generate a manifest
file.
Read more about Delta Lake's integration for Presto and Athena here
Final Code:
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/raw/')
# Write data as DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-ia-test/current/")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
deltaTable.generate("symlink_format_manifest")
In your S3 bucket, you should see a _symlink_format_manifest
prefix/folder. This will be used by Amazon Athena for mapping out the parquet files.
Create your table using the code below as a reference:
CREATE EXTERNAL TABLE IF NOT EXISTS "default"."superstore" (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-ia-test/current/_symlink_format_manifest/'
I'm lazy. I've made things simple by using STRING
for all columns.
Note that you have to define the table name as it is when you you wrote it as a delta table. Or else you'll get blank results when querying with Athena.
-- Run a simple select
SELECT *
FROM "default"."superstore"
LIMIT 10
Recap:
- Read from a CSV
- Created a Spark DataFrame from the CSV
- Written the DataFrame as a Delta Table
- Made a manifest file
- Created an external table in Athena
- Query sample data
What we'll do next:
- Read the updates from the CSV
- Make an update based on the new files
- Generate/update the manifest file
Let's add another Glue ETL Job for the updates.
I have manually modified my raw data to simulate the updates, I just plug in the 99999
values in the sales and profit for the first 15 rows. Feel free to have your own modifications.
After which, upload it to your S3 Bucket in a different location.
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
Nothing new here.
# Read updates
df_updates = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/updates/')
# Read current as DELTA TABLE
df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
First line is a typical read from csv code.
The next line creates a DeltaTable
object, which allows us to call functions in the delta package.
# UPSERT process
final_df = df.alias("full_df").merge(
source = df_updates.alias("append_df"),
condition = expr("append_df.row_id = full_df.row_id"))\
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
One of this is the merge(source, condition)
function, which:
Merges the data from the source DataFrame based on the given merge condition.
First, we take a DeltaTable DataFrame
object, then give it an alias. We then call the merge()
function, supplying the Parameters
with the our Arguments
. Which, in this case, is the updates DataFrame
and the merge condition.
Then, we call the whenMatchedUpdateAll(condition=None)
Updates all the columns of the matched table row with the values of the corresponding columns in the source row. If a
condition
is specified, then it must be true for the new row to be updated.
to have the code update all the columns.
If the condition specified in the merge()
function doesn't match, then we do a whenNotMatchedInsertAll(condition=None)
Insert a new target Delta table row by assigning the target columns to the values of the corresponding columns in the source row. If a
condition
is specified, then it must evaluate to true for the new row to be inserted.
Lastly, we call the execute()
function to sum it up
Execute the merge operation based on the built matched and not matched actions.
# Generate new MANIFEST file
final_df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
final_df.generate("symlink_format_manifest")
Then we update the manifest
file.
For more functions in the library, kindly refer to the official docs
Final code:
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read updates
df_updates = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/updates/')
# Read current as DELTA TABLE
df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
# UPSERT process
final_df = df.alias("full_df").merge(
source = df_updates.alias("append_df"),
condition = expr("append_df.row_id = full_df.row_id"))\
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Generate new MANIFEST file
final_df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
final_df.generate("symlink_format_manifest")
Now, try querying your updated in Athena. It should show the most updated data.
This blog post demonstrated how you can leverage ACID transactions in your data lake.
Having a functionality like this is helpful especially if you have requirements such as Change Data Capture (CDC). Curious to know how you guys implemented such things. Let me know in the comments!
I've read a couple of articles and blog posts whether a Data Lake should be immutable
or not.
From O'Reilly, Data Lake for Enterprises by Tomcy John, Pankaj Misra on the topic of Immutable Data Principle
The data should be stored in a raw format from the different source systems. More importantly, the data stored should be immutable in nature.
By making it immutable, it inherently takes care of human fault tolerance to at least some extent and takes away errors with regards to data loss and corruption. It allows data to be selected, inserted, and not updated or deleted.
To cater to fundamental fast processing/performance, the data is usually stored in a denormalized fashion. Data being immutable makes the system in general simpler and more manageable.
From SQLServerCentral by Steve Jones on the topic of Should the Data Lake be Immutable?
Imagine I had a large set of data, say GBs in a file, would I want to download this and change a few values before uploading it again? Do we want a large ETL load process to repeat?
Could we repeat the process and reload a file again? I don't think so, but it's hard to decide. After all, the lake isn't the source of data; that is some other system.
Maybe that's the simplest solution, and one that reduces complexity, downtime, or anything else that might be involved with locking and changing a file.
Comments on the topic can be found here.
There is a comment from roger.plowman
I suspect immutability should be asked after asking if you should even have the data lake or warehouse in the first place.
What do you guys think? Would love to hear your thoughts!
Speaking of Data Lakes vs Data Warehouse, there's also this very interesting concept I picked up from one of the AWS Community Builder (Joel Farvault), it is called Data Mesh Architecture, in which they describe it as the next enterprise data platform architecture. I'll leave it up for you to read on about. AWS also made a blog post on this using AWS Lake Formation.
There is also a youtube video back from the AWS DevDay Data & Analytics conducted on July 14, 2021. Where AWS Technical Evangelists; Javier Ramirez and Ricardo Sueiras discusses the Data Mesh Architecture.
Additionally, here is a video about Data Mesh in practice for Europe's biggest online fashion retailer.
Hope this helps! Let me know if you have questions below.
Happy coding!
P.S. CloudFormation stack is on going ⚙
25