Schematic graph of a Dagster IO manager

A Guide to Dagster IO Managers: Implement a Redshift IO Manager

In this blog, we describe how you can build your own Dagster IO managers and use AWS Redshift as an example. We elaborate on the fundamentals on IO managers, and the tips and tricks we’ve learned building different IO managers over time.

Keywords
Dagster IO manager
Redshift IO manager

A short introduction on IO managers

From the IO manager documentation of Dagster:

I/O managers are user-provided objects that store asset and op outputs and load them as inputs to downstream assets and ops. They can be a powerful tool that reduces boilerplate code and easily changes where your data is stored.

IO managers, combined with partitions, are one of the most powerful concepts in Dagster; in essence, they allow you to avoid re-writing code to load and retrieve data from different sources, often your database of choice. This is immensely powerful once you realize that much of your code is just reading and writing to databases, file storage, etc. The interesting part of your code is the transformations you perform on the data; loading and retrieving data is necessary but does not add much value. Thus, when the IO manager is a great fit for our problem, we should consider using one.

When not to use an IO Manager

When all you have is a hammer, everything looks like a nail.

Every programmer has encountered this, a good example being the RDBMS. As with any abstraction, it works great until it doesn’t. The dagster IO manager is just one tool in your toolbox, and when it doesn’t fit the problem at hand, you shouldn’t use it.

The IO manager has several assumptions that make it great for loading data into a function, doing something with it, and then storing some data. However, if this is not the abstract pattern of your problem, the IO manager might not be of much help.

For instance, if your workflow involves complex data transformations that don’t neatly fit into the input-process-output model supported by the IO manager, trying to force it into that framework could lead to convoluted and inefficient code. In such cases, it’s better to step back and evaluate whether using the IO manager aligns with the natural flow of your problem or if a different approach might be more suitable.

Ultimately, choosing the right tool for the job involves understanding the problem domain, considering the specific requirements and constraints, and being willing to adapt and explore alternative solutions when necessary. The dagster IO manager is a valuable tool in your toolbox, but it’s important to recognize when it’s the right tool for the task at hand and when another approach might be more appropriate.

Dagster IO manager overview

Best Practices and Tips When Developing IO Managers

Before we dive into the implementation details of an IO manager, lets first highlight some best practices and tips for developing and utilizing IO managers effectively. In this section we’ve tried to note the most important points to consider.

1. Error Handling and Resilience

Implement robust error handling mechanisms within your IO manager to gracefully handle failures and recover from errors. This includes handling network issues, authentication errors, and data inconsistencies to ensure the reliability of your data pipelines.

2. Automated Schema Creation

Automate schema creation within your IO manager where possible. This streamlines data storage, reduces errors, and ensures consistency. Dynamic schema generation based on input data or templates enhances scalability and adaptability, optimizing Dagster pipeline efficiency. Also consider what happens if the schema in the database doesn’t match the schema of the data input, throwing an error in this instance can be a good idea.

3. Ensure Idempotency

Implementing idempotence using Dagster partitioning adds an extra layer of reliability to your data operations. By partitioning your data into distinct segments based on certain criteria (like time intervals or data sources), you create a structured approach to handling retries and failures. Each partition represents a self-contained unit of work, making it easier to track and manage data processing tasks. If something goes wrong during processing, you can retry individual partitions without affecting others, ensuring consistent results across your pipeline. Leveraging Dagster partitioning in conjunction with idempotent logic reinforces the integrity of your data operations, minimizing the risk of errors and inconsistencies. This is pretty much a must-have.

4. Performance Optimization

Optimize the performance of your IO manager by minimizing latency and maximizing throughput. This involves optimizing data transfer mechanisms, leveraging caching strategies, and parallelizing data operations where possible to improve overall pipeline efficiency. Concrete strategies are batching your operations, doing work concurrently or in parallel, and caching heavy operations.

5. Reusability and Modularity

Design your custom IO manager with reusability and modularity in mind. Consider abstracting common functionality into reusable components or libraries to facilitate easier integration across multiple pipelines and projects.

Having laid the groundwork for what an IO manager should look like, let’s now look at a real-world example of how to set up an IO manager.

Implementing a Redshift IO Manager in Dagster: A Step-by-Step Guide

In this guide, we’ll implement a custom IO manager for Redshift, Amazon Web Services’ (AWS) data warehousing solution. We will focus on storing data from Redshift clusters within Dagster pipelines. The custom IO manager we’ll develop will handle schema management, data saving, and partition management, providing a comprehensive solution for interacting with Redshift databases within Dagster workflows. The in and output types of this IO manager are pandas, one of the most used libraries. For better performance, we would recommend using other libraries like pyarrow or polars.

Introduction to the Redshift IO Manager

Our Redshift IO manager will bridge the gap between Dagster pipelines and a Redshift cluster, facilitating the movement of data between these environments. This custom IO manager will leverage AWS services such as S3, which allows us to store data as efficiently as possible.

The key functionalities of our Redshift IO manager include:

  1. Initialization and Configuration: Setting up AWS credentials, Redshift cluster details, S3 bucket configurations, and initializing the IO manager.
  2. Schema Management: Automatically managing database schemas and table structures in Redshift based on the data being processed.
  3. Saving Data: We efficiently store data from Pandas DataFrames into Redshift tables, ensuring data integrity and consistency. If the data already exists, we need to remove and replace the old dataset.
  4. Loading Data: Retrieving data from Redshift tables into Pandas DataFrames, enabling seamless integration with downstream processing. We have decided to leave this out of this guide.

1. Initialization and Configuration

To initialize our RedshiftPandasIOManager, we have the following definition:

class RedshiftPandasIOManager(ConfigurableIOManager):
    region_name: str
    cluster_identifier: str
    user: str
    database: str
    host: str
    port: str
    s3_iam_role: str
    bucket_name: str

To set up the Redshift Pandas IO manager, provide AWS credentials, Redshift cluster details, and S3 bucket configurations. Initialize the IO manager as follows:

from dagster import Definitions

resources = {
	"io_manager": RedshiftPandasIOManager(
      host=EnvVar("AWS_REDSHIFT_HOST"),
      database=EnvVar("AWS_REDSHIFT_DATABASE"),
      port=EnvVar("AWS_REDSHIFT_PORT"),
      user=EnvVar("AWS_REDSHIFT_USER"),
      cluster_identifier=EnvVar("AWS_REDSHIFT_CLUSTER_IDENTIFIER"),
      region_name=EnvVar("AWS_REDSHIFT_REGION_NAME"),
      s3_iam_role=EnvVar("AWS_REDSHIFT_S3_IAM_ROLE"),
      bucket_name=EnvVar("AWS_REDSHIFT_BUCKET_NAME"),
  ),
}

defs = Definitions(
        assets=all_assets,
        resources=resources,
        # ...
)

When it’s given as the io_manager in the resources dictionary of your Dagster Definitions object, it will be used for all of your assets by default.

2. Schema Management

Schema management is critical for maintaining alignment between Redshift databases and the data being processed. Our custom Redshift IO manager automates schema creation and table structure definition to streamline database interactions.

Our schema management functionality comprises:

  1. Schema Creation: Dynamically creating schemas and tables in Redshift to accommodate incoming data.
  2. Table Structure Definition: Deriving Redshift table structure from the schema of the input data.

Let’s explore the implementation within RedshiftPandasIOManager:

  1. create_schemas Method: This method establishes a connection to Redshift and executes SQL queries to create schemas and tables. The table structure is defined based on the column types extracted from the input data.
def create_schemas(self, 
                   context: "OutputContext",
                   columns_with_types: dict[str, str],
                   full_table_name: str,
                   partition_key: Optional[str],
                   schema: str):
    context.log.debug(f"creating db-schemas for {schema=} {full_table_name=} {partition_key=}")
    with (
        self.get_connection() as connection,
        connection.cursor() as cursor
    ):
        try:
            cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")

            schema_string_components = map(lambda x: f"\"{x[0].upper()}\" {x[1]}", columns_with_types.items())
            schema_string = ", ".join(schema_string_components)
            cursor.execute(f"CREATE TABLE IF NOT EXISTS {full_table_name} ({schema_string})")

            connection.commit()
        except Exception as e:
            connection.rollback()
            raise Failure(
                description=f"Failed to create table {full_table_name} in Redshift"
                            f"for asset_key {context.asset_key} and partition "
                            f"key {partition_key}",
                metadata=dict(
                    asset_key=context.asset_key,
                    partition_key=partition_key,
                    table_name=full_table_name,
                    error=str(e)
                ),
                allow_retries=False
            )
  1. _get_table_structure Function: This helper function analyzes the structure of the input data and returns a dictionary mapping column names to their respective data types.
def _get_table_structure(df: pd.DataFrame) -> dict[str, str]:
    columns = list(df.columns)
    return {column: "VARCHAR(10000)" for column in columns}

Automating schema management ensures database readiness for data ingestion, making sure the input data is compatible with the database schema.

Next, we’ll delve into efficiently clearing and saving data from Pandas DataFrames into Redshift tables.

3. Saving Data

The main functionality of our custom Redshift IO manager is efficiently storing data from Pandas DataFrames into Redshift tables. This process ensures that data is ingested into Redshift with integrity and consistency, ready for analysis and further processing.

The saving of data functionality involves several key steps:

  1. Data Preparation: Before saving data to Redshift, we prepare the data by converting it into a suitable format. In our implementation, we convert the Pandas DataFrame into a compressed CSV file stored in Amazon S3.
  2. Data Removal and Loading: Once the data is prepared, we handle the removal of existing data and then load the new data into Redshift using the COPY command. This command efficiently copies data from Amazon S3 into Redshift tables, leveraging parallel processing for optimal performance. The removal of the old data is an step essential step, as to ensure that loading the data is an idempotent operation.
  3. Partition Management: Optionally, we handle partitioning of data within Redshift tables. This allows for efficient loading and storing of large datasets with Dagster partition keys by organizing data into logical partitions based on a specified partition key.

Let’s explore the implementation of the saving data functionality within our RedshiftPandasIOManager class:

  1. handle_output Method: This method is responsible for handling the output data and initiating the process of saving it into Redshift. Inside this method:
    • We first check if the output data is empty.
    • We construct the full table name in Redshift based on the schema and table name extracted from the output context.
    • If the output context specifies partition information, we retrieve the partition key and partition expression.
    • We upload the output data to Amazon S3 using the upload_data_to_s3 method.
    • We retrieve the structure of the output data using the _get_table_structure function.
    • We create or update the schema and table structure in Redshift using the create_schemas method.
    • Finally, we save the data into Redshift using the save_data method.
  2. save_data Method: This method executes SQL queries to clear existing data (if applicable) and copy the new data from Amazon S3 into the Redshift table. It handles both full table refreshes and partition updates. Here’s how the method works:
def handle_output(self, context: "OutputContext", obj: pd.DataFrame) -> None:
    if obj is None or len(obj) == 0:
        context.log.info(
            f"{self.__class__.__name__} skipping handle_output: The asset "
            f"{context.asset_key} does not output any data to store (the "
            f"asset may store the result during the operation.")
        return

    schema = "_".join(context.asset_key.path[:-1])
    table_name = context.asset_key.path[-1]
    full_table_name = f"{schema}.{table_name}"

    partition_key = None
    if context.has_asset_partitions:
        try:
            partition_key = context.asset_partition_key
        except CheckError as e:
            raise Failure(
                description=f"{self.__class__.__name__} does not support partition key ranges",
                metadata=dict(
                    asset_key=context.asset_key
                ),
                allow_retries=False
            )

    partition_expr = context.metadata.get("partition_expr")

    if partition_expr is None and partition_key is not None:
        raise Failure(
            description=f"Asset has partition key, but no partition_expr in its metadata",
            metadata=dict(
                asset_key=context.asset_key,
                partition_key=partition_key,
            ),
            allow_retries=False
        )

    context.log.info(f"Starting process to store output of asset "
                     f"{context.asset_key} and partition key {partition_key} in Redshift")

    s3_object_name = self.upload_data_to_s3(full_table_name, obj, partition_key)

    columns_with_types = _get_table_structure(obj)

    context.log.info(f"Loaded {len(obj)=} for {context.asset_key} and {partition_key=} records to disk."
                     f" Preparing to load these into redshift.")
    self.create_schemas(context, columns_with_types, full_table_name, partition_key, schema)

    self.save_data(context, s3_object_name, full_table_name, partition_expr, partition_key)


def save_data(self,
              context: "OutputContext",
              s3_object_name: str,
              full_table_name: str,
              partition_expr: str,
              partition_key: Optional[str]):

    context.log.debug(f"saving data for {full_table_name=} {partition_key=}")

    with (
        self.get_connection() as connection,
        connection.cursor() as cursor
    ):
        try:
            # First clear the partition or table out.
            if partition_key is None:
                # Do a full refresh, and delete all data
                context.log.debug(f"Fully clearing table {full_table_name} since no partition key is given")
                query = f"DELETE FROM {full_table_name}"
                cursor.execute(query)
            else:
                # Try to remove the old partition data and load the new data
                context.log.debug(f"Clearing partition such that {partition_expr} = {partition_key}"
                                  f" for table {full_table_name}")
                query = f"DELETE FROM {full_table_name} WHERE {partition_expr} = %s"
                cursor.execute(query, (partition_key,))

            # Copy all the file starting with `s3_url_prefix` to the table `table_name`.
            # The role `self.s3_iam_role is used by Redshift to authenticate to S3
            # The parameter EMPTYASNULL ensures that empty strings in the csv are stored as NULL in Redshift
            s3_url_prefix = f"s3://{self.bucket_name}/{s3_object_name}"
            query = f"COPY {full_table_name} FROM '{s3_url_prefix}' IAM_ROLE '{self.s3_iam_role}' CSV gzip EMPTYASNULL"
            cursor.execute(query)

            connection.commit()
        except Exception as e:
            # Abort execution of the operation
            connection.rollback()
            raise Failure(
                description=f"Failed to store data in Redshift for asset_key {context.asset_key} "
                            f"and partition key {partition_key}",
                metadata=dict(
                    asset_key=context.asset_key,
                    partition_key=partition_key,
                    table_name=full_table_name,
                    error=str(e)
                ),
                allow_retries=False
            )

In this method, we handle removing existing data before loading new data. This ensures the Redshift table is updated with the latest information, maintaining data integrity.

  1. upload_data_to_s3 Method: This method uploads the output data in CSV (gzipped) format to Amazon S3, preparing it for loading into Redshift.
    def upload_data_to_s3(self, full_table_name, obj, partition_key):
        buf = BytesIO()
        obj.to_csv(buf, index=False, header=False, compression='gzip')
        s3_object_name = f"{full_table_name}/{partition_key}.csv.gzip"

        client = boto3.client("s3")
        client.put_object(
            Body=buf,
            Bucket=self.bucket_name,
            Key=s3_object_name,
        )

        return s3_object_name

By implementing efficient data-saving mechanisms, our Redshift IO manager ensures that data is seamlessly integrated into Redshift for analysis and processing within Dagster pipelines.

Potential Improvements

To keep the scope of this blog somewhat simple, we have not included all the bells and whistles you might want to have. Potential improvements to the IO manager could be the following:

  • Dagster Key ranges are currently not supported. This wouldn’t be very difficult to implement.
  • It uses Pandas Dataframes, which is not the fastest and is not always an ideal library for data engineering because it performs dynamic type conversions. Polars or Pyarrow could significantly speed up the loading and writing of tables.
  • The data is now uploaded using CSV format; a better format would be Parquet tables.
  • Large batches of data might cause the system to go out of memory (OOM); this could be fixed using iterators and batched files. Similarly Redshift can give errors if you upload to large CSV files with the COPY command, splitting these files up into circa 100mb size speeds up loading of the data and prevents Redshift giving an error.
  • Data loading is not yet implemented; it could be implemented similarly to how the data is currently stored.

The Code

The full code for the IO manager can be found here: Dagster Redshift IO manager

Conclusion

In this blog, we’ve explored the fundamentals of Dagster IO managers and delved into the process of building a custom IO manager for AWS Redshift. Through an in-depth analysis of each step, from schema management to data saving, we’ve provided insights into best practices and tips for developing robust and efficient IO managers.

Looking ahead, there are numerous opportunities for further enhancements and optimizations to our Redshift IO manager, such as supporting key ranges, optimizing data formats and loading strategies, and improving performance and scalability.

In conclusion, Dagster IO managers are a powerful tool for simplifying data pipeline development and management, offering flexibility, modularity, and reliability. By understanding the core principles and best practices outlined in this blog, data engineers have a robust foundation to develop their own IO managers.

About the author

Maximilian Filtenborg

Maximilian is a machine learning enthusiast, experienced data engineer, and co-founder of BiteStreams. In his free-time he listens to electronic music and is into photography.

Read more

Continue Reading

Enjoyed reading this post? Check out our other articles.

Do you want to know whether Dagster is right for you?

Feel free to contact us, we don't bite.

Contact us