Foundry의 PySpark에서 다양한 디버깅 정보를 결과물로 낼 수 있습니다.
파이썬의 내장된 print
는 코드 에디터 오른쪽에 있는 Code Workbook의 결과물 섹션으로 파이프되어, 일반적으로 오류가 나타나는 곳입니다.
Copied!1 2
def new_dataset(some_input_dataset): print("example log output") # "example log output"을 출력합니다.
이 함수는 주어진 데이터셋(some_input_dataset
)에 대한 작업을 수행하는데, 여기에는 실제 작업이 포함되지 않았습니다. 그저 example log output
이라는 메시지를 출력하는 것만 있습니다. 이 함수는 더 많은 기능이 추가되어야 할 템플릿일 수 있습니다.
예시 로그 출력
Code Repositories는 Python의 내장된 logging library를 사용합니다. 이는 널리 온라인에 문서화되어 있으며 로그 레벨 (ERROR
, WARNING
, INFO
)을 제어하여 필터링을 더 쉽게 할 수 있게 해줍니다.
로그 출력은 결과물 데이터셋의 로그 파일과 빌드의 드라이버 로그(데이터셋 -> 상세 -> 파일 -> 로그 파일
, 그리고 빌드 -> 빌드 -> 작업 상태 로그
; 각각 "드라이버 로그"
를 선택하세요)에 모두 나타납니다.
Copied!1 2 3 4 5 6 7 8 9 10
import logging logger = logging.getLogger(__name__) @transform_df( ... ) def some_transformation(some_input): # 예시 로그 출력 logger.info("example log output")
INFO [2018-01-01T12:00:00] some_transformation: 예시 로그 출력
Spark는 쿼리를 생성하는 최상위 드라이버 프로세스, 예를 들어 위의 some_transformation
함수에서 로깅 출력을 캡처합니다. 그러나 사용자 정의 함수 (UDFs) 내부에서 작성된 로그는 캡처하지 않습니다. PySpark 쿼리 내에서 UDF를 사용하고 데이터를 로그에 기록해야 하는 경우, 캡처하려는 데이터를 반환하는 두 번째 UDF를 생성하고 호출하십시오.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
@transform_df( ... ) def some_transformation(some_input): logger.info("log output related to the overall query") # 전체 쿼리와 관련된 로그 출력 @F.udf("integer") def custom_function(integer_input): return integer_input + 5 # 입력된 정수에 5를 더하여 반환 @F.udf("string") def custom_log(integer_input): return "Original integer was %d before adding 5" % integer_input # 5를 더하기 전 원래 정수는 %d였습니다. df = ( some_input .withColumn("new_integer", custom_function(F.col("example_integer_col")) .withColumn("debugging", custom_log(F.col("example_integer_col")) )
쿼리에서 무슨 일이 일어나는지에 대한 정보를 로그에 기록하고 싶을 때가 많습니다. PySpark에는 DataFrame을 내부 조사하는 여러 가지 방법이 있으며, 이 정보를 위에서 설명한 로깅 메커니즘으로 전송할 수 있습니다.
이 예시에서는 Code Workbook의 print
구문을 사용하지만, Transforms & Authoring의 logger
로 대체할 수 있습니다.
df.columns
를 사용하면 DataFrame에 존재하는 열을 내부 조사할 수 있습니다. 이것은 문자열 목록을 생성합니다.
Copied!1 2 3 4 5 6 7 8 9
def employee_phone_numbers(employee_df, phone_number_df): # 직원 데이터프레임의 칼럼을 출력합니다. print("employee columns are {}".format(employee_df.columns)) # 전화번호 데이터프레임의 칼럼을 출력합니다. print("phone columns are {}".format(phone_df.columns)) # 직원 데이터프레임과 전화번호 데이터프레임을 'employee_id'를 기준으로 왼쪽 조인합니다. df = employee_df.join(phone_number_df, 'employee_id', 'left') # 조인한 데이터프레임의 칼럼을 출력합니다. print("joined columns are {}".format(df.columns))
employee 컬럼은 ['name', 'employee_id']
phone 컬럼은 ['phone_number', 'employee_id']
joined 컬럼은 ['name', 'employee_id', 'phone_number']
예를 들어 왼쪽 조인을 수행하고, 일대일 관계를 기대하며, 왼쪽 DataFrame의 행 수가 동일한지 확인하려고 합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
def employee_phone_numbers(employee_df, phone_number_df): # employee_df 데이터프레임의 원래 행 수를 세어 original_employee_rows 변수에 저장합니다. original_employee_rows = employee_df.count() # 원래 행 수를 출력합니다. print("Incoming employee rows {}".format(original_employee_rows)) # employee_df와 phone_number_df를 'employee_id'를 기준으로 왼쪽 조인합니다. # 결과를 df라는 새로운 데이터프레임에 저장합니다. df = employee_df.join(phone_number_df, 'employee_id', 'left') # 조인 후의 행 수를 세어 rows_after_join 변수에 저장합니다. rows_after_join = df.count() # 최종 행 수를 출력합니다. print("Final employee rows {}".format(rows_after_join)) # 조인 후의 행 수가 원래 행 수보다 크다면, # 즉, 어떤 직원이 여러 개의 전화번호를 가지고 있다면 아래 메시지를 출력합니다. if rows_after_join > original_employee_rows: print("Some employees have multiple phone numbers!") # 그렇지 않다면, 데이터가 올바르다는 메시지를 출력합니다. else: print("Data is correct")
들어오는 직원 행 100
최종 직원 행 105
일부 직원들은 여러 개의 전화번호를 가지고 있습니다!
주어진 DataFrame
을 생성하기 위해 스파크가 실행할 최적화된 물리 계획에 접근하려면 .explain()
을 호출하십시오.
Copied!1 2 3 4 5 6 7
def employee_phone_numbers(employee, phone): # 직원의 생일이 현재 달과 같은 달인 직원들을 필터링합니다. employee = employee.where(F.month(employee.birthday) == F.month(F.current_date())) # 직원 데이터프레임과 전화번호 데이터프레임을 employee_id를 기준으로 결합합니다. df = employee.join(phone, 'employee_id', 'left') # 최적화된 실행 계획을 출력합니다. df.explain()
== Physical Plan ==
*(2) Project [employee_id#9734, name#9732, birthday#9733, phone_number#9728]
+- *(2) BroadcastHashJoin [employee_id#9734], [employee_id#9729], LeftOuter, BuildRight
// 왼쪽 테이블 birthday의 월이 10월인 직원들 필터링
:- *(2) Filter (month(birthday#9733) = 10)
: +- *(2) FileScan parquet !ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36:ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36@00000000-1ebd-4a81-9f64-2d4c8a8472bc:master.ri.foundry.main.dataset.6ad20cd7-45b0-4312-b096-05f57487f650[name#9732,birthday#9733,employee_id#9734] Batched: true, Format: Parquet, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,birthday:date,employee_id:int>
// 오른쪽 테이블에서 employee_id가 null이 아닌 데이터 필터링
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
+- *(1) Project [phone_number#9728, employee_id#9729]
+- *(1) Filter isnotnull(employee_id#9729)
+- *(1) FileScan csv !ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db:ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db@00000000-1ebc-f483-b75d-dbcc3292d9e4:master.ri.foundry.main.dataset.f5bf4c77-37c0-4e29-8a68-814c35442bbd[phone_number#9728,employee_id#9729] Batched: false, Format: CSV, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<phone_number:int,employee_id:int>
예를 들어, 어떤 직원이 가장 많은 전화번호를 가지고 있는지 확인하려고 합니다. 관심 있는 데이터셋(전화번호가 1개 이상인 직원)을 파생시키고 .take(3)
을 호출하여 상위 3개 행을 리스트로 검색합니다. 또한 .collect()
는 DataFrame의 모든 행을 리스트로 검색합니다.
Python 환경에서 너무 많은 데이터를 가져오면 메모리 부족이 쉽게 발생할 수 있습니다. 작은 양의 데이터만 collect()
하세요.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# 함수 정의: multiple_numbers def multiple_numbers(phone_numbers): # 'employee_id'를 기준으로 그룹화하고, 각 그룹에서 'phone_number'의 개수를 세어 'numbers'라는 별칭으로 지정합니다. df = phone_numbers.groupBy('employee_id').agg( F.count('phone_number').alias('numbers') # 'numbers'가 1보다 큰 경우만 필터링합니다. 즉, 전화번호가 여러 개인 직원만 선택합니다. ).where('numbers' > 1) # 'numbers'를 기준으로 내림차순 정렬합니다. 즉, 전화번호를 가장 많이 가진 직원이 먼저 나옵니다. .sort(F.col('numbers').desc()) # 정렬된 DataFrame의 상위 3개 행을 출력합니다. print(df.take(3))
[Row(employee_id=70, numbers=4), Row(employee_id=90, numbers=2), Row(employee_id=25, numbers=2)]
이 코드는 파이썬에서 Row 객체의 리스트를 생성하는 코드입니다.
employee_id
는 직원의 아이디를 나타냅니다.numbers
는 어떤 수치를 나타내는 값입니다. (이 값이 무엇을 의미하는지는 코드의 컨텍스트에 따라 달라집니다.)따라서 위 코드는 아래와 같이 번역될 수 있습니다.
[행(직원_아이디=70, 숫자=4), 행(직원_아이디=90, 숫자=2), 행(직원_아이디=25, 숫자=2)]
주의: Row
와 employee_id
, numbers
는 파이썬의 키워드나 변수 이름이므로 번역하지 않습니다.