Hive-style partitioning is a method for optimizing the layout of data in a dataset in order to dramatically improve the performance of queries that filter on particular columns. In the context of Foundry Spark-based transforms, hive-style partitioning is performed in the following fashion:
Dataset readers that use compute engines such as Spark or Polars and that filter on these columns can automatically leverage the metadata in the transaction metadata and file paths in order to narrow down the files to read.
Because at least one file is written for each unique combination of partition column values in the data, and writing an excessive amount of files results in poor write and subsequent read performance, hive-style partitioning is not suited for columns with very high cardinality (many unique values and only a few rows for each value).
The below minimal examples show how to configure hive-style partitioning when writing data to the output in Python and Java.
In these examples, we repartition the dataframe using repartitionByRange ↗ on the partition columns before writing to the output. Repartitioning ensures that the output contains only one file per unique combination of partition column values, rather than one file per unique combination of partition column values in each input dataframe partition. Skipping this repartition step can result in an excessive amount of files in the output dataset, causing poor write and read performance.
repartitionByRange
is generally preferred over repartition ↗ in the context of hive-style partitioning because repartitionByRange
uses sampling to estimate partition ranges that will distribute data as evenly as possible. Conversely, repartition
uses a hash function modulo by the number of partitions to assign values to dataframe partitions; for columns with low cardinality, this hash-and-modulo operation has a high likelihood of distributing data unevenly, even if the original data is relatively evenly distributed across values. Uneven data distribution (skew) can cause Spark executor out-of-memory errors and job failures.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform, Input, Output @transform( transform_output=Output("/path/to/output"), transform_input=Input("/path/to/input"), ) def compute(transform_output, transform_input): transform_output.write_dataframe( transform_input.dataframe().repartitionByRange("record_date", "department"), partition_cols=["record_date", "department"], )
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
package myproject.datasets; import com.palantir.foundry.spark.api.DatasetFormatSettings; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import static org.apache.spark.sql.functions.col; public final class HivePartitioningInJava { @Compute public void myComputeFunction( @Input("ri.foundry.main.dataset.e2dd4bcf-7985-461c-9d08-ee0edd734a1a") FoundryInput myInput, @Output("ri.foundry.main.dataset.4b62bf9b-3700-40f6-9e85-505eaf87e57d") FoundryOutput myOutput) { myOutput.getDataFrameWriter( myInput.asDataFrame().read().repartitionByRange(col("record_date"), col("department"))) .setFormatSettings(DatasetFormatSettings.builder() .addPartitionColumns("record_date", "department") .build()) .write(); } }