This guide walks through how to set up Data Expectations in a Python transforms repository. For a high-level overview of Data Expectations, see this page.
Open the library search panel on the left side of your Code Repository. Search for transforms-expectations
and click on "Add library" within the library tab.
Your Code Repository will then resolve all dependencies and run checks again. This may take a couple of moments, after which you will be able to start using the library in your transforms.
Import the expectations
and Check
into your transforms file:
Copied!1 2
from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E
For some common schema and column expectations you may want to import types
as well:
Copied!1
from pyspark.sql import types as T
The basic structure of a single check:
Copied!1
Check(expectation, 'Check unique name', on_error='WARN/FAIL')
FAIL
(default) - Job will be aborted if check failsWARN
- Job will continue and a warning will be generated and handled by Data HealthAssign checks to dataset
Each check should be passed to a single input or output. Pass a single check as checks=check1
or multiple checks in an array: checks=[check1, check2, ...]
Use multiple checks to create more legible Expectations structure and control the behavior of each meaningful check separately.
An example for a simple primary key check on the output:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E @transform_df( Output( "/Users/data/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("Users/data/input") ) def my_compute_function(input): return input
You can also add more complex checks using composite expectations. For example, let us check that column age
is of type long
and in a given range. Notice that we can define the composite expectation and use it in multiple checks within the transform, applying different behavior on errors.
A check is monitored as a whole even when it consists of a composite expectation. If you want to monitor (that is, watch and get notifications) specific parts of the composite expectation, it is recommended that you split it to several different checks.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E from pyspark.sql import types as T # We assume an age is valid if it is between 0 and 200. expect_valid_age = E.all( E.col('age').has_type(T.LongType()), E.col('age').gte(0), E.col('age').lt(200) ) @transform_df( Output( "/Users/data/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def my_compute_function(input): return input