注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

構文チートシート

PySpark SQLで最も頻繁に使用されるパターンと関数へのクイックリファレンスガイド:

探している情報が見つからない場合は、PySpark公式ドキュメンテーションでカバーされている可能性があります。

一般的なパターン

出力のログ化

Copied!
1 2 3 4 5 6 7 # コードワークブック内 print("サンプルログ出力") # "example log output" を表示する # コードリポジトリ内 import logging # logging モジュールをインポートする logger = logging.getLogger(__name__) # 現在のファイル名を識別子にしてロガーを作成する logger.info("サンプルログ出力") # "example log output" を情報レベルでログ出力する

関数と型のインポート

Copied!
1 2 # F.my_function() や T.my_type() のように簡単に参照できるようにする from pyspark.sql import functions as F, types as T

フィルター処理

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 等しい条件でフィルタリング df = df.filter(df.is_adult == 'Y') # is_adultが'Y'であるデータだけをフィルタリング # >, <, >=, <= 条件でフィルタリング df = df.filter(df.age > 25) # ageが25より大きいデータだけをフィルタリング # 複数の条件はそれぞれの条件に丸括弧が必要 df = df.filter((df.age > 25) & (df.is_adult == 'Y')) # ageが25より大きく、かつis_adultが'Y'であるデータだけをフィルタリング # 許可された値のリストと比較 df = df.filter(col('first_name').isin([3, 4, 7])) # first_nameが3, 4, 7のいずれかであるデータだけをフィルタリング # 結果の並び替え df = df.orderBy(df.age.asc()) # ageの昇順に並び替え df = df.orderBy(df.age.desc()) # ageの降順に並び替え

結合

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 別のデータセットを左結合 df = df.join(person_lookup_table, 'person_id', 'left') # 'person_id'を基準に、person_lookup_tableをdfに左結合 # 別のデータセットで左反結合(左のデータフレームの一致しない行を返す) df = df.join(person_lookup_table, 'person_id', 'leftanti'); # 'person_id'を基準に、person_lookup_tableと一致しないデータをdfに返す # 左と右のデータセットで異なる列に一致 df = df.join(other_table, df.id == other_table.person_id, 'left') # dfの'id'とother_tableの'person_id'が一致するものを左結合 # 複数の列に一致 df = df.join(other_table, ['first_name', 'last_name'], 'left') # 'first_name'と'last_name'が一致するものを左結合 # ルックアップコード結合のための便利な一行関数 def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value): return ( df1 .join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left') # df1_keyとdf2_keyが一致するものを左結合 .withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key))) # df2_valueとdf1_keyの値が存在するものをdf1_key列に格納 .drop(df2_key) # df2_key列を削除 .drop(df2_value) # df2_value列を削除 ) df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc) # peopleとpay_codesの指定したキーに基づいて結合し、指定した値で置換

行操作

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # 新しい静的な列を追加 df = df.withColumn('status', F.lit('PASS')) # 'status'という名前の新しい列を追加し、その値をすべて'PASS'に設定します。 # 新しい動的な列を構築 df = df.withColumn('full_name', F.when( (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname) # 'fname'と'lname'が両方ともnullでない場合は、それらを連結します。 ).otherwise(F.lit('N/A'))) # nullの場合は'N/A'にします。 # 保持する列を選択し、必要に応じて一部をリネーム df = df.select( 'name', 'age', F.col('dob').alias('date_of_birth'), # 'dob'という列名を'date_of_birth'に変更します。 ) # 列を削除 df = df.drop('mod_dt', 'mod_username') # 'mod_dt'と'mod_username'という列を削除します。 # 列の名前を変更 df = df.withColumnRenamed('dob', 'date_of_birth') # 'dob'という列名を'date_of_birth'に変更します。 # 別のデータセットにも存在するすべての列を保持 df = df.select(*(F.col(c) for c in df2.columns)) # df2に存在する列のみをdfから選択します。 # バッチで列の名前を変更/列をクリーン for col in df.columns: df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_')) # すべての列名を小文字にし、空白とハイフンをアンダースコアに置換します。

キャスティングとヌル値・重複の結合

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # カラムの型を別の型にキャストする df = df.withColumn('price', df.price.cast(T.DoubleType())) # すべてのnullを特定の値で置き換える df = df.fillna({ 'first_name': 'Tom', 'age': 0, }) # 最初のnullでない値を取得する df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A'))) # データセット内の重複した行を削除する(distinct()と同じ) df = df.dropDuplicates() # 特定のカラムのみを考慮して重複した行を削除する df = df.dropDuplicates(['name', 'height'])

文字列操作

文字列フィルター処理

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # Contains - col.contains(string) # 文字列が含まれているか - col.contains(文字列) df = df.filter(df.name.contains('o')) # Starts With - col.startswith(string) # 文字列で始まるか - col.startswith(文字列) df = df.filter(df.name.startswith('Al')) # Ends With - col.endswith(string) # 文字列で終わるか - col.endswith(文字列) df = df.filter(df.name.endswith('ice')) # Is Null - col.isNull() # Nullかどうか - col.isNull() df = df.filter(df.is_adult.isNull()) # Is Not Null - col.isNotNull() # Nullでないか - col.isNotNull() df = df.filter(df.first_name.isNotNull()) # Like - col.like(string_with_sql_wildcards) # SQLワイルドカードを含む文字列に一致するか - col.like(SQLワイルドカードを含む文字列) df = df.filter(df.name.like('Al%')) # Regex Like - col.rlike(regex) # 正規表現に一致するか - col.rlike(正規表現) df = df.filter(df.name.rlike('[A-Z]*ice$')) # Is In List - col.isin(*values) # リスト内に存在するか - col.isin(*値) df = df.filter(df.name.isin('Bob', 'Mike'))

文字列関数

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # 部分文字列 - col.substr(startPos, length) (1-ベースのインデックス) # サブストリング - col.substr(開始位置, 長さ) (1から始まるインデックス) df = df.withColumn('short_id', df.id.substr(1, 10)) # Trim - F.trim(col) # トリム - F.trim(col) df = df.withColumn('name', F.trim(df.name)) # Left Pad - F.lpad(col, len, pad) # 右詰め - F.rpad(col, len, pad) # 左パッド - F.lpad(col, len, pad) # 右パッド - F.rpad(col, len, pad) df = df.withColumn('id', F.lpad('id', 4, '0')) # Left Trim - F.ltrim(col) # Right Trim - F.rtrim(col) # 左トリム - F.ltrim(col) # 右トリム - F.rtrim(col) df = df.withColumn('id', F.ltrim('id')) # Concatenate - F.concat(*cols) (null if any column null) # 連結 - F.concat(*cols) (いずれかの列がnullの場合はnull) df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname')) # Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols) (ignores nulls) # セパレータ/デリミタ付きで連結 - F.concat_ws(delimiter, *cols) (nullを無視) df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname')) # Regex Replace - F.regexp_replace(str, pattern, replacement) # 正規表現で置換 - F.regexp_replace(str, pattern, replacement) df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1')) # Regex Extract - F.regexp_extract(str, pattern, idx) # 正規表現で抽出 - F.regexp_extract(str, pattern, idx) df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))

数値演算

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # Round - F.round(col, scale=0) # 切り捨て - F.round(列, 小数点以下の桁数) df = df.withColumn('price', F.round('price', 0)) # Floor - F.floor(col) # 床関数 - F.floor(列) df = df.withColumn('price', F.floor('price')) # Ceiling - F.ceil(col) # 天井関数 - F.ceil(列) df = df.withColumn('price', F.ceil('price')) # Absolute Value - F.abs(col) # 絶対値 - F.abs(列) df = df.withColumn('price', F.abs('price')) # X raised to power Y – F.pow(x, y) # X の Y 乗 - F.pow(X, Y) df = df.withColumn('exponential_growth', F.pow('x', 'y')) # Select smallest value out of multiple columns – F.least(*cols) # 複数の列から最小値を選択 - F.least(*列) df = df.withColumn('least', F.least('subtotal', 'total')) # Select largest value out of multiple columns – F.greatest(*cols) # 複数の列から最大値を選択 - F.greatest(*列) df = df.withColumn('greatest', F.greatest('subtotal', 'total'))

日付&タイムスタンプ操作

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 # 既知の形式の文字列を日付に変換します(時間情報は除外します) df = df.withColumn('date_of_birth', F.to_date('date_of_birth', 'yyyy-MM-dd')) # 既知の形式の文字列をタイムスタンプに変換します(時間情報を含みます) df = df.withColumn('time_of_birth', F.to_timestamp('time_of_birth', 'yyyy-MM-dd HH:mm:ss')) # 日付から年を取得: F.year(col) # 日付から月を取得: F.month(col) # 日付から日を取得: F.dayofmonth(col) # 日付から時を取得: F.hour(col) # 日付から分を取得: F.minute(col) # 日付から秒を取得: F.second(col) df = df.filter(F.year('date_of_birth') == F.lit('2017')) # 日付に日数を加算・減算 df = df.withColumn('three_days_after', F.date_add('date_of_birth', 3)) df = df.withColumn('three_days_before', F.date_sub('date_of_birth', 3)) # 日付に月数を加算・減算 df = df.withColumn('next_month', F.add_months('date_of_birth', 1)) df = df.withColumn('previous_month', F.add_months('date_of_birth', -1)) # 二つの日付間の日数を取得 df = df.withColumn('days_between', F.datediff('end', 'start')) # 二つの日付間の月数を取得 df = df.withColumn('months_between', F.months_between('end', 'start')) # date_of_birthが2017-05-10と2018-07-21の間の行のみを保持 df = df.filter( (F.col('date_of_birth') >= F.lit('2017-05-10')) & (F.col('date_of_birth') <= F.lit('2018-07-21')) )

配列と構造体の操作

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # カラム配列 - F.array(*cols) # 'fname'と'lname'から成る新しいカラム'full_name'を作成します df = df.withColumn('full_name', F.array('fname', 'lname')) # 空の配列 - F.array(*cols) # 空のカラム'empty_array_column'を作成します df = df.withColumn('empty_array_column', F.array(F.lit(""))) # 既存のカラムから配列または構造体カラムを作成 # 'guardian_1'と'guardian_2'から成る新しいカラム'guardians'を作成します df = df.withColumn('guardians', F.array('guardian_1', 'guardian_2')) # 'hair_color'と'eye_color'から成る新しいカラム'properties'を作成します df = df.withColumn('properties', F.struct('hair_color', 'eye_color')) # 配列または構造体カラムからインデックスまたはキーによって要素を抽出(無効な場合はnull) # 'properties'カラムから'hair_color'要素を抽出し、新しいカラム'hair_color'を作成します df = df.withColumn('hair_color', F.element_at(F.col('properties'), F.col('hair_color'))) # 配列または構造体カラムを複数の行に展開 # 'guardians'カラムを複数の行に展開し、それぞれの行に'child_name'カラムの値を保持します df = df.select(F.col('child_name'), F.explode(F.col('guardians'))) # 'properties'カラムを複数の行に展開し、それぞれの行に'child_name'カラムの値を保持します df = df.select(F.col('child_name'), F.explode(F.col('properties'))) # 構造体カラムを複数のカラムに展開 # 'properties'カラムを複数のカラムに展開し、それぞれのカラムに'child_name'の値を保持します df = df.select(F.col('child_name'), F.col('properties.*'))

集約操作

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 行数のカウント: F.count(*cols), F.countDistinct(*cols) # グループ内の行の合計: F.sum(*cols) # グループ内の行の平均: F.mean(*cols) # グループ内の行の最大値: F.max(*cols) # グループ内の行の最小値: F.min(*cols) # グループ内の最初の行: F.first(*cols, ignorenulls=False) df = df.groupBy(col('address')).agg( count('uuid').alias('num_residents'), # uuidの数を"num_residents"としてカウント max('age').alias('oldest_age'), # "age"の最大値を"oldest_age"として取得 first('city', True).alias('city') # 最初の"city"を"city"として取得 ) # グループ内の全行を集合として収集: F.collect_set(col) # グループ内の全行をリストとして収集: F.collect_list(col) df = df.groupBy('address').agg(F.collect_set('name').alias('resident_names')) # "name"の集合を"resident_names"として収集

高度な操作

再分割

Copied!
1 2 3 # 再分割 – df.repartition(num_output_partitions) # num_output_partitions は出力されるパーティションの数を決めます df = df.repartition(1) # データフレームを1つのパーティションに再分割します

UDFs (ユーザー定義関数)

Copied!
1 2 3 4 5 6 7 8 9 10 11 # 各行のage列を2倍にする # 各行の年齢列を2倍にします times_two_udf = F.udf(lambda x: x * 2) df = df.withColumn('age', times_two_udf(df.age)) # 行の名前としてランダムに値を選択する # 行の名前としてランダムに値を選びます import random random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna'])) df = df.withColumn('name', random_name_udf())