データ統合パイプラインの最適化とビルドSparkSpark プロファイルリファレンス

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Spark プロファイルリファレンス

このページでは、Foundry で利用可能な Spark プロファイルのリファレンスをご紹介します。Spark とプロファイルについて詳しくは以下を参照してください。

ドライバーコア

このファミリのプロファイルでは、spark.driver.cores の値を設定します。

これにより、Spark ドライバーに割り当てられる CPU コア数が制御されます。実際には、同じ Spark モジュール内で多数の Spark ジョブが同時に実行される特殊なケースを除いて、この値を上書きする必要はありません。

ドライバーメモリ

このファミリのプロファイルでは、spark.driver.memory の値を設定します。

これにより、Spark ドライバー JVM に割り当てられるメモリ量が制御されます。例えば、ドライバーに大量のデータを収集したり、大規模なブロードキャスト結合を実行したりする場合、この値を増やす必要があるかもしれません。

これは JVM メモリのみを制御し、Python プロセスで利用可能なメモリは制御しません。Pandas を使用してローカルで大量のデータを変換する場合は、別のプロファイルが必要です。

エグゼキュータコア

このファミリのプロファイルでは、spark.executor.cores の値を設定します。

これにより、各 Spark エグゼキュータに割り当てられる CPU コア数が制御され、各エグゼキュータで同時に実行されるタスク数が制御されます。実際には、通常の変換ジョブではこの値を上書きする必要はほとんどありません。

エグゼキュータメモリ

このファミリのプロファイルでは、spark.executor.memory および関連設定の値を設定します。

これにより、各 Spark エグゼキュータ JVM に割り当てられるメモリ量が制御されます。各 Spark タスクで処理されるデータ量が非常に大きい場合、この値を増やす必要があるかもしれません。

このメモリは、エグゼキュータ上で実行されるすべてのタスク間で共有されます(エグゼキュータ コア プロファイルによって制御される)。

エグゼキュータメモリオーバーヘッド

このファミリのプロファイルでは、spark.executor.memoryOverhead の値を設定します。

これにより、Spark エグゼキュータ JVM メモリに加えて、各コンテナに割り当てられるメモリ量が制御されます。JVM の外で大量のメモリが必要なジョブの場合、この値を増やす必要があるかもしれません。

エグゼキュータ数

このファミリのプロファイルでは、spark.executor.instances および関連設定の値を設定します。

これにより、ジョブの実行に要求されるエグゼキュータの数が制御されます。この値を増やすと、並行して実行できるタスクの数が増えるため、パフォーマンスが向上します(ジョブが十分に並列である場合)が、より多くのリソースを使用することになります。

実際には、非常に迅速に実行する必要がある特定の組織的要件を持つ大規模なジョブでのみ、この値を上書きする必要があるでしょう。

動的割り当て

このファミリのプロファイルでは、spark.dynamicAllocation.enabledspark.dynamicAllocation.minExecutors、および spark.dynamicAllocation.maxExecutors の値を設定します。

これにより、エグゼキュータの固定数ではなく、エグゼキュータの範囲を指定することで、ジョブの実行に要求されるエグゼキュータの数が制御されます。Spark は、maxExecutors までエグゼキュータの数をスケールアップし、それらが必要なくなったときにエグゼキュータを解放します。これは、必要なエグゼキュータの数が一貫して同じでない場合や、起動時間を短縮する場合に役立ちます。モジュールは、要求された maxExecutors の数を受け取ることが保証されておらず、エグゼキュータの数が変わるため、実行ごとにパフォーマンスが異なる場合があります。

実際には、動的割り当ての利点と欠点を理解した上で、特定の大規模なジョブでのみこの値を上書きする必要があるでしょう。

アダプティブクエリ実行

このファミリのプロファイルでは、アダプティブクエリ実行(AQE)を有効にしたり無効にしたりします。

AQE を有効にすると、Spark はランタイムでパーティション数を自動的に設定し、ビルド速度が向上する可能性があります。これにより、十分な並列性がない少ないパーティション数や、オーバーヘッドが大きい多数の小さなパーティション数を回避できます。

AQE は、パーティションあたり 64 MB のバランスの取れた出力サイズを目指しています。例えば、出力サイズが合計 512 MB の場合、おおよそ 8 つのパーティションが生成されます。

このファミリのファイルサイズプロファイルを使用して、ターゲットサイズを増やすことができます。データが書き込まれてから頻繁に読み取られる場合、例えば Contour の分析で使用される場合、128MB 以上のパーティションサイズが推奨されます。

出力全体が小さいが、高価な UDF などのために計算に非常に時間がかかる場合は、AQE を無効にすることを検討してください。その場合、AQE は並列性を減らし、計算速度が遅くなる可能性があります。

タスクごとのコア数

このファミリのプロファイルでは、spark.task.cpus の値を設定します。

これにより、各タスクに割り当てられるコア数が制御されます。実際には、これを上書きすることはほとんどありません。ジョブの並列性を制御したい場合は、エグゼキュータ コア を参照してください。

Arrow

これらのプロファイルを使用して、Pandas と PySpark のデータフレーム間の変換に Arrow を有効または無効にします。Arrow を使用するには、Transform が pyarrow パッケージに依存していることを確認してください。

spark.createDataFrame() を Pandas データフレームで呼び出すか、toPandas() を使用すると、Spark はすべての行をシリアル化して、1 つの形式から別の形式に変換する必要があります。大規模なデータフレームの場合、これは遅いプロセスであり、Transform のボトルネックになる可能性があります。Pandas Transform を使用する場合、このシリアル化はデータの読み取りと書き込みの両方で発生します。

Arrow は、この変換を大幅に高速化する効率的なシリアル化形式です(Arrow ウェブサイト で報告されています)。

Kubernetes

このファミリのプロファイルでは、Spark ジョブがどのように実行されるかの低レベルの詳細を制御します。

基盤となるマシンの CPU アーキテクチャに依存しないライブラリを使用している場合、プロファイルを使用して Spark ジョブを特定のアーキテクチャで実行するように強制できます。ただし、一部の環境では、AMD アーキテクチャのマシンのみにアクセスできるため、ARM アーキテクチャのオーバーライドを使用するジョブは、そのような環境では成功しないことに注意してください。

プロファイルテーブル

プロファイルファミリープロファイル名Spark設定
ドライバーコアDRIVER_CORES_SMALLspark.driver.cores: 1
ドライバーコアDRIVER_CORES_MEDIUMspark.driver.cores: 2
ドライバーコアDRIVER_CORES_LARGEspark.driver.cores: 4
ドライバーコアDRIVER_CORES_EXTRA_LARGEspark.driver.cores: 8
ドライバーコアDRIVER_CORES_EXTRA_EXTRA_LARGEspark.driver.cores: 16
ドライバーメモリーDRIVER_MEMORY_SMALLspark.driver.memory: 3g
ドライバーメモリーDRIVER_MEMORY_MEDIUMspark.driver.memory: 6g; spark.driver.maxResultSize: 4g
ドライバーメモリーDRIVER_MEMORY_LARGEspark.driver.memory: 13g; spark.driver.maxResultSize: 8g
ドライバーメモリーDRIVER_MEMORY_EXTRA_LARGEspark.driver.memory: 27g; spark.driver.maxResultSize: 16g
ドライバーメモリーDRIVER_MEMORY_EXTRA_EXTRA_LARGEspark.driver.memory: 54g; spark.driver.maxResultSize: 32g
ドライバーメモリーオーバーヘッドDRIVER_MEMORY_OVERHEAD_SMALLspark.driver.memoryOverhead: 1g
ドライバーメモリーオーバーヘッドDRIVER_MEMORY_OVERHEAD_MEDIUMspark.driver.memoryOverhead: 2g
ドライバーメモリーオーバーヘッドDRIVER_MEMORY_OVERHEAD_LARGEspark.driver.memoryOverhead: 4g
ドライバーメモリーオーバーヘッドDRIVER_MEMORY_OVERHEAD_EXTRA_LARGEspark.driver.memoryOverhead: 8g
ドライバーメモリーオーバーヘッドDRIVER_MEMORY_OVERHEAD_EXTRA_EXTRA_LARGEspark.driver.memoryOverhead: 16g
エクゼキューターコアEXECUTOR_CORES_EXTRA_SMALLspark.executor.cores: 1
エクゼキューターコアEXECUTOR_CORES_SMALLspark.executor.cores: 2
エクゼキューターコアEXECUTOR_CORES_MEDIUMspark.executor.cores: 4
エクゼキューターコアEXECUTOR_CORES_LARGEspark.executor.cores: 6
エクゼキューターコアEXECUTOR_CORES_EXTRA_LARGEspark.executor.cores: 8
エクゼキューターメモリーEXECUTOR_MEMORY_EXTRA_SMALLspark.executor.memory: 3g; spark.executor.memoryOverhead: 768m
エクゼキューターメモリーEXECUTOR_MEMORY_SMALLspark.executor.memory: 6g; spark.executor.memoryOverhead: 1536m
エクゼキューターメモリーEXECUTOR_MEMORY_MEDIUMspark.executor.memory: 13g; spark.executor.memoryOverhead: 2g
エクゼキューターメモリーEXECUTOR_MEMORY_LARGEspark.executor.memory: 27g; spark.executor.memoryOverhead: 3g
エクゼキューターメモリーオーバーヘッドEXECUTOR_MEMORY_OVERHEAD_SMALLspark.executor.memoryOverhead: 1g
エクゼキューターメモリーオーバーヘッドEXECUTOR_MEMORY_OVERHEAD_MEDIUMspark.executor.memoryOverhead: 2g
エクゼキューターメモリーオーバーヘッドEXECUTOR_MEMORY_OVERHEAD_LARGEspark.executor.memoryOverhead: 4g
エクゼキューターメモリーオーバーヘッドEXECUTOR_MEMORY_OVERHEAD_EXTRA_LARGEspark.executor.memoryOverhead: 8g
エクゼキューター数KUBERNETES_NO_EXECUTORSspark.kubernetes.local.submission: true; spark.sql.shuffle.partitions: 1
エクゼキューター数NUM_EXECUTORS_1spark.executor.instances: 1; spark.dynamicAllocation.maxExecutors: 1
エクゼキューター数NUM_EXECUTORS_2spark.executor.instances: 2; spark.dynamicAllocation.maxExecutors: 2
エクゼキューター数NUM_EXECUTORS_4spark.executor.instances: 4; spark.dynamicAllocation.maxExecutors: 4
エクゼキューター数NUM_EXECUTORS_8spark.executor.instances: 8; spark.dynamicAllocation.maxExecutors: 8
エクゼキューター数NUM_EXECUTORS_16spark.executor.instances: 16; spark.dynamicAllocation.maxExecutors: 16
エクゼキューター数NUM_EXECUTORS_32spark.executor.instances: 32; spark.dynamicAllocation.maxExecutors: 32
エクゼキューター数NUM_EXECUTORS_64spark.executor.instances: 64; spark.dynamicAllocation.maxExecutors: 64
エクゼキューター数NUM_EXECUTORS_128spark.executor.instances: 128; spark.dynamicAllocation.maxExecutors: 128
エクゼキューター数NUM_EXECUTORS_256spark.executor.instances: 256; spark.dynamicAllocation.maxExecutors: 256
エクゼキューター数NUM_EXECUTORS_512spark.executor.instances: 512; spark.dynamicAllocation.maxExecutors: 512
タスクCPU数TASK_CPUS_2spark.task.cpus: 2
タスクCPU数TASK_CPUS_4spark.task.cpus: 4
ダイナミックアロケーションDYNAMIC_ALLOCATION_DISABLEDspark.dynamicAllocation.enabled: false
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLEDspark.dynamicAllocation.enabled: true
ダイナミックアロケーションDYNAMIC_ALLOCATION_MIN_2spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 2
ダイナミックアロケーションDYNAMIC_ALLOCATION_MIN_4spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 4
ダイナミックアロケーションDYNAMIC_ALLOCATION_MIN_8spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 8
ダイナミックアロケーションDYNAMIC_ALLOCATION_MIN_16spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 16
ダイナミックアロケーションDYNAMIC_ALLOCATION_MAX_8spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.maxExecutors: 8
ダイナミックアロケーションDYNAMIC_ALLOCATION_MAX_16spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.maxExecutors: 16
ダイナミックアロケーションDYNAMIC_ALLOCATION_MAX_32spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.maxExecutors: 32
ダイナミックアロケーションDYNAMIC_ALLOCATION_MAX_64spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.maxExecutors: 64
ダイナミックアロケーションDYNAMIC_ALLOCATION_MAX_128spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.maxExecutors: 128
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_1_2spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 1; spark.dynamicAllocation.maxExecutors: 2
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_2_4spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 2; spark.dynamicAllocation.maxExecutors: 4
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_4_8spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 4; spark.dynamicAllocation.maxExecutors: 8
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_8_16spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 8; spark.dynamicAllocation.maxExecutors: 16
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_16_32spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 16; spark.dynamicAllocation.maxExecutors: 32
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_32_64spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 32; spark.dynamicAllocation.maxExecutors: 64
ダイナミックアロケーションDYNAMIC_ALLOCATION_ENABLED_64_128spark.dynamicAllocation.enabled: true; spark.dynamicAllocation.minExecutors: 64; spark.dynamicAllocation.maxExecutors: 128
ダイナミックアロケーションDYNAMIC_ALLOCATION_FAST_SCALE_DOWNspark.dynamicAllocation.executorIdleTimeout: 10s
ダイナミックアロケーションDYNAMIC_ALLOCATION_SLOW_SCALE_UP_2Mspark.dynamicAllocation.schedulerBacklogTimeout: 2m
シャッフルパーティションSHUFFLE_PARTITIONS_SMALLspark.sql.shuffle.partitions: 20
シャッフルパーティションSHUFFLE_PARTITIONS_MEDIUMspark.sql.shuffle.partitions: 200
シャッフルパーティションSHUFFLE_PARTITIONS_LARGEspark.sql.shuffle.partitions: 2000
シャッフルパーティションSHUFFLE_PARTITIONS_EXTRA_LARGEspark.sql.shuffle.partitions: 20000
アダプティブクエリ実行ADAPTIVE_ENABLEDspark.sql.adaptive.enabled: true
アダプティブクエリ実行ADAPTIVE_DISABLEDspark.sql.adaptive.enabled: false
アダプティブクエリ実行ADVISORY_PARTITION_SIZE_MEDIUMspark.sql.adaptive.enabled: true; spark.sql.adaptive.shuffle.targetPostShuffleInputSize: 128MB
アダプティブクエリ実行ADVISORY_PARTITION_SIZE_LARGEspark.sql.adaptive.enabled: true; spark.sql.adaptive.shuffle.targetPostShuffleInputSize: 256MB
RPCメッセージサイズRPC_MESSAGE_MAX_SIZE_512Mspark.rpc.message.maxSize: 512
RPCメッセージサイズRPC_MESSAGE_MAX_SIZE_1Gspark.rpc.message.maxSize: 1024
RPCメッセージサイズRPC_MESSAGE_MAX_SIZE_MAXspark.rpc.message.maxSize: 2047
レガシーLEGACY_ALLOW_UNTYPED_SCALA_UDFspark.sql.legacy.allowUntypedScalaUDF: true
レガシーLEGACY_ALLOW_NEGATIVE_DECIMAL_SCALEspark.sql.legacy.allowNegativeScaleOfDecimal: true
レガシーLEGACY_ALLOW_HASH_ON_MAPTYPEspark.sql.legacy.allowHashOnMapType: true
レガシーLEGACY_NAME_NON_STRUCT_GROUPING_KEY_AS_VALUEspark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue: true
レガシーLEGACY_ARRAY_EXISTS_NULL_HANDLINGspark.sql.legacy.followThreeValuedLogicInArrayExists: false
レガシーLEGACY_ALLOW_AMBIGUOUS_SELF_JOINspark.sql.analyzer.failAmbiguousSelfJoin: false
レガシーLEGACY_TIME_PARSER_POLICYspark.sql.legacy.timeParserPolicy: LEGACY
レガシーLEGACY_DATETIME_REBASE_MODEspark.sql.legacy.avro.datetimeRebaseModeInRead: LEGACY; spark.sql.legacy.parquet.datetimeRebaseModeInRead: LEGACY; spark.sql.legacy.avro.datetimeRebaseModeInWrite: LEGACY; spark.sql.legacy.parquet.datetimeRebaseModeInWrite: LEGACY
レガシーLEGACY_FROM_DAYTIME_STRINGspark.sql.legacy.fromDayTimeString.enabled: true
レガシーLEGACY_DATETIME_STRING_COMPARISONspark.sql.legacy.typeCoercion.datetimeToString.enabled: true
日付&時間TIME_PARSER_POLICY_CORRECTEDspark.sql.legacy.timeParserPolicy: CORRECTED
日付&時間SPARK_ALLOW_INT96_AS_TIMESTAMPspark.sql.parquet.int96AsTimestamp: true
その他BUCKET_SORTED_SCAN_ENABLEDspark.sql.sources.bucketing.sortedScan.enabled: true
その他LAST_MAP_KEY_WINSspark.sql.mapKeyDedupPolicy: LAST_WIN
その他CROSS_JOIN_ENABLEDspark.sql.crossJoin.enabled: true
その他SPECULATIVE_EXECUTIONspark.speculation: true
その他AUTO_BROADCAST_JOIN_DISABLEDspark.sql.autoBroadcastJoinThreshold: -1
その他ALLOW_ADD_MONTHSspark.foundry.sql.allowAddMonths: true
その他PYSPARK_ROW_FIELD_SORTING_ENABLEDspark.executorEnv.PYSPARK_ROW_FIELD_SORTING_ENABLED: true; spark.yarn.appMasterEnv.PYSPARK_ROW_FIELD_SORTING_ENABLED: true; spark.kubernetes.driverEnv.PYSPARK_ROW_FIELD_SORTING_ENABLED: true
その他PYSPARK_ROW_FIELD_SORTING_DISABLEDspark.executorEnv.PYSPARK_ROW_FIELD_SORTING_ENABLED: false; spark.yarn.appMasterEnv.PYSPARK_ROW_FIELD_SORTING_ENABLED: false; spark.kubernetes.driverEnv.PYSPARK_ROW_FIELD_SORTING_ENABLED: false
その他PYSPARK_ROW_SCHEMA_CORRUPTION_CHECK_DISABLEDspark.kubernetes.driverEnv.PYSPARK_CHECK_ROW_SCHEMA_CORRUPTION: false; spark.yarn.appMasterEnv.PYSPARK_CHECK_ROW_SCHEMA_CORRUPTION: false; spark.executorEnv.PYSPARK_CHECK_ROW_SCHEMA_CORRUPTION: false
その他SPARK_KYRO_REFERENCE_TRACKING_DISABLEDspark.kryo.referenceTracking: false
その他GEOSPARKspark.foundry.build.stats.enabled: false
その他SPARK_REFERENCE_TRACKING_DISABLEDspark.cleaner.referenceTracking: false
ArrowARROW_ENABLEDspark.sql.execution.arrow.enabled: true; spark.sql.execution.arrow.pyspark.enabled: true; spark.sql.execution.arrow.sparkr.enabled: true; spark.sql.execution.arrow.fallback.enabled: true; spark.sql.execution.arrow.pyspark.fallback.enabled: true
ArrowARROW_DISABLEDspark.sql.execution.arrow.enabled: false; spark.sql.execution.arrow.pyspark.enabled: false; spark.sql.execution.arrow.sparkr.enabled: false
KubernetesKUBERNETES_CPU_ARCHITECTURE_OVERRIDE_AMD64N/A
KubernetesKUBERNETES_CPU_ARCHITECTURE_OVERRIDE_ARM64N/A