注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

非構造化ファイルの読み書き

警告

非構造化ファイルへのアクセスは高度なトピックです。このページを読む前に、このユーザーガイドの他のコンテンツを熟知してください。

データ変換でファイルにアクセスする理由はさまざまです。ファイルアクセスは、非表形式のファイル(XMLJSON など)や圧縮形式のファイル(gzzip など)を処理する場合に特に役立ちます。

transforms Python ライブラリでは、ユーザーが Foundry データセット内のファイルを読み書きすることができます。transforms.api.TransformInput は読み取り専用の FileSystem オブジェクトを公開し、transforms.api.TransformOutput は書き込み専用の FileSystem オブジェクトを公開します。これらの FileSystem オブジェクトは、Foundry データセット内のファイルのパスに基づいてファイルアクセスを許可し、基本ストレージを抽象化します。

データ変換でファイルにアクセスするためには、transform()デコレータを使用してTransformオブジェクトを構築する必要があります。これは、TransformInputおよびTransformOutputオブジェクトによって公開されるFileSystemオブジェクトのためです。transform()は、計算関数の入力および出力がそれぞれTransformInputおよびTransformOutput型であることを期待する唯一のデコレータです。

ファイルのインポート

ファイルは、手動でのファイルインポートや データ接続 を介した同期によって Foundry にアップロードできます。構造化および非構造化ファイルは、Foundry データセットにインポートされ、下流のアプリケーションで処理されます。また、ファイルを拡張子を変更せずに生ファイルとしてアップロードすることもできます。以下の例は、生ファイルではなく、Foundry データセットとしてアップロードされたファイルを参照しています。

Foundry には、データセットにアップロードされた特定のファイルタイプのスキーマを自動的に推測する機能もあります。例えば、CSV 形式のファイルをインポートする場合、スキーマを適用 ボタンを使用して自動的にスキーマを適用できます。手動でのデータアップロードについて詳しく学ぶことができます。

ファイルの閲覧

データセット内のファイルは、transforms.api.FileSystem.ls() メソッドを使用してリスト表示できます。このメソッドは、transforms.api.FileStatus オブジェクトのジェネレーターを返します。これらのオブジェクトは、各ファイルのパス、サイズ(バイト単位)、および変更タイムスタンプ(Unix エポックからのミリ秒)を取得します。以下に示すのは、Transform オブジェクトの例です。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 入力として生徒の髪と目の色のデータを読み込む processed=Output('/examples/hair_eye_color_processed') # 処理したデータの出力先を指定する ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # あなたのデータ変換コード # この関数は、特定の目の色をフィルタリングするためのものです。 # 具体的なコードはこの下に記述します。 pass

データ変換コードでは、ユーザーのデータセットファイルを参照できます:

Copied!
1 2 3 4 list(hair_eye_color.filesystem().ls()) # 結果: [FileStatus(path='students.csv', size=688, modified=...)] # コードの説明: hair_eye_colorのファイルシステムをリスト化し、その中のファイルやディレクトリの情報を表示します。 # 'students.csv'という名前のファイルがあり、そのサイズは688、最終更新日時などの情報も表示されます。

ls() の呼び出し結果を、グロブまたは正規表現のパターンを渡すことで、フィルター処理することも可能です:

Copied!
1 2 3 4 5 6 7 list(hair_eye_color.filesystem().ls(glob='*.csv')) # 結果: [FileStatus(path='students.csv', size=688, modified=...)] # ファイルの一覧を取得する(拡張子が .csv のもの) list(hair_eye_color.filesystem().ls(regex='[A-Z]*\.csv')) # 結果: [] # ファイルの一覧を取得する(大文字で始まる拡張子が .csv のもの)

ファイルの読み込み

ファイルは、transforms.api.FileSystem.open() メソッドを使用して開くことができます。これは Python のファイルライクなストリームオブジェクトを返します。io.open() がサポートしているすべてのオプションもサポートされています。ファイルはストリームとして読み込まれるため、ランダムアクセスはサポートされていません。

open() メソッドによって返されるファイルライクなストリームオブジェクトは、seektell メソッドをサポートしていません。したがって、ランダムアクセスはサポートされていません。

次の Transform オブジェクトを考えてみてください:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # transforms.apiから、transform, Input, Outputをインポートする from transforms.api import transform, Input, Output # transformデコレータを使用して、入力と出力のデータセットを指定する @transform( # 入力データは、'/examples/students_hair_eye_color'のパスに格納されている hair_eye_color=Input('/examples/students_hair_eye_color'), # 出力データは、'/examples/hair_eye_color_processed'のパスに格納される processed=Output('/examples/hair_eye_color_processed') ) # filter_eye_color関数を定義する。この関数は、入力データを処理して出力データを生成する def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # ここにデータ変換コードを書く pass

データ変換コードでは、ユーザーのデータセットファイルを読み込むことができます:

Copied!
1 2 3 4 5 6 # 'hair_eye_color'という名前のファイルシステムから'students.csv'というファイルを開きます。 with hair_eye_color.filesystem().open('students.csv') as f: # ファイルの最初の行を読み込みます。 f.readline() # 結果: 'id,hair,eye,sex\n'

ストリームは、パース用のライブラリにも渡すことができます。たとえば、CSVファイルを解析することができます。

Copied!
1 2 3 4 5 6 7 import csv with hair_eye_color.filesystem().open('students.csv') as f: reader = csv.reader(f, delimiter=',') next(reader) # 結果: ['id', 'hair', 'eye', 'sex'] # コメント: CSVファイルからデータを読み込み、1行目をスキップしています。

前述した通り、非表形式のファイル(XMLJSON など)や圧縮フォーマット(gzzip など)も処理することができます。例えば、以下のコードを使用して、zip ファイル内の CSV を読み取り、その内容をデータフレームとして返すことができます:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def process_file(file_status): # ファイルを開く with fs.open(file_status.path, 'rb') as f: # 一時ファイルを作成する with tempfile.NamedTemporaryFile() as tmp: # ファイル内容をコピーする shutil.copyfileobj(f, tmp) tmp.flush() # zipファイルを処理する with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): # 各ファイルを開く with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # 各CSVの最初の行をスキップする for line in tw: # 行データを処理してMyRowオブジェクトを生成する yield MyRow(*line.split(",")) # ファイルシステムからファイルを取得し、RDDに変換する rdd = fs.files().rdd # ファイルを処理する関数を使ってRDDをフラット化する rdd = rdd.flatMap(process_file) # RDDをデータフレームに変換する df = rdd.toDF()

ランダムアクセス

警告

ランダムアクセスを使用すると、パフォーマンスが大幅に低下します。コードを書き換えて seek メソッドに依存しないようにすることをお勧めします。それでもランダムアクセスを使用したい場合は、以下の情報を参照してください。

open() メソッドはストリームオブジェクトを返すため、ランダムアクセスはサポートされていません。ランダムアクセスが必要な場合は、ファイルをメモリまたはディスクにバッファリングすることができます。hair_eye_colorTransformInput オブジェクトに対応していると仮定して、以下にいくつかの例を示します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import io import shutil # StringIO オブジェクトを作成します s = io.StringIO() # 'students.csv' ファイルを開き、それを StringIO オブジェクトにコピーします with hair_eye_color.filesystem().open('students.csv') as f: shutil.copyfileobj(f, s) # StringIO オブジェクトの内容を取得します s.getvalue() # 結果: 'id,hair,eye,sex\n...'
Copied!
1 2 3 4 5 6 # 髪と目の色のファイルシステムから'students.csv'を開く with hair_eye_color.filesystem().open('students.csv') as f: lines = f.read().splitlines() lines[0] # 結果: 'id,hair,eye,sex'
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import tempfile # 一時ファイルを作成します with tempfile.NamedTemporaryFile() as tmp: # 'students.csv'ファイルを読み込みモードで開きます with hair_eye_color.filesystem().open('students.csv', 'rb') as f: # ファイルの内容を一時ファイルにコピーします shutil.copyfileobj(f, tmp) # shutil.copyfileobjは自動的にフラッシュ(データの書き出し)を行わないため、手動でフラッシュを行います tmp.flush() # 一時ファイルを読み込みモードで開きます with open(tmp.name) as t: # ファイルの最初の行を読み込みます t.readline() # 結果: 'id,hair,eye,sex\n'

ファイルの作成

ファイルの作成も同様に、open() メソッドを使用します。これにより、書き込み専用の Python ファイルライクなストリームオブジェクトが返されます。io.open() が受け付けるすべてのキーワード引数もサポートされています。ファイルはストリームとして書き込まれるため、ランダムアクセスはサポートされていないことに注意してください。以下の Transform オブジェクトを考えてみましょう:

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 入力データセットを定義します。この例では、生徒の髪の色と目の色に関するデータセットを使用します。 processed=Output('/examples/hair_eye_color_processed') # 処理後のデータセットの出力先を定義します。 ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # この関数は、目の色に基づいてデータをフィルタリングするためのものです。 # あなたのデータ変換コード pass # この部分にデータ変換のコードを書きます。

データ変換コードでは、出力ファイルシステムに書き込むことが可能です。以下の例では、Pythonの組み込みシリアライザである pickle モジュールを使用してモデルを永続化しています:

Copied!
1 2 3 4 5 6 import pickle # 'model.pickle'という名前のファイルを書き込みモード('wb')で開きます with processed.filesystem().open('model.pickle', 'wb') as f: # モデル('model')をpickle形式で保存します pickle.dump(model, f)

分散処理

DataFrameオブジェクトのデータ変換とは異なり、ファイルベースの変換ではドライバーとエグゼキューターのコードの違いを理解することが重要です。compute 関数はドライバー(1 つのマシン)で実行され、Spark が DataFrame 関数を適切にエグゼキューター(複数のマシン)に自動的に分散します。

ファイル API を使って分散処理を活用するためには、Spark を使って計算を分散させる必要があります。そこで、FileStatusDataFrame を作成し、エグゼキューターに分散させます。エグゼキューター上の各タスクは、割り当てられたファイルを開いて処理し、結果は Spark によって集約されます。

ファイル API では、ls() 関数と同じ引数を受け取る files() 関数が提供されており、代わりに FileStatus オブジェクトの DataFrame を返します。この DataFrame は、ファイルサイズによってパーティション分けされており、ファイルサイズが異なる場合に計算をバランスさせるのに役立ちます。パーティション分けは、以下の 2 つの Spark の設定オプションを使用して制御できます。

  • spark.sql.files.maxPartitionBytes は、ファイルを読み込む際に単一のパーティションに詰め込むことができる最大バイト数です。

  • spark.sql.files.openCostInBytes は、ファイルを開くための推定コストで、同じ時間にスキャンできるバイト数で測定されます。これは、ファイルサイズに加えられ、パーティションで使用されるファイルの合計バイト数を計算します。

これらのプロパティの値を変更するには、カスタム Transforms プロファイルを作成し、configure()デコレータを使用して Transform に適用する必要があります。詳細については、Code Repositories のドキュメント内の Transforms プロファイルの定義に関するセクションを参照してください。

例を挙げましょう。CSV ファイルがあり、それらを解析して連結したいとします。flatMap() を使って、各 FileStatus オブジェクトに処理関数を適用します。この処理関数は、pyspark.sql.SparkSession.createDataFrame() に従って行を生成する必要があります。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import csv from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType from transforms.api import transform, Input, Output @transform( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): # ファイルを処理する関数を定義します def process_file(file_status): # csvファイルを開きます with hair_eye_color.filesystem().open(file_status.path) as f: r = csv.reader(f) # ヘッダーロウからpyspark.Rowを構成します header = next(r) MyRow = Row(*header) # 各行を処理し、MyRowオブジェクトを生成します for row in csv.reader(f): yield MyRow(*row) # スキーマを定義します schema = StructType([ StructField('student_id', StringType(), True), StructField('hair_color', StringType(), True), StructField('eye_color', StringType(), True), ]) # CSVファイルを読み込みます files_df = hair_eye_color.filesystem().files('**/*.csv') # CSVファイルを処理し、DataFrameを生成します processed_df = files_df.rdd.flatMap(process_file).toDF(schema) # 結果を書き込みます processed.write_dataframe(processed_df)
警告

スキーマを渡さずに toDF() を呼び出すことは可能ですが、ファイル処理がゼロ行を返す場合、Sparkのスキーマ推定は失敗し、ValueError: RDD is empty という例外をスローします。したがって、常に手動でスキーマを指定することをお勧めします。