변환 컨텍스트에서 저장소의 다른 파일을 읽을 수 있습니다. 이를 통해 변환 코드에 참조할 파라미터를 설정하는 데 도움이 될 수 있습니다.
시작하려면, 파이썬 저장소에서 setup.py
를 편집하세요:
Copied!1 2 3 4 5 6 7
setup( name=os.environ['PKG_NAME'], # os.environ을 통해 환경 변수에서 'PKG_NAME'을 가져와 패키지의 이름으로 설정합니다. # ... package_data={ '': ['*.yaml', '*.csv'] # 패키지에 포함시킬 데이터 파일의 형식을 지정합니다. 여기서는 .yaml과 .csv 파일을 포함시킵니다. } )
이것은 파이썬에게 yaml과 csv 파일을 패키지로 번들링하라고 알려줍니다.
그런 다음 파이썬 변환(예: read_yml.py
아래 참조) 바로 옆에 구성 파일(예: config.yaml
이지만 csv 또는 txt일 수도 있음)을 배치하세요:
Copied!1 2 3 4 5 6 7
- name: tbl1 # 테이블 이름: tbl1 primaryKey: # 기본 키 - col1 # 첫번째 기본 키: col1 - col2 # 두번째 기본 키: col2 update: # 업데이트 - column: col3 # 업데이트할 컬럼: col3 with: 'XXX' # col3을 'XXX'로 업데이트
변환에서 read_yml.py
로 아래의 코드를 사용하여 읽을 수 있습니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from transforms.api import transform_df, Input, Output from pkg_resources import resource_stream import yaml import json # 데이터프레임 변환 함수 정의 @transform_df( Output("/Demo/read_yml") ) def my_compute_function(ctx): # 'config.yaml' 파일 스트림 가져오기 stream = resource_stream(__name__, "config.yaml") # YAML 파일 안전하게 읽어들이기 docs = yaml.safe_load(stream) # JSON 형식으로 변환 후 Spark 데이터프레임 생성 return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])
그러므로 프로젝트 구조는 다음과 같습니다:
config.yaml
read_yml.py
이것은 데이터셋에 "결과"라는 한 열에 내용이 있는 단일 행을 결과물로 출력합니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
[ { "primaryKey": ["col1", "col2"], // 기본 키 설정 (프라이머리 키) "update": [ { "column": "col3", // 업데이트할 컬럼 이름 "with": "XXX" // 업데이트할 값 } ], "name": "tbl1" // 테이블 이름 } ]