이 섹션에는 점진적으로 계산 가능한 변환의 다양한 예시가 포함되어 있습니다:
이 예시들은 점진적 계산을 보여주기 위해 두 가지 입력값을 사용합니다: students
와 students_updated
. students
입력값에는 3명의 학생이 포함되어 있으며 점진적이지 않습니다. 이는 히스토리가 없다는 것을 의미합니다:
>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # 입력에 대한 기본 읽기 모드는 'added'입니다
>>> students.dataframe('added') is students.dataframe()
True
students_updated
입력값은 students
와 동일하지만 세 명의 추가 학생이 포함된 업데이트가 추가되어 있습니다. 이 업데이트로 인해 입력값이 점진적으로 변합니다. 따라서, 비어 있지 않은 previous
DataFrame이 있습니다.
>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
# 'previous' 데이터프레임을 'id'로 정렬하고 보여줍니다.
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
# 'current' 데이터프레임을 'id'로 정렬하고 보여줍니다.
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
# 'added' 데이터프레임을 'id'로 정렬하고 보여줍니다.
>>> # Recall that the default read mode for inputs is 'added'
>>> students_updated.dataframe('added') is students_updated.dataframe()
True
>>>
# 입력값의 기본 읽기 모드가 'added'임을 기억하세요.
# 따라서 students_updated.dataframe('added')는 students_updated.dataframe()와 같습니다.
추가 전용 점진적 계산은 추가된 입력 행만을 기반으로 한 출력 행을 추가하는 것입니다. 이는 변환을 통해 출력을 계산하기 위해 다음과 같은 작업을 수행한다는 것을 의미합니다.
열 유형 변경, 문자열로 날짜 서식 지정, 필터링은 모두 추가 전용 계산의 예입니다. 이 예시에서 각 추가 입력 행은 변환되거나 삭제되어 출력 행을 생성합니다.
점진적 변환을 추가 전용으로 만들기 위한 차이점은 incremental()
데코레이션뿐입니다.
점진적으로 실행할 때, 기본 읽기 모드인 added
는 변환에서 새로운 학생들만 읽는 것을 의미하며, 기본 쓰기 모드인 modify
는 필터링된 새로운 학생들만 출력에 추가하는 것을 의미합니다.
비점진적으로 실행할 때, 기본 읽기 모드인 added
는 변환에서 전체 입력을 읽는 것을 의미하며, 기본 쓰기 모드인 replace
는 필터링된 학생 전체 집합으로 출력을 교체하는 것을 의미합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from transforms.api import transform, incremental, Input, Output # 누적 변환 실행 @incremental() # 변환 함수 정의: 인풋 데이터는 students, 아웃풋 데이터는 processed @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) # incremental_filter 함수 정의: students와 processed를 인자로 받음 def incremental_filter(students, processed): # students 데이터프레임을 new_students_df에 저장 new_students_df = students.dataframe() # processed에 머리색이 갈색인 학생들만 필터링한 데이터프레임을 저장 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') )
때때로 변환은 점진적으로 업데이트를 계산하기 위해 이전 결과물을 참조해야 합니다. 이의 예로는 distinct()
메소드가 있습니다.
변환에서 중복 행을 제거하려면 (현재 결과물이 올바르다고 가정했을 때), 변환은 입력에서 새로운 행을 중복 제거해야 하며, 그 후에 그 행들이 이미 결과물에 존재하지 않는지 확인해야 합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
@incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_distinct(students, processed): # 새로운 학생 데이터프레임 생성 new_students_df = students.dataframe() # 새로운 학생 데이터프레임에서 중복 제거한 후 이전 데이터프레임과 차집합을 구함 processed.write_dataframe( new_students_df.distinct().subtract( processed.dataframe('previous', schema=new_students_df.schema) ) )
여기서는 결과물 데이터셋의 previous
읽기 모드를 사용합니다. 이것은 마지막 빌드 중 출력된 DataFrame
을 반환합니다. previous
결과물이 없을 수도 있기 때문에, 올바르게 구성된 빈 DataFrame을 생성하기 위해 dataframe('previous')
호출에 스키마를 제공해야 합니다.
이 코드를 실행하면, 결과물 데이터셋의 스키마가 데이터에서 자동으로 추론됩니다. 이에는 열의 이름, 유형, "nullability"(참조 StructField
), 그리고 열의 순서를 자동으로 감지하는 것이 포함됩니다. 빌드의 신뢰성을 보장하기 위해, Spark 추론에 의존하는 대신 데이터프레임의 예상 스키마를 하드코딩하는 것이 모범 사례입니다.
전체 출력을 항상 교체하는 일부 변환들이 있습니다. 그러나 종종 이러한 변환은 점진적 계산의 이점을 얻을 수 있습니다. 그러한 한 가지 예는 통계 집계입니다. 예를 들어, 열에서 각 고유 값이 발생하는 횟수를 세는 것입니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
from pyspark.sql import functions as F @incremental() @transform( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_group_by(students, processed): # 새로운 학생들에 대해서만 머리 색깔 수를 계산합니다. new_hair_counts_df = students.dataframe().groupBy('hair').count() # 이전의 수와 합쳐줍니다. out_schema = new_hair_counts_df.schema all_counts_df = new_hair_counts_df.union( processed.dataframe('previous', schema=out_schema) ) # 머리 색깔별로 그룹화하고, 두 개의 카운트 세트를 합산합니다. totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count')) # 출력을 완전히 대체하려면 출력 모드를 'replace'로 설정합니다. # 출력 모드를 변경하기 전에 총계 데이터 프레임을 체크포인트합니다. totals_df.localCheckpoint(eager=True) processed.set_mode('replace') processed.write_dataframe(totals_df.select(out_schema.fieldNames()))
다시 말해, previous
출력이 없을 수 있으므로, 비어 있는 DataFrame을 올바르게 구성할 수 있도록 dataframe('previous')
호출에 스키마를 제공해야 합니다.
어떤 경우에는 스키마가 변경될 수 있는 입력을 사용하여 데이터셋을 점진적으로 업데이트해야 합니다. 예를 들어, 입력이 시간이 지남에 따라 열이 추가되거나 제거될 수 있는 소스 테이블에서 나올 때입니다. 이 경우 이전 출력을 이전 스키마로 읽은 다음 새 스키마와 수동으로 조정해야 합니다.
SNAPSHOT
모드에서 변환 작업이 어떻게 동작해야 하는지 정의해야 합니다. 이 경우, 이전 출력이 없습니다. 이 경우, processed.dataframe('current')
호출이 실패합니다.
아래 예제는 새로운 students
데이터셋의 스키마가 다르더라도 기존 students_processed
데이터셋에 새로운 students
를 추가하는 방법을 보여줍니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
from pyspark.sql import functions as F @incremental( snapshot_inputs=['students'] ) @transform( students=Input('/examples/students_raw'), # 이 데이터셋의 스키마는 변경될 수 있습니다. 이것은 스냅샷 트랜잭션이어야 한다는 것을 의미하므로, "snapshot_inputs" 파라미터에 포함시킵니다. processed=Output('/examples/students_processed') ) def incremental_group_by(students, processed, ctx): # 증분이 아닌 경우를 독립적으로 처리해야 함 if not ctx.is_incremental: # ... return # 이전 스키마와 함께 이전 처리된 데이터프레임을 읽음 # 아직 처리되지 않은 상태이므로, 'current'는 이전 트랜잭션을 제공함 # 주의: 여기서 'processed'의 쓰기 모드가 아직 'modify'인 것이 중요함 (아래 경고 참조) students_previous = processed.dataframe('current').localCheckpoint() # 이전 및 새 데이터프레임 병합, 누락된 열을 null로 설정 students_all = students_previous.unionByName(students.dataframe(), allowMissingColumns=True) # 새로운 결합된 데이터프레임을 작성하여 출력 processed.set_mode('replace') processed.write_dataframe(students_all)
.dataframe()
호출에 대한 읽기 모드 평가는 지연된(lazy, call-by-need) 것으로, 값이 필요할 때까지 평가가 지연됩니다. 데이터프레임의 읽기 결과는 write_dataframe
호출 중 데이터셋의 쓰기 모드에 따라 평가되며, 이전 쓰기 모드는 무시됩니다. .localCheckpoint(eager=True)
를 호출하면 해당 시점에서 데이터를 읽고 출력 쓰기 모드를 평가하고, 다시 계산하지 않습니다.
고객이 제출한 Orders
테이블과 완료된 Deliveries
테이블 두 개가 있다고 가정합시다. 그리고 우리는 배송에 걸리는 시간을 나타내는 DeliveryDuration
테이블을 계산하려고 합니다. Orders
와 Deliveries
테이블에는 새로운 행만 추가되고 행이 수정되지 않을지라도, 두 점진적 데이터셋 사이의 간단한 조인은 작동하지 않습니다. 예를 들어, Orders
테이블에는 아직 Deliveries
테이블에 없는 orderIds
가 포함될 수 있습니다.
Orders: Deliveries:
+---------+---------------+ +---------+--------------+ +---------+------------------+
| orderId | submittedDate | | orderId | deliveryDate | | orderId | deliveryDuration |
+---------+---------------+ +---------+--------------+ ----> +---------+------------------+
| 1001 | 2019-08-21 | join on | 1001 | 2019-08-23 | | 1001 | 2 | // 주문 1001의 배송 기간은 2일입니다.
+---------+---------------+ orderId +---------+--------------+ +---------+------------------+
| 1002 | 2019-08-22 |
+---------+---------------+
| 1003 | 2019-08-23 |
+---------+---------------+
orderId
가 Orders
와 Deliveries
테이블에서 모두 엄격하게 증가한다고 가정하면, 우리는 deliveryDuration
을 계산한 마지막 orderId
(maxComputedOrderId
)가 무엇인지 확인하고, orderId
가 maxComputedOrderId
보다 큰 Orders
와 Deliveries
테이블의 행만 얻을 수 있습니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
from transforms.api import transform, Input, Output, incremental from pyspark.sql import types as T from pyspark.sql import functions as F @incremental(snapshot_inputs=['orders', 'deliveries']) @transform( orders=Input('/example/Orders'), deliveries=Input('/example/Deliveries'), delivery_duration=Output('/example/New_Delivery_Date') ) def compute_delivery_duration(orders, deliveries, delivery_duration): def to_fields(datatype, names, nullable=True): return [T.StructField(n, datatype, nullable) for n in names] # 배달 기간을 전달하기 위해 스키마 생성 fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration']) # 이전 버전의 스키마를 참조할 수 없으므로 스키마를 명시적으로 정의합니다 maxComputedOrderId = ( delivery_duration .dataframe('previous', schema=T.StructType(fields)) .groupby() .max('orderId') .collect()[0][0] ) # 첫 번째 반복에서는 delivery_duration 데이터 집합이 아직 존재하지 않으므로 maxComputedOrderId가 비어 있습니다 if maxComputedOrderId == None: maxComputedOrderId = 0 ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId) deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId) # 새로운 배달 기간을 계산합니다 newDurations = ( ordersNotProcessed .join(deliveriesNotProcessed, 'orderId', how='left') .withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate'))) .drop(*['submittedDate', 'deliveryDate']) ) delivery_duration.write_dataframe(newDurations)
점진적 데이터셋에 새로운 열을 추가하려고 한다고 가정해 봅시다. 출력에 다른 열을 추가하더라도 is_incremental
플래그는 무효화되지 않으므로, 다음 실행은 새로운 행을 계산하고 새로운 열과 함께 데이터를 작성하며 이 열은 이전에 작성된 모든 행에서 null이 될 것입니다.
그러나 이전 행에 대해서도 열을 채우려고 할 수 있습니다. 변환의 semantic_version
을 증가시키면 한 번 비증가적으로 실행되고, "추가된" 읽기 모드를 사용하고 있다면 입력에는 모든 데이터가 포함되어 새 열을 다시 계산하고 추가할 수 있습니다.
변환에서 스냅샷 입력에서의 역사적 데이터셋 생성을 수행하였다면, 이전 데이터는 입력의 스냅샷 트랜잭션 스택에 있기 때문에 약간 복잡해집니다. 이 경우, Palantir 대표에게 문의하십시오.
이 예에서는 새 열을 추가하는 방법에 대해 논의하였지만, 위의 논리는 모든 종류의 로직 변경에 적용됩니다.
새로운 브랜치를 생성하고 빌드를 실행하면, 빌드는 점진적으로 실행됩니다. 단순히 원본 브랜치를 기반으로 브랜치를 생성하면 마지막 트랜잭션이 새 브랜치에서 첫 빌드를 위한 이전 트랜잭션으로 간주됩니다.
데이터를 점진적으로 처리하는 방법을 살펴보았습니다:
또한 다음을 살펴보았습니다:
점진적 오류를 이해하려면 트랜잭션 및 데이터셋 뷰 개념을 읽는 것이 더 쉽고 때로는 필요합니다.
작업이 점진적으로 실행될 때, 점진적 입력 데이터셋은 전체 데이터셋 뷰가 아닌 처리되지 않은 트랜잭션 범위만 포함합니다.
데이터셋의 다음과 같은 트랜잭션 이력을 상상해 보십시오:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
|
마지막으로 처리된 트랜잭션
데이터셋이 마지막으로 빌드된 시점에서 가장 최근의 트랜잭션은 (3)이었습니다. 그 이후로 트랜잭션 (4)와 (5)가 커밋되었으므로 처리되지 않은 트랜잭션 범위는 (4) — (5)입니다.
데이터셋의 보기는 트랜잭션 범위 (1) — (5)입니다. 보기의 "맨 위"에 있는 트랜잭션(가장 최근의 트랜잭션)은 때때로 브랜치의 HEAD라고도 합니다(git에 비유해서). git과 마찬가지로 브랜치는 트랜잭션에 대한 포인터이므로 브랜치가 트랜잭션 (5)를 가리킨다고 말합니다. 여러 브랜치가 여러 트랜잭션을 가리킬 수 있으며, 브랜치는 트랜잭션 이력을 공유할 수 있습니다:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5) [develop] # 스냅샷(1)에서 업데이트(2), (3), (4), (5) 순으로 진행되며 develop 브랜치입니다.
|
└─> UPDATE [feature-branch] # 업데이트가 진행되는 feature-branch 브랜치입니다.
Catalog:TransactionsNotInView
작업이 점진적으로 실행되기 위해, 작업 시작 시 일련의 검사가 실행됩니다. 이 검사 중 하나는 처리되지 않은 트랜잭션 범위가 엄격히 점진적인지 확인합니다(즉, 추가 전용 파일 변경사항, 점진적 계산에 대한 요구사항 참조). 이는 처리되지 않은 트랜잭션 범위의 파일과 처리된 트랜잭션 범위의 파일을 비교함으로써 이루어집니다.
그러나 브랜치의 HEAD가 이동된 경우, 점진적 작업이 이제 일관성 없는 상태에 있습니다: 더 이상 두 범위를 비교하는 것은 의미가 없으므로 오류 Catalog:TransactionNotInView
가 발생합니다.
아래에 이 오류가 발생하는 방법에 대한 다이어그램이 있습니다:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)
| (마지막 처리된 (브랜치의 이전
| 트랜잭션) HEAD, 지금은 고아)
|
└─> UPDATE (6) --> UPDATE (7, 브랜치의 현재 HEAD)
여기서 처리된 트랜잭션 범위는 (1) — (5)이고, 현재 브랜치의 HEAD는 (7)을 가리키며, 현재 뷰는 트랜잭션 (1), (2), (6), (7)로 구성되어 있습니다.
이것은 처리된 모든 트랜잭션이 브랜치의 HEAD의 상류에 없기 때문에 일관성이 없는 상태입니다. 실제로 (3)은 그렇지 않습니다. 다시 말해서, 이전 HEAD (3)는 더 이상 현재 뷰의 일부가 아니기 때문에 Catalog:TransactionNotInView
가 발생합니다.
Catalog:InconsistentBranchesOrder
던질 수 있는 다른 카탈로그 오류는 Catalog:InconsistentBranchesOrder
로, 마지막으로 처리된 트랜잭션 (prevTransaction
)이 브랜치 HEAD (endTransaction
)보다 클 때 발생합니다. 이것은 데이터셋의 HEAD가 이전 트랜잭션보다 이전의 트랜잭션으로 이동하게 될 경우 발생할 수 있습니다.
아래 다이어그램에서 이 오류가 발생하는 방법을 확인하세요:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
| |
현재 HEAD 마지막으로 처리된 트랜잭션
브랜치의 HEAD는 두 가지 이유로 변경될 수 있습니다:
SNAPSHOT
이므로 일관성 없는 상태로 이어질 수 없습니다.이를 수정하려면 다음 중 하나를 수행해야 합니다:
updateBranch2
엔드포인트를 사용해야 합니다. 단, 이 엔드포인트는 경험이 많은 사용자만 사용할 것을 권장합니다.