이 안내서는 Python 변환 리포지토리에서 데이터 기대값을 설정하는 방법을 안내합니다. 데이터 기대값의 고수준 개요를 보려면 이 페이지를 참조하세요.
Code Repositories 왼쪽에 있는 라이브러리 검색 패널을 엽니다. transforms-expectations
을 검색하고 라이브러리 탭 내에 있는 "라이브러리 추가"를 클릭하세요.
그러면 Code Repositories에서 모든 종속성을 해결하고 다시 확인을 실행합니다. 이 작업은 잠시 시간이 걸릴 수 있으며, 완료되면 변환에서 라이브러리를 사용하기 시작할 수 있습니다.
expectations
과 Check
를 변환 파일에 가져옵니다:
Copied!1 2 3 4
# transforms.api에서 필요한 함수와 클래스를 불러옴 from transforms.api import transform_df, Input, Output, Check # transforms에서 expectations를 불러옴 from transforms import expectations as E
일부 일반적인 스키마 및 열 기대값의 경우, types
를 가져오는 것이 좋습니다:
Copied!1 2
# pyspark.sql에서 types을 T라는 이름으로 가져옵니다. from pyspark.sql import types as T
단일 검사의 기본 구조:
Copied!1 2
# 코드를 이해하기 쉽게 한국어 주석을 추가합니다. Check(expectation, '고유 이름 확인', on_error='경고/실패')
FAIL
(기본값) - 검사에 실패하면 작업이 중단됩니다.WARN
- 작업이 계속되고 경고가 생성되어 Data Health에 의해 처리됩니다.데이터셋에 검사 할당
각 검사는 단일 입력값 또는 결과물에 전달되어야 합니다. 단일 검사를 checks=check1
로 전달하거나 여러 검사를 배열로 전달하세요: checks=[check1, check2, ...]
기대치 구조를 더 읽기 쉽게 만들고 각 의미 있는 검사의 동작을 별도로 제어하기 위해 여러 검사를 사용하세요.
결과물에 대한 간단한 기본 키 검사 예시:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E # 데이터 변환 함수 정의 @transform_df( Output( "/Users/data/dataset", # 기본키 체크 및 오류 발생 시 실패 처리 checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("Users/data/input") ) def my_compute_function(input): # 입력 데이터를 그대로 반환 return input
복합 기대치를 사용하여 더 복잡한 검사를 추가할 수도 있습니다. 예를 들어, 열 age
가 long
유형이고 주어진 범위 내에 있는지 확인해 봅시다. 변환 내에서 복합 기대치를 정의하고 오류 발생 시 다른 동작을 적용하여 여러 검사에서 사용할 수 있다는 점에 유의하세요.
검사는 복합 기대치로 구성되어 있을 때에도 전체적으로 모니터링됩니다. 복합 기대치의 특정 부분을 모니터링(즉, 감시 및 알림 받기)하려면 여러 개의 다른 검사로 분할하는 것이 좋습니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E from pyspark.sql import types as T # 나이가 유효하다고 가정하는 범위는 0에서 200 사이입니다. # 나이가 0에서 200 사이인지 확인 expect_valid_age = E.all( E.col('age').has_type(T.LongType()), E.col('age').gte(0), E.col('age').lt(200) ) @transform_df( Output( "/Users/data/dataset", checks=[ # id를 기본키로 설정하고, 에러 발생 시 'FAIL' 반환 Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), # 출력 결과에서 나이가 유효한지 확인하고, 에러 발생 시 'FAIL' 반환 Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", # 입력 데이터에서 나이가 유효한지 확인하고, 에러 발생 시 'WARN' 반환 checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def my_compute_function(input): return input