注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ダウンロードのためのデータセットを準備する

以下のエクスポートプロセスは高度なワークフローであり、Foundry インターフェースから直接データをダウンロードできない場合、または Code Repositories の アクション メニューや Foundry の別のアプリケーションからのエクスポート を使用できない場合にのみ実行する必要があります。

このガイドでは、Code Repositoriesで変換を使用して CSV をダウンロード用に準備する方法について説明します。場合によっては、Foundry インターフェースの アクション メニューを使用せずに、Foundry から CSV 形式のデータサンプルをダウンロードする必要があるかもしれません。このような場合、アクション メニューを使用する代わりに、ビルド中にエクスポートファイルを準備することをお勧めします。

データの準備

ダウンロードのための CSV を準備する最初のステップは、フィルター処理され、クリーニングされたデータセットを作成することです。以下のステップを実行することをお勧めします:

  1. データサンプルがエクスポート可能であり、データエクスポート制御ルールに従っていることを確認します。具体的には、エクスポートがユーザーの組織のデータガバナンスポリシーに準拠していることを確認する必要があります。
  2. データを可能な限り小さくフィルター処理することで、必要な目的を達成します。一般的に、CSV 形式のデータの非圧縮サイズは、デフォルトの HDFS ブロックサイズ(128 MB)未満である必要があります。これを達成するために、必要な行のみを選択し、行の数を最小限に抑える必要があります。特定の値をフィルター処理するか、または任意の数の行(例えば 1000)を持つランダムサンプルを取ることで、行の数を減らすことができます。
  3. 行タイプを string に変更します。CSV 形式はスキーマ(強制的な型と行のラベル)を欠いているため、すべての行を文字列にキャストすることを推奨します。これは、タイムスタンプの行に特に重要です。

以下のサンプルコードは、ニューヨークのタクシーデータセットから抽出されたもので、ダウンロード用のデータを準備する際に参考になるかもしれません:

def prepare_input(my_input_df):
    from pyspark.sql import functions as F

    # フィルタリングするための列名
    filter_column = "vendor_id"
    # フィルタリングする値
    filter_value = "CMT"
    # フィルタリングを実施
    df_filtered = my_input_df.filter(filter_value == F.col(filter_column))

    # サンプリングする行数の近似値
    approx_number_of_rows = 1000
    # サンプリングの割合を計算
    sample_percent = float(approx_number_of_rows) / df_filtered.count()

    # サンプリングを実施
    df_sampled = df_filtered.sample(False, sample_percent, seed=0)

    # 重要な列名
    important_columns = ["medallion", "tip_amount"]

    # 重要な列だけを抽出し、その型を文字列に変換
    return df_sampled.select([F.col(c).cast(F.StringType()).alias(c) for c in important_columns])

同様のロジックとSparkの概念を使用して、SQLやJavaなどの他のSpark APIでも準備を実装することができます。

出力形式の設定とパーティションの結合

データがエクスポートの準備が整ったら、出力形式をCSVに設定できます。出力形式をCSVに設定すると、データの基礎となる形式はFoundryでCSVファイルとして保存されます。また、出力形式をJSON、ORC、Parquet、またはテキストに設定することもできます。最後に、結果を単一のCSVファイルに保存するために、データを単一のパーティションに結合してダウンロードする必要があります。

Python

以下の例のコードは、Pythonでデータを結合する方法を示しています:

Copied!
1 2 3 4 5 6 7 8 9 from transforms.api import transform, Input, Output @transform( output=Output("/path/to/python_csv"), # 出力のパスを指定します my_input=Input("/path/to/input") # 入力のパスを指定します ) def my_compute_function(output, my_input): # my_inputのデータフレームをcoalesce関数で1つのパーティションにまとめ、 # それをcsv形式で出力します。オプションとしてヘッダーを含めるように指定しています。 output.write_dataframe(my_input.dataframe().coalesce(1), output_format="csv", options={"header": "true"})

SQL

以下の例示コードは、SQLでデータを結合する方法を示しています:

CREATE TABLE `/path/to/sql_csv` USING CSV AS SELECT /*+ COALESCE(1) */ * FROM `/path/to/input`

以下のように日本語でコメントを追加します:

/*
CSVを使用して`/path/to/sql_csv`というテーブルを作成します。このテーブルは`/path/to/input`から全てのデータを取得します。
COALESCE(1)は、最初の非null引数を返す関数で、ここではパーティションの最適化に使用されています。
*/
CREATE TABLE `/path/to/sql_csv` USING CSV AS SELECT /*+ COALESCE(1) */ * FROM `/path/to/input`

追加のCSV生成オプションについては、公式 Spark ドキュメンテーションを参照してください。

ダウンロードのためのファイルにアクセスする

データセットが作成されたら、データセットページの 詳細 タブに移動します。CSVはダウンロード可能として表示されるはずです。

ダウンロード可能なCSV