Warning

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

コンセプト:列

このドキュメントの例を追うためには、from pyspark.sql import functions as Fを追加してください。

列はPySparkクラスで管理されています:pyspark.sql.Column。列のインスタンスは、既存の列を直接参照したり、その列から表現を派生させるたびに作成されます。以下のいずれかの方法で列を参照できます:

  • F.col("column_name")
  • F.column("column_name")

列を参照することは、selectを実行することとは同等ではありません。なぜなら、「選択」する列とは、結果として表示されるデータセットに含めたい列をサブセット(および並べ替え)することを指すからです。

目次

スキーマの取得

DataFrame.columns

すべての列名をPythonのリストとして返します

Copied!
1 columns = df.columns # dfの列名を取得します。例:['age', 'name']

DataFrame.dtypes

すべての列名とそのデータ型をタプルのリストとして返します

Copied!
1 dtypes = df.dtypes # df.dtypesを用いて、データフレームの各列のデータ型を取得します。例:[('age', 'int'), ('name', 'string')]

Select(選択)

DataFrame.select(*cols)

元の DataFrame の列の一部を含む新しい DataFrame を返します。

例えば、次のような 6 つの名前付き列がある DataFrame を持っているとします:idfirst_namelast_namephone_numberaddressis_active_member

idfirst_namelast_namephone_numberzip_codeis_active_member
1JohnDoe(123) 456-789010014true
2JaneEyre(213) 555-123490007true
..................

利用可能なものの一部である名前付き列のみを含む DataFrame に変換したい場合があります。たとえば、phone_number という名前の列だけが含まれる表を作成したい場合があります:

Copied!
1 2 # "phone_number"という名前の列を選択します df = df.select("phone_number")
phone_number
(123) 456-7890
(213) 555-1234
...

あるいは、idfirst_namelast_nameだけが欲しい場合もあるでしょう(同じタスクを達成するための方法は少なくとも3つあります):

  1. 列名を直接指定して渡す:

    Copied!
    1 df = df.select("id", "first_name", "last_name")

    または列インスタンスを渡す:

    Copied!
    1 df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
  2. 列名の配列を渡す:

    Copied!
    1 2 select_columns = ["id", "first_name", "last_name"] df = df.select(select_columns)
  3. "アンパックされた"配列を渡す:

    Copied!
    1 2 3 select_columns = ["id", "first_name", "last_name"] df = df.select(*select_columns) # same as: df = df.select("id", "first_name", "last_name")
    idfirst_namelast_name
    1JohnDoe
    2JaneEyre
    .........

    select_columnsの前の*は、配列をアンパックして、機能的に#1と同じように動作するようにします(コメント参照)。これにより、以下のような操作が可能になります:

    Copied!
    1 2 3 select_columns = ["id", "first_name", "last_name"] return df.select(*select_columns, "phone_number") # same as: df = df.select("id", "first_name", "last_name", "phone_number")
    idfirst_namelast_namephone_number
    1JohnDoe(123) 456-7890
    2JaneEyre(213) 555-1234
    ............

ユーザーの出力データセットには選択した列のみが含まれ選択した順序で表示されます。元の列の順序は維持されません。名前は一意で大文字小文字を区別し、選択元のデータセットにすでに列として存在していなければなりません。

そのルールの例外は、新しい列を導出し、すぐにそれを選択できることです。新たに導出された列にはalias、つまり名前を付ける必要があります:

string1string2string3string4
firstsecondthirdFourth
onetwothreefour
Copied!
1 2 3 4 5 # "string1"と"string2"という名前の列を":"で連結します。結果として得られる新しい列を"derived_column"とします。 derived_column = F.concat_ws(":", F.col("string1"), F.col("string2")) # 元のデータフレーム(df)から"string3"という名前の列と上で作成した"derived_column"を選択し、"derived_column"の名前を"derived"に変更します。 return df.select("string3", derived_column.alias("derived"))
Copied!
1 2 # 新しいデータフレームは、既存のデータフレームに新たに導出された列を追加して作成します。 new_df = old_df.withColumn("column_name", derived_column)
  • new_dfold_dfのすべての列を含む結果のデータフレームで、new_column_nameが追加されています。
  • old_df:新しい列を適用したいデータフレーム
  • column_name:新規作成(old_dfに存在しない場合)または更新(old_dfに既に存在する場合)したい列の名前。
  • derived_column:列を導出する式で、column_name(または列に付ける任意の名前)の下の各行に適用されます。

既存のDataFrameがある場合、withColumnメソッドを使用して新しい列を作成したり、新しいまたは変更した値で既存の列を更新することができます。これは特に以下の目標にとって非常に便利です:

  1. 既存の値に基づいて新しい値を導出する

    Copied!
    1 df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
    Copied!
    1 df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
  2. 値を一つの型から別の型にキャストする

    Copied!
    1 2 # `start_timestamp`をDateTypeにキャストし、新しい値を`start_date`に保存する df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
  3. 列を更新する

    Copied!
    1 2 # 列`string`を自身のすべて小文字バージョンで更新する df = df.withColumn("string", F.lower(F.col("string")))

名前変更、エイリアス

DataFrame.withColumnRenamed(name, rename)

.withColumnRenamed()を使用して列の名前を変更します:

Copied!
1 2 # "old_name"という名前のカラム名を"new_name"に変更します df = df.withColumnRenamed("old_name", "new_name")

列の名前を変更するタスクを見る別の方法は、PySparkがトランスフォームステートメントを最適化する方法についての洞察を提供するはずです:

Copied!
1 2 3 # withColumn関数を使用して、新しい列 "new_name" を作成します。この列の内容は "old_name" 列と同じです。 # その後、drop関数を使用して元の "old_name" 列を削除します。 df = df.withColumn("new_name", F.col("old_name")).drop("old_name")

しかし、withColumnを使用せずに新しい列を導出し、それでも名前をつける必要があるいくつかのケースもあります。これがalias(またはそのメソッドのエイリアス、name)が便利な場所です。以下にいくつかの使用例を示します:

Copied!
1 2 3 4 5 6 7 # 新たに派生させた列を"new_name"という名称で選択します df = df.select(derived_column.alias("new_name")) # .name("new_name")は.alias("new_name")と同じ意味です df = df.select(derived_column.name("new_name")) # "group"列でグループ化し、"number"列の合計("sum_of_numbers"として)とレコード数("count"として)を集計します df = df.groupBy("group") \ .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count"))

一度に複数の列の名前を変更することもできます:

Copied!
1 2 3 4 5 6 7 8 9 10 # 英語の名前を日本語の名前に変更するための辞書を作成します。 renames = { "column": "column_renamed", # "column"を"column_renamed"に変更 "data": "data_renamed", # "data"を"data_renamed"に変更 } # 辞書の各アイテムについてループを回します。 for colname, rename in renames.items(): # DataFrame 'df'の列名を新しい名前に変更します。 df = df.withColumnRenamed(colname, rename)

Drop

DataFrame.drop(*cols)

指定された列を削除した、元の DataFrame のサブセットである新しい DataFrame を返します。(スキーマに指定された列名が含まれていない場合、これは失敗します。)

列を削除する方法は2つあります。直接的な方法と間接的な方法です。間接的な方法は、select を使用して、保持したい列のサブセットを選択します。直接的な方法は、drop を使用して、破棄したい列のサブセットを提供します。どちらも似たような使用構文がありますが、ここでは順序は関係ありません。いくつかの例:

idfirst_namelast_namephone_numberzip_codeis_active_member
1JohnDoe(123) 456-789010014true
2JaneEyre(213) 555-123490007true
..................

たとえば、phone_number という列だけを削除したい場合は次のようになります。

Copied!
1 2 # "phone_number"という列をデータフレームから削除します df = df.drop("phone_number")
idfirst_namelast_namezip_codeis_active_member
1JohnDoe10014true
2JaneEyre90007true
...............

または、idfirst_name、およびlast_nameを削除することを希望するかもしれません(同じタスクを達成する方法は少なくとも3つ存在します):

  1. 列名を直接入力します:

    Copied!
    1 df = df.drop("id", "first_name", "last_name")

    または

    Copied!
    1 df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
  2. 配列を入力します:

    Copied!
    1 2 drop_columns = ["id", "first_name", "last_name"] df = df.drop(drop_columns)
  3. "アンパックされた"配列を入力します:

    Copied!
    1 2 3 drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns) # same as: df = df.drop("id", "first_name", "last_name")
    phone_numberzip_codeis_active_member
    (123) 456-789010014true
    (213) 555-123490007true
    .........

    *drop_columnsの前にあると配列をアンパックし、#1のように機能的に同じように動作します(コメントを参照)。これにより、以下のようなことが可能になります:

    Copied!
    1 2 3 drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns, "phone_number") # same as: df = df.drop("id", "first_name", "last_name", "phone_number")
    zip_codeis_active_member
    10014true
    90007true
    ......

キャスト

列.キャスト(タイプ)

以下は存在するすべてのデータタイプです:NullType, StringType, BinaryType, BooleanType, DateType, TimestampType, DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType, MapType, StructType, StructField

一般的に、列のキャストメソッドを使用して、ほとんどのデータタイプを他のデータタイプに変換できます:

Copied!
1 2 3 4 5 from pyspark.sql.types import StringType df.select(df.age.cast(StringType()).alias("age")) # df.ageがIntegerType()だと仮定します。 # このコードは、dfのage列のデータタイプを文字列(StringType)に変換します。 # alias("age")は変換後の列名を指定しています。ここでは元の列名(age)をそのまま使用しています。

または

Copied!
1 2 df.select(df.age.cast("string").alias("age")) # StringType()を使用するのと同じ効果があります。
年齢
"2"
"5"

キャスティングは基本的に新しく派生した列を作成します。これにより、selectwithColumnフィルター処理するなどを直接実行できます。PySparkでは「ダウンキャスティング」と「アップキャスティング」の概念も適用されるため、以前のデータタイプに格納されたより詳細な情報を失うか、ガベージ情報を取得する可能性があります。

when, otherwise

F.when(条件, 値).otherwise(値2)

パラメーター:

  • 条件 - ブール型の列の表現
  • - リテラル値または列の表現

または値2パラメーターと同一の列の表現に評価します。Column.otherwise()が呼び出されない場合、一致しない条件に対してNone(null)の列の表現が返されます。

whenotherwise演算子は、新しい列値を計算するif-else文に対して類似性を提供します。基本的な使用法は以下の通りです:

Copied!
1 2 3 4 5 6 7 8 9 10 # CASE WHEN (age >= 21) THEN true ELSE false END # (年齢が21以上の場合はtrue、それ以外はfalseとする) at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False) # CASE WHEN (last_name != "") THEN last_name ELSE null # (last_nameが空でない場合はlast_name、それ以外はnullとする) last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None) # 上記で作成した列を選択し、エイリアスをつけて新しいデータフレームを作成 df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))

when ステートメントも、必要な回数だけチェーンすることができます:

Copied!
1 2 3 # "age"という名前のカラムを基に、値が35以上の場合は"A"とラベル付けし、 # それ以外で値が21以上の場合は"B"とラベル付けし、それ以外の場合は"C"とラベル付けするという条件分岐を行うスイッチを定義しています。 switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")

これらの評価は列に割り当てることができますし、フィルター処理するのにも使用できます:

Copied!
1 2 df = df.withColumn("switch", switch) # switch=A、B、またはC df = df.where(~F.isnull(last_name)) # last_name(空の文字列がnull値として評価された後)がnullでない行をフィルタリングする