注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Java Spark SQLで最もよく使われるパターンと関数のクイックリファレンスガイド。
Java Sparkに関する追加情報は、Java Spark公式ドキュメントを参照してください。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
// SLF4Jというロギングフレームワークをインポートしています。 import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyClass { // LoggerFactoryを使って、MyClassに対応するロガーを作成します。 // これは、MyClassのログ出力を制御するためのものです。 private static final Logger LOG = LoggerFactory.getLogger(MyClass.class); // LOG.infoを使って、情報レベルのログメッセージ "example log output" を出力します。 LOG.info("example log output"); }
Copied!1 2 3 4 5 6 7
// JavaのSparkライブラリをインポートする import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // Spark SQLの関数をインポートする import static org.apache.spark.sql.functions.*; // Sparkのデータ型をインポートする import org.apache.spark.sql.types.DataTypes;
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 等しい条件でフィルタリングする df = df.filter(col("is_adult").equalTo("Y")); // >, <, >=, <= 条件でフィルタリングする df = df.filter(col("age").gt(25)); df = df.filter(col("age").lt(25)); df = df.filter(col("age").geq(25)); df = df.filter(col("age").leq(25)); // 複数の条件 df = df.filter(col("age").gt(25).and(col("is_adult").equalTo("Y"))); // 許可された値のリストと比較する df = df.filter(col("first_name").isin(List.of(3, 4, 7))); // 結果を並べ替える df = df.orderBy(col("age").asc()); df = df.orderBy(col("age").desc());
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// 別のデータセットで左結合を行います df = df.join(personLookupTable, col("person_id"), "left"); // 別のデータセットで左反対結合を行います(左のデータフレームで一致しない行を返す) df = df.join(personLookupTable, col("person_id"), "leftanti"); // 左と右のデータセットで異なる列に一致します df = df.join(otherTable, col("id").equalTo(col("person_id")), "left"); // 複数の列に一致します df = df.join(otherTable, col("first_name").equalTo(col("name")).and( col("last_name").equalTo(col("family_name"))), "left"); // ルックアップコードの結合に便利なワンライナー public Dataset<Row> lookupAndReplace(Dataset<Row> df1, Dataset<Row> df2, String df1Key, String df2Key, String df2Value) { return df1.join(df2.select(col(df2Key), col(df2Value)), col(df1Key).equalTo(col(df2Key)), "left") .withColumn(df1Key, coalesce(col(df2Value), col(df1Key))) .drop(df2Key, df2Value); } Dataset<Row> df = lookupAndReplace(people, payCodes, id, payCodeId, payCodeDesc);
このコードでは、DataFrame(データフレーム)の結合操作を行っています。結合操作は、データフレーム間で共通のキーを持つ行を結合するための手法で、SQLのJOINクエリに相当します。また、"leftanti"オプションを使って左側のデータフレームで一致しない行を返す反対結合を行ったり、複数の列に一致する結合を行ったりしています。
また、lookupAndReplace
メソッドは、2つのデータフレームとそれぞれのキー、値を引数として受け取り、一方のデータフレームから他方のデータフレームに一致する行を結合し、値を置き換える操作を行います。これは、一方のデータフレームから他方のデータフレームの特定の値をルックアップ(検索)して置き換えるのに便利な一行のコードです。
最後の行では、people
とpayCodes
という2つのデータフレームをlookupAndReplace
メソッドに渡して結合し、値を置き換えています。この結果を新たなデータフレームdf
として保存しています。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
// 新しい静的なカラムを追加します df = df.withColumn("status", lit("PASS")); // 新しい動的なカラムを構築します df = df.withColumn("full_name", when( (col("fname").isNotNull().or(col("lname").isNotNull())), // fnameまたはlnameがnullでない場合 concat_ws(" ", col("fname"), col("lname"))) // fnameとlnameをスペースで結合します .otherwise(lit(null))); // それ以外の場合はnullを設定します // 保持するカラムを選択し、必要に応じて一部をリネームします df = df.select( col("name"), // nameカラム col("age"), // ageカラム col("dob").alias("date_of_birth") // dobカラムをdate_of_birthとしてリネームします ); // カラムを削除します df = df.drop("mod_dt", "mod_username"); // カラムの名前を変更します df = df.withColumnRenamed("dob", "date_of_birth"); // 別のデータセットにも存在する全てのカラムを保持します import org.apache.spark.sql.Column; import scala.collection.JavaConversions; List<Column> columnsToSelect = new ArrayList<Column>(); List<String> df2Columns = Arrays.asList(df2.columns()); for (String c : df.columns()) { if (df2Columns.contains(c)) { columnsToSelect.add(col(c)); } } df = df.select(JavaConversions.asScalaBuffer(columnsToSelect)); // バッチでカラムの名前を変更/クリーニングします for (String c in df.columns()) { df = df.withColumnRenamed(c, c.toLowerCase().replace(" ", "_").replace("-", "_")); }
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import org.apache.spark.sql.types.DataTypes; // 列を別の型にキャストする(どちらの方法も同じ結果) // Cast a column to a different type (both ways equivalent) df = df.withColumn("price", col("price").cast(DataTypes.DoubleType)); df = df.withColumn("price", col("price").cast("double")); // 全てnullの列を作成し、特定の型にキャストする // Create an all null column and cast to specific type df = df.withColumn("name", lit(null).cast(DataTypes.StringType)); df = df.withColumn("name", lit(null).cast("string")); // 全てのnullを特定の値で置き換える // Replace all nulls with a specific value df = df.na().fill(ImmutableMap.of("first_name", "Tom", "age", 0)); // 最初のnullでない値を取る // Take the first value that is not null df = df.withColumn("last_name", coalesce(col("last_name"), col("surname"), lit("N/A"))); // データセット内の重複した行を削除する(distinct()と同じ) // Drop duplicate rows in a dataset (same as distinct()) df = df.dropDuplicates() // 特定の列だけを考慮して重複した行を削除する // Drop duplicate rows, but consider only specific columns df = df.dropDuplicates("name", "height");
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
// Contains - col.contains(string) // 文字列が含まれているか - col.contains(文字列) df = df.filter(col("name").contains("o")); //Starts With - col.startsWith(string) // 文字列で始まるか - col.startsWith(文字列) df = df.filter(col("name").startsWith("Al")); // Ends With - col.endsWith(string) // 文字列で終わるか - col.endsWith(文字列) df = df.filter(col("name").endsWith("ice")); // Is Null - col.isNull() // Nullであるか - col.isNull() df = df.filter(col("is_adult").isNull()); // Is Not Null - col.isNotNull() // Nullではないか - col.isNotNull() df = df.filter(col("first_name").isNotNull()); // Like - col.like(string_with_sql_wildcards) // 特定のパターンに合致するか(SQLのワイルドカードを使用) - col.like(SQLのワイルドカードを含む文字列) df = df.filter(col("name").like("Al%")); // Regex Like - col.rlike(regex) // 正規表現に合致するか - col.rlike(正規表現) df = df.filter(col("name").rlike("[A-Z]*ice$")); // Is In List - col.isin(Object... values) // リストの中に存在するか - col.isin(オブジェクトの配列) df = df.filter(col("name").isin("Bob", "Mike"));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// サブストリング - col.substr(startPos, length) (1から始まるインデックス) df = df.withColumn("short_id", col("id").substr(1, 10)); // トリム - trim(col) df = df.withColumn("name", trim(col("name"))); // 左パッド - lpad(col, len, pad) // 右パッド - rpad(col, len, pad) df = df.withColumn("id", lpad(col("id"), 4, "0")); // 左トリム - ltrim(col) // 右トリム - rtrim(col) df = df.withColumn("id", ltrim(col("id"))); // 結合 - concat(Column... cols) (いずれかの列がnullならnull) df = df.withColumn("full_name", concat(col("fname"), lit(" "), col("lname"))); // デリミタ付きで結合 - concat_ws(delim, Column... cols) (nullは無視) df = df.withColumn("full_name", concat_ws("-", "fname", "lname")); // 正規表現で置換 - regexp_replace(col, pattern, replacement) df = df.withColumn("id", regexp_replace(col("id"), "0F1(.*)", "1F1-$1")); // 正規表現で抽出 - regexp_extract(str, pattern, idx) df = df.withColumn("id", regexp_extract(col("id"), "[0-9]*", 0));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Round - round(col, scale=0) 四捨五入 df = df.withColumn("price", round(col("price"), 0)); // Floor - floor(col) 価格を下に丸める df = df.withColumn("price", floor(col("price"))); // Ceiling - ceil(col) 価格を上に丸める df = df.withColumn("price", ceil(col("price"))); // Absolute Value - abs(col) 絶対値 df = df.withColumn("price", abs(col("price"))); // X raised to power Y – pow(X, Y) XのY乗 df = df.withColumn("exponential_growth", pow(col("x"), 2.0)); // Select smallest value out of multiple columns – least(Column... cols) 複数の列から最小値を選択 df = df.withColumn("least", least(col("subtotal"), col("total"))); // Select largest value out of multiple columns – greatest(Column... cols) 複数の列から最大値を選択 df = df.withColumn("greatest", greatest(col("subtotal"), col("total")));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
// 既知のフォーマットの文字列を日付に変換します(時間情報は除外) df = df.withColumn("date_of_birth", to_date(col("date_of_birth"), "yyyy-MM-dd")); // 既知のフォーマットの文字列をタイムスタンプに変換します(時間情報を含む) df = df.withColumn("time_of_birth", to_timestamp(col("time_of_birth"), "yyyy-MM-dd HH:mm:ss")); // 日付から年を取得: year(col) // 日付から月を取得: month(col) // 日付から日を取得: dayofmonth(col) // 日付から時間を取得: hour(col) // 日付から分を取得: minute(col) // 日付から秒を取得: second(col) df = df.filter(year(col("date_of_birth")).equalTo("2017")); // 日数の加算と減算 df = df.withColumn("three_days_after", date_add(col("date_of_birth"), 3)); df = df.withColumn("three_days_before", date_sub(col("date_of_birth"), 3)); // 月の加算と減算 df = df.withColumn("next_month", add_months(col("date_of_birth"), 1)); df = df.withColumn("previous_month", add_months(col("date_of_birth"), -1)); // 二つの日付間の日数を取得 df = df.withColumn("days_between", datediff(col("end"), col("start"))); // 二つの日付間の月数を取得 df = df.withColumn("months_between", months_between(col("end"), col("start"))); // date_of_birthが2017-05-10から2018-07-21の間である行だけを保持 df = df.filter( (col("date_of_birth").geq("2017-05-10")).and( (col("date_of_birth").leq("2018-07-21"))) );
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
// 既存の列から配列または構造体列を作成 df = df.withColumn("guardians", array(col("guardian_1"), col("guardian_2"))); // "guardians"という新しい列を作成し、"guardian_1"と"guardian_2"の値を配列として保存します。 df = df.withColumn("properties", struct(col("hair_color"), col("eye_color"))); // "properties"という新しい列を作成し、"hair_color"と"eye_color"の値を構造体として保存します。 // インデックスまたはキーで配列または構造体列から抽出(無効な場合はnull) df = df.withColumn("primary_guardian", col("guardians").getItem(0)); // "guardians"列の1つ目の要素を"primary_guardian"として抽出します。 df = df.withColumn("hair_color", col("properties").getItem("hair_color")); // "properties"列から"hair_color"の値を抽出します。 // 配列または構造体列を複数の行に展開 df = df.select(col("child_name"), explode(col("guardians"))); // "guardians"列の各要素を新しい行として展開し、それと共に"child_name"列も選択します。 df = df.select(col("child_name"), explode(col("properties"))); // "properties"列の各要素を新しい行として展開し、それと共に"child_name"列も選択します。 // 構造体列を複数の列に展開 df = df.select(col("child_name"), col("properties.*")); // "properties"列の各フィールドを新しい列として展開し、それと共に"child_name"列も選択します。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// 行数のカウント: count(col), countDistinct(col) // グループ内の行の合計: sum(col) // グループ内の行の平均: mean(col) // グループ内の行の最大値: max(col) // グループ内の行の最小値: min(col) // グループ内の最初の行: first(col, ignoreNulls) df = df.groupBy(col("address")).agg( count(col("uuid")).alias("num_residents"), // 「uuid」のカウントで「num_residents」を算出 max(col("age")).alias("oldest_age"), // 「age」の最大値で「oldest_age」を算出 first(col("city"), true).alias("city") // 「city」の最初の行(nullを無視)で「city」を取得 ); // グループ内の全行のセットを収集: collect_set(col) // グループ内の全行のリストを収集: collect_list(col) df = df.groupBy(col("address")).agg(collect_set("name").alias("resident_names")); // 「name」のセットを「resident_names」に収集
Copied!1 2
// リパーティション – df.repartition(出力パーティションの数) df = df.repartition(1);
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.types.DecimalType; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.udf; import java.math.BigDecimal; /** * 以下の例では、2つのSparkタイプ "Decimal" の列を追加するJava UDFを作成します。 * SparkはDecimalTypeをjava.math.BigDecimalで表すため、このクラスを使用します。 * 他のSparkタイプは、https://spark.apache.org/docs/3.0.0/sql-ref-datatypes.html で定義されているように、他のJavaタイプで表されます。 */ public final class HighLevelAutoTransform { @Compute @Output("...") public Dataset<Row> myComputeFunction(@Input("...") Dataset<Row> df) { UserDefinedFunction addsUDF = udf((BigDecimal i, BigDecimal j) -> { if (i == null || j == null) { // 常にnullケースを処理する return null; } return i.add(j); }, new DecimalType()); // これは、UDFの結果のSparkデータタイプです return df.withColumn("a_plus_b", addsUDF.apply(col("a"), col("b"))); } }