비구조화된 파일 접근은 고급 주제입니다. 이 페이지를 읽기 전에 이 사용자 가이드의 나머지 내용을 잘 알고 있는지 확인하세요.
다양한 이유로 데이터 변환에서 파일에 접근하려고 할 수 있습니다. 파일 접근은 특히 XML
또는 JSON
과 같은 비테이블 형식의 파일이나 gz
또는 zip
과 같은 압축 형식의 파일을 처리하려는 경우 유용합니다.
transforms
Python 라이브러리는 사용자가 Foundry 데이터셋에서 파일을 읽고 쓸 수 있게 합니다. transforms.api.TransformInput
은 읽기 전용 FileSystem 오브젝트를 제공하며, transforms.api.TransformOutput
은 쓰기 전용 FileSystem 오브젝트를 제공합니다. 이들 FileSystem
오브젝트들은 Foundry 데이터셋 내의 파일 경로를 기반으로 파일 접근을 허용하며, 이는 기본 저장소를 추상화합니다.
데이터 변환에서 파일에 접근하려면, transform()
데코레이터를 사용하여 Transform
오브젝트를 구성해야 합니다. 이는 FileSystem 오브젝트가 TransformInput
및 TransformOutput
오브젝트에 의해 노출되기 때문입니다. transform()
는 계산 함수에 대한 입력 및 출력이 각각 TransformInput
과 TransformOutput
유형이어야 한다고 예상하는 유일한 데코레이터입니다.
파일은 수동 파일 가져오기 또는 Data Connection을 통해 동기화하여 Foundry에 업로드할 수 있습니다. 구조화된 파일과 비구조화된 파일은 Foundry 데이터셋에 가져와서 하류 애플리케이션에서 처리할 수 있습니다. 파일은 확장자를 수정하지 않고 원시 파일로도 업로드할 수 있습니다. 아래 예제는 원시 파일이 아닌 Foundry 데이터셋으로 업로드된 파일을 참조합니다.
Foundry에는 데이터셋에 업로드된 특정 파일 유형에 대한 스키마를 자동으로 추론하는 기능도 있습니다. 예를 들어, CSV
유형의 파일을 가져올 때 스키마 적용
버튼을 사용하여 자동으로 스키마를 적용할 수 있습니다. 수동으로 데이터 업로드하기에 대해 더 알아보세요.
데이터셋 내의 파일은 transforms.api.FileSystem.ls()
메서드를 사용하여 나열할 수 있습니다. 이 메서드는 transforms.api.FileStatus
오브젝트의 생성기를 반환합니다. 이 오브젝트들은 각 파일의 경로, 크기(바이트 단위), 및 수정된 타임스탬프(유닉스 시대 이후 밀리초)를 캡처합니다. 다음 Transform
오브젝트를 고려해보세요:
Copied!1 2 3 4 5 6 7 8 9 10
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 학생들의 머리색과 눈색 데이터 입력 processed=Output('/examples/hair_eye_color_processed') # 처리된 데이터 출력 ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 눈색을 필터링하는 데이터 변환 코드 pass
데이터 변환 코드에서 데이터셋 파일을 탐색할 수 있습니다:
Copied!1 2 3 4 5
list(hair_eye_color.filesystem().ls()) # 결과: [FileStatus(path='students.csv', size=688, modified=...)] # 'hair_eye_color' 객체의 파일 시스템에서 사용 가능한 파일 목록을 나열합니다. # 이 경우 'students.csv' 파일이 나열됩니다. # 파일의 크기는 688바이트이며, 수정된 날짜와 시간도 표시됩니다.
ls()
호출의 결과를 glob 또는 regex 패턴을 전달하여 필터링하는 것도 가능합니다:
Copied!1 2 3 4 5
list(hair_eye_color.filesystem().ls(glob='*.csv')) # 결과: [FileStatus(path='students.csv', size=688, modified=...)] list(hair_eye_color.filesystem().ls(regex='[A-Z]*\.csv')) # 결과: []
파일은 transforms.api.FileSystem.open()
메서드를 사용하여 열 수 있습니다. 이렇게 하면 파이썬 파일과 유사한 스트림 오브젝트가 반환됩니다. io.open()
에서 지원하는 모든 옵션도 지원됩니다. 파일이 스트림으로 읽혀서 무작위 접근이 지원되지 않는다는 점에 주의하세요.
open()
메서드로 반환되는 파일과 유사한 스트림 오브젝트는 seek
또는 tell
메서드를 지원하지 않습니다. 따라서 무작위 접근이 지원되지 않습니다.
다음과 같은 Transform
오브젝트를 고려해 보세요:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform, Input, Output @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), # 학생들의 머리와 눈 색에 대한 데이터를 입력으로 받습니다. processed=Output('/examples/hair_eye_color_processed') # 처리된 데이터를 출력으로 내보냅니다. ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code # 이곳에 데이터 변환 코드를 작성합니다. pass
데이터 변환 코드에서 데이터셋 파일을 읽을 수 있습니다:
Copied!1 2 3 4 5
with hair_eye_color.filesystem().open('students.csv') as f: f.readline() # 결과: 'id,hair,eye,sex\n' # 설명: hair_eye_color 파일 시스템에서 'students.csv' 파일을 열고, 첫 줄을 읽어옵니다.
스트림은 파싱 라이브러리에도 전달될 수 있습니다. 예를 들어, CSV 파일을 파싱할 수 있습니다.
Copied!1 2 3 4 5 6 7
import csv # 학생들의 머리카락 색과 눈 색을 담고 있는 students.csv 파일을 여는 코드 with hair_eye_color.filesystem().open('students.csv') as f: reader = csv.reader(f, delimiter=',') next(reader) # 첫 번째 라인(헤더)을 건너뛴다 # 결과: ['id', 'hair', 'eye', 'sex']
앞서 언급했듯이, XML
이나 JSON
과 같은 비테이블 형식의 파일이나 gz
나 zip
과 같은 압축된 형식의 파일도 처리할 수 있습니다. 예를 들어, 아래의 코드를 사용하면 압축된 파일 내의 CSV를 읽고 그 내용을 데이터프레임으로 반환할 수 있습니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
def process_file(file_status): # 파일을 읽어옵니다 with fs.open(file_status.path, 'rb') as f: # 임시 파일을 생성합니다 with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() # 파일을 압축 해제합니다 with zipfile.ZipFile(tmp) as archive: # 압축 해제한 파일 목록을 불러옵니다 for filename in archive.namelist(): # 각 파일을 처리합니다 with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # CSV 파일의 첫 번째 줄을 건너뜁니다 # 각 줄을 처리하여 MyRow 객체를 생성하고 반환합니다 for line in tw: yield MyRow(*line.split(",")) # 파일 시스템에서 파일 목록을 가져옵니다 rdd = fs.files().rdd # process_file 함수를 사용하여 각 파일을 처리합니다 rdd = rdd.flatMap(process_file) # RDD를 DataFrame으로 변환합니다 df = rdd.toDF()
랜덤 접근을 사용하면 성능이 크게 저하됩니다. seek
메소드에 의존하지 않도록 코드를 다시 작성하는 것을 권장합니다. 여전히 랜덤 접근을 사용하려면 아래 정보를 참조하여 방법을 확인하십시오.
open()
메소드는 스트림 오브젝트를 반환하므로, 랜덤 접근은 지원되지 않습니다. 랜덤 접근이 필요한 경우 파일을 메모리나 디스크에 버퍼링할 수 있습니다. hair_eye_color
가 TransformInput
오브젝트에 해당한다고 가정하면, 아래는 몇 가지 예입니다:
Copied!1 2 3 4 5 6 7 8 9 10 11
import io import shutil s = io.StringIO() # hair_eye_color.filesystem()에서 'students.csv' 파일을 열어서 읽습니다. with hair_eye_color.filesystem().open('students.csv') as f: # 파일의 내용을 s(StringIO 객체)로 복사합니다. shutil.copyfileobj(f, s) # s 객체에 저장된 값을 가져옵니다. s.getvalue() # 결과: 'id,hair,eye,sex\n...'
Copied!1 2 3 4 5 6
with hair_eye_color.filesystem().open('students.csv') as f: lines = f.read().splitlines() lines[0] # 결과: 'id,hair,eye,sex' # 결과: '아이디,머리색,눈색,성별'
Copied!1 2 3 4 5 6 7 8 9
import tempfile with tempfile.NamedTemporaryFile() as tmp: with hair_eye_color.filesystem().open('students.csv', 'rb') as f: # 'students.csv'라는 파일을 바이너리 읽기 모드(rb)로 엽니다. shutil.copyfileobj(f, tmp) # f 객체의 내용을 임시 파일 tmp로 복사합니다. tmp.flush() # shutil.copyfileobj는 flush를 수행하지 않으므로 복사 후에 flush를 수행해줍니다. with open(tmp.name) as t: # 임시 파일을 다시 열어서 t.readline() # 첫 번째 줄을 읽습니다. # 결과: 'id,hair,eye,sex\n'
파일은 open()
메서드를 사용하여 비슷한 방식으로 작성됩니다. 이것은 Python 파일과 유사한 스트림 오브젝트를 반환하며, 이는 오직 쓰기만 가능합니다. io.open()
에서 지원하는 모든 키워드 인수도 지원됩니다. 파일은 스트림으로 작성되므로 무작위 접근이 지원되지 않음을 주의하십시오. 다음과 같은 Transform
오브젝트를 고려해 보세요:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
# 필요한 라이브러리를 가져옵니다. from transforms.api import transform, Input, Output # transform 데코레이터를 이용해 함수를 정의합니다. # 이 함수는 '/examples/students_hair_eye_color'의 입력 데이터를 받아서 # '/examples/hair_eye_color_processed'로 출력합니다. @transform( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 데이터 변환 코드를 작성하는 부분입니다. pass
데이터 변환 코드에서 결과물 파일 시스템에 쓰는 것이 가능합니다. 다음 예제에서는 Python의 내장 직렬화 도구인 pickle
모듈을 사용하여 모델을 유지합니다:
Copied!1 2 3 4 5 6
import pickle # 'model.pickle' 파일을 쓰기 모드('wb')로 연다. with processed.filesystem().open('model.pickle', 'wb') as f: # pickle 모듈을 사용하여 모델을 저장(dump)한다. pickle.dump(model, f)
DataFrame
객체의 관점에서 표현된 데이터 변환과는 달리, 파일 기반 변환에서는 드라이버 코드와 실행자 코드의 차이를 이해하는 것이 중요합니다. 계산 함수는 드라이버(단일 기계)에서 실행되며, Spark는 자동으로 DataFrame
함수를 적절하게 실행자(많은 기계)에 분배합니다.
파일 API로 분산 처리의 이점을 얻으려면, 계산을 분배하도록 Spark를 활용해야 합니다. 이를 위해, 우리는 DataFrame
of FileStatus
를 생성하고 이를 실행자에 분배합니다. 각 실행자의 작업은 할당받은 파일을 열어 처리할 수 있으며, 결과는 Spark에 의해 집계됩니다.
파일 API는 ls()
함수와 동일한 인수를 받는 files()
함수를 제공하지만, 대신 FileStatus
객체의 DataFrame
을 반환합니다. 이 DataFrame은 파일 크기를 기준으로 파티션되어 파일 크기가 다양할 때 계산을 균형잡게 돕습니다. 파티션은 다음 두 가지 Spark 구성 옵션을 사용하여 제어할 수 있습니다:
spark.sql.files.maxPartitionBytes는 파일을 읽을 때 단일 파티션에 패키징할 바이트의 최대 수입니다.
spark.sql.files.openCostInBytes는 파일을 열기 위한 예상 비용으로, 동일한 시간에 스캔할 수 있는 바이트 수로 측정됩니다. 이는 파일 크기에 추가하여 파티션에서 파일이 사용하는 총 바이트 수를 계산하는 데 사용됩니다.
이러한 속성의 값을 수정하려면, 사용자 정의 Transforms 프로필을 생성하고 이를 configure()
데코레이터를 사용하여 Transform에 적용해야 합니다. 자세한 정보는 Code Repositories 문서의 Transforms 프로필 정의 섹션을 참조하십시오.
이제 예제를 통해 살펴보겠습니다. 파싱하고 연결하려는 CSV 파일이 있다고 가정해봅시다. 우리는 flatMap()
를 사용하여 각 FileStatus
객체에 처리 함수를 적용합니다. 이 처리 함수는 pyspark.sql.SparkSession.createDataFrame()
에 따라 행을 생성해야 합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
import csv from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType from transforms.api import transform, Input, Output @transform( processed=Output('/examples/hair_eye_color_processed'), # 처리된 데이터를 저장할 경로 지정 hair_eye_color=Input('/examples/students_hair_eye_color_csv'), # 원본 데이터 파일의 경로 지정 ) def example_computation(hair_eye_color, processed): def process_file(file_status): with hair_eye_color.filesystem().open(file_status.path) as f: r = csv.reader(f) # Construct a pyspark.Row from our header row # 헤더 행으로부터 pyspark.Row 생성 header = next(r) MyRow = Row(*header) for row in r: yield MyRow(*row) # 각 행을 MyRow 형태로 반환 # 스키마 생성. 각 필드는 문자열 타입이며, nullable=True schema = StructType([ StructField('student_id', StringType(), True), StructField('hair_color', StringType(), True), StructField('eye_color', StringType(), True), ]) # '**/*.csv' 패턴에 맞는 모든 파일을 읽어들임 files_df = hair_eye_color.filesystem().files('**/*.csv') # 각 파일에 대해 process_file 함수를 적용하고, 그 결과를 스키마에 맞는 DataFrame으로 변환 processed_df = files_df.rdd.flatMap(process_file).toDF(schema) # 처리된 DataFrame을 지정된 경로에 저장 processed.write_dataframe(processed_df)
스키마를 전달하지 않고 toDF()
를 호출할 수 있지만, 파일 처리 결과가 0행이면 Spark의 스키마 추론이 실패하여 ValueError: RDD is empty
예외가 발생합니다. 따라서 항상 수동으로 스키마를 지정할 것을 권장합니다.