데이터 통합PythonPySpark Reference로깅

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

로깅

Foundry의 PySpark에서 다양한 디버깅 정보를 결과물로 낼 수 있습니다.

Code Workbook

파이썬의 내장된 print는 코드 에디터 오른쪽에 있는 Code Workbook의 결과물 섹션으로 파이프되어, 일반적으로 오류가 나타나는 곳입니다.

Copied!
1 2 def new_dataset(some_input_dataset): print("example log output") # "example log output"을 출력합니다.

이 함수는 주어진 데이터셋(some_input_dataset)에 대한 작업을 수행하는데, 여기에는 실제 작업이 포함되지 않았습니다. 그저 example log output이라는 메시지를 출력하는 것만 있습니다. 이 함수는 더 많은 기능이 추가되어야 할 템플릿일 수 있습니다.

예시 로그 출력

Code Repositories

Code Repositories는 Python의 내장된 logging library를 사용합니다. 이는 널리 온라인에 문서화되어 있으며 로그 레벨 (ERROR, WARNING, INFO)을 제어하여 필터링을 더 쉽게 할 수 있게 해줍니다.

로그 출력은 결과물 데이터셋의 로그 파일과 빌드의 드라이버 로그(데이터셋 -> 상세 -> 파일 -> 로그 파일, 그리고 빌드 -> 빌드 -> 작업 상태 로그; 각각 "드라이버 로그"를 선택하세요)에 모두 나타납니다.

Copied!
1 2 3 4 5 6 7 8 9 10 import logging logger = logging.getLogger(__name__) @transform_df( ... ) def some_transformation(some_input): # 예시 로그 출력 logger.info("example log output")
INFO [2018-01-01T12:00:00] some_transformation: 예시 로그 출력

Python UDF 내부에서 로깅하기

Spark는 쿼리를 생성하는 최상위 드라이버 프로세스, 예를 들어 위의 some_transformation 함수에서 로깅 출력을 캡처합니다. 그러나 사용자 정의 함수 (UDFs) 내부에서 작성된 로그는 캡처하지 않습니다. PySpark 쿼리 내에서 UDF를 사용하고 데이터를 로그에 기록해야 하는 경우, 캡처하려는 데이터를 반환하는 두 번째 UDF를 생성하고 호출하십시오.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @transform_df( ... ) def some_transformation(some_input): logger.info("log output related to the overall query") # 전체 쿼리와 관련된 로그 출력 @F.udf("integer") def custom_function(integer_input): return integer_input + 5 # 입력된 정수에 5를 더하여 반환 @F.udf("string") def custom_log(integer_input): return "Original integer was %d before adding 5" % integer_input # 5를 더하기 전 원래 정수는 %d였습니다. df = ( some_input .withColumn("new_integer", custom_function(F.col("example_integer_col")) .withColumn("debugging", custom_log(F.col("example_integer_col")) )

예시

쿼리에서 무슨 일이 일어나는지에 대한 정보를 로그에 기록하고 싶을 때가 많습니다. PySpark에는 DataFrame을 내부 조사하는 여러 가지 방법이 있으며, 이 정보를 위에서 설명한 로깅 메커니즘으로 전송할 수 있습니다.

이 예시에서는 Code Workbook의 print 구문을 사용하지만, Transforms & Authoring의 logger로 대체할 수 있습니다.

DataFrame 열

df.columns를 사용하면 DataFrame에 존재하는 열을 내부 조사할 수 있습니다. 이것은 문자열 목록을 생성합니다.

Copied!
1 2 3 4 5 6 7 8 9 def employee_phone_numbers(employee_df, phone_number_df): # 직원 데이터프레임의 칼럼을 출력합니다. print("employee columns are {}".format(employee_df.columns)) # 전화번호 데이터프레임의 칼럼을 출력합니다. print("phone columns are {}".format(phone_df.columns)) # 직원 데이터프레임과 전화번호 데이터프레임을 'employee_id'를 기준으로 왼쪽 조인합니다. df = employee_df.join(phone_number_df, 'employee_id', 'left') # 조인한 데이터프레임의 칼럼을 출력합니다. print("joined columns are {}".format(df.columns))
employee 컬럼은 ['name', 'employee_id']
phone 컬럼은 ['phone_number', 'employee_id']
joined 컬럼은 ['name', 'employee_id', 'phone_number']

조인 동작 확인

예를 들어 왼쪽 조인을 수행하고, 일대일 관계를 기대하며, 왼쪽 DataFrame의 행 수가 동일한지 확인하려고 합니다.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def employee_phone_numbers(employee_df, phone_number_df): # employee_df 데이터프레임의 원래 행 수를 세어 original_employee_rows 변수에 저장합니다. original_employee_rows = employee_df.count() # 원래 행 수를 출력합니다. print("Incoming employee rows {}".format(original_employee_rows)) # employee_df와 phone_number_df를 'employee_id'를 기준으로 왼쪽 조인합니다. # 결과를 df라는 새로운 데이터프레임에 저장합니다. df = employee_df.join(phone_number_df, 'employee_id', 'left') # 조인 후의 행 수를 세어 rows_after_join 변수에 저장합니다. rows_after_join = df.count() # 최종 행 수를 출력합니다. print("Final employee rows {}".format(rows_after_join)) # 조인 후의 행 수가 원래 행 수보다 크다면, # 즉, 어떤 직원이 여러 개의 전화번호를 가지고 있다면 아래 메시지를 출력합니다. if rows_after_join > original_employee_rows: print("Some employees have multiple phone numbers!") # 그렇지 않다면, 데이터가 올바르다는 메시지를 출력합니다. else: print("Data is correct")
들어오는 직원 행 100
최종 직원 행 105
일부 직원들은 여러 개의 전화번호를 가지고 있습니다!

스파크 쿼리 계획

주어진 DataFrame을 생성하기 위해 스파크가 실행할 최적화된 물리 계획에 접근하려면 .explain()을 호출하십시오.

Copied!
1 2 3 4 5 6 7 def employee_phone_numbers(employee, phone): # 직원의 생일이 현재 달과 같은 달인 직원들을 필터링합니다. employee = employee.where(F.month(employee.birthday) == F.month(F.current_date())) # 직원 데이터프레임과 전화번호 데이터프레임을 employee_id를 기준으로 결합합니다. df = employee.join(phone, 'employee_id', 'left') # 최적화된 실행 계획을 출력합니다. df.explain()
== Physical Plan ==
*(2) Project [employee_id#9734, name#9732, birthday#9733, phone_number#9728]
+- *(2) BroadcastHashJoin [employee_id#9734], [employee_id#9729], LeftOuter, BuildRight
   // 왼쪽 테이블 birthday의 월이 10월인 직원들 필터링
   :- *(2) Filter (month(birthday#9733) = 10)
   :  +- *(2) FileScan parquet !ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36:ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36@00000000-1ebd-4a81-9f64-2d4c8a8472bc:master.ri.foundry.main.dataset.6ad20cd7-45b0-4312-b096-05f57487f650[name#9732,birthday#9733,employee_id#9734] Batched: true, Format: Parquet, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,birthday:date,employee_id:int>
   // 오른쪽 테이블에서 employee_id가 null이 아닌 데이터 필터링
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
      +- *(1) Project [phone_number#9728, employee_id#9729]
         +- *(1) Filter isnotnull(employee_id#9729)
            +- *(1) FileScan csv !ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db:ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db@00000000-1ebc-f483-b75d-dbcc3292d9e4:master.ri.foundry.main.dataset.f5bf4c77-37c0-4e29-8a68-814c35442bbd[phone_number#9728,employee_id#9729] Batched: false, Format: CSV, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<phone_number:int,employee_id:int>

데이터 보기

예를 들어, 어떤 직원이 가장 많은 전화번호를 가지고 있는지 확인하려고 합니다. 관심 있는 데이터셋(전화번호가 1개 이상인 직원)을 파생시키고 .take(3)을 호출하여 상위 3개 행을 리스트로 검색합니다. 또한 .collect()는 DataFrame의 모든 행을 리스트로 검색합니다.

경고

Python 환경에서 너무 많은 데이터를 가져오면 메모리 부족이 쉽게 발생할 수 있습니다. 작은 양의 데이터만 collect()하세요.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 # 함수 정의: multiple_numbers def multiple_numbers(phone_numbers): # 'employee_id'를 기준으로 그룹화하고, 각 그룹에서 'phone_number'의 개수를 세어 'numbers'라는 별칭으로 지정합니다. df = phone_numbers.groupBy('employee_id').agg( F.count('phone_number').alias('numbers') # 'numbers'가 1보다 큰 경우만 필터링합니다. 즉, 전화번호가 여러 개인 직원만 선택합니다. ).where('numbers' > 1) # 'numbers'를 기준으로 내림차순 정렬합니다. 즉, 전화번호를 가장 많이 가진 직원이 먼저 나옵니다. .sort(F.col('numbers').desc()) # 정렬된 DataFrame의 상위 3개 행을 출력합니다. print(df.take(3))
[Row(employee_id=70, numbers=4), Row(employee_id=90, numbers=2), Row(employee_id=25, numbers=2)]

이 코드는 파이썬에서 Row 객체의 리스트를 생성하는 코드입니다.

  • employee_id는 직원의 아이디를 나타냅니다.
  • numbers는 어떤 수치를 나타내는 값입니다. (이 값이 무엇을 의미하는지는 코드의 컨텍스트에 따라 달라집니다.)

따라서 위 코드는 아래와 같이 번역될 수 있습니다.

[행(직원_아이디=70, 숫자=4), 행(직원_아이디=90, 숫자=2), 행(직원_아이디=25, 숫자=2)]

주의: Rowemployee_id, numbers는 파이썬의 키워드나 변수 이름이므로 번역하지 않습니다.