Driving AWS Fargate to the Edge: Matillion Hybrid Agents and Python Pandas
December 22, 2024

Driving AWS Fargate to the Edge: Matillion Hybrid Agents and Python Pandas

One of the best services in the AWS ecosystem is Elastic Container Service (ECS) with Fargate launch type. The service is often referred to as “serverless” because it requires minimal operational setup to start. However, while Fargate is easy to use, it lacks one of the key advantages of AWS Lambda: automatic scaling. Regardless, Matilion leverages Fargate to create a cost-effective way to integrate your AWS environment with its services.


Big data scalability challenges

While Matillion provides simple integration, challenges can arise when discussing scalability and big data movement. For example, if your data pipeline needs to process gigabytes of data, it can be managed in a relatively static manner using the Matillion element. However, if you need to use Pandas to implement Python logic, the situation becomes more complicated.

You may be wondering: How do you process gigabytes of data using a Fargate runner with only 2 vCPUs and 4GB of memory? Yes, this can be tricky, especially if you need to process the data during transfer since it has to be loaded into memory.


Leveraging Python Pandas on Big Data with Fargate

A key issue to understand is how Python Pandas transforms data in memory. This design choice allows for high-speed processing, but it also introduces the risk of “OutOfMemoryError”, which can cause your program to terminate Dharma mission. If this happens, Matilion will not be able to track the state of the pipeline because the executing task disappears.

Your first reaction may be to permanently increase the task size. However, this may result in a significant increase in your AWS bill. For example, running a production agent with Matillion’s recommended configuration (2 vCPUs, 4GB RAM, 20GB storage) costs approximately $144.16 for 24 hours over 30 days. By comparison, scaling up to 16 vCPUs and 128GB of RAM could cost $1,724.28. Since not all pipelines require such extensive resources, introducing static extensions is not recommended. Instead, dynamic scaling may be an option, similar to how to scale Snowflake resources using ALTER WAREHOUSE SQL. However, managing dynamic scaling outside of the Matillion pipeline is critical to avoid losing tracking information. Dynamic scaling within pipelines could be a valuable feature request for Matillion.


Implement chunking of big data archives

Pandas workloads can be implemented more easily by using sophisticated methods provided by the Pandas package itself. Since Pandas runs in-memory, why not explore if it has the option of breaking big data archives into smaller pieces? If so, can we apply the transformation logic to each block? Would reading bytes introduce flexibility against encoding issues and allow common data type recognition in each block? Can we upload each chunk to the target storage system and then free the memory? Finally, if we encounter errors, are we able to track them block by block?

The problem of how to split a large DataFrame into smaller DataFrames, such as converting a large CSV into multiple smaller Parquet files, not new. Fortunately, today we have a chunksize parameter in the following method:

  • pandas.read_csv()
  • pandas.read_excel()
  • pandas.read_json()
  • pandas.read_sql()
  • pandas.read_hdf()

The authors of Pandas suggest that chunking works well when operations require zero or minimal coordination between chunks. For more complex workflows, other libraries may be better choose.


Block implementation example

Here is a simple implementation of chunking:

import pandas as pd

chunk_size = 1000
chunk_number = 0

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    chunk.to_parquet(f'chunk_{chunk_number}.parquet')
    chunk_number += 1
Enter full screen mode

Exit full screen mode

Note that the total number of rows is only meaningful if you know how many columns there are. Both dimensions determine the size of the DataFrame.


Apply transformation logic to each block

Can we apply transformation logic for each block, the answer is yes. When chunking the data, we can apply transformations to each fragment. However, we may encounter slight differences in the way Pandas interprets column data types, especially for CSV and JSON files, where quotes and delimiters are critical for elastic pipes.

Here’s an example of applying a transformation to each block:

import pandas as pd

# Define a transformation function
def transform(df):
    df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
    return df

chunk_size = 1000
chunk_number = 0

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    transformed_chunk = transform(chunk)
    transformed_chunk.to_parquet(f'chunk_{chunk_number}.parquet')
    chunk_number += 1
Enter full screen mode

Exit full screen mode


Handle coding issues

For robust handling of encoding errors, reading bytes is a better approach. However, it also has its drawbacks. For example, if we read the entire file at once, we may encounter an OutOfMemory error:

import pandas as pd

# Define a transformation function
def transform(df):
    df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
    return df

file_path = 'large_file.csv'
chunk_size = 1000
chunk_number = 0

with open(file_path, mode="rb") as fd:
    for chunk in pd.read_csv(io.StringIO(fd.read().decode("latin_1")), chunksize=chunk_size):
        # Apply the transformation to each chunk
        transformed_chunk = transform(chunk)

        # Write the transformed chunk to a Parquet file
        transformed_chunk.to_parquet(f'chunk_{chunk_number}.parquet')
        chunk_number += 1

Enter full screen mode

Exit full screen mode

Therefore, our Fargate tasks will be killed by AWS ECS, or the process using the Matillion hybrid proxy will result in error 137 because chunking will not take effect. Therefore, we need to adjust the code to read in chunks:

import pandas as pd
# Define a transformation function
def transform(df):
    df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
    return df

file_path = 'large_file.csv'
chunk_size = 10000
chunk_number = 0

with open (file_path, mode="rb") as fd:
    for chunk in pd.read_csv(
        fd,
        low_memory=False,
        chunksize=chunk_size,
        encoding="latin_1"
    ):
        # Apply the transformation to each chunk
        transformed_chunk = transform(chunk)

        # Write the transformed chunk to a Parquet file
        transformed_chunk.to_parquet(f'chunk_{chunk_number}.parquet')
        chunk_number += 1
Enter full screen mode

Exit full screen mode

This method ensures that encoding is applied in chunks, thus preventing memory issues. parameter low_memory=False Will be used to ensure data type consistency.


Upload chunks to target storage and free memory

To free up memory, after creating the chunk and uploading it to the target storage system (such as Amazon S3), you can do the following:

import pandas as pd
import logging
import gc

# Define a transformation function
def transform(df):
    df['numeric_column'] = df['numeric_column'].apply(lambda x: x * 1.1)
    return df

file_path = 'large_file.csv'
s3_bucket = 'your_s3_bucket'
prefix_name = 'my_prefix'
file_name = 'large_file.csv'
access_key = 'your_access_key'
secret_access_key = 'your_secret_access_key'
sessions_token = 'your_session_token'

chunk_size = 10000
chunk_number = 0

with open (file_path, mode="rb") as fd:
    # Statt die gesamte Datei auf einmal zu lesen, verwenden wir einen iterativen Ansatz
    for chunk in pd.read_csv(
        fd,
        low_memory=False,
        chunksize=chunk_size,
        encoding="latin_1"
    ):
        # Apply the transformation to each chunk
        transformed_chunk = transform(chunk)

        # Define the Parquet file path
        parquet_file = f"s3://{s3_bucket}/{prefix_name}/part_{chunk_number}_{file_name.replace('.csv', '.snappy.parquet')}"

        # Write the transformed chunk to a Parquet file
        transformed_chunk.to_parquet(
            parquet_file,
            engine="auto",
            compression="snappy",
            storage_options={
                "key": access_key,
                "secret": secret_access_key,
                "token": sessions_token,
            },
        )

        logging.info(f"Uploaded chunk of {file_name} to {parquet_file}")
        chunk_number += 1
        del chunk
        gc.collect()
Enter full screen mode

Exit full screen mode

parameter storage_options Help us implement boto3 implementation of Pandas, and del chunk and gc.collect() Release memory.


Track errors during processing

Currently, tracing errors block by block can be challenging. However, you can use on_bad_lines Parameter introduced in Pandas v1.4 for receiving warnings about unprocessed rows. With its callback functionality, you can write excellent log files to track misunderstandings. This feature allows you to handle errors more efficiently, although it may not be compatible with all configurations. The same is true when using low_memory=False. In our use case we use on_bad_lines="warn" Parse logs from Matillion backwards.


final thoughts

The biggest drawback highlighted in this article is the lack of robust error logging. However, despite these challenges, using Matillion with AWS Fargate enables a cost-effective solution without the need for complex clusters. By implementing chunking and careful memory management, you can avoid memory overflow errors and simplify processing of large data sets.

As AWS, Matilion, and Snowflake continue to evolve, leveraging these tools together can optimize your data workflows, reduce execution time, and increase overall efficiency. Remember, in the cloud, efficient resource utilization is key to maximizing your investment.

Happy coding :-)!

2024-12-22 13:09:19

Leave a Reply

Your email address will not be published. Required fields are marked *