データ統合パイプラインの最適化とビルドSparkSpark の詳細を理解する

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Spark の詳細を理解する

Apache Spark は、Foundry のデータ統合レイヤーで最も一般的に使用される実行エンジンです。パイプラインのパフォーマンス特性を理解し、パイプラインの最適化方法を見つけるためには、Spark でのコードの実行方法の詳細を理解することが重要です。Foundry は、Spark でのジョブのパフォーマンスを表示し、理解するための統合ツールを提供しています。このページでは、利用可能な Spark の詳細を概説し、それらの詳細が意味することについてのガイダンスを提供します。

Spark の詳細にアクセスする

Foundry で構築されたデータセットについては、Spark の詳細を表示するために以下の手順を実行します。

  1. ビルドレポートを表示する
    • データセットプレビュー から、または Data Lineage から、履歴 タブを選択し、リストの中から個々のビルドを選択し、ビルドレポートを表示 を選択します。
    • すべてのビルド の表示から、リストの中からビルドを選択するだけです。
  2. ジョブを選択する。ビルドは1つ以上のジョブで構成され、ガントチャートの下にリストで表示されます。リストからジョブを選択し、Spark の詳細 ボタンを選択します。

view spark details

Spark の詳細ページは、Spark でのジョブの実行に関する情報を提供します。各ジョブについて、Spark の詳細ページでは、以下に示すように、さまざまなカテゴリで情報が表示されます。

Spark details tabs

概要タブ

概要タブでは、ジョブに関する以下の情報が提供されます。

ハイレベルのジョブメトリクス

  • すべてのタスクの合計ランタイム: すべてのステージのすべてのタスクのランタイムの合計
  • ジョブの持続時間: Spark の計算の持続時間(最初のステージの開始から最後のステージの完了までの時間)

これらの2つのメトリクスを使用して、並列性の比率を次のように計算できます。

すべてのタスクの合計ランタイム / ジョブ持続時間

比率が1に近いと、並列性が低いことを示します。

  • ディスクのスピル: すべてのステージ全体で、エクスキュータの RAM からディスクに移動されたデータのサイズ。

    • これは、データがエクスキュータのメモリに収まらない場合に発生します。ディスクへの読み書きは遅い操作であるため、ジョブがスピルした場合、著しく遅くなります。場合によっては、発生している計算のタイプによって、スピルがエクスキュータのメモリ不足を引き起こし、ジョブが失敗することがあります。
    • 大きなデータセットの場合、ディスクのスピルは予想されます。
  • シャッフルの書き込み: ジョブ全体で、すべてのステージでシャッフルされたデータの量。

    • シャッフルは、データが Spark のステージ間およびパーティション間で移動されるプロセスです。例えば、ジョインを計算するために(テーブルがいずれもブロードキャストされていない場合)、集約を実行するために、またはリパーティショニングを適用するために行われます。
    • シャッフルは、ネットワーク IO とディスク IO の両方を引き起こすため、ジョブのランタイムの大部分を占めることがあります。
    • したがって、パフォーマンスの高い Spark ジョブを作成する主な目標は、シャッフルを最小限に抑えることです。例えば、ブロードキャスト可能なジョインが実際にブロードキャストされていることを確認すること、下流のジョブで同じキーで結合されたり集約されたりする可能性が高いデータセットに対してバケット化を利用すること(これにより、このデータセットの下流でのシャッフルを回避する)、または不要なリパーティショニングステップを回避することによってです。

ステージ実行タイムラインとステージ間の依存関係

ジョブの開始時に、Spark はトランスフォームのコードを解釈して実行計画を作成します。これは、相互依存関係を持つ一連のステージとして表現できます。次のグラフは、ステージの実行タイムラインを示しています。

Builds application stage timeline

最も左側のステージは通常、入力の読み込みを表し、最も右側のステージは通常、出力の書き込みを表します。 上記の例では、ステージ 28、30、31、32、33、および 35 の実行に時間がかかるため、このジョブのランタイムを最適化する良い候補です。

ステージ 28、30、31、33、および 35 は並行して実行できるため、相互依存関係はありません。ただし、ステージ 32 は、すべての前のステージが終了したときにのみ開始されるため、以下のことが示されます。

  • ステージ 35 のランタイムを減らしても、待ち時間が max_runtime(28, 30, 31, 33, 35) で支配されるため、目立った改善は見られません。目に見える改善を得るためには、これらすべてのステージを加速させる必要があります。
  • ステージ 32 はジョブのボトルネックであり、ジョブの持続時間の約 35 パーセントを占めています。

タスク並行性チャート

タスク並行性チャートは、リソースの使用状況を理解するのに役立ちます。ステージの並行性を時間とともにプロットします。ジョブの並行性と同様に、ステージの並行性は次のように計算できます。

ステージ内のタスクの合計ランタイム / ステージの持続時間

タスク並行性チャートの時間軸は、上のガントチャートのステージと共有されているため、相関関係を特定しやすくなっています。

Builds application task concurrency timeline

上記のチャートでは、ステージ 32 の並行性はほぼ 1 です。これは、このステージで行われるほぼすべての作業が 1 つの(非常に長い)タスクで行われることを意味し、計算が分散されていないことを示しています。

完全に分散されたジョブは、次のようになります。 Builds application task concurrency perfectly distributed

ステージ詳細

特定のステージが失敗したり遅かったりする理由を理解しようとするとき、さらに情報が役立ちます。残念ながら、ステージがどのようにして元のコードや物理プランに遡るのかを自動的に追跡することは現在、Spark がコードをステージに変換するときにこのデータフローを公開していないため、不可能です。

ステージの概要は、失敗したり長時間実行されたりするステージの調査に役立ちます。 Builds application stage details skewed

タスクの半分は 2 秒未満で終了しますが、もっとも興味深いのは最大ランタイムです。1 つのタスクがステージの合計ランタイムの約 63% を占めています。これは、前述のチャートで示されたように、このステージがボトルネックであり、ほぼすべての作業が 1 つのタスクで行われていることと一致しています。

詳細を知るために、ステージの詳細にジャンプできます。 Builds application task details skewed

これは、このステージで実行されたタスクのサンプルと、ステージ自体に関連するメトリクスを示しています。

タスク 22267-0 は 1 時間 16 分かかるので、最も遅いものです。実際、このタスクは 81M の行を処理していますが、他のタスクは 10K ~ 700K の行を処理しています。この歪みの症状は次のとおりです。

  • 高いディスクスピル: 190GB 対 他のタスクでは 0
  • 高いエクスキュータのピークメモリ: 4.5GB 対 他のタスクでは 1GB

エグゼキュータタブ

Executors

エグゼキュータ タブは、Spark ジョブのドライバーやエグゼキュータから特定のメトリクスを取得し、スタックトレースやメモリヒストグラムを含みます。これらのメトリクスは、Spark ジョブのパフォーマンス問題をデバッグする際に役立ちます。

スナップショット ボタンを選択すると、実行中のジョブから Java スタックトレースまたはドライバー専用のメモリヒストグラムがキャプチャされます。ジョブが実行中の状態でなければなりません(ジョブが完了している場合、これらのメトリクスはもう収集できません)。

スタックトレースは、Spark ジョブの各スレッドがその時点で実行している内容を確認する方法です。例えば、ジョブがハングしているように見える場合(つまり、進行が期待通りでない場合)、スタックトレースを取得することで、その時点で実行されている内容が明らかになります。

メモリヒストグラムは、現在ヒープ上にある Java オブジェクトの数とそのサイズ(バイト単位)を表示します。メモリの使用状況を理解し、メモリ関連の問題をデバッグする際に役立ちます。

メトリクスを取得することで、実行中のジョブのパフォーマンスに影響があることに注意してください。これらのメトリクスの収集は、JVM が行う追加の作業です。例えば、メモリヒストグラムの取得はガーベジコレクションをトリガーします。