Revolutionize Your Python Data Processing with Type-Safe Pipelines using MyPy plugin— Part 1

Volkov Aleksandr
3 min readMar 4, 2024

--

A Common Challenge

Picture this: you’re staring at a Python script filled with data transformations, and you realize you’ve lost track of what’s going on. How can you clarify the data state without running the entire application? There’s a powerful solution that can transform the way you handle data processing: Type-Safe Pipelines.

Type Families for pipeline

Type Families are type-level functions that allow us to modify types during static analysis. In simpler terms, they enable us to change the types of data while the program is being analyzed, rather than during runtime. This opens up a world of possibilities for enhancing the clarity and robustness of our data processing pipelines. And Type-safe data processing pipelines article describes what opportunities this opens up. To be short, lets take a look at this picture:

For example, let’s say we have a data processing pipeline that involves transforming data. We can use Type Families to ensure that each transformation step produces data with the correct labels, such as “Subtotal”, “Rounded”” or any another which reflect a state of data. If a transformation step produces data without these labels, it’s flagged as an error during static analysis, preventing potential runtime errors.

Introducing typing-protocol-intersection

At the heart of Type-Safe Pipelines is the typing-protocol-intersection library. This powerful tool introduces the ProtocolIntersection as Has type hint, which allows you to simplify protocol intersection and ensure type safety in your pipelines. By leveraging this library, you can enhance the clarity, reliability, and maintainability of your data processing code.

Practical Example: A Python Pipeline

Let’s dive into a practical example to see how Type-Safe Pipelines can revolutionize your approach to data processing. Consider a scenario where we have several tables in a database (Customers, Addresses, Products, Orders, and OrderItems) and we want to perform various transformations on this data using a type-safe pipeline.

Creating the Data: We start by creating DataFrames for each table using Spark. Defining Transformations: We define functions for transforming the data, such as rounding prices, adding subtotals, and calculating total amounts.
Implementing the Pipeline: Using a custom Pipeline class, we chain these transformations together, ensuring type safety at each step.

class Pipeline(Generic[T]):
def __init__(self, df: DataFrame) -> None:
super().__init__()
self.df = df
def round_prices(self) -> 'Pipeline[Has[T, Rounded]]':
self.df = round_prices(self.df)
return self # type: ignore

def add_subtotal(self, order_items_df: DataFrame) -> 'Pipeline[Has[T, Subtotal]]':
self.df = add_subtotal(order_items_df, self.df)
return self # type: ignore

def add_total_amount(self: 'Pipeline[Has[T, Rounded, Subtotal]]', orders_df: DataFrame) -> 'Pipeline[Has[T, TotalAmount]]':
self.df = add_total_amount(orders_df, self.df)
return self # type: ignore

def build(self) -> DataFrame:
return self.df

Where Rounded, Subtotal and TotalAmount is our Protocols and T is covariant.

Running the Pipeline: Finally, we execute the pipeline, ensuring that the data flows smoothly through each transformation, with type safety guaranteed at every stage.

orders_pipe = (
Pipeline(products_df)
.round_prices().add_subtotal(order_items_df).add_total_amount(orders_df)
)

Let’s try to break the pipeline and forgot to use round_prices():

orders_pipe = (
Pipeline(products_df)
.add_subtotal(order_items_df).add_total_amount(orders_df)
)error: Invalid self argument "Pipeline[ProtocolIntersection[Subtotal]]" to attribute function "add_total_amount"
with type "Callable[[
Pipeline[typing_protocol_intersection.types.ProtocolIntersection​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​[T, Rounded, Subtotal]], DataFrame],
Pipeline[typing_protocol_intersection.types.ProtocolIntersection​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​[T, Total_Amount]
]]"

add_total_amount waits for [Rounded, Subtotal] but got only [Subtotal] what cause an error! After all transformations we can unpack our data from Pipeline with build():

orders_pipe.build().show()
+--------+-----------+----------+------------+
|Order_ID|Customer_ID|Order_Date|Total_Amount|
+--------+-----------+----------+------------+
| 1| 1|2024-02-18| 1600|
| 3| 3|2024-02-18| 1200|
| 2| 2|2024-02-18| 2200|
+--------+-----------+----------+------------+

You can find full example here.

Conclusion and Next Steps

By embracing type-safe pipelines in Python, we can transform the way we handle data processing, ensuring clarity, reliability, and maintainability in our code. As a next step, consider exploring the typing-protocol-intersection plugin further and sharing your experiences with the Python community. Together, we can revolutionize the way data processing pipelines are designed and implemented in Python. Stay tuned for another part!

--

--

Volkov Aleksandr
Volkov Aleksandr

Written by Volkov Aleksandr

Grew up in web python, live in big data, dream in fp

No responses yet