Parallelize Processing a Large AWS S3 File

In my last post, we discussed achieving the efficiency in processing a large AWS S3 file via S3 select. The processing was kind of sequential and it might take ages for a large file. So how do we parallelize the processing across multiple units? 🤔 Well, in this post we gonna implement it and see it working!

📝 I highly recommend checking out my last post on streaming S3 file via S3-Select to set the context for this post.

I always like to break down a problem into the smaller pieces necessary to solve it (analytical approach). Let's try to solve this in 3 simple steps:

1. Find the total bytes of the S3 file

Very similar to the 1st step of our last post, here as well we try to find file size first.
The following code snippet showcases the function that will perform a HEAD request on our S3 file and determines the file size in bytes.

# core/utils.py

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size

2. Create a celery task to process a chunk

Here, we would define a celery task to process a file chunk (which will be executed in parallel later). The overall processing here will look like this:

  • Receive the start and end bytes of this chunk as an argument
  • Fetch this part of the S3 file via S3-Select and store it locally in a temporary file (as CSV in this example)
  • Read this temporary file and perform any processing required
  • Delete this temporary file

📝 I term this task as a file chunk processor. It processes a chunk from a file. Running multiple of these tasks completes the processing of the whole file.

# core/tasks.py

@celery.task(name='core.tasks.chunk_file_processor', bind=True)
def chunk_file_processor(self, **kwargs):
    """ Creates and process a single file chunk based on S3 Select ScanRange start and end bytes
    """
    bucket = kwargs.get('bucket')
    key = kwargs.get('key')
    filename = kwargs.get('filename')
    start_byte_range = kwargs.get('start_byte_range')
    end_byte_range = kwargs.get('end_byte_range')
    header_row_str = kwargs.get('header_row_str')
    local_file = filename.replace('.csv', f'.{start_byte_range}.csv')
    file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file)

    logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}')
    try:
        # 1. fetch data from S3 and store it in a file
        store_scrm_file_s3_content_in_local_file(
            bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range,
            end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str)

        # 2. Process the chunk file in temp folder
        id_set = set()
        with open(file_path) as csv_file:
            csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER)
            for row in csv_reader:
                # perform any other processing here
                id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')

        # 3. delete local file
        if path.exists(file_path):
            unlink(file_path)
    except Exception:
        logger.exception(f'Error in file processor: {filename}')

3. Execute multiple celery tasks in parallel

This is the most interesting step in this flow. We will create multiple celery tasks to run in parallel via Celery Group.
Once we know the total bytes of a file in S3 (from step 1), we calculate start and end bytes for the chunk and call the task we created in step 2 via the celery group. The start and end bytes range is a continuous range of file size. Optionally, we can also call a callback (result) task once all our processing tasks get completed.

# core/tasks.py

@celery.task(name='core.tasks.s3_parallel_file_processing', bind=True)
def s3_parallel_file_processing_task(self, **kwargs):
    """ Creates celery tasks to process chunks of file in parallel
    """
    bucket = kwargs.get('bucket')
    key = kwargs.get('key')
    try:
        filename = key
        # 1. Check file headers for validity -> if failed, stop processing
        desired_row_headers = (
            'id',
            'name',
            'age',
            'latitude',
            'longitude',
            'monthly_income',
            'experienced'
        )
        is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select(
            bucket=bucket,
            key=key,
            delimiter=S3_FILE_DELIMITER,
            desired_headers=desired_row_headers)
        if not is_headers_valid:
            logger.error(f'{filename} file headers validation failed')
            return False
        logger.info(f'{filename} file headers validation successful')

        # 2. fetch file size via S3 HEAD
        file_size = get_s3_file_size(bucket=bucket, key=key)
        if not file_size:
            logger.error(f'{filename} file size invalid {file_size}')
            return False
        logger.info(f'We are processing {filename} file about {file_size} bytes :-o')

        # 2. Create celery group tasks for chunk of this file size for parallel processing
        start_range = 0
        end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size)
        tasks = []
        while start_range < file_size:
            tasks.append(
                chunk_file_processor.signature(
                    kwargs={
                        'bucket': bucket,
                        'key': key,
                        'filename': filename,
                        'start_byte_range': start_range,
                        'end_byte_range': end_range,
                        'header_row_str': header_row_str
                    }
                )
            )
            start_range = end_range
            end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range)
        job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename}))
        _ = job.apply_async()
    except Exception:
        logger.exception(f'Error processing file: {filename}')


@celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False)
def chunk_file_processor_callback(self, *args, **kwargs):
    """ Callback task called post chunk_file_processor()
    """
    logger.info('Callback called')
# core/utils.py

def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int,
                                             delimiter: str, header_row: str):
    """Retrieves S3 file content via S3 Select ScanRange and store it in a local file.
       Make sure the header validation is done before calling this.

    Args:
        bucket (str): S3 bucket
        key (str): S3 key
        file_path (str): Local file path to store the contents
        start_range (int): Start range of ScanRange parameter of S3 Select
        end_range (int): End range of ScanRange parameter of S3 Select
        delimiter (str): S3 file delimiter
        header_row (str): Header row of the local file. This will be inserted as first line in local file.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    try:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': delimiter,
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'CSV': {
                    'FieldDelimiter': delimiter,
                    'RecordDelimiter': '\n',
                },
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        """
        f = open(file_path, 'wb')  # we receive data in bytes and hence opening file in bytes
        f.write(header_row.encode())
        f.write('\n'.encode())
        for event in response['Payload']:
            if records := event.get('Records'):
                f.write(records['Payload'])
        f.close()
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    except Exception:
        logger.exception(f'Error reading S3 file {bucket} : {key}')

That's it! 😎 Now, instead of streaming the S3 file bytes by bytes, we parallelize the processing by concurrently processing the chunks. It wasn't that tough, isn't it? 😅

🔍 Comparing the processing time

If we compare the processing time of the same file we processed in our last post with this approach, the processing runs approximately 68% faster (with the same hardware and config). 😆

Streaming S3 File Parallel Processing S3 File
File size 4.8MB 4.8MB
Processing time ~37 seconds ~12 seconds

✔️ Benefits of this approach

  • A very large file containing millions of records can be processed within minutes. I have been using this approach in the production environment for a while, and it's very blissful
  • Computing and processing is distributed among distributed workers
  • Processing speed can be tweaked by the availability of worker pools
  • No more memory issues

📌 You can check out my GitHub repository for a complete working example of this approach 👇

GitHub logo idris-rampurawala / s3-select-demo

This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.

AWS S3 Select Demo

The MIT License

This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.

Currently, S3 Select does not support OFFSET and hence we cannot paginate the results of the query. Hence, we use scanrange feature to stream the contents of the S3 file.

Background

Importing (reading) a large file leads Out of Memory error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally at once? 🤔

Well, we can make use of AWS S3 Select to stream a large file via it's ScanRange parameter. This approach…

📑 Resources

See ya! until my next post 😋

16