It's possible to output various debugging information from PySpark in Foundry.
Python's built in print
pipes to the Output section of the Code Workbook to the right of the code editor where errors normally appear.
Copied!1 2
def new_dataset(some_input_dataset): print("example log output")
example log output
Code Repositories uses Python's built in logging library ↗. This is widely documented online and allows you to control logging level (ERROR
, WARNING
, INFO
) for easier filtering.
Logging output appears in both your output dataset's log files, and in your build's driver logs (Dataset -> Details -> Files -> Log Files
, and Builds -> Build -> Job status logs
; select "Driver logs"
, respectively).
Copied!1 2 3 4 5 6 7 8 9
import logging logger = logging.getLogger(__name__) @transform_df( ... ) def some_transformation(some_input): logger.info("example log output")
INFO [2018-01-01T12:00:00] some_transformation: example log output
Spark captures logging output from the top-level driver process that creates your query, such as from the some_transformation
function above. However, it does not capture logs written from inside of User Defined Functions (UDFs). If you are using a UDF within your PySpark query and need to log data, create and call a second UDF that returns the data you wish to capture.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
@transform_df( ... ) def some_transformation(some_input): logger.info("log output related to the overall query") @F.udf("integer") def custom_function(integer_input): return integer_input + 5 @F.udf("string") def custom_log(integer_input): return "Original integer was %d before adding 5" % integer_input df = ( some_input .withColumn("new_integer", custom_function(F.col("example_integer_col")) .withColumn("debugging", custom_log(F.col("example_integer_col")) )
We often want to log information about what's happening in our query. PySpark has a number of ways to introspect DataFrames and we can send this information to the logging mechanisms described above.
We will use Code Workbook's print
syntax in these examples but print
can be substituted for Transforms & Authoring's logger
.
We can introspect the columns that exist on a DataFrame with df.columns
This produces a list of strings.
Copied!1 2 3 4 5
def employee_phone_numbers(employee_df, phone_number_df): print("employee columns are {}".format(employee_df.columns)) print("phone columns are {}".format(phone_df.columns)) df = employee_df.join(phone_number_df, 'employee_id', 'left') print("joined columns are {}".format(df.columns))
employee columns are ['name', 'employee_id']
phone columns are ['phone_number', 'employee_id']
joined columns are ['name', 'employee_id', 'phone_number']
Suppose we are performing a left join, expect a one-to-one relationship and want to verify that the number of rows in our left DataFrame has stayed the same.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
def employee_phone_numbers(employee_df, phone_number_df): original_employee_rows = employee_df.count() print("Incoming employee rows {}".format(original_employee_rows)) df = employee_df.join(phone_number_df, 'employee_id', 'left') rows_after_join = df.count() print("Final employee rows {}".format(rows_after_join)) if rows_after_join > original_employee_rows: print("Some employees have multiple phone numbers!") else: print("Data is correct")
Incoming employee rows 100
Final employee rows 105
Some employees have multiple phone numbers!
You can access the optimized physical plan that Spark will run to generate a given DataFrame
by calling .explain()
.
Copied!1 2 3 4
def employee_phone_numbers(employee, phone): employee = employee.where(F.month(employee.birthday) == F.month(F.current_date())) df = employee.join(phone, 'employee_id', 'left') df.explain()
== Physical Plan ==
*(2) Project [employee_id#9734, name#9732, birthday#9733, phone_number#9728]
+- *(2) BroadcastHashJoin [employee_id#9734], [employee_id#9729], LeftOuter, BuildRight
:- *(2) Filter (month(birthday#9733) = 10)
: +- *(2) FileScan parquet !ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36:ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36@00000000-1ebd-4a81-9f64-2d4c8a8472bc:master.ri.foundry.main.dataset.6ad20cd7-45b0-4312-b096-05f57487f650[name#9732,birthday#9733,employee_id#9734] Batched: true, Format: Parquet, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,birthday:date,employee_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
+- *(1) Project [phone_number#9728, employee_id#9729]
+- *(1) Filter isnotnull(employee_id#9729)
+- *(1) FileScan csv !ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db:ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db@00000000-1ebc-f483-b75d-dbcc3292d9e4:master.ri.foundry.main.dataset.f5bf4c77-37c0-4e29-8a68-814c35442bbd[phone_number#9728,employee_id#9729] Batched: false, Format: CSV, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<phone_number:int,employee_id:int>
Suppose we would like to see which employees have the most phone numbers. We will derive the dataset we're interested in (employees with more than one number) and call .take(3)
to retrieve the top 3 rows as a list. Alternatively .collect()
retrieves all rows of a DataFrame as a list.
Pulling too much data into your Python environment can easily cause it to run out of memory. Only collect()
small amounts of data.
Copied!1 2 3 4 5 6
def multiple_numbers(phone_numbers): df = phone_numbers.groupBy('employee_id').agg( F.count('phone_number').alias('numbers') ).where('numbers' > 1).sort(F.col('numbers').desc()) print(df.take(3))
[Row(employee_id=70, numbers=4), Row(employee_id=90, numbers=2), Row(employee_id=25, numbers=2)]