2. [Repositories] データ変換入門14 - クリーニングユーティリティの作成

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

14 - クリーニングユーティリティの作成

📖 タスクの概要

ユーザーの生データの一部の値は、最適な形式ではありません。この演習では、データを前処理するために使用する関数を含む2つのユーティリティファイルを作成します。ユーザーのパイプラインの早い段階で修正したい異常には、次のようなものがあります(これらに限定されません):

  • flight_alerts_rawflightDate 行は現在、日付ではなく 文字列 タイプです。
  • 両方のマッピングデータセットの mapped_value 行には余分なスペースがあり、テキストは小文字で _ 文字で区切られています。 "Open and Assigned" よりも現在の値 "·······open_and_assigned" を好みます。

この最初のユーティリティファイルでは、リポジトリが作成されたときに提供された examples.py ファイルをリネームして再利用します。このトレーニングのルートでは、Python/PySpark を使用したデータ変換技術についての知識があることを前提としており、構文についての指導は提供していません。ただし、提供されたコードには、その目的を明確にするコメントが含まれています。

🔨 タスクの説明

  1. Master から yourName/feature/preprocessing という名前の新しいブランチをユーザーのリポジトリに作成します。

  2. リポジトリの Files パネルの examples.py ファイルを右クリックし、それを cleaning_utils.pyRename します。

  3. コードエディタウィンドウのファイルの内容を削除します(例えば、ctrl+a → Delete を使用します)。

  4. 下のコードブロックをコピーし、それをコードエディタに貼り付けます。

    コードコメントで説明されている機能が、上記で説明した問題を部分的に解決し、行名を正規化する方法に注意してください。

    from pyspark.sql import functions as F
    import re
    
    
    def normalize_strings(df, string_columns):
        """
        This function takes a dataframe (df) and an array of string columns as arguments
        This function iterates through the list of string columns in the dataframe and
        1. Replaces underscores with spaces
        2. Capitalizes the first letter of each word
        3. Trims trailing whitespace
        4. Replaces empty strings with null
        """
        for colm in string_columns:
            df = df.withColumn(colm, F.regexp_replace(F.col(colm), '_', ' '))
            df = df.withColumn(colm, F.initcap(F.col(colm)))
            df = df.withColumn(colm, F.trim(F.col(colm)))
            df = df.withColumn(colm, F.when(F.col(colm) != "", F.col(colm)).otherwise(None))
    
        return df
    
    
    def normalize_column_names(df):
        """
        This function take a dataframe (df) as an argument
        This function calls the normalize_column_name function to convert all column names in the dataframe to snake_case
        """
        return df.select(
            *(F.col(f"`{colm}`").alias(normalize_column_name(colm)) for colm in df.columns)
        )
    
    
    def normalize_column_name(name):
        """
        This function takes a single column name (string) as an argument
        This function:
        1. Trims trailing whitespace
        2. Converts from camelCase to snake_case
        3. Collapes any sequential underscores into a single underscore
        4. Removes any trailing underscores
        """
        name = name.strip()
        name = re.sub(r'([a-z])([A-Z])', r'\1_\2', name)
        name = re.sub(r'[^a-zA-Z0-9_]', r'_', name)
        name = re.sub(r'_+', r'_', name)
        name = re.sub(r'_$', r'', name)
        return name.lower()
    
  5. 次のメッセージでコードをコミットします: “feature: add cleaning utils.”