Unit tests

The Python repository unit tests described on this page are only applicable to batch pipelines, and are not supported for streaming pipelines.

Python repositories have the option of running tests as part of checks. These tests are run using the popular Python testing framework, PyTest ↗.

CI tasks: condaPackRun

All CI checks contain, among other tasks, the condaPackRun.

Build time summary of CI checks for a repository.

The condaPackRun is responsible for installing the environment. Each artifact is retrieved from the proper channel, and Conda uses these artifacts to construct the environment. This task contains three stages:

  1. Download and extract all packages in the solved environment.
  2. Verify package contents. Depending on configuration, Conda will either use a checksum or will verify that the file size is correct.
  3. Link packages into the environment.

The environment specifications is stored, as a cache for the next builds, in the hidden files:

  • conda-version-run.linux-64.lock
  • conda-version-test.linux-64.lock

The cache is stored for 7 days. It is re-cached if any change happens in the meta.yaml file.

This task is heavily dependent on how many packages are added to the repositories. The more packages added, the slower the task will run.

Enabling stylechecks

PEP8 / PyLint stylechecks can be enabled by applying the com.palantir.conda.pep8 and com.palantir.conda.pylint Gradle plugin in your Python project's build.gradle file. For transforms repositories, this lives in the Python subproject. For library repositories, this lives in the root folder. A transforms build.gradle will look something like this:

Copied!
1 2 3 4 5 6 apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Apply the pep8 linting plugin apply plugin: 'com.palantir.conda.pep8' apply plugin: 'com.palantir.conda.pylint'

PyLint can be configured in src/.pylintrc in your Python project. For example, specific messages can be disabled:

[MESSAGES CONTROL]
disable =
    missing-module-docstring,
    missing-function-docstring
PyLint limitations

Not all configurations of PyLint are guaranteed to work in Foundry. If a feature in src/.pylintrc does not get displayed in Checks, this indicates that the feature is not supported.

Enabling Spark anti-pattern plugin

The Spark anti-pattern linter can be enabled by applying the com.palantir.transforms.lang.antipattern-linter Gradle plugin in your Python project's build.gradle file.

Copied!
1 2 // Apply the anti-pattern linter apply plugin: 'com.palantir.transforms.lang.antipattern-linter'

The Spark anti-pattern plugin will warn against the usage of common anti-patterns in Spark such as correctness issues, poor Spark performance, and security implications.

Enabling tests

Tests can be enabled by applying the com.palantir.transforms.lang.pytest-defaults Gradle plugin in your Python project’s build.gradle file. For transforms repositories, this lives in the Python subproject. For library repositories, this lives in the root folder. A transforms build.gradle will look something like this:

Copied!
1 2 3 4 5 apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Apply the testing plugin apply plugin: 'com.palantir.transforms.lang.pytest-defaults'

And a library build.gradle will look something like this:

Copied!
1 2 3 4 5 6 7 8 apply plugin: 'com.palantir.transforms.lang.python-library' apply plugin: 'com.palantir.transforms.lang.python-library-defaults' // Apply the testing plugin apply plugin: 'com.palantir.transforms.lang.pytest-defaults' // Publish only for tagged releases (zero commits ahead of last git tag) condaLibraryPublish.onlyIf { versionDetails().commitDistance == 0 }

Runtime requirements defined in the meta.yaml will be available in your tests. Additional requirements can also be specified in the conda test section ↗.

Writing a test

Full documentation can be found at https://docs.pytest.org ↗.

PyTest finds tests in any Python file that begins with test_ or ends with _test.py. It is recommended to put all your tests into a test package under the src directory of your project. Tests are Python functions that are also named with the test_ prefix, and assertions are made using Python’s assert statement. PyTest will also run tests written using Python’s built-in unittest module. For example, in transforms-python/src/test/test_increment.py a simple test would look like this:

Copied!
1 2 3 4 5 def increment(num): return num + 1 def test_increment(): assert increment(3) == 5

Running this test will cause checks to fail with a message that looks like this:

============================= test session starts =============================
collected 1 item

test_increment.py F                                                       [100%]

================================== FAILURES ===================================
_______________________________ test_increment ________________________________

    def test_increment():
>       assert increment(3) == 5
E       assert 4 == 5
E        +  where 4 = increment(3)

test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================

Testing with PySpark

PyTest fixtures ↗ are a powerful feature that enables injecting values into test functions simply by adding a parameter of the same name. This feature is used to provide a spark_session fixture for use in your test functions. For example:

Copied!
1 2 3 def test_dataframe(spark_session): df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number']) assert df.schema.names == ['letter', 'number']

Creating test inputs from CSV

CSV files can be stored in Code Repository and used as test inputs for testing data transformations.

This following section demonstrates an example that assumes the following data transformation is authored in transforms-python/src/myproject/datasets/

find_aircraft.py

Copied!
1 2 3 4 5 6 7 8 9 10 from pyspark.sql import functions as F from transforms.api import transform_df, Input, Output @transform_df( Output("<output_dataset_rid>"), aircraft_df=Input("<input_dataset_rid>"), ) def compute(aircraft_df): return aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes")))

And the following 2 CSV files and its respective contents in the folder: transforms-python/src/test/resources/:

aircraft_mock.csv

Copied!
1 2 3 4 5 6 tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type AAA1,20809,1990,Manufacturer_1,M1-100,1,3500,Yes,Owned,13/8/90,208 BBB1,46970,2013,Manufacturer_2,M2-300,310,108500,No,Owned,10/15/14,777 CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777 DDD1,58340,2014,Manufacturer_3,M3-200,294,100000,Yes,Leased,11/21/13,330 EEE1,58600,2013,Manufacturer_2,M2-300,300,47200,Yes,Leased,12/2/13,777

expected_filtered_aircraft.csv

Copied!
1 2 tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777

The following test can be written at the path transforms-python/src/test/:

test_find_aircraft.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import os from pathlib import Path from myproject.datasets.find_aircraft import compute TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, header=True ) expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, header=True ) result_df = compute(aircraft_mock_df) assert result_df.columns == expected_filtered_aircraft_df.columns assert result_df.count() == expected_filtered_aircraft_df.count() assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0

The final repository structure will look like the following image:

Unit test with example inputs

The test resides in transforms-python/src/test/test_find_aircraft.py. The CSV resources for the input and the expected output reside in transforms-python/src/test/resources

Intercepting written dataframe in transform() decorator

When transform functions are decorated with transform() instead of transform_df, the transformation function will no longer return the resulting dataframe, but instead use one of the Output objects sent as an argument to the function to materialize the results in a dataset. To test the logic, you are required to use mocks for the Output arguments in order to intercept the values that were sent to be materialized.

Assuming the above data transformation change to use the transform() decorator:

find_aircraft_transform_decorator.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 from pyspark.sql import functions as F from transforms.api import transform, Input, Output @transform( results_output=Output("<output_dataset_rid>"), aircraft_input=Input("<input_dataset_rid>"), ) def compute(results_output, aircraft_input): aircraft_df = aircraft_input.dataframe() results_df = aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes"))) results_output.write_dataframe(results_df)

During the test for validation, the transformation function is now expecting an Input(), for the aircraft_input argument, and it is required to intercept the value of result_df send to results_output.

MagicMock ↗ can be used to create the necessary wrappers for both instances.

test_find_aircraft_transform_decorator.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import os from pathlib import Path from unittest.mock import MagicMock from myproject.datasets.find_aircraft_transform_decorator import compute from transforms.api import Input TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, header=True ) expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, header=True ) # Creating a mock object for the output results_output_mock = MagicMock() # Creating a wrapper for the input and configure the returned dataset aircraft_mock_input = Input() aircraft_mock_input.dataframe = MagicMock(return_value=aircraft_mock_df) # Running the transformation with the mock output object compute( results_output=results_output_mock, aircraft_input=aircraft_mock_input ) # Intercepting the arguments with which write_dataframe was called on the mock object # and extracting the dataframe sent to be written args, kwargs = results_output_mock.write_dataframe.call_args result_df = args[0] assert result_df.columns == expected_filtered_aircraft_df.columns assert result_df.count() == expected_filtered_aircraft_df.count() assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0

Viewing test output

The output of any configured test will display in the Checks tab with a separate output for each test. By default, the test result will display collapsed with the status: PASSED, FAILED, or SKIPPED. Expanding each test (or expanding all tests) will show the test output as well as the StdOut and StdErr logs.

checks-test

Test coverage

PyTest coverage ↗ can be used to compute coverage and enforce a minimum percentage on your repository.

Add the following to the repository's meta.yml:

Copied!
1 2 3 test: requires: - pytest-cov

Create a pytest.ini file at /transforms-python/src/pytest.ini with the following contents:

Copied!
1 2 [pytest] addopts = --cov=<<package name, e.g. myproject>> --cov-report term --cov-fail-under=100

The coverage required to fail checks is customizable; select a percentage for the --cov-fail-under argument.

Running tests that result in coverage less than the prescribed amount will now fail with this output:

coverage-test

Parallelizing tests

By default, PyTest runs tests sequentially. You can speed up test runs by sending tests to multiple CPUs by adjusting the transforms build.gradle, setting numProcesses to the value that reflects how many processes to use.

Copied!
1 2 3 4 5 6 7 8 9 apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Apply the testing plugin apply plugin: 'com.palantir.transforms.lang.pytest-defaults' tasks.pytest { numProcesses "3" }

Test parallelization is run using the pytest-xdist ↗ testing plugin.

Parallelizing tests will involve sending pending tests to any of the available workers, without any guaranteed orders. Any tests that require global/shared state and anticipate changes being made by other preceding tests should be adjusted accordingly.

Tips

  1. After enabling these tests, you should see the :transforms-python:pytest task being run in the CI logs when you commit.
  2. Tests are discovered based on test_ at the beginning of both the file and function names. This is standard to PyTest convention.
  3. A quick way to get example records is to open the dataset in the Code Workbook console and call .collect().
  4. To obtain a Python-formatted schema, open the dataset preview, then open the Columns tab and click Copy and then Copy PySpark Schema.