We recommend setting up your time series pipeline using Pipeline Builder as explained in the time series setup page. Doing so will automatically apply the transform optimizations described below.
Contact your Palantir representative before proceeding with an advanced setup configuration.
If you require low-level transform control or advanced functionality not yet provided by Pipeline Builder, this page describes how to manually set up your time series pipeline with Code Repositories used for data transformations.
To set up time series with Code Repositories, you must complete the following:
When you create a time series sync using the time series output of Pipeline Builder the time series dataset is automatically generated and both the time series dataset and sync are correctly configured for you. When you manually set up your pipeline, you must explicitly generate a time series dataset that contains your formatted time series data and is required to create a time series sync. The dataset must contain Series ID, Value, and Timestamp columns as specified in the glossary so they can be mapped in the time series sync.
All values for a series ID should be contained in the same dataset. Since values are fetched by their series ID, a single time series dataset can contain all values for multiple series IDs. For example:
+------------------------+---------------------+---------+
| series_id | timestamp | value |
+------------------------+---------------------+---------+
| Machine123_temperature | 01/01/2023 12:00:00 | 100 |
| Machine123_temperature | 01/01/2023 12:01:00 | 99 |
| Machine123_temperature | 01/01/2023 12:02:00 | 101 |
| Machine463_temperature | 01/01/2023 12:00:00 | 105 |
| Machine123_pressure | 01/01/2023 12:00:00 | 3 |
| ... | ... | ... |
+------------------------+---------------------+---------+
Time series datasets are typically configured to build incrementally when there is live data. Incremental builds allow you to save on compute costs and achieve a much shorter latency from when raw data is ingested to when up-to-date data can be read.
For more information about the benefits of incremental time series builds, see the FAQ documentation.
When generating the time series dataset in code, format the dataset before writing as follows:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from transforms.api import transform, Input, Output @transform( output_dataset=Output("/path/to/output/dataset"), input_dataset=Input("/path/to/input/dataset") ) def my_compute_function(output_dataset, input_dataset): output_dataframe = ( input_dataset .dataframe() .repartitionByRange('seriesId') .sortWithinPartitions('seriesId', 'timestamp') ) output_dataset.write_dataframe(output_dataframe, output_format='soho')
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.foundry.spark.api.DatasetFormatSettings; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import java.util.Collections; public final class TimeSeriesWriter { @Compute public void writePartitioned( @Input("/path/to/input/dataset") FoundryInput inputDataset, @Output("/path/to/output/dataset") FoundryOutput outputDataset) { Dataset<Row> inputDataframe = inputDataset.asDataFrame().read(); Dataset<Row> outputDataframe = inputDataframe .repartitionByRange(inputDataframe.col('seriesId')) .sortWithinPartitions('seriesId', 'timestamp'); outputDataset.getDataFrameWriter(outputDataframe) .setFormatSettings(DatasetFormatSettings.builder() .format('soho') .build()) .write(); } }
Running this repartition and sort will optimize your dataset for performant usage as time series. At a minimum, your dataset should also be formatted as Soho (as shown) for new data to be indexed to the time series database when it is not yet projected. You should also configure the number of partitions written by repartitionByRange()
↗ to an appropriate number for your pipeline, based on the following guidance:
The limit for the lowest number of partitions you can write is informed by writing small enough partitions that they fit on an executor, but enough partitions that your job will parallelize sufficiently for the pipeline latency that you desire. Writing more partitions results in smaller partitions and faster jobs but will not be as optimal as larger partitions.
To create a new time series sync, navigate directly to https://<domain>/workspace/time-series-catalog-app/new
. You will be prompted to choose a location to save your sync, which must be in a Project that contains your time series dataset or imports it as a reference.
Select your time series dataset as the input, then complete the mapping of your dataset columns to the time series sync's Series ID, Value, and Timestamp. If your Timestamp column is a Long
type, specify if it is a SECONDS
, MILLISECONDS
, MICROSECONDS
, or NANOSECONDS
unit.
When the time series sync builds, it syncs metadata from the time series dataset, enabling Foundry to index your time series data in its time series database on demand.
Restricted Views limit dataset access to only the rows that a user has permission to view. When working with an object type backed by a Restricted View, you must configure your time series sync to stop inheriting Markings.
Stop inheriting each Marking on the time series dataset by selecting Stop inheriting next to the Marking.
When you are finished, select Save at the top of the page.
While it is possible to configure Spark profiles for time series sync builds, this is very rarely necessary.
By default, the sync will be scheduled to run when the input time series dataset updates. We recommend this setting to ensure your time series data is kept up-to-date.
If you wrote intersecting series IDs in another time series sync and would like to replace that sync with a new one, you can specify the old sync under Show advanced options > Overwrite series from other syncs backed by other datasets. Doing this will cause the old sync to fail, and it should then be trashed.
You may generate the time series object type backing dataset by your preferred method, and it should conform to the schema specified in the glossary.
To automatically generate the time series object type backing dataset, you can generate it in the same transform as your time series dataset where you can take the distinct set of series IDs and extract/map metadata from/to them. In an incremental pipeline, you can use the merge and append pattern to achieve this.
Follow the standard process to create an object type on your time series object type backing dataset. It is also possible to generate the object type directly from the dataset by selecting All actions > Create object type in the dataset preview. When creating the object type, configure it for time series by specifying which properties should be time series properties.