The Python repository unit tests described on this page are only applicable to batch pipelines, and are not supported for streaming pipelines.
Python repositories have the option of running tests as part of checks. These tests are run using the popular Python testing framework, PyTest ↗.
All CI checks contain, among other tasks, the condaPackRun.
The condaPackRun is responsible for installing the environment. Each artifact is retrieved from the proper channel, and Conda uses these artifacts to construct the environment. This task contains three stages:
The environment specifications is stored, as a cache for the next builds, in the hidden files:
The cache is stored for 7 days. It is re-cached if any change happens in the meta.yaml file.
This task is heavily dependent on how many packages are added to the repositories. The more packages added, the slower the task will run.
PEP8 / PyLint stylechecks can be enabled by applying the com.palantir.conda.pep8
and com.palantir.conda.pylint
Gradle plugin in your Python project's build.gradle
file. For transforms repositories, this lives in the Python subproject. For library repositories, this lives in the root folder.
A transforms build.gradle
will look something like this:
Copied!1 2 3 4 5 6
apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Apply the pep8 linting plugin apply plugin: 'com.palantir.conda.pep8' apply plugin: 'com.palantir.conda.pylint'
PyLint can be configured in src/.pylintrc
in your Python project. For
example, specific messages can be disabled:
[MESSAGES CONTROL]
disable =
missing-module-docstring,
missing-function-docstring
Not all configurations of PyLint are guaranteed to work in Foundry. If a feature in src/.pylintrc
does not get displayed in Checks, this indicates that the feature is not supported.
The Spark anti-pattern linter can be enabled by applying the com.palantir.transforms.lang.antipattern-linter
Gradle plugin in your Python project's build.gradle
file.
Copied!1 2
// Apply the anti-pattern linter apply plugin: 'com.palantir.transforms.lang.antipattern-linter'
The Spark anti-pattern plugin will warn against the usage of common anti-patterns in Spark such as correctness issues, poor Spark performance, and security implications.
Tests can be enabled by applying the com.palantir.transforms.lang.pytest-defaults
Gradle plugin in your Python project’s build.gradle
file. For transforms repositories, this lives in the Python subproject. For library repositories, this lives in the root folder.
A transforms build.gradle
will look something like this:
Copied!1 2 3 4 5
apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Apply the testing plugin apply plugin: 'com.palantir.transforms.lang.pytest-defaults'
And a library build.gradle
will look something like this:
Copied!1 2 3 4 5 6 7 8
apply plugin: 'com.palantir.transforms.lang.python-library' apply plugin: 'com.palantir.transforms.lang.python-library-defaults' // Apply the testing plugin apply plugin: 'com.palantir.transforms.lang.pytest-defaults' // Publish only for tagged releases (zero commits ahead of last git tag) condaLibraryPublish.onlyIf { versionDetails().commitDistance == 0 }
Runtime requirements defined in the meta.yaml will be available in your tests. Additional requirements can also be specified in the conda test section ↗.
Full documentation can be found at https://docs.pytest.org ↗.
PyTest finds tests in any Python file that begins with test_
or ends with _test.py
. It is recommended to put all your tests into a test
package under the src
directory of your project. Tests are Python functions that are also named with the test_
prefix, and assertions are made using Python’s assert
statement. PyTest will also run tests written using Python’s built-in unittest
↗ module.
For example, in transforms-python/src/test/test_increment.py
a simple test would look like this:
Copied!1 2 3 4 5
def increment(num): return num + 1 def test_increment(): assert increment(3) == 5
Running this test will cause checks to fail with a message that looks like this:
============================= test session starts =============================
collected 1 item
test_increment.py F [100%]
================================== FAILURES ===================================
_______________________________ test_increment ________________________________
def test_increment():
> assert increment(3) == 5
E assert 4 == 5
E + where 4 = increment(3)
test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================
PyTest fixtures ↗ are a powerful feature that enables injecting values into test functions simply by adding a parameter of the same name.
This feature is used to provide a spark_session
fixture for use in your test functions. For example:
Copied!1 2 3
def test_dataframe(spark_session): df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number']) assert df.schema.names == ['letter', 'number']
CSV files can be stored in Code Repository and used as test inputs for testing data transformations.
This following section demonstrates an example that assumes the following data transformation is authored in transforms-python/src/myproject/datasets/
find_aircraft.py
Copied!1 2 3 4 5 6 7 8 9 10
from pyspark.sql import functions as F from transforms.api import transform_df, Input, Output @transform_df( Output("<output_dataset_rid>"), aircraft_df=Input("<input_dataset_rid>"), ) def compute(aircraft_df): return aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes")))
And the following 2 CSV files and its respective contents in the folder:
transforms-python/src/test/resources/
:
aircraft_mock.csv
Copied!1 2 3 4 5 6
tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type AAA1,20809,1990,Manufacturer_1,M1-100,1,3500,Yes,Owned,13/8/90,208 BBB1,46970,2013,Manufacturer_2,M2-300,310,108500,No,Owned,10/15/14,777 CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777 DDD1,58340,2014,Manufacturer_3,M3-200,294,100000,Yes,Leased,11/21/13,330 EEE1,58600,2013,Manufacturer_2,M2-300,300,47200,Yes,Leased,12/2/13,777
expected_filtered_aircraft.csv
Copied!1 2
tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777
The following test can be written at the path transforms-python/src/test/
:
test_find_aircraft.py
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
import os from pathlib import Path from myproject.datasets.find_aircraft import compute TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, header=True ) expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, header=True ) result_df = compute(aircraft_mock_df) assert result_df.columns == expected_filtered_aircraft_df.columns assert result_df.count() == expected_filtered_aircraft_df.count() assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0
The final repository structure will look like the following image:
The test resides in transforms-python/src/test/test_find_aircraft.py
.
The CSV resources for the input and the expected output reside in transforms-python/src/test/resources
When transform functions are decorated with transform()
instead of transform_df
, the transformation function will no longer return the resulting dataframe, but instead use one of the Output
objects sent as an argument to the function to materialize the results in a dataset. To test the logic, you are required to use mocks for the Output
arguments in order to intercept the values that were sent to be materialized.
Assuming the above data transformation change to use the transform()
decorator:
find_aircraft_transform_decorator.py
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
from pyspark.sql import functions as F from transforms.api import transform, Input, Output @transform( results_output=Output("<output_dataset_rid>"), aircraft_input=Input("<input_dataset_rid>"), ) def compute(results_output, aircraft_input): aircraft_df = aircraft_input.dataframe() results_df = aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes"))) results_output.write_dataframe(results_df)
During the test for validation, the transformation function is now expecting an Input()
, for the aircraft_input
argument, and it is required to intercept the value of result_df
send to results_output
.
MagicMock ↗ can be used to create the necessary wrappers for both instances.
test_find_aircraft_transform_decorator.py
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
import os from pathlib import Path from unittest.mock import MagicMock from myproject.datasets.find_aircraft_transform_decorator import compute from transforms.api import Input TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, header=True ) expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, header=True ) # Creating a mock object for the output results_output_mock = MagicMock() # Creating a wrapper for the input and configure the returned dataset aircraft_mock_input = Input() aircraft_mock_input.dataframe = MagicMock(return_value=aircraft_mock_df) # Running the transformation with the mock output object compute( results_output=results_output_mock, aircraft_input=aircraft_mock_input ) # Intercepting the arguments with which write_dataframe was called on the mock object # and extracting the dataframe sent to be written args, kwargs = results_output_mock.write_dataframe.call_args result_df = args[0] assert result_df.columns == expected_filtered_aircraft_df.columns assert result_df.count() == expected_filtered_aircraft_df.count() assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0
The output of any configured test will display in the Checks
tab with a separate output for each test. By default, the test result will display collapsed with the status: PASSED, FAILED, or SKIPPED. Expanding each test (or expanding all tests) will show the test output as well as the StdOut and StdErr logs.
PyTest coverage ↗ can be used to compute coverage and enforce a minimum percentage on your repository.
Add the following to the repository's meta.yml
:
Copied!1 2 3
test: requires: - pytest-cov
Create a pytest.ini
file at /transforms-python/src/pytest.ini
with the following contents:
Copied!1 2
[pytest] addopts = --cov=<<package name, e.g. myproject>> --cov-report term --cov-fail-under=100
The coverage required to fail checks is customizable; select a percentage for the --cov-fail-under
argument.
Running tests that result in coverage less than the prescribed amount will now fail with this output:
By default, PyTest runs tests sequentially. You can speed up test runs by sending tests to multiple CPUs by adjusting the transforms build.gradle
, setting numProcesses
to the value that reflects how many processes to use.
Copied!1 2 3 4 5 6 7 8 9
apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Apply the testing plugin apply plugin: 'com.palantir.transforms.lang.pytest-defaults' tasks.pytest { numProcesses "3" }
Test parallelization is run using the pytest-xdist ↗ testing plugin.
Parallelizing tests will involve sending pending tests to any of the available workers, without any guaranteed orders. Any tests that require global/shared state and anticipate changes being made by other preceding tests should be adjusted accordingly.
:transforms-python:pytest
task being run in the CI logs when you commit.test_
at the beginning of both the file and function names. This is standard to PyTest convention..collect()
.