A quick reference guide to the most commonly used patterns and functions in Java Spark SQL.
See the Java Spark Official Documentation ↗ for additional information on Java Spark.
Copied!1 2 3 4 5 6 7
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyClass { private static final Logger LOG = LoggerFactory.getLogger(MyClass.class); LOG.info("example log output"); }
Copied!1 2 3 4
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import static org.apache.spark.sql.functions.*; import org.apache.spark.sql.types.DataTypes;
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Filter on equals condition df = df.filter(col("is_adult").equalTo("Y")); // Filter on >, <, >=, <= condition df = df.filter(col("age").gt(25)); df = df.filter(col("age").lt(25)); df = df.filter(col("age").geq(25)); df = df.filter(col("age").leq(25)); // Multiple conditions df = df.filter(col("age").gt(25).and(col("is_adult").equalTo("Y"))); // Compare against a list of allowed values df = df.filter(col("first_name").isin(List.of(3, 4, 7))); // Sort results df = df.orderBy(col("age").asc()); df = df.orderBy(col("age").desc());
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// Left join in another dataset df = df.join(personLookupTable, col("person_id"), "left"); // Left anti-join in another dataset (return unmatched rows in left dataframe) df = df.join(personLookupTable, col("person_id"), "leftanti"); // Match on different columns in left & right datasets df = df.join(otherTable, col("id").equalTo(col("person_id")), "left"); // Match on multiple columns df = df.join(otherTable, col("first_name").equalTo(col("name")).and( col("last_name").equalTo(col("family_name"))), "left"); // Useful for one-liner lookup code joins public Dataset<Row> lookupAndReplace(Dataset<Row> df1, Dataset<Row> df2, String df1Key, String df2Key, String df2Value) { return df1.join(df2.select(col(df2Key), col(df2Value)), col(df1Key).equalTo(col(df2Key)), "left") .withColumn(df1Key, coalesce(col(df2Value), col(df1Key))) .drop(df2Key, df2Value); } Dataset<Row> df = lookupAndReplace(people, payCodes, id, payCodeId, payCodeDesc);
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
// Add a new static column df = df.withColumn("status", lit("PASS")); // Construct a new dynamic column df = df.withColumn("full_name", when( (col("fname").isNotNull().or(col("lname").isNotNull())), concat_ws(" ", col("fname"), col("lname"))) .otherwise(lit(null))); // Pick which columns to keep, optionally rename some df = df.select( col("name"), col("age"), col("dob").alias("date_of_birth") ); // Remove columns df = df.drop("mod_dt", "mod_username"); // Rename a column df = df.withColumnRenamed("dob", "date_of_birth"); // Keep all the columns which also occur in another dataset import org.apache.spark.sql.Column; import scala.collection.JavaConversions; List<Column> columnsToSelect = new ArrayList<Column>(); List<String> df2Columns = Arrays.asList(df2.columns()); for (String c : df.columns()) { if (df2Columns.contains(c)) { columnsToSelect.add(col(c)); } } df = df.select(JavaConversions.asScalaBuffer(columnsToSelect)); // Batch Rename/Clean Columns for (String c in df.columns()) { df = df.withColumnRenamed(c, c.toLowerCase().replace(" ", "_").replace("-", "_")); }
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
import org.apache.spark.sql.types.DataTypes; // Cast a column to a different type (both ways equivalent) df = df.withColumn("price", col("price").cast(DataTypes.DoubleType)); df = df.withColumn("price", col("price").cast("double")); // Create an all null column and cast to specific type df = df.withColumn("name", lit(null).cast(DataTypes.StringType)); df = df.withColumn("name", lit(null).cast("string")); // Replace all nulls with a specific value df = df.na().fill(ImmutableMap.of("first_name", "Tom", "age", 0)); // Take the first value that is not null df = df.withColumn("last_name", coalesce(col("last_name"), col("surname"), lit("N/A"))); // Drop duplicate rows in a dataset (same as distinct()) df = df.dropDuplicates() // Drop duplicate rows, but consider only specific columns df = df.dropDuplicates("name", "height");
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
// Contains - col.contains(string) df = df.filter(col("name").contains("o")); //Starts With - col.startsWith(string) df = df.filter(col("name").startsWith("Al")); // Ends With - col.endsWith(string) df = df.filter(col("name").endsWith("ice")); // Is Null - col.isNull() df = df.filter(col("is_adult").isNull()); // Is Not Null - col.isNotNull() df = df.filter(col("first_name").isNotNull()); // Like - col.like(string_with_sql_wildcards) df = df.filter(col("name").like("Al%")); // Regex Like - col.rlike(regex) df = df.filter(col("name").rlike("[A-Z]*ice$")); // Is In List - col.isin(Object... values) df = df.filter(col("name").isin("Bob", "Mike"));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// Substring - col.substr(startPos, length) (1-based indexing) df = df.withColumn("short_id", col("id").substr(1, 10)); // Trim - trim(col) df = df.withColumn("name", trim(col("name"))); // Left Pad - lpad(col, len, pad) // Right Pad - rpad(col, len, pad) df = df.withColumn("id", lpad(col("id"), 4, "0")); // Left Trim - ltrim(col) // Right Trim - rtrim(col) df = df.withColumn("id", ltrim(col("id"))); // Concatenate - concat(Column... cols) (null if any column null) df = df.withColumn("full_name", concat(col("fname"), lit(" "), col("lname"))); // Concatenate with Delimiter - concat_ws(delim, Column... cols) (ignores nulls) df = df.withColumn("full_name", concat_ws("-", "fname", "lname")); // Regex Replace - regexp_replace(col, pattern, replacement) df = df.withColumn("id", regexp_replace(col("id"), "0F1(.*)", "1F1-$1")); // Regex Extract - regexp_extract(str, pattern, idx) df = df.withColumn("id", regexp_extract(col("id"), "[0-9]*", 0));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Round - round(col, scale=0) df = df.withColumn("price", round(col("price"), 0)); // Floor - floor(col) df = df.withColumn("price", floor(col("price"))); // Ceiling - ceil(col) df = df.withColumn("price", ceil(col("price"))); // Absolute Value - abs(col) df = df.withColumn("price", abs(col("price"))); // X raised to power Y – pow(X, Y) df = df.withColumn("exponential_growth", pow(col("x"), 2.0)); // Select smallest value out of multiple columns – least(Column... cols) df = df.withColumn("least", least(col("subtotal"), col("total"))); // Select largest value out of multiple columns – greatest(Column... cols) df = df.withColumn("greatest", greatest(col("subtotal"), col("total")));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
// Convert a string of known format to a date (excludes time information) df = df.withColumn("date_of_birth", to_date(col("date_of_birth"), "yyyy-MM-dd")); // Convert a string of known format to a timestamp (includes time information) df = df.withColumn("time_of_birth", to_timestamp(col("time_of_birth"), "yyyy-MM-dd HH:mm:ss")); // Get year from date: year(col) // Get month from date: month(col) // Get day from date: dayofmonth(col) // Get hour from date: hour(col) // Get minute from date: minute(col) // Get second from date: second(col) df = df.filter(year(col("date_of_birth")).equalTo("2017")); // Add & subtract days df = df.withColumn("three_days_after", date_add(col("date_of_birth"), 3)); df = df.withColumn("three_days_before", date_sub(col("date_of_birth"), 3)); // Add & subtract months df = df.withColumn("next_month", add_months(col("date_of_birth"), 1)); df = df.withColumn("previous_month", add_months(col("date_of_birth"), -1)); // Get number of days between two dates df = df.withColumn("days_between", datediff(col("end"), col("start"))); // Get number of months between two dates df = df.withColumn("months_between", months_between(col("end"), col("start"))); // Keep only rows where date_of_birth is between 2017-05-10 and 2018-07-21 df = df.filter( (col("date_of_birth").geq("2017-05-10")).and( (col("date_of_birth").leq("2018-07-21"))) );
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Array or Struct column from existing columns df = df.withColumn("guardians", array(col("guardian_1"), col("guardian_2"))); df = df.withColumn("properties", struct(col("hair_color"), col("eye_color"))); // Extract from Array or Struct column by index or key (null if invalid) df = df.withColumn("primary_guardian", col("guardians").getItem(0)); df = df.withColumn("hair_color", col("properties").getItem("hair_color")); // Explode Array or Struct column into multiple rows df = df.select(col("child_name"), explode(col("guardians"))); df = df.select(col("child_name"), explode(col("properties"))); // Explode Struct column into multiple columns df = df.select(col("child_name"), col("properties.*"));
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Row Count: count(col), countDistinct(col) // Sum of Rows in Group: sum(col) // Mean of Rows in Group: mean(col) // Max of Rows in Group: max(col) // Min of Rows in Group: min(col) // First Row in Group: first(col, ignoreNulls) df = df.groupBy(col("address")).agg( count(col("uuid")).alias("num_residents"), max(col("age")).alias("oldest_age"), first(col("city"), true).alias("city") ); // Collect a Set of all Rows in Group: collect_set(col) // Collect a List of all Rows in Group: collect_list(col) df = df.groupBy(col("address")).agg(collect_set("name").alias("resident_names"));
Copied!1 2
// Repartition – df.repartition(num_output_partitions) df = df.repartition(1);
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.types.DecimalType; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.udf; import java.math.BigDecimal; /** * The following example creates a java UDF which adds two columns of Spark type "Decimal". * Spark represents DecimalType with java.math.BigDecimal, hence why we use this class. * Other Spark types will be represented by other Java types as defined in https://spark.apache.org/docs/3.0.0/sql-ref-datatypes.html */ public final class HighLevelAutoTransform { @Compute @Output("...") public Dataset<Row> myComputeFunction(@Input("...") Dataset<Row> df) { UserDefinedFunction addsUDF = udf((BigDecimal i, BigDecimal j) -> { if (i == null || j == null) { // always handle null cases return null; } return i.add(j); }, new DecimalType()); // this is the Spark data type of the result of the UDF return df.withColumn("a_plus_b", addsUDF.apply(col("a"), col("b"))); } }