データ統合PythonPySpark 一覧ロギング

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ロギング

Foundry で PySpark からさまざまなデバッグ情報を出力することが可能です。

Code Workbook

Python の組み込み print は、コードエディタの右側、通常エラーが表示される Code Workbook の Output セクションにパイプします。

Copied!
1 2 3 def new_dataset(some_input_dataset): # 入力データセットを処理する新しいデータセットを作成する関数 print("example log output") # 例: ログ出力
例のログ出力

コードリポジトリ

コードリポジトリでは、Pythonの組み込みlogging ライブラリを使用しています。これはオンラインで広く文書化されており、ログレベル(ERRORWARNINGINFO)を簡単にフィルター処理するための制御が可能です。

ログ出力は、出力データセットのログファイルとビルドのドライバーログ(データセット -> 詳細 -> ファイル -> ログファイルおよびビルド -> ビルド -> ジョブステータスログ;それぞれで"ドライバーログ"を選択)の両方に表示されます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 import logging logger = logging.getLogger(__name__) # データフレームを変換するデコレーターを使用 @transform_df( ... ) # いくつかの入力データに対して変換を実行する関数 def some_transformation(some_input): # ログ出力の例 logger.info("example log output")
INFO [2018-01-01T12:00:00] some_transformation: 例のログ出力

Python UDF 内部からのログ出力

Spark は、上記の some_transformation 関数など、ユーザーのクエリを作成するトップレベルのドライバープロセスからのログ出力をキャプチャします。しかし、ユーザー定義関数 (UDFs) の内部から書かれたログはキャプチャしません。PySpark のクエリ内で UDF を使用していて、データをログに記録する必要がある場合は、キャプチャするデータを返す第二の UDF を作成し、呼び出してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @transform_df( ... ) def some_transformation(some_input): # クエリ全体に関連するログ出力 logger.info("log output related to the overall query") @F.udf("integer") # カスタム関数:入力した整数に5を加える def custom_function(integer_input): return integer_input + 5 @F.udf("string") # カスタムログ:元の整数が何であったかを示すメッセージを作成 def custom_log(integer_input): return "Original integer was %d before adding 5" % integer_input df = ( some_input # "example_integer_col"列にカスタム関数を適用し、新しい列"new_integer"を作成 .withColumn("new_integer", custom_function(F.col("example_integer_col")) # "example_integer_col"列に基づいてデバッグメッセージを作成し、新しい列"debugging"を作成 .withColumn("debugging", custom_log(F.col("example_integer_col")) )

私たちは、クエリで何が起こっているかについての情報を記録したいことがよくあります。PySparkには、DataFramesを内部調査するための多くの方法があり、上記で説明したロギングメカニズムにこの情報を送信することができます。

これらの例ではCode Workbookの print 構文を使用しますが、print はTransforms & Authoringの logger に置き換えることができます。

DataFrameの行

DataFrameに存在する行を内部調査するためには df.columns を使用します。これにより文字列のリストが生成されます。

Copied!
1 2 3 4 5 6 7 8 9 def employee_phone_numbers(employee_df, phone_number_df): # 従業員データフレームのカラムを出力します print("employee columns are {}".format(employee_df.columns)) # 電話番号データフレームのカラムを出力します print("phone columns are {}".format(phone_df.columns)) # 'employee_id'をキーとして、左結合で二つのデータフレームを結合します df = employee_df.join(phone_number_df, 'employee_id', 'left') # 結合したデータフレームのカラムを出力します print("joined columns are {}".format(df.columns))
employeeのカラムは ['name', 'employee_id'] です
phoneのカラムは ['phone_number', 'employee_id'] です
joinedのカラムは ['name', 'employee_id', 'phone_number'] です

結合動作の確認

左結合を行い、一対一の関係を期待し、左の DataFrame の行数が同じままであることを確認したいとします。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def employee_phone_numbers(employee_df, phone_number_df): # 入力された従業員データの行数をカウントします original_employee_rows = employee_df.count() # 入力された従業員データの行数を表示します print("Incoming employee rows {}".format(original_employee_rows)) # 従業員データと電話番号データを左結合します df = employee_df.join(phone_number_df, 'employee_id', 'left') # 結合後のデータの行数をカウントします rows_after_join = df.count() # 結合後のデータの行数を表示します print("Final employee rows {}".format(rows_after_join)) # 結合後のデータの行数が元の行数よりも多い場合、一部の従業員が複数の電話番号を持っていると表示します if rows_after_join > original_employee_rows: print("Some employees have multiple phone numbers!") # 結合後のデータの行数が元の行数と同じまたは少ない場合、データが正しいと表示します else: print("Data is correct")
入社予定の従業員の行数 100
最終的な従業員の行数 105
一部の従業員は複数の電話番号を持っています!

この部分はプログラムコードではなく、プログラムの結果や出力の説明と思われます。したがって、上記のように翻訳します。

Spark クエリプラン

特定の DataFrame を生成するために Spark が実行する最適化された物理プランにアクセスするには、.explain() を呼び出します。

Copied!
1 2 3 4 5 6 7 def employee_phone_numbers(employee, phone): # 社員の誕生日が今月と同じ月のデータを抽出 employee = employee.where(F.month(employee.birthday) == F.month(F.current_date())) # 社員データと電話番号データを結合 df = employee.join(phone, 'employee_id', 'left') # 結合処理の詳細を説明 df.explain()
== 物理的なプラン ==
*(2) Project [employee_id#9734, name#9732, birthday#9733, phone_number#9728]
+- *(2) BroadcastHashJoin [employee_id#9734], [employee_id#9729], LeftOuter, BuildRight
   :- *(2) Filter (month(birthday#9733) = 10)  # 誕生日が10月のレコードをフィルタリングします
   :  +- *(2) FileScan parquet !ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36:ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36@00000000-1ebd-4a81-9f64-2d4c8a8472bc:master.ri.foundry.main.dataset.6ad20cd7-45b0-4312-b096-05f57487f650[name#9732,birthday#9733,employee_id#9734] Batched: true, Format: Parquet, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,birthday:date,employee_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))  # ブロードキャストエクスチェンジを使用して、大きなテーブルと小さなテーブルを結合します
      +- *(1) Project [phone_number#9728, employee_id#9729]
         +- *(1) Filter isnotnull(employee_id#9729)  # employee_idがnullでないレコードをフィルタリングします
            +- *(1) FileScan csv !ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db:ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db@00000000-1ebc-f483-b75d-dbcc3292d9e4:master.ri.foundry.main.dataset.f5bf4c77-37c0-4e29-8a68-814c35442bbd[phone_number#9728,employee_id#9729] Batched: false, Format: CSV, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<phone_number:int,employee_id:int>

データの閲覧

例えば、どの従業員が最も多くの電話番号を持っているかを確認したいとします。興味のあるデータセット(1つ以上の番号を持つ従業員)を導出し、.take(3) を呼び出して上位 3 行をリストとして取得します。また、.collect() は DataFrame のすべての行をリストとして取得します。

警告

Python 環境に取り込むデータが多すぎると、簡単にメモリが不足してしまうことがあります。データ量が少ないものだけ collect() してください。

Copied!
1 2 3 4 5 6 7 8 9 10 # 関数 multiple_numbers の定義 def multiple_numbers(phone_numbers): # 'employee_id' に対してグループ化し、それぞれのグループ内で 'phone_number' の数をカウントします。 # そして、それを 'numbers' という新しい列として追加します。 # その後、'numbers' の値が 1 を超える行だけを抽出し、'numbers' の値で降順に並び替えます。 df = phone_numbers.groupBy('employee_id').agg( F.count('phone_number').alias('numbers') ).where('numbers' > 1).sort(F.col('numbers').desc()) # 結果の先頭 3 行をプリントします。 print(df.take(3))
[Row(employee_id=70, numbers=4), Row(employee_id=90, numbers=2), Row(employee_id=25, numbers=2)]
# Rowオブジェクトのリスト。各Rowオブジェクトは、employee_idとnumbersの2つの属性を持っています。
# ここでは、employee_idが70の従業員は、numbersの値が4、employee_idが90の従業員はnumbersの値が2、employee_idが25の従業員はnumbersの値が2です。