注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
時系列パイプラインの設定には、Pipeline Builder を使用して 時系列設定 ページで説明されているように行うことをお勧めします。これにより、以下で説明する変換の最適化が自動的に適用されます。
高度な設定構成を進める前に、Palantirの担当者に連絡してください。
Pipeline Builderで提供されていない低レベルの変換制御や高度な機能が必要な場合、このページでは、データ変換に使用される Code Repositories を使用して、手動で時系列パイプラインを設定する方法について説明します。
Code Repositories を使用して時系列を設定するには、以下を完了する必要があります。
Pipeline Builderの時系列出力を使用して時系列同期を作成すると、時系列データセットが自動的に生成され、時系列データセットと同期が正確に設定されます。手動でパイプラインを設定する場合、フォーマットされた 時系列 データを含む時系列データセットを明示的に生成し、時系列同期を作成するために必要です。データセットには、用語集で指定されたように、Series ID、Value、および Timestamp 行が含まれている必要があります。
シリーズIDのすべての値は同じデータセットに含まれている必要があります。値はシリーズIDによって取得されるため、1つの時系列データセットに複数のシリーズIDのすべての値を含めることができます。例:
+------------------------+---------------------+---------+
| series_id | timestamp | value |
+------------------------+---------------------+---------+
| Machine123_temperature | 01/01/2023 12:00:00 | 100 | // Machine123の温度データ、時間:01/01/2023 12:00:00、値:100
| Machine123_temperature | 01/01/2023 12:01:00 | 99 | // Machine123の温度データ、時間:01/01/2023 12:01:00、値:99
| Machine123_temperature | 01/01/2023 12:02:00 | 101 | // Machine123の温度データ、時間:01/01/2023 12:02:00、値:101
| Machine463_temperature | 01/01/2023 12:00:00 | 105 | // Machine463の温度データ、時間:01/01/2023 12:00:00、値:105
| Machine123_pressure | 01/01/2023 12:00:00 | 3 | // Machine123の圧力データ、時間:01/01/2023 12:00:00、値:3
| ... | ... | ... | // 他のデータ...
+------------------------+---------------------+---------+
時間系列データセットは、通常、ライブデータがある場合にインクリメンタルに構築されるように設定されています。インクリメンタルビルドにより、コンピューティングコストを節約し、生データが取り込まれるタイミングから最新のデータが読み取れるまでのレイテンシを大幅に短縮することができます。
インクリメンタルな時間系列ビルドの利点については、FAQ ドキュメントを参照してください。
コードで時間系列データセットを生成する際は、以下のようにデータセットをフォーマットしてから書き込みます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# transforms.apiからtransform, Input, Outputをインポートします from transforms.api import transform, Input, Output # transformデコレータを使用して、入力と出力データセットを設定します @transform( output_dataset=Output("/path/to/output/dataset"), # 出力データセットのパスを指定します input_dataset=Input("/path/to/input/dataset") # 入力データセットのパスを指定します ) # my_compute_functionという名前の関数を定義します def my_compute_function(output_dataset, input_dataset): # 入力データセットからデータフレームを作成し、 # 'seriesId'に基づいてパーティションを再分割し、各パーティション内で'seriesId'と'timestamp'によってソートします output_dataframe = ( input_dataset .dataframe() .repartitionByRange('seriesId') .sortWithinPartitions('seriesId', 'timestamp') ) # ソートされたデータフレームを出力データセットとして書き込みます。出力フォーマットは'soho'です output_dataset.write_dataframe(output_dataframe, output_format='soho')
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
package myproject.datasets; import com.palantir.transforms.lang.java.api.Compute; import com.palantir.transforms.lang.java.api.FoundryInput; import com.palantir.transforms.lang.java.api.FoundryOutput; import com.palantir.transforms.lang.java.api.Input; import com.palantir.transforms.lang.java.api.Output; import com.palantir.foundry.spark.api.DatasetFormatSettings; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import java.util.Collections; // タイムシリーズデータを書き込むクラス public final class TimeSeriesWriter { // パーティション分割されたデータセットを書き込むメソッド @Compute public void writePartitioned( // 入力データセット @Input("/path/to/input/dataset") FoundryInput inputDataset, // 出力データセット @Output("/path/to/output/dataset") FoundryOutput outputDataset) { // 入力データセットをデータフレームとして読み込む Dataset<Row> inputDataframe = inputDataset.asDataFrame().read(); // 出力データフレームを作成し、パーティションを再分割してソートする Dataset<Row> outputDataframe = inputDataframe .repartitionByRange(inputDataframe.col('seriesId')) .sortWithinPartitions('seriesId', 'timestamp'); // 出力データセットにデータフレームを書き込む outputDataset.getDataFrameWriter(outputDataframe) .setFormatSettings(DatasetFormatSettings.builder() .format('soho') .build()) .write(); } }
このリパーティションとソートを実行することで、データセットを時系列としてパフォーマンスの高い使用に最適化できます。最低限、データセットは Soho 形式(表示されているように)にする必要があります。これにより、新しいデータがまだ投影されていない場合でも、時系列データベースにインデックスされます。また、repartitionByRange()
で書き込むパーティションの数を、以下のガイダンスに基づいて、パイプラインに適した数に設定する必要があります。
書き込めるパーティションの最低数は、パーティションが十分小さくてエグゼキュータに収まるようにし、十分にパーティション化してパイプラインのレイテンシを達成するためのものです。より多くのパーティションを書き込むと、パーティションが小さくなりジョブが速くなりますが、大きなパーティションに比べて最適ではありません。
新しい時系列同期を作成するには、https://<domain>/workspace/time-series-catalog-app/new
に直接移動してください。同期を保存する場所を選択するよう求められますが、これは時系列データセットを含むプロジェクトか、それを参照としてインポートしているプロジェクトにする必要があります。
時系列データセットを入力として選択し、データセットの行を時系列同期のシリーズ ID、値、およびタイムスタンプにマッピングを完了します。タイムスタンプ行が Long
型の場合、SECONDS
、MILLISECONDS
、MICROSECONDS
、または NANOSECONDS
の単位を指定してください。
時系列同期がビルドされると、時系列データセットからのメタデータが同期され、Foundry がオンデマンドで時系列データベースに時系列データをインデックスできるようになります。
制限付きビューは、ユーザーが閲覧する権限がある行にのみデータセットへのアクセスを制限します。制限付きビューでバックアップされたオブジェクトタイプを操作する場合、時系列同期を設定してマーキングを継承しないようにする必要があります。
時系列データセットの各マーキングの継承を停止するには、マーキングの横にある継承を停止を選択してください。
終了したら、ページの上部にある保存を選択してください。
時系列同期ビルドのSpark プロファイルを設定することも可能ですが、これは非常にまれです。
デフォルトでは、同期は入力時系列データセットが更新されたときに実行されるようにスケジュールされます。時系列データが最新の状態に保たれるようにするため、この設定をお勧めします。
別の時系列同期で交差するシリーズ ID を書き込んでいて、その同期を新しいものに置き換えたい場合は、詳細オプションを表示 > 他のデータセットでバックアップされた同期からのシリーズの上書きの下に古い同期を指定できます。これにより、古い同期が失敗し、ゴミ箱に入れる必要があります。
時系列オブジェクトタイプの元データセットは、お好みの方法で生成できますが、用語集で指定されたスキーマに準拠する必要があります。
時系列オブジェクトタイプの元データセットを自動的に生成するには、時系列データセットと同じ変換で生成できます。ここで、シリーズ ID の別個のセットを取得し、それらにメタデータを抽出/マップすることができます。インクリメンタルパイプラインでは、マージと追加パターンを使用してこれを実現できます。
時系列オブジェクトタイプの元データセットでオブジェクトタイプを作成する標準プロセスに従ってください。また、データセットプレビューですべてのアクション > オブジェクトタイプの作成を選択することで、データセットから直接オブジェクトタイプを生成することもできます。オブジェクトタイプを作成する際、どのプロパティが時系列プロパティであるかを指定して、時系列用に設定してください。