Register the wrapped compute function as a pandas transform.
To use the pandas library, you must add pandas as a run dependency in your meta.yml file. For more information, refer to the documentation ↗.
The transform_pandas decorator is used to construct a Transform object from a compute function that accepts and returns pandas.DataFrame ↗ objects. This decorator is similar to the transform_df() decorator, however the pyspark.sql.DataFrame ↗ objects are converted to pandas.DataFrame ↗ object before the computation, and converted back afterwards.
Copied!1 2 3 4 5 6 7 8>>> @transform_pandas( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(first_input, second_input): ... # type: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame ... return first_input.concat(second_input)
Note that transform_pandas should only be used on datasets that can fit into memory. If you have larger datasets that you wish to filter down before converting to pandas you should write your transformation using the transform_df() decorator and the pyspark.sql.SparkSession.createDataFrame() ↗ method.
Copied!1 2 3 4 5 6 7 8 9 10>>> @transform_df( ... Output('/path/to/output/dataset'), # An unnamed Output spec ... first_input=Input('/path/to/first/input/dataset'), ... second_input=Input('/path/to/second/input/dataset'), ... ) ... def my_compute_function(ctx, first_input, second_input): ... # type: (pyspark.sql.DataFrame, pyspark.sql.DataFrame) -> pyspark.sql.DataFrame ... pd = first_input.filter(first_input.county == 'UK').toPandas() ... # Perform pandas operations on a subset of the data before converting back to a PySpark DataFrame ... return ctx.spark_session.createDataFrame(pd)