注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Spark サイドカー変換

前提条件

以下のドキュメントは、次の前提条件の作業知識を前提としています。

  1. コンテナ化されたインフラストラクチャとコンテナイメージのような概念.
  2. PySpark 変換.

Spark サイドカー変換を使用すると、コンテナ化されたコードをデプロイしながら、Spark と変換によって提供される既存のインフラストラクチャを活用できます。

コードのコンテナ化により、Foundry で実行するためのコードと依存関係をすべてパッケージ化できます。コンテナ化ワークフローは変換と統合されており、スケジューリング、ブランチング、データヘルスがすべてシームレスに統合されています。コンテナ化されたロジックは Spark エグゼキュータと並行して実行されるため、入力データに合わせてコンテナ化されたロジックをスケーリングできます。

短く言えば、コンテナで実行できるあらゆるロジックを使用して、Foundry でデータを処理、生成、または消費できます。

コンテナ化概念に精通している場合は、以下のセクションを使用して、Spark サイドカー変換の使用について詳しく学習してください。

Foundry でのコンテナ化について詳しく学ぶ。

アーキテクチャ

Foundry の変換は、データセット間でデータを送受信し、Spark ドライバを使用して複数のエグゼキュータ間で処理を分散できます。以下の図に示すようになっています。

Spark サイドカー変換 - コンテナなし

@sidecar デコレータ(transforms-sidecar ライブラリで提供されている)を使用して変換に注釈を付けることで、PySpark 変換の各エグゼキュータと並行して起動するコンテナを1つ指定できます。カスタムロジックを持ち、各エグゼキュータと並行して実行されるユーザー提供のコンテナをサイドカー コンテナと呼びます。

エグゼキュータが1つだけのシンプルな使用例では、データフローは次のようになります。

Spark サイドカー変換 - シングルコンテナ

入力データセットを複数のエグゼキュータに分割する変換を記述すると、データフローは次のようになります。

Spark サイドカー変換 - マルチコンテナ

エグゼキュータとサイドカー コンテナ間のインターフェースは、共有ボリューム(ディレクトリ)であり、以下のような情報を伝達するために使用されます。

  • コンテナ化されたロジックの実行を開始するタイミング。
  • コンテナで処理する入力データ。
  • コンテナから取り出す出力データ。
  • コンテナ化されたロジックの実行を終了するタイミング。

これらの共有ボリュームは、@sidecar デコレータの Volume 引数を使用して指定され、パス /opt/palantir/sidecars/shared-volumes/ 内のサブフォルダになります。

次のセクションでは、Spark サイドカー変換の準備と記述について説明します。

イメージのビルド

Spark サイドカー変換と互換性のあるイメージをビルドするには、イメージはイメージ要件を満たす必要があります。また、以下で説明する重要なコンポーネントを含む例の Docker イメージも含める必要があります。この例のイメージをビルドするには、Python スクリプト entrypoint.py が必要です。

ローカルコンピュータに Docker をインストールし、docker CLI コマンド(公式ドキュメント)にアクセスできる必要があります。

Dockerfile

ローカルコンピュータのフォルダに、以下の内容を Dockerfile という名前のファイルに入れてください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # Fedoraのバージョン38をベースにしたイメージを使用します FROM fedora:38 # entrypoint.pyを/usr/bin/entrypointに追加します ADD entrypoint.py /usr/bin/entrypoint # /usr/bin/entrypointに実行権限を付与します RUN chmod +x /usr/bin/entrypoint # 必要なディレクトリを作成します RUN mkdir -p /opt/palantir/sidecars/shared-volumes/shared/ # 作成したディレクトリの所有権を変更します RUN chown 5001 /opt/palantir/sidecars/shared-volumes/shared/ # 環境変数SHARED_DIRを設定します ENV SHARED_DIR=/opt/palantir/sidecars/shared-volumes/shared # ユーザーID 5001でコンテナを実行します USER 5001 # コンテナが起動するときに実行するコマンドを設定します # ここではinfile.csvをoutfile.csvにコピーします ENTRYPOINT entrypoint -c "dd if=$SHARED_DIR/infile.csv of=$SHARED_DIR/outfile.csv"

カスタマイズされた Dockerfile

上記のように、独自の Dockerfile を構築することができますが、以下の点を確認してください。

  • 9行目で数値の非ルートユーザーを指定してください。これは 画像の要件 の1つであり、コンテナに特権的な実行が与えられないように、適切なセキュリティ態勢を維持するのに役立ちます。

  • 次に、5-7行目で共有ボリュームの作成を行ってください。上記の アーキテクチャセクション で説明したように、/opt/palantir/sidecars/shared-volumes/ 内のサブディレクトリである共有ボリュームは、入力データと出力データが PySpark 変換からサイドカーコンテナに共有される主要な方法です。

    • 5行目でディレクトリを作成します。
    • 6行目でディレクトリが作成されたユーザーに権限が与えられることを確認します。
    • 7行目で、この共有ディレクトリへのパスを環境変数として他の場所で参照するために保存します。
  • 最後に、3行目でコンテナに簡単な entrypoint スクリプトを追加し、11行目で ENTRYPOINT として設定します。このステップは重要であり、Spark サイドカー変換は、コンテナが起動する前に入力データが利用可能になるのを待つように、サイドカー コンテナに指示しません。また、サイドカー変換は、出力データがコピーされるのを待ってコンテナがアイドル状態になるように指示しません。提供された entrypoint スクリプトは、Python を使用して、指定されたロジックが実行される前に、共有ボリュームに start_flag ファイルが書き込まれるのを待つようにコンテナに指示します。指定されたロジックが終了すると、同じディレクトリに done_flag を書き込みます。コンテナは、共有ボリュームに close_flag が書き込まれるまで待機し、その後コンテナは自動的に停止してクリーンアップされます。

上記の例では、コンテナ化されたロジックは、POSIX ディスクダンプ (dd) ユーティリティを使用して、共有ディレクトリから入力 CSV ファイルを同じディレクトリに保存された出力ファイルにコピーします。この「コマンド」は、entrypoint スクリプトに渡されるもので、コンテナ内で実行できるロジックであれば何でも構いません。

エントリーポイント

Dockerfile と同じローカルフォルダーに、以下のコードスニペットを entrypoint.py という名前のファイルにコピーしてください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #!/usr/bin/env python3 import os import time import subprocess from datetime import datetime import argparse parser = argparse.ArgumentParser() parser.add_argument("-c", "--command", type=str, help="実行するモデルのコマンド") args = parser.parse_args() the_command = args.command.split(" ") def run_process(exe): "コマンドの実行とstdoutの行ごとのキャプチャを定義する関数" p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return iter(p.stdout.readline, b"") start_flag_fname = "/opt/palantir/sidecars/shared-volumes/shared/start_flag" done_flag_fname = "/opt/palantir/sidecars/shared-volumes/shared/done_flag" close_flag_fname = "/opt/palantir/sidecars/shared-volumes/shared/close_flag" # スタートフラグを待つ print(f"{datetime.utcnow().isoformat()}: スタートフラグを待っています") while not os.path.exists(start_flag_fname): time.sleep(1) print(f"{datetime.utcnow().isoformat()}: スタートフラグを検出しました") # モデルを実行し、出力をファイルにログとして記録する with open("/opt/palantir/sidecars/shared-volumes/shared/logfile", "w") as logfile: for item in run_process(the_command): my_string = f"{datetime.utcnow().isoformat()}: {item}" print(my_string) logfile.write(my_string) logfile.flush() print(f"{datetime.utcnow().isoformat()}: 出力ファイルの書き込みが完了しました") # 完了フラグを書き出す open(done_flag_fname, "w") print(f"{datetime.utcnow().isoformat()}: 完了フラグファイルを書き込みました") # スクリプトの終了を許可する前にクローズフラグを待つ while not os.path.exists(close_flag_fname): time.sleep(1) print(f"{datetime.utcnow().isoformat()}: クローズフラグを検出しました。シャットダウンします")

画像をプッシュする

画像をプッシュするには、新しい Artifacts リポジトリを作成し、指示に従って、画像を関連する Docker リポジトリにタグ付けしてプッシュします。

  1. Artifacts リポジトリを作成します。

Artifacts リポジトリを作成

  1. タイプを Docker に変更します。

Docker Artifacts リポジトリ

  1. 画面に表示される指示に従って、トークンを生成します。
  2. 次のコマンドパターン docker build . --tag <container_registry>/<image_name>:<image_tag> --platform linux/amd64 を使用して、例の画像をビルドします。ここで、
  • container_registry は、Foundry インスタンスのコンテナレジストリのアドレスを表し、Artifact リポジトリに Docker イメージをプッシュするための指示の最後のコマンドの一部として見つけることができます。
  • image_name および image_tag は、ユーザーの裁量に任されます。この例では simple_example:0.0.1 を使用しています。
  1. Artifacts Repository からの指示をコピーして、ローカルにビルドされたイメージをプッシュします。最後のコマンドで <image_name>:<image_version> を上記のイメージビルドステップで使用された image_name および image_version に置き換えてください。

Spark サイドカートランスフォームを作成する

  1. コードリポジトリアプリケーションで Python データ変換リポジトリを作成します
  2. 左側の Libraries タブで transforms-sidecar を追加し、変更をコミットします。
  3. Settings > Libraries で、Artifact リポジトリを追加します。
  4. トランスフォームを作成します。

以下の例では、サイドカートランスフォームを開始するために必要な主要情報を説明します。両方の例では、同じユーティリティファイルを使用して、ここ で見つけることができ、以下に示すようにコードリポジトリに追加してインポートできます。

例 1: 単一の実行

以下の変換では、@sidecar デコレータと Volume プリミティブを transforms-sidecar ライブラリからインポートします。 トランスフォームは、これらのアイテムを使用してアノテーションを行い、simple-example:0.0.1 コンテナの 1 つのインスタンスが各エグゼキュータと共に起動されます。各エグゼキュータ/サイドカーのペアは、/opt/palantir/sidecars/shared-volumes/shared で共有ボリュームを持ちます。

この最初の例では、コンテナの 1 つのインスタンスを 1 つのエグゼキュータで起動し、以下の画像に示すアーキテクチャに従います。

Spark Sidecar Transforms - 単一コンテナ

次に、トランスフォームは、ユーティリティ関数 lanch_udf_once を使用して、user_defined_function の 1 つのインスタンスを起動します。そのユーザー定義関数は、1 つのエグゼキュータで実行され、サイドカーコンテナの 1 つのインスタンスと通信します。ユーザー定義関数は、インポートされたユーティリティ関数を呼び出して、次のことを行います。

  • 入力ファイルを共有ディレクトリにコピーして、サイドカーコンテナがアクセスできるようにします。
  • 開始フラグをコピーして、サイドカーコンテナが実行を開始することがわかります。
  • コンテナ化されたロジックが終了するのを待ちます。
  • コンテナ化されたロジックによって作成されたファイルをコピーします。
  • コンテナが停止してクリーンアップされるように、クローズフラグをコピーします。
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from transforms.api import transform, Input, Output from transforms.sidecar import sidecar, Volume from myproject.datasets.utils import copy_files_to_shared_directory, copy_start_flag, wait_for_done_flag from myproject.datasets.utils import copy_output_files, copy_close_flag, launch_udf_once @sidecar(image='simple-example', tag='0.0.1', volumes=[Volume("shared")]) @transform( output=Output("<output dataset rid>"), source=Input("<input dataset rid>"), ) def compute(output, source, ctx): def user_defined_function(row): # ソースから共有ディレクトリにファイルをコピーします。 copy_files_to_shared_directory(source) # 全ての入力ファイルがコンテナに存在することを示すためのスタートフラグを送信します copy_start_flag() # ストップフラグが書き込まれるか、最大時間制限に達するまで待ちます wait_for_done_flag() # コンテナから出力データセットに出力ファイルをコピーします output_fnames = [ "start_flag", "outfile.csv", "logfile", "done_flag", ] copy_output_files(output, output_fnames) # データを抽出し終えたことをコンテナに伝えるためのクローズフラグを書き込みます copy_close_flag() # ユーザー定義関数は何かを返さなければなりません return (row.ExecutionID, "success") # これは一つのタスクを生成し、それは一つのエクゼキューターにマッピングされ、一つの「sidecar container」が起動されます launch_udf_once(ctx, user_defined_function)

例 2:並行実行

この例では、多数のサイドカーコンテナのインスタンスが起動し、それぞれが入力データの一部を処理します。その情報は集められて出力データセットに保存されます。この例は下図に示されるアーキテクチャにより密接に似ています。

Spark Sidecar Transforms - Multi Container

次の変換は、異なるユーティリティ関数を使用して入力データを分割し、各コンテナに個々のファイルを送信し、異なる入力データのチャンクに対して同じ実行を行います。ユーティリティ関数は、出力ファイルを個々のファイルとして、また、表形式の出力データセットとして保存するように書かれています。

@sidecar デコレータと Volume 仕様に対して設定された同じパラメーターを見るでしょう、これは例 1と同じです。

@confgure フラグが設定されていることを確認し、エクゼキュータごとに1つのタスクのみが起動し、合計4つのエクゼキュータが起動できるようにします。この設定は、入力データセットに正確に4行のデータがあり、入力の再分割が 4 に設定されているという事実と組み合わせることで、ユーザー定義関数の4つのインスタンスが4つのエクゼキュータ上に起動します。したがって、サイドカーコンテナの正確に4つのインスタンスが起動し、入力データのそのセグメントを処理します。

ユーザーのリポジトリに 設定 > Spark の下で2つのSparkプロファイルがインポートされていることを確認してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from transforms.api import transform, Input, Output, configure from transforms.sidecar import sidecar, Volume import uuid from myproject.datasets.utils import copy_start_flag, wait_for_done_flag, copy_close_flag from myproject.datasets.utils import write_this_row_as_a_csv_with_one_row from myproject.datasets.utils import copy_output_files_with_prefix, copy_out_a_row_from_the_output_csv @configure(["EXECUTOR_CORES_EXTRA_SMALL", "NUM_EXECUTORS_4"]) @sidecar(image='simple-example', tag='0.0.1', volumes=[Volume("shared")]) @transform( output=Output("<first output dataset rid>"), output_rows=Output("<second output dataset rid>"), source=Input("<input dataset rid>"), ) def compute(output, output_rows, source, ctx): def user_defined_function(row): # ソースから共有ディレクトリにファイルをコピー write_this_row_as_a_csv_with_one_row(row) # コンテナがすべての入力ファイルを持っていることを知らせるために開始フラグを送信します。 copy_start_flag() # 停止フラグが書かれるか、最大時間制限に達するまで繰り返します。 wait_for_done_flag() # コンテナから出力データセットに出力ファイルをコピー output_fnames = [ "start_flag", "infile.csv", "outfile.csv", "logfile", "done_flag", ] random_unique_prefix = f'{uuid.uuid4()}'[:8] copy_output_files_with_prefix(output, output_fnames, random_unique_prefix) outdata1, outdata2, outdata3 = copy_out_a_row_from_the_output_csv() # データを抽出したことがわかるように、クローズフラグを書き込みます。 copy_close_flag() # ユーザー定義関数は何かを返さなければなりません。 return (row.data1, row.data2, row.data3, "success", outdata1, outdata2, outdata3) results = source.dataframe().repartition(4).rdd.map(user_defined_function) columns = ["data1", "data2", "data3", "success", "outdata1", "outdata2", "outdata3"] output_rows.write_dataframe(results.toDF(columns))

例:ユーティリティ

utils.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 import os import shutil import time import csv import pyspark.sql.types as T VOLUME_PATH = "/opt/palantir/sidecars/shared-volumes/shared" # 共有ボリュームのパスを定義 MAX_RUN_MINUTES = 10 # 最大実行時間を10分に設定 def write_this_row_as_a_csv_with_one_row(row): # この関数は、特定の行をCSVファイルに書き込みます in_path = "/opt/palantir/sidecars/shared-volumes/shared/infile.csv" # CSVファイルのパス with open(in_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=',') writer.writerow(['data1', 'data2', 'data3']) # ヘッダー行を書き込み writer.writerow([row.data1, row.data2, row.data3]) # データ行を書き込み def copy_out_a_row_from_the_output_csv(): # この関数は、出力CSVから行を読み取ります out_path = "/opt/palantir/sidecars/shared-volumes/shared/outfile.csv" # CSVファイルのパス with open(out_path, newline='') as csvfile: reader = csv.reader(csvfile, delimiter=',', quotechar='|') values = "", "", "" for myrow in reader: values = myrow[0], myrow[1], myrow[2] # CSVファイルから行を読み取り return values # 行の値を返す def copy_output_files_with_prefix(output, output_fnames, prefix): # この関数は、指定した接頭辞を持つ出力ファイルをコピーします for file_path in output_fnames: output_fs = output.filesystem() out_path = os.path.join(VOLUME_PATH, file_path) try: with open(out_path, "rb") as shared_file: with output_fs.open(f'{prefix}_{file_path}', "wb") as output_file: shutil.copyfileobj(shared_file, output_file) # 共有ファイルから出力ファイルへコピー except FileNotFoundError as err: print(err) # ファイルが見つからない場合のエラーメッセージ def copy_files_to_shared_directory(source): # この関数は、ソースディレクトリから共有ディレクトリにファイルをコピーします source_fs = source.filesystem() for item in source_fs.ls(): file_path = item.path with source_fs.open(file_path, "rb") as source_file: dest_path = os.path.join(VOLUME_PATH, file_path) with open(dest_path, "wb") as shared_file: shutil.copyfileobj(source_file, shared_file) # ソースファイルから共有ファイルへコピー def copy_start_flag(): # この関数は開始フラグを設定します open(os.path.join(VOLUME_PATH, 'start_flag'), 'w') time.sleep(1) # 1秒待つ def wait_for_done_flag(): # この関数は、完了フラグが設定されるのを待ちます i = 0 while i < 60 * MAX_RUN_MINUTES and not os.path.exists(os.path.join(VOLUME_PATH, 'done_flag')): i += 1 time.sleep(1) # 1秒ごとにフラグの存在を確認 def copy_output_files(output, output_fnames): # この関数は出力ファイルをコピーします for file_path in output_fnames: output_fs = output.filesystem() out_path = os.path.join(VOLUME_PATH, file_path) try: with open(out_path, "rb") as shared_file: with output_fs.open(file_path, "wb") as output_file: shutil.copyfileobj(shared_file, output_file) # 共有ファイルから出力ファイルへコピー except FileNotFoundError as err: print(err) # ファイルが見つからない場合のエラーメッセージ def copy_close_flag(): # この関数はクローズフラグを設定します time.sleep(5) # 5秒待つ open(os.path.join(VOLUME_PATH, 'close_flag'), 'w') # クローズフラグを設定 def launch_udf_once(ctx, user_defined_function): # この関数は、シングル行データフレームを使ってユーザー定義関数を一度だけ起動します schema = T.StructType([T.StructField("ExecutionID", T.IntegerType())]) ctx.spark_session.createDataFrame([{"ExecutionID": 1}], schema=schema).rdd.foreach(user_defined_function)