注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

コンセプト: 行

このドキュメントの例に従うには、from pyspark.sql import functions as Fを追加してください。

行は、pyspark.sql.Column クラスによって管理されます。行インスタンスは、既存の行を直接参照するか、その式から派生するたびに作成されます。以下のいずれかの方法で行を参照できます。

  • F.col("column_name")
  • F.column("column_name")

行を参照することは、selectを実行することと同等ではありません。なぜなら、「行を選択する」ことは、結果として得られるデータセットに表示したい行をサブセット化(および並べ替え)することを意味するからです。

目次

スキーマの取得

DataFrame.columns

すべての行名を Python のリストとして返します

Copied!
1 columns = df.columns # dfのカラム名を取得します。例:['age', 'name']

DataFrame.dtypes

全ての行名とそれらのデータ型をタプルのリストとして返します。

Copied!
1 dtypes = df.dtypes # [('age', 'int'), ('name', 'string')] # dfのデータ型を取得します(例:年齢は整数型、名前は文字列型)

選択

DataFrame.select(*cols)

元の DataFrame から一部の行を選択した新しい DataFrame を返します。

例えば、6つの名前付き行を持つ DataFrame があるとします:idfirst_namelast_namephone_numberaddressis_active_member

idfirst_namelast_namephone_numberzip_codeis_active_member
1JohnDoe(123) 456-789010014true
2JaneEyre(213) 555-123490007true
..................

ユーザーが利用可能な中から特定の名前付き行のみを含む DataFrame に変換したい場合もあります。例えば、phone_number という単一の行のみを含む表を欲しいとします:

Copied!
1 2 # 電話番号を選択してデータフレームを更新します df = df.select("phone_number")
phone_number
(123) 456-7890
(213) 555-1234
...

idfirst_name、およびlast_nameだけがほしい場合(同じタスクを達成するための少なくとも 3つの異なる方法があります):

  1. 行名を直接渡す:

    Copied!
    1 df = df.select("id", "first_name", "last_name")

    または行インスタンスを渡す:

    Copied!
    1 df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
  2. 行名の配列を渡す:

    Copied!
    1 2 select_columns = ["id", "first_name", "last_name"] df = df.select(select_columns)
  3. 配列を"アンパック"して渡す:

    Copied!
    1 2 3 select_columns = ["id", "first_name", "last_name"] df = df.select(*select_columns) # これと同じ:df = df.select("id", "first_name", "last_name")
    idfirst_namelast_name
    1JohnDoe
    2JaneEyre
    .........

    select_columnsの前の*は、配列をアンパックして、機能的に#1と同じように動作するようにします(コメントを参照)。これにより、次のような操作が可能になります。

    Copied!
    1 2 3 select_columns = ["id", "first_name", "last_name"] return df.select(*select_columns, "phone_number") # これと同じ:df = df.select("id", "first_name", "last_name", "phone_number")
    idfirst_namelast_namephone_number
    1JohnDoe(123) 456-7890
    2JaneEyre(213) 555-1234
    ............

選択した行のみが出力データセットに含まれることに注意してください。また、選択した行の順序がそのまま保持されます(元の行の順序を保持するのではなく)。名前は一意であり、大文字と小文字が区別され、選択元のデータセットの行としてすでに存在している必要があります。

そのルールの例外として、新しい列を派生させてすぐにそれを選択することができますが、新しく派生した列にはalias(名前)を付ける必要があります:

string1string2string3string4
firstsecondthirdFourth
onetwothreefour
Copied!
1 2 3 4 # "string1" と "string2" を ":" で連結して新しいカラムを作成 derived_column = F.concat_ws(":", F.col("string1"), F.col("string2")) # "string3" と新しく作成したカラム(エイリアス名は "derived")を選択して返す return df.select("string3", derived_column.alias("derived"))
stringstring3
thirdfirst
threeone

作成、更新

DataFrame.withColumn(name, column)

Copied!
1 2 # 新しいデータフレームを作成し、既存のデータフレームから派生した列を追加します。 new_df = old_df.withColumn("column_name", derived_column)
  • new_df: old_df からすべての行を含む結果のデータフレームで、new_column_name が追加されています。
  • old_df: 新しい行を適用したいデータフレーム
  • column_name: 作成する行の名前(old_df に存在しない場合)または更新する行の名前(old_df に既に存在する場合)。
  • derived_column: 行を導出する式で、column_name(または行に付ける他の名前)の下のすべての行に適用されます。

既存の DataFrame がある場合、新しい列を作成するか、既存の列に新しい値や変更された値を追加するには、withColumn メソッドを使用できます。これは特に以下の目的に役立ちます。

  1. 既存の値に基づいて新しい値を導出する

    Copied!
    1 df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
    Copied!
    1 df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
  2. あるタイプの値から別のタイプの値にキャストする

    Copied!
    1 2 # `start_timestamp` を DateType にキャストし、新しい値を `start_date` に格納する df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
  3. 行を更新する

    Copied!
    1 2 # 列 `string` をそのすべての小文字バージョンで更新する df = df.withColumn("string", F.lower(F.col("string")))

名前の変更、エイリアス

DataFrame.withColumnRenamed(name, rename)

.withColumnRenamed() を使用して行の名前を変更します。

Copied!
1 2 # "old_name"列の名前を"new_name"に変更する df = df.withColumnRenamed("old_name", "new_name")

列の名前を変更するというタスクを別の視点から見ると、PySparkが変換ステートメントを最適化する方法について理解が深まるはずです。これは以下のようになります:

Copied!
1 2 3 # DataFrame 'df' に新しい列 "new_name" を作成します。この新しい列は "old_name" 列の値を持っています。 # そして、 "old_name" 列を削除します。 df = df.withColumn("new_name", F.col("old_name")).drop("old_name")

しかし、withColumnを使用せずに新しい行を導出し、それでも名前をつける必要があるいくつかのケースもあります。ここでalias(またはそのメソッドエイリアス、name)が便利です。以下にいくつかの使用例を示します:

Copied!
1 2 3 4 5 6 7 8 9 10 # 列名を"new_name"に変更して選択します。 df = df.select(derived_column.alias("new_name")) # .alias("new_name")と同様に列名を"new_name"に変更して選択します。 df = df.select(derived_column.name("new_name")) # "group"によってデータをグループ化し、その結果に対して合計値とカウントを計算します。 # "number"の合計値は"sum_of_numbers"として、カウントは"count"として新たに列を作成します。 df = df.groupBy("group") \ .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count"))

また、一度に複数の行をリネームすることも可能です:

Copied!
1 2 3 4 5 6 7 8 9 10 # "column"と"data"という名前の列をそれぞれ"column_renamed"と"data_renamed"に変更するための辞書を作成します renames = { "column": "column_renamed", "data": "data_renamed", } # 辞書の各項目に対して、列の名前を変更する操作を行います for colname, rename in renames.items(): # withColumnRenamed関数を使用して、colname(元の列名)をrename(新しい列名)に変更します df = df.withColumnRenamed(colname, rename)

ドロップ

DataFrame.drop(*cols)

指定された列を削除して、元の DataFrame の列のサブセットを持つ新しい DataFrame を返します。(スキーマに指定された列名が含まれていない場合、これは失敗します。)

列を削除する方法は2つあります: 直接的な方法と間接的な方法です。間接的な方法は select を使用して、維持したい列のサブセットを選択します。直接的な方法は、破棄したい列のサブセットを提供するために、drop を使用します。どちらも似たような使用構文がありますが、ここでは順序は関係ありません。いくつかの例:

idfirst_namelast_namephone_numberzip_codeis_active_member
1JohnDoe(123) 456-789010014true
2JaneEyre(213) 555-123490007true
..................

例えば、phone_number という1つの列だけを削除したい場合:

Copied!
1 2 # "phone_number"という列を削除します df = df.drop("phone_number")
idfirst_namelast_namezip_codeis_active_member
1ジョンドウ10014true
2ジェーンエア90007true
...............

id, first_name, last_name を削除するかもしれません(同じタスクを達成するための少なくとも 3 つの異なる方法があります):

  1. 列名を直接渡す:

    Copied!
    1 df = df.drop("id", "first_name", "last_name")

    または

    Copied!
    1 df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
  2. 配列を渡す:

    Copied!
    1 2 drop_columns = ["id", "first_name", "last_name"] df = df.drop(drop_columns)
  3. "展開された"配列を渡す:

    Copied!
    1 2 3 drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns) # 同じ : df = df.drop("id", "first_name", "last_name")
    phone_numberzip_codeis_active_member
    (123) 456-789010014true
    (213) 555-123490007true
    .........

    drop_columns の前にある * は、配列を 展開 して、機能的に #1 と同じように動作するようにします(コメント参照)。これにより、以下の操作が可能になります:

    Copied!
    1 2 3 drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns, "phone_number") # 同じ : df = df.drop("id", "first_name", "last_name", "phone_number")
    zip_codeis_active_member
    10014true
    90007true
    ......

キャスト

行.cast(タイプ)

以下は、存在するすべてのデータタイプです:NullType, StringType, BinaryType, BooleanType, DateType, TimestampType, DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType, MapType, StructType, StructField

一般的に、列の cast メソッドを使用して、ほとんどのデータタイプを他のデータタイプに変換できます:

Copied!
1 2 3 from pyspark.sql.types import StringType df.select(df.age.cast(StringType()).alias("age")) # df.ageがIntegerType()であると仮定

または

Copied!
1 2 df.select(df.age.cast("string").alias("age")) # StringType()を使用するのと基本的に同じです。
年齢
"2"
"5"

キャスティングは基本的に新たに派生した行を作成し、その上で直接 selectwithColumnfilter などを実行できます。PySparkでも"ダウンキャスティング"と"アップキャスティング"の概念が適用されるため、以前のデータ型に格納されていたより詳細な情報を失うか、ゴミ情報を得る可能性があります。

When, otherwise

F.when(条件, 値).otherwise(値2)

パラメーター:

  • 条件 - ブール型の行式
  • - リテラル値または行式

または 値2 パラメーターと同一の行式に評価されます。Column.otherwise() が呼び出されない場合、一致しない条件に対して None(null)の行式が返されます。

whenotherwise 演算子は新しい行値を計算する if-else 文に相当するものを提供します。基本的な使用方法は次のとおりです:

Copied!
1 2 3 4 5 6 7 8 9 10 # CASE WHEN (age >= 21) THEN true ELSE false END # (年齢が21以上の場合は真、それ以外は偽) at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False) # CASE WHEN (last_name != "") THEN last_name ELSE null # (last_nameが空でない場合はlast_name、それ以外はnull) last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None) # データフレームdfを選択し、at_least_21とlast_nameをエイリアスとして使用 df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))

when 文も必要な回数だけ連鎖させることができます:

Copied!
1 2 # "age" が 35 以上の場合 "A" を返す。それ以外の場合で "age" が 21 以上の場合 "B" を返す。それ以外の場合 "C" を返す switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")

これらの評価は、行に割り当てることも、フィルター処理するのに使用することもできます:

Copied!
1 2 df = df.withColumn("switch", switch) # switchはA、B、またはCです df = df.where(~F.isnull(last_name)) # last_nameがnullでない行(空の文字列がnullとして評価された後)をフィルタリングします