Revolutionize Your Python Data Processing with Type-Safe Pipelines using MyPy plugin— Part 1
A Common Challenge
Picture this: you’re staring at a Python script filled with data transformations, and you realize you’ve lost track of what’s going on. How can you clarify the data state without running the entire application? There’s a powerful solution that can transform the way you handle data processing: Type-Safe Pipelines.
Type Families for pipeline
Type Families are type-level functions that allow us to modify types during static analysis. In simpler terms, they enable us to change the types of data while the program is being analyzed, rather than during runtime. This opens up a world of possibilities for enhancing the clarity and robustness of our data processing pipelines. And Type-safe data processing pipelines article describes what opportunities this opens up. To be short, lets take a look at this picture:
For example, let’s say we have a data processing pipeline that involves transforming data. We can use Type Families to ensure that each transformation step produces data with the correct labels, such as “Subtotal”, “Rounded”” or any another which reflect a state of data. If a transformation step produces data without these labels, it’s flagged as an error during static analysis, preventing potential runtime errors.
Introducing typing-protocol-intersection
At the heart of Type-Safe Pipelines is the typing-protocol-intersection
library. This powerful tool introduces the ProtocolIntersection as Has
type hint, which allows you to simplify protocol intersection and ensure type safety in your pipelines. By leveraging this library, you can enhance the clarity, reliability, and maintainability of your data processing code.
Practical Example: A Python Pipeline
Let’s dive into a practical example to see how Type-Safe Pipelines can revolutionize your approach to data processing. Consider a scenario where we have several tables in a database (Customers, Addresses, Products, Orders, and OrderItems) and we want to perform various transformations on this data using a type-safe pipeline.
Creating the Data: We start by creating DataFrames for each table using Spark. Defining Transformations: We define functions for transforming the data, such as rounding prices, adding subtotals, and calculating total amounts.
Implementing the Pipeline: Using a custom Pipeline class, we chain these transformations together, ensuring type safety at each step.
class Pipeline(Generic[T]):
def __init__(self, df: DataFrame) -> None:
super().__init__()
self.df = df
def round_prices(self) -> 'Pipeline[Has[T, Rounded]]':
self.df = round_prices(self.df)
return self # type: ignore
def add_subtotal(self, order_items_df: DataFrame) -> 'Pipeline[Has[T, Subtotal]]':
self.df = add_subtotal(order_items_df, self.df)
return self # type: ignore
def add_total_amount(self: 'Pipeline[Has[T, Rounded, Subtotal]]', orders_df: DataFrame) -> 'Pipeline[Has[T, TotalAmount]]':
self.df = add_total_amount(orders_df, self.df)
return self # type: ignore
def build(self) -> DataFrame:
return self.df
Where Rounded
, Subtotal
and TotalAmount
is our Protocols and T is covariant.
Running the Pipeline: Finally, we execute the pipeline, ensuring that the data flows smoothly through each transformation, with type safety guaranteed at every stage.
orders_pipe = (
Pipeline(products_df)
.round_prices().add_subtotal(order_items_df).add_total_amount(orders_df)
)
Let’s try to break the pipeline and forgot to use round_prices():
orders_pipe = (
Pipeline(products_df)
.add_subtotal(order_items_df).add_total_amount(orders_df)
)error: Invalid self argument "Pipeline[ProtocolIntersection[Subtotal]]" to attribute function "add_total_amount"
with type "Callable[[
Pipeline[typing_protocol_intersection.types.ProtocolIntersection[T, Rounded, Subtotal]], DataFrame],
Pipeline[typing_protocol_intersection.types.ProtocolIntersection[T, Total_Amount]
]]"
add_total_amount waits for [Rounded, Subtotal] but got only [Subtotal] what cause an error! After all transformations we can unpack our data from Pipeline with build():
orders_pipe.build().show()
+--------+-----------+----------+------------+
|Order_ID|Customer_ID|Order_Date|Total_Amount|
+--------+-----------+----------+------------+
| 1| 1|2024-02-18| 1600|
| 3| 3|2024-02-18| 1200|
| 2| 2|2024-02-18| 2200|
+--------+-----------+----------+------------+
You can find full example here.
Conclusion and Next Steps
By embracing type-safe pipelines in Python, we can transform the way we handle data processing, ensuring clarity, reliability, and maintainability in our code. As a next step, consider exploring the typing-protocol-intersection
plugin further and sharing your experiences with the Python community. Together, we can revolutionize the way data processing pipelines are designed and implemented in Python. Stay tuned for another part!