注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ファイルの読み込み

ファイルは、transforms.api.FileSystem.open() メソッドを使用して開くことができます。これにより、Python の file-like ストリームオブジェクトが返されます。io.open() でサポートされているすべてのオプションもサポートされています。ファイルはストリームとして読み込まれるため、ランダムアクセスはサポートされていません。

open() メソッドで返される file-like ストリームオブジェクトは、seektell メソッドをサポートしていません。そのため、ランダムアクセスはサポートされていません。

次のような Transform オブジェクトを考えてみましょう:

ファイルの作成

ファイルは、同様にして open() メソッドを使用して作成されます。これにより、書き込み専用のPythonファイルライクストリームオブジェクトが返されます。 io.open() が受け入れるすべてのキーワード引数もサポートされています。ファイルはストリームとして書き込まれるため、ランダムアクセスはサポートされていません。以下の Transform オブジェクトを考えてみてください:

分散処理

DataFrameオブジェクトを使用したデータ変換とは異なり、ファイルベースの変換ではドライバーとエクゼキューターのコードの違いを理解することが重要です。compute 関数はドライバー(単一のマシン)で実行され、Spark が DataFrame 関数をエクゼキューター(多数のマシン)に自動的に分散して適切に実行します。

ファイル API を使用して分散処理を活用するためには、Spark を利用して計算を分散させる必要があります。これを行うために、FileStatusDataFrame を作成し、エクゼキューターに分散させます。各エクゼキューター上のタスクは、割り当てられたファイルを開いて処理し、結果が Spark によって集約されます。

ファイル API は、ls() 関数と同じ引数を受け付ける files() 関数を提供しており、代わりに FileStatus オブジェクトの DataFrame を返します。この DataFrame は、ファイルサイズによってパーティション分けされており、ファイルサイズが異なる場合に計算をバランスさせるのに役立ちます。パーティション分けは、以下の 2 つの Spark 設定オプションを使用して制御できます。

  • spark.sql.files.maxPartitionBytes ↗ は、ファイルを読み取る際に単一のパーティションにパックするバイト数の最大値です。

  • spark.sql.files.openCostInBytes ↗ は、ファイルを開くための推定コストで、同じ時間にスキャンできるバイト数で測定されます。これは、ファイルサイズに追加され、パーティションで使用されるファイルの合計バイト数を計算するために使用されます。

これらのプロパティの値を変更するには、カスタム Transforms プロファイルを作成し、configure() デコレーターを使用して Transform に適用する必要があります。詳細については、Code Repositories のドキュメント内の Transforms プロファイルの定義に関するセクションを参照してください。 それでは、例を見てみましょう。CSV ファイルがあり、それを解析して連結したいとします。flatMap() を使用して、各 FileStatus オブジェクトに処理関数を適用します。この処理関数は、pyspark.sql.SparkSession.createDataFrame() に従って行を生成する必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import csv from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType from transforms.api import transform, Input, Output @transform( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): def process_file(file_status): with hair_eye_color.filesystem().open(file_status.path) as f: r = csv.reader(f) # ヘッダー行からpyspark.Rowを構築 header = next(r) MyRow = Row(*header) for row in r: yield MyRow(*row) # スキーマの定義 schema = StructType([ StructField('student_id', StringType(), True), StructField('hair_color', StringType(), True), StructField('eye_color', StringType(), True), ]) # 入力ファイルを読み込む files_df = hair_eye_color.filesystem().files('**/*.csv') # 入力データを処理してDataFrameを作成 processed_df = files_df.rdd.flatMap(process_file).toDF(schema) # 結果を書き込む processed.write_dataframe(processed_df)