PySpark is a wrapper language that allows you to interface with an Apache Spark backend to quickly process data. Spark can operate on very large datasets across a distributed network of servers, which provides major performance and reliability benefits when used correctly. It presents challenges, even for experienced Python developers, as the PySpark syntax draws on the JVM heritage of Spark and therefore implements code patterns that might be unfamiliar.
This opinionated guide to PySpark code style presents common situations and the associated best practices based on the most frequent recurring topics across the PySpark repositories we've encountered.
To enforce consistent code style, each main repository should have Pylint ↗ enabled, with the same configuration. We provide some PySpark-specific checkers ↗ that you can additionally include into your Pylint to match the rules listed in this document. See the documentation on enabling style checks for details on our built-in Pylint plugin for Python repositories.
Beyond PySpark specifics, the general practices of clean code are important in PySpark repositories - the Google PyGuide ↗ is a good starting point.
Copied!1 2 3 4 5
# bad df = df.select(F.lower(df1.colA), F.upper(df2.colB)) # good df = df.select(F.lower(F.col("colA")), F.upper(F.col("colB")))
The preferred option may look more complicated, longer and polluted - and that is correct; in fact it is best to avoid using F.col() altogether. But there are certain cases where using it, or the alternative explicit selection, is unavoidable. There is however a very good reason to prefer the second example over the first one.
When using explicit columns as in the first case, both the dataframe name and schema are explicitly bound to the dataframe variable. This means that if df1
is deleted or renamed the reference df1.colA
will break.
By contrast, F.col("colA")
will always reference a column called “colA”
in the dataframe being operated on, which in this case happens to be named df
. It does not require keeping track of other dataframes' states at all, so the code becomes more local and less prone to “spooky interaction at a distance”, which are often challenging to debug.
Other good reasons to avoid the first case:
df1["colA"]
is just as difficult to write as F.col(“colA”)
;F.col("prod_status") == 'Delivered'
to a variable makes it reusable
for multiple dataframes, while df.prod_status == 'Delivered'
is always bound to dfFortunately, usually a convoluted expression with F.col()
is not required. The only exceptions
are F.lower
, F.upper
, ... and these ↗.
In some contexts, there is access to columns from more than one dataframe, and there may be an overlap in
names. A common example is in matching expressions like df.join(df2, on=(df.key == df2.key), how='left'
).
In such cases it is fine to reference columns by their dataframe directly. You can also disambiguate joins
using dataframe aliases (see more in the Joins section in this guide).
In general, for loops are inefficient in Spark. At a high level, this is because Spark is lazily evaluated and will only process one for loop at a time. This may cause slower run times if all parts of the loop can be processed at once, and may result in Driver out of memory errors (OOMs). To rename all columns in a dataset from uppercase to lower case, instead of the first example below (labeled # bad
), we suggest using list comprehension instead (as in the second example labeled # good
):
Copied!1 2 3
# bad for colm in df.columns: df = df.withColumnRenamed(colm, colm.lower())
Copied!1 2 3 4
# good df = df.select( *[F.col(colm).alias(colm.lower()) for colm in df.columns] )
Using list comprehension as in the # good
example will avoid the slow performance and query plan issues discussed above while still getting the same desired result.
Logical operations, which often reside inside .filter()
or F.when()
, need to be readable. We apply the same rule as with chaining functions, keeping logic expressions
inside the same code block to 3 expressions at most. If they grow longer, it is often a sign that the code can be simplified or extracted out. Extracting out complex logical operations into variables or functions makes the code easier to read and reason about, which also reduces bugs.
Copied!1 2 3 4
# bad F.when( (df.prod_status == 'Delivered') | (((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & ((df.currentRegistration.rlike('.+')) | ((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')))))), 'In Service')
We can simplify the code above in different ways. To start, let's focus on grouping the logic steps in a few named variables. Pyspark requires that expressions are wrapped with parentheses. This, mixed with actual parenthesis to group logical operations can hurt readability. For example the code
above has a redundant (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
that the original author didn't notice because it's very hard to spot.
Copied!1 2 3 4 5 6 7
# better has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')) delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0) has_registration = (df.currentRegistration.rlike('.+')) is_delivered = (df.prod_status == 'Delivered') F.when(is_delivered | (delivery_date_passed & (has_registration | has_operator)), 'In Service')
The above example is easier to read, and also drops the redundant expression. We can improve it further by reducing the number of operations.
Copied!1 2 3 4 5 6 7 8
# good has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')) delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0) has_registration = (df.currentRegistration.rlike('.+')) is_delivered = (df.prod_status == 'Delivered') is_active = (has_registration | has_operator) F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')
Note how the F.when
expression is now succinct and readable and the desired behavior is clear to anyone reviewing this code. The reader only needs to visit the individual expressions if they suspect there is an
error there. It also makes each chunk of logic easy to test if you have unit tests in your code,
and want to abstract them as functions.
There is still some duplication of code in the final example: we leave how to remove that duplication as an exercise for the reader.
select
statements to specify a schema contractDoing a select at the beginning of a PySpark transform, or before returning, is considered good
practice. This select
statement specifies the contract with both the reader and the code about the expected dataframe schema for inputs and outputs.
Any select should be seen as a cleaning operation that is preparing the dataframe for consumption by the next step in the transform.
Always aim to keep select statements as simple as
possible. Due to common SQL idioms, allow for up to one function from spark.sql.function
to be used per selected column, plus an optional .alias()
to give it a meaningful name. Keep in mind
that this should be used sparingly, and if there are more than three such uses in the same select, refactor it into a separate function like clean_<dataframe name>()
to encapsulate the operation.
Never allow expressions involving more than one dataframe, or conditional operations like .when()
to be used in a select.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# bad aircraft = aircraft.select( 'aircraft_id', 'aircraft_msn', F.col('aircraft_registration').alias('registration'), 'aircraft_type', F.avg('staleness').alias('avg_staleness'), F.col('number_of_economy_seats').cast('long'), F.avg('flight_hours').alias('avg_flight_hours'), 'operator_code', F.col('number_of_business_seats').cast('long'), )
Cluster together the operations of the same type together. All individual columns should be listed upfront, while calls to functions from spark.sql.function
should go on separate lines.
Copied!1 2 3 4 5 6 7 8 9
# good aircraft = aircraft.select( 'aircraft_id', 'aircraft_msn', 'aircraft_type', 'operator_code', F.col('aircraft_registration').alias('registration'), F.col('number_of_economy_seats').cast('long'), F.col('number_of_business_seats').cast('long'), F.avg('staleness').alias('avg_staleness'), F.avg('flight_hours').alias('avg_flight_hours'), )
The select()
statement, by its nature, redefines the schema of a dataframe, so it naturally supports the inclusion or exclusion of columns, old and new, as well as the redefinition of pre-existing ones. By centralizing all such operations in a single statement, it becomes much easier to identify the final
schema, which makes code more readable. It also makes code slightly more concise.
Instead of calling withColumnRenamed()
, use aliases:
Copied!1 2 3 4 5
# bad df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') # good df.select('key', F.col('comments').alias('num_comments'))
Instead of using withColumn()
to redefine type, cast in the select:
Copied!1 2 3 4 5
# bad df.select('comments').withColumn('comments', F.col('comments').cast('double')) # good df.select(F.col('comments').cast('double'))
But keep it simple:
Copied!1 2 3 4 5 6 7 8 9 10 11
# bad df.select( ((F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400).alias('days_open') ) # good df.withColumn( 'days_open', (F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400 )
Avoid including columns in the select statement if they are going to remain unused and choose instead an explicit set of columns - this is a preferred alternative to using
.drop()
since it guarantees that schema mutations won't cause unexpected columns bloating your dataframe. That said, dropping columns isn't inherently discouraged in all cases; for instance it is commonly appropriate after joins since it is common for joins to introduce redundant columns.
Finally, instead of adding new columns by means of the select statement, it's
recommended using .withColumn()
instead.
If you need to add an empty column to satisfy a schema, always use F.lit(None)
for populating that column. Never use an empty string or some other string signalling an empty value (such as NA
).
Beyond being semantically correct, one practical reason for this preserving the ability to use utilities like isNull
, instead of having to verify empty strings, and nulls, and 'NA'
, etc.
Copied!1 2 3 4 5 6 7 8
# bad df = df.withColumn('foo', F.lit('')) # bad df = df.withColumn('foo', F.lit('NA')) # good - note necessary cast since `None` is typeless. Choose the appropriate type based on expected use. df = df.withColumn('foo', F.lit(None).cast('string'))
While comments can provide useful insight into code, it is often more valuable to refactor the code to improve its readability. The code should be readable by itself. If you are using comments to explain the logic step by step, you should refactor it.
Copied!1 2 3 4 5 6
# bad # Cast the timestamp columns cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
In the example above, we can see that those columns are getting cast to Timestamp. The comment doesn't add much value. Moreover, a more verbose comment might still be unhelpful if it only provides information that already exists in the code. For example:
Copied!1 2 3 4 5 6
# bad # Go through each column, remove 1000 because millis and cast to timestamp cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
Instead of leaving comments that just describe the logic you wrote, you should aim at leaving comments that give context, explaining the "why" of decisions you made when writing the code. This is particularly important for PySpark, since the reader can understand your code, but often doesn't have context on the data that feeds into your PySpark transform. Small pieces of logic might have involved hours of digging through data to understand the correct behavior, in which case comments explaining the rational are especially valuable.
Copied!1 2 3 4 5 6 7 8
# good # The consumer of this dataset expects a timestamp instead of a date, and we need # to adjust the time by 1000 because the original datasource is storing these as millis # even though the documentation says it's actually a date. cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
It is highly recommended to avoid UDFs in all situations, as they are dramatically less performant than native PySpark. In most situations, logic that seems to necessitate a UDF can, in fact, be refactored to use only native PySpark functions.
Always avoid using functions that collect data to the Spark driver, such as:
DataFrame.collect()
DataFrame.first()
DataFrame.head(...)
DataFrame.take(...)
DataFrame.show(...)
Using these functions eliminates the benefits of a distributed framework like Spark, resulting in lower performance or out-of-memory errors. Instead of these functions, we strongly recommend using:
Be careful with joins. If you perform a left join, and the right side has multiple matches for a key, that row will be duplicated as many times as there were matches. This is called a "join explosion" and can dramatically bloat the size of your dataset. Always double check your assumptions to see that the key you are joining on is unique, unless you are expecting the multiplication.
Bad joins are the source of many issues which can be tricky to debug. There are some things that help like specifying the how
explicitly, even if you are using the default value (inner
):
Copied!1 2 3 4 5 6 7 8
# bad flights = flights.join(aircraft, 'aircraft_id') # also bad flights = flights.join(aircraft, 'aircraft_id', 'inner') # good flights = flights.join(aircraft, 'aircraft_id', how='inner')
Also avoid right
joins. If you are about to use a right
join, switch
the order of your dataframes and use a left
join instead. It is more intuitive, since the
dataframe you are doing the operation on, is the one that you are centering your join around.
Copied!1 2 3 4 5
# bad flights = aircraft.join(flights, 'aircraft_id', how='right') # good flights = flights.join(aircraft, 'aircraft_id', how='left')
When joining dataframes, avoid using expressions that duplicate columns in the output:
Copied!1 2 3 4 5
# bad - column aircraft_id will be duplicated in the output output = flights.join(aircraft, flights.aircraft_id == aircraft.aircraft_id, how='inner') # good output = flights.join(aircraft, 'aircraft_id', how='inner')
Avoid renaming all columns to avoid collisions. You can instead just give an alias to the whole dataframe, and use that alias to select which columns you want in the end.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# bad columns = ["start_time", "end_time", "idle_time", "total_time"] for col in columns: flights = flights.withColumnRenamed(col, 'flights_' + col) parking = parking.withColumnRenamed(col, 'parking_' + col) flights = flights.join(parking, on="flight_code", how="left") flights = flights.select( F.col("flights_start_time").alias("flight_start_time"), F.col("flights_end_time").alias("flight_end_time"), F.col("parking_total_time").alias("client_parking_total_time") ) # good flights = flights.alias("flights") parking = parking.alias("parking") flights = flights.join(parking, on="flight_code", how="left") flights = flights.select( F.col("flights.start_time").alias("flight_start_time"), F.col("flights.end_time").alias("flight_end_time"), F.col("parking.total_time").alias("client_parking_total_time") )
In such cases, however, keep in mind that:
As a last word about joins, don't use .dropDuplicates()
or .distinct()
as a crutch. If unexpected duplicate rows are observed, there's almost always an underlying reason for why those duplicate rows appear. Adding .dropDuplicates()
only masks this problem and adds
overhead to the runtime.
Chaining expressions is a contentious topic, but we do recommend some limits on the usage of chaining. See the conclusion of this section for a discussion of the rationale behind this recommendation.
Avoid chaining of expressions into multi-line expressions with different types. Particularly if they have different behaviors or contexts. For example mixing column creation or joining with selecting and filtering.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# bad df = ( df .select("a", "b", "c", "key") .filter(df.a == "truthiness") .withColumn("boverc", df.b / df.c) .join(df2, "key", how="inner") .join(df3, "key", how="left") .drop('c') ) # better (separating into steps) # first: we select and trim down the data that we need # second: we create the columns that we need to have # third: joining with other dataframes df = ( df .select("a", "b", "c", "key") .filter(df.a == "truthiness") ) df = df.withColumn("boverc", df.b / df.c) df = ( df .join(df2, "key", how="inner") .join(df3, "key", how="left") .drop('c') )
Having each group of expressions isolated into its own logical code block improves legibility and makes it easier to find relevant logic.
For example, a reader of the code below will likely jump to where they see dataframes being assigned df = df...
.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# bad df = ( df .select('foo', 'bar', 'foobar', 'abc') .filter(df.abc == 123) .join(another_table, 'some_field') ) # better df = ( df .select('foo', 'bar', 'foobar', 'abc') .filter(F.col('abc') == 123) ) df = df.join(another_table, 'some_field', how='inner')
There are legitimate reasons to chain expressions together. These commonly represent atomic logic steps, and are acceptable. Apply a rule with a maximum of number chained expressions in the same block to keep the code readable. We recommend chains of no longer than 3-5 statements.
If you find you are making longer chains, or getting trouble because of the size of your variables, consider extracting the logic into a separate function:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# bad customers_with_shipping_address = ( customers_with_shipping_address .select("a", "b", "c", "key") .filter(F.col("a") == "truthiness") .withColumn("boverc", F.col("b") / F.col("c")) .join(df2, "key", how="inner") ) # also bad customers_with_shipping_address = customers_with_shipping_address.select("a", "b", "c", "key") customers_with_shipping_address = customers_with_shipping_address.filter(df.a == "truthiness") customers_with_shipping_address = customers_with_shipping_address.withColumn("boverc", F.col("b") / F.col("c")) customers_with_shipping_address = customers_with_shipping_address.join(df2, "key", how="inner") # better def join_customers_with_shipping_address(customers, df_to_join): customers = ( customers .select("a", "b", "c", "key") .filter(df.a == "truthiness") ) customers = customers.withColumn("boverc", F.col("b") / F.col("c")) customers = customers.join(df_to_join, "key", how="inner") return customers
In fact, chains of more than three statements are prime candidates to factor into separate, well-named functions since they are already encapsulated, isolated blocks of logic.
There are several reasons behind these limits on chaining:
The reason you can chain expressions is because PySpark was developed from Spark, which comes from JVM languages. This meant some design patterns were transported, namely chainability.
However, Python doesn't support multiline expressions gracefully and the only alternatives are to either provide explicit line breaks, or wrap the expression in parentheses. You only need to provide explicit line breaks if the chain happens at the root node. For example:
Copied!1 2 3 4 5 6 7 8 9
# needs `\` df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # chain not in root node so it doesn't need the `\` df = df.withColumn('safety', F.when(F.col('has_tests') == True, 'is safe') .when(F.col('has_executed') == True, 'no tests but runs') .otherwise('not safe'))
Thus to keep things consistent, wrap the entire expression into a single parenthesis block, and avoid using \
:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# bad df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # good df = ( df .filter(F.col('event') == 'executing') .filter(F.col('has_tests') == True) .drop('has_tests') )
.otherwise(value)
as a general fallback. If you are mapping a list of keys to a list of values and a number of unknown keys appear, using otherwise
will mask all of these into one value.types
and functions
from pySpark from pyspark.sql import types as T, functions as F
.