注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このドキュメントの例に従うには、from pyspark.sql import functions as F
を追加してください。
行は、pyspark.sql.Column
クラスによって管理されます。行インスタンスは、既存の行を直接参照するか、その式から派生するたびに作成されます。以下のいずれかの方法で行を参照できます。
F.col("column_name")
F.column("column_name")
行を参照することは、select
を実行することと同等ではありません。なぜなら、「行を選択する」ことは、結果として得られるデータセットに表示したい行をサブセット化(および並べ替え)することを意味するからです。
DataFrame.columns
すべての行名を Python のリストとして返します
Copied!1
columns = df.columns # dfのカラム名を取得します。例:['age', 'name']
DataFrame.dtypes
全ての行名とそれらのデータ型をタプルのリストとして返します。
Copied!1
dtypes = df.dtypes # [('age', 'int'), ('name', 'string')] # dfのデータ型を取得します(例:年齢は整数型、名前は文字列型)
DataFrame.select(*cols)
元の DataFrame
から一部の行を選択した新しい DataFrame
を返します。
例えば、6つの名前付き行を持つ DataFrame があるとします:id
、first_name
、last_name
、phone_number
、address
、is_active_member
id | first_name | last_name | phone_number | zip_code | is_active_member |
---|---|---|---|---|---|
1 | John | Doe | (123) 456-7890 | 10014 | true |
2 | Jane | Eyre | (213) 555-1234 | 90007 | true |
... | ... | ... | ... | ... | ... |
ユーザーが利用可能な中から特定の名前付き行のみを含む DataFrame に変換したい場合もあります。例えば、phone_number
という単一の行のみを含む表を欲しいとします:
Copied!1 2
# 電話番号を選択してデータフレームを更新します df = df.select("phone_number")
phone_number |
---|
(123) 456-7890 |
(213) 555-1234 |
... |
id
、first_name
、およびlast_name
だけがほしい場合(同じタスクを達成するための少なくとも 3つの異なる方法があります):
行名を直接渡す:
Copied!1
df = df.select("id", "first_name", "last_name")
または行インスタンスを渡す:
Copied!1
df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
行名の配列を渡す:
Copied!1 2
select_columns = ["id", "first_name", "last_name"] df = df.select(select_columns)
配列を"アンパック"して渡す:
Copied!1 2 3
select_columns = ["id", "first_name", "last_name"] df = df.select(*select_columns) # これと同じ:df = df.select("id", "first_name", "last_name")
id | first_name | last_name |
---|---|---|
1 | John | Doe |
2 | Jane | Eyre |
... | ... | ... |
select_columns
の前の*
は、配列をアンパックして、機能的に#1
と同じように動作するようにします(コメントを参照)。これにより、次のような操作が可能になります。
Copied!1 2 3
select_columns = ["id", "first_name", "last_name"] return df.select(*select_columns, "phone_number") # これと同じ:df = df.select("id", "first_name", "last_name", "phone_number")
id | first_name | last_name | phone_number |
---|---|---|---|
1 | John | Doe | (123) 456-7890 |
2 | Jane | Eyre | (213) 555-1234 |
... | ... | ... | ... |
選択した行のみが出力データセットに含まれることに注意してください。また、選択した行の順序がそのまま保持されます(元の行の順序を保持するのではなく)。名前は一意であり、大文字と小文字が区別され、選択元のデータセットの行としてすでに存在している必要があります。
そのルールの例外として、新しい列を派生させてすぐにそれを選択することができますが、新しく派生した列にはalias
(名前)を付ける必要があります:
string1 | string2 | string3 | string4 |
---|---|---|---|
first | second | third | Fourth |
one | two | three | four |
Copied!1 2 3 4
# "string1" と "string2" を ":" で連結して新しいカラムを作成 derived_column = F.concat_ws(":", F.col("string1"), F.col("string2")) # "string3" と新しく作成したカラム(エイリアス名は "derived")を選択して返す return df.select("string3", derived_column.alias("derived"))
string | string3 |
---|---|
third | first |
three | one |
DataFrame.withColumn(name, column)
Copied!1 2
# 新しいデータフレームを作成し、既存のデータフレームから派生した列を追加します。 new_df = old_df.withColumn("column_name", derived_column)
new_df
: old_df
からすべての行を含む結果のデータフレームで、new_column_name
が追加されています。old_df
: 新しい行を適用したいデータフレームcolumn_name
: 作成する行の名前(old_df に存在しない場合)または更新する行の名前(old_df に既に存在する場合)。derived_column
: 行を導出する式で、column_name
(または行に付ける他の名前)の下のすべての行に適用されます。既存の DataFrame がある場合、新しい列を作成するか、既存の列に新しい値や変更された値を追加するには、withColumn
メソッドを使用できます。これは特に以下の目的に役立ちます。
既存の値に基づいて新しい値を導出する
Copied!1
df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
Copied!1
df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
あるタイプの値から別のタイプの値にキャストする
Copied!1 2
# `start_timestamp` を DateType にキャストし、新しい値を `start_date` に格納する df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
行を更新する
Copied!1 2
# 列 `string` をそのすべての小文字バージョンで更新する df = df.withColumn("string", F.lower(F.col("string")))
DataFrame.withColumnRenamed(name, rename)
.withColumnRenamed()
を使用して行の名前を変更します。
Copied!1 2
# "old_name"列の名前を"new_name"に変更する df = df.withColumnRenamed("old_name", "new_name")
列の名前を変更するというタスクを別の視点から見ると、PySparkが変換ステートメントを最適化する方法について理解が深まるはずです。これは以下のようになります:
Copied!1 2 3
# DataFrame 'df' に新しい列 "new_name" を作成します。この新しい列は "old_name" 列の値を持っています。 # そして、 "old_name" 列を削除します。 df = df.withColumn("new_name", F.col("old_name")).drop("old_name")
しかし、withColumn
を使用せずに新しい行を導出し、それでも名前をつける必要があるいくつかのケースもあります。ここでalias
(またはそのメソッドエイリアス、name
)が便利です。以下にいくつかの使用例を示します:
Copied!1 2 3 4 5 6 7 8 9 10
# 列名を"new_name"に変更して選択します。 df = df.select(derived_column.alias("new_name")) # .alias("new_name")と同様に列名を"new_name"に変更して選択します。 df = df.select(derived_column.name("new_name")) # "group"によってデータをグループ化し、その結果に対して合計値とカウントを計算します。 # "number"の合計値は"sum_of_numbers"として、カウントは"count"として新たに列を作成します。 df = df.groupBy("group") \ .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count"))
また、一度に複数の行をリネームすることも可能です:
Copied!1 2 3 4 5 6 7 8 9 10
# "column"と"data"という名前の列をそれぞれ"column_renamed"と"data_renamed"に変更するための辞書を作成します renames = { "column": "column_renamed", "data": "data_renamed", } # 辞書の各項目に対して、列の名前を変更する操作を行います for colname, rename in renames.items(): # withColumnRenamed関数を使用して、colname(元の列名)をrename(新しい列名)に変更します df = df.withColumnRenamed(colname, rename)
DataFrame.drop(*cols)
指定された列を削除して、元の DataFrame
の列のサブセットを持つ新しい DataFrame
を返します。(スキーマに指定された列名が含まれていない場合、これは失敗します。)
列を削除する方法は2つあります: 直接的な方法と間接的な方法です。間接的な方法は select
を使用して、維持したい列のサブセットを選択します。直接的な方法は、破棄したい列のサブセットを提供するために、drop
を使用します。どちらも似たような使用構文がありますが、ここでは順序は関係ありません。いくつかの例:
id | first_name | last_name | phone_number | zip_code | is_active_member |
---|---|---|---|---|---|
1 | John | Doe | (123) 456-7890 | 10014 | true |
2 | Jane | Eyre | (213) 555-1234 | 90007 | true |
... | ... | ... | ... | ... | ... |
例えば、phone_number
という1つの列だけを削除したい場合:
Copied!1 2
# "phone_number"という列を削除します df = df.drop("phone_number")
id | first_name | last_name | zip_code | is_active_member |
---|---|---|---|---|
1 | ジョン | ドウ | 10014 | true |
2 | ジェーン | エア | 90007 | true |
... | ... | ... | ... | ... |
id
, first_name
, last_name
を削除するかもしれません(同じタスクを達成するための少なくとも 3 つの異なる方法があります):
列名を直接渡す:
Copied!1
df = df.drop("id", "first_name", "last_name")
または
Copied!1
df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
配列を渡す:
Copied!1 2
drop_columns = ["id", "first_name", "last_name"] df = df.drop(drop_columns)
"展開された"配列を渡す:
Copied!1 2 3
drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns) # 同じ : df = df.drop("id", "first_name", "last_name")
phone_number | zip_code | is_active_member |
---|---|---|
(123) 456-7890 | 10014 | true |
(213) 555-1234 | 90007 | true |
... | ... | ... |
drop_columns
の前にある *
は、配列を 展開 して、機能的に #1
と同じように動作するようにします(コメント参照)。これにより、以下の操作が可能になります:
Copied!1 2 3
drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns, "phone_number") # 同じ : df = df.drop("id", "first_name", "last_name", "phone_number")
zip_code | is_active_member |
---|---|
10014 | true |
90007 | true |
... | ... |
行.cast(タイプ)
以下は、存在するすべてのデータタイプです:NullType
, StringType
, BinaryType
, BooleanType
, DateType
, TimestampType
, DecimalType
, DoubleType
, FloatType
, ByteType
, IntegerType
, LongType
, ShortType
, ArrayType
, MapType
, StructType
, StructField
一般的に、列の cast
メソッドを使用して、ほとんどのデータタイプを他のデータタイプに変換できます:
Copied!1 2 3
from pyspark.sql.types import StringType df.select(df.age.cast(StringType()).alias("age")) # df.ageがIntegerType()であると仮定
または
Copied!1 2
df.select(df.age.cast("string").alias("age")) # StringType()を使用するのと基本的に同じです。
年齢 |
---|
"2" |
"5" |
キャスティングは基本的に新たに派生した行を作成し、その上で直接 select
、withColumn
、filter
などを実行できます。PySparkでも"ダウンキャスティング"と"アップキャスティング"の概念が適用されるため、以前のデータ型に格納されていたより詳細な情報を失うか、ゴミ情報を得る可能性があります。
F.when(条件, 値).otherwise(値2)
パラメーター:
値
または 値2
パラメーターと同一の行式に評価されます。Column.otherwise()
が呼び出されない場合、一致しない条件に対して None
(null)の行式が返されます。
when
、otherwise
演算子は新しい行値を計算する if-else
文に相当するものを提供します。基本的な使用方法は次のとおりです:
Copied!1 2 3 4 5 6 7 8 9 10
# CASE WHEN (age >= 21) THEN true ELSE false END # (年齢が21以上の場合は真、それ以外は偽) at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False) # CASE WHEN (last_name != "") THEN last_name ELSE null # (last_nameが空でない場合はlast_name、それ以外はnull) last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None) # データフレームdfを選択し、at_least_21とlast_nameをエイリアスとして使用 df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))
when
文も必要な回数だけ連鎖させることができます:
Copied!1 2
# "age" が 35 以上の場合 "A" を返す。それ以外の場合で "age" が 21 以上の場合 "B" を返す。それ以外の場合 "C" を返す switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")
これらの評価は、行に割り当てることも、フィルター処理するのに使用することもできます:
Copied!1 2
df = df.withColumn("switch", switch) # switchはA、B、またはCです df = df.where(~F.isnull(last_name)) # last_nameがnullでない行(空の文字列がnullとして評価された後)をフィルタリングします