데이터 통합PythonData expectations시작하기

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

시작하기

이 안내서는 Python 변환 리포지토리에서 데이터 기대값을 설정하는 방법을 안내합니다. 데이터 기대값의 고수준 개요를 보려면 이 페이지를 참조하세요.

리포지토리 설정

Code Repositories 왼쪽에 있는 라이브러리 검색 패널을 엽니다. transforms-expectations을 검색하고 라이브러리 탭 내에 있는 "라이브러리 추가"를 클릭하세요.

그러면 Code Repositories에서 모든 종속성을 해결하고 다시 확인을 실행합니다. 이 작업은 잠시 시간이 걸릴 수 있으며, 완료되면 변환에서 라이브러리를 사용하기 시작할 수 있습니다.

변환 설정

expectationsCheck를 변환 파일에 가져옵니다:

Copied!
1 2 3 4 # transforms.api에서 필요한 함수와 클래스를 불러옴 from transforms.api import transform_df, Input, Output, Check # transforms에서 expectations를 불러옴 from transforms import expectations as E

일부 일반적인 스키마 및 열 기대값의 경우, types를 가져오는 것이 좋습니다:

Copied!
1 2 # pyspark.sql에서 types을 T라는 이름으로 가져옵니다. from pyspark.sql import types as T

검사 생성

단일 검사의 기본 구조:

Copied!
1 2 # 코드를 이해하기 쉽게 한국어 주석을 추가합니다. Check(expectation, '고유 이름 확인', on_error='경고/실패')
  • 기대치 - 단일 기대치로, 여러 하위 기대치의 복합 기대치 (예: any/all 연산자 사용)가 될 수 있습니다.
  • 고유한 이름 확인 - 변환에서 고유해야 하며 (결과물과 입력값간에 동일한 이름을 공유할 수 없음) 앱 (예: Data Health, 빌드 애플리케이션)간에 확인을 식별합니다.
  • on_error - 기대치가 충족되지 않을 때 작업의 동작을 정의합니다:
    • FAIL (기본값) - 검사에 실패하면 작업이 중단됩니다.
    • WARN - 작업이 계속되고 경고가 생성되어 Data Health에 의해 처리됩니다.

데이터셋에 검사 할당

각 검사는 단일 입력값 또는 결과물에 전달되어야 합니다. 단일 검사를 checks=check1로 전달하거나 여러 검사를 배열로 전달하세요: checks=[check1, check2, ...]

여러 검사

기대치 구조를 더 읽기 쉽게 만들고 각 의미 있는 검사의 동작을 별도로 제어하기 위해 여러 검사를 사용하세요.

결과물에 대한 간단한 기본 키 검사 예시:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E # 데이터 변환 함수 정의 @transform_df( Output( "/Users/data/dataset", # 기본키 체크 및 오류 발생 시 실패 처리 checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("Users/data/input") ) def my_compute_function(input): # 입력 데이터를 그대로 반환 return input

복합 검사

복합 기대치를 사용하여 더 복잡한 검사를 추가할 수도 있습니다. 예를 들어, 열 agelong 유형이고 주어진 범위 내에 있는지 확인해 봅시다. 변환 내에서 복합 기대치를 정의하고 오류 발생 시 다른 동작을 적용하여 여러 검사에서 사용할 수 있다는 점에 유의하세요.

검사는 복합 기대치로 구성되어 있을 때에도 전체적으로 모니터링됩니다. 복합 기대치의 특정 부분을 모니터링(즉, 감시 및 알림 받기)하려면 여러 개의 다른 검사로 분할하는 것이 좋습니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E from pyspark.sql import types as T # 나이가 유효하다고 가정하는 범위는 0에서 200 사이입니다. # 나이가 0에서 200 사이인지 확인 expect_valid_age = E.all( E.col('age').has_type(T.LongType()), E.col('age').gte(0), E.col('age').lt(200) ) @transform_df( Output( "/Users/data/dataset", checks=[ # id를 기본키로 설정하고, 에러 발생 시 'FAIL' 반환 Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), # 출력 결과에서 나이가 유효한지 확인하고, 에러 발생 시 'FAIL' 반환 Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", # 입력 데이터에서 나이가 유효한지 확인하고, 에러 발생 시 'WARN' 반환 checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def my_compute_function(input): return input