Dataset projections can improve performance for a large class of queries. If you have a dataset you want to optimize for multiple query patterns (for example, filtering on two separate columns), you should consider adding projections to the dataset. Specific examples of use cases for projections are listed below.
Each projection typically supports a single query pattern, focused on either improving filters on a set of columns or joins on a set of columns. Multiple projections can be added to a dataset, and all column types are supported in projections. Additionally, projections can be added both to snapshot or incrementally built datasets. The Noho service is used to manage projections and is referenced in the dataset schema when setting up a projection.
Projections have some limitations:
APPEND, SNAPSHOT, and UPDATE transactions that add rows to the dataset. The projection system will automatically identify datasets that violate this requirement and disable projections on them, but as a user, you should still avoid this scenario.Review the example use cases below to learn about whether projections are a good fit for your use case. Get started by learning how to set up a projection.
Many types of queries will benefit from a projection. The examples below will demonstrate the following:
Given an ordered list of columns, optimize filters on any prefix of the list. Projections will only speed up filters that compare a column to a constant value. And any filters on a string column must be case-sensitive.
For example, say that there is a projection optimized for filters on the ordered list of columns ["x", "y", "z"].
It will speed up the following types of queries:
SELECT * FROM dataset WHERE x = 5 AND y = 10 AND z = '15'SELECT * FROM dataset WHERE x = 5 AND y = 10SELECT * FROM dataset WHERE x = 5 AND q = 3
q = 3, Spark will automatically attempt to unpack the filter conditions it can "push" into the data source (in this case, x = 5) and apply the other conditions afterwards.But the following types of queries will not be optimized:
SELECT * FROM dataset WHERE abs(x) == 10
abs(x) == 10 does not compare a column with a constant value.SELECT * FROM dataset WHERE x % 100 == 10
x % 100 == 10 does not compare a column with a constant value.SELECT * FROM dataset WHERE y = 10
["y"] is not a prefix of ["x", "y", "z"].SELECT * FROM dataset WHERE z = '15'
["z"] is not a prefix of ["x", "y", "z"].SELECT * FROM dataset WHERE x = 5 OR q = 3
x = 5 filter into the datasource because that would miss rows where x = 5 is false but q = 3 is true.Projections can optionally speed up range queries on the filter columns, for example:
SELECT * FROM dataset WHERE x > 5 AND x < 10SELECT * FROM dataset WHERE s LIKE 'SOME_PREFIX%'Given an unordered set of columns and a bucket count, optimize joins (only) on that exact set and bucket count.
For example, a projection optimized for joins on {"x", "y"} will optimize the following types of queries:
SELECT * FROM dataset1 INNER JOIN dataset2 ON dataset1.x = dataset2.x AND dataset1.y = dataset2.yBut the following queries will not be optimized:
SELECT * FROM dataset1 INNER JOIN dataset2 ON dataset1.x = dataset2.xIn Foundry, joins of large datasets typically perform a sort-merge join. This involves partitioning each of the datasets according to the join key, sorting each partition on that key, and then merging the (sorted) partitions with the same key.
Most projection consumers in Foundry do not by default take advantage of the fact that the projection is already sorted when doing sort-merge joins, so you may still see a sort in your Spark query plan. In transforms, you can use the Spark profile BUCKET_SORTED_SCAN_ENABLED to modify Spark's behavior based on the fact that the projection is sorted, but this does not always improve performance and can actually make performance worse.
In Foundry, aggregations usually involve performing a shuffle exchange on the dataset (i.e., partitioning the dataset) according to the aggregation key. When reading from a projection that is hash-bucketed (which is the case for all join-optimized projections, but may not be the case for a filter-optimized projection), consumers can avoid this shuffle, which can lead to dramatic performance improvements during aggregations. A primary key expectation check on a dataset that is updated incrementally in an append-only fashion is a case where a projection (on the primary key columns) is especially beneficial, both because of the compaction performed by the projection and because of the fact that the hash-bucketing of the projection can be leveraged in the aggregation used to compute the expectation.
Incremental pipelines can result in very high file counts, and correspondingly degraded read performance. For example, a pipeline that writes ten partitions every five minutes will write over a million files per year. These are difficult to read for many reasons, including such things as simply producing the list of input partitions.
Projections provide a way to compact incremental pipelines such as these transparently. Just set up a projection, optimized for either filters or joins, and reads will use the projection. Learn more about using projections for incremental pipelines.
Note that consumers can still take advantage of the filter or join optimizations of the projection, even if the projection is not fully up-to-date with the canonical dataset, as long as there were no SNAPSHOT or DELETE transactions (or UPDATE transactions that modified existing files) on the canonical dataset between the latest transaction at the time that the projection was last built and the current latest transaction on the canonical dataset.
CSV is an inefficient file format for querying data, but it is common for data that is uploaded to Foundry (via Data Connection from a file system source, manual uploads, etc.) to initially be in CSV format. Creating a transform job is one way to convert these CSVs to a more efficient format such as Parquet. Alternatively, you can add a projection; read operations will use the optimized projection for better performance, while the dataset still contains the original CSV files.