PySpark는 Apache Spark 백엔드와 인터페이스를 제공하는 래퍼 언어로, 데이터를 빠르게 처리할 수 있습니다. Spark는 매우 큰 데이터셋을 분산 서버 네트워크에서 작동할 수 있으며, 올바르게 사용할 경우 성능과 신뢰성 측면에서 큰 이점을 제공합니다. 그러나 PySpark 구문은 Spark의 JVM 유산을 기반으로 하므로, 경험이 풍부한 Python 개발자조차도 익숙하지 않은 코드 패턴을 구현할 수 있습니다.
이 PySpark 코드 스타일에 대한 의견이 많은 가이드에서는 우리가 마주한 PySpark 저장소에 걸쳐 가장 자주 반복되는 주제를 기반으로 일반적인 상황과 관련된 모범 사례를 제시합니다.
일관된 코드 스타일을 유지하기 위해, 각 주요 저장소에는 동일한 구성으로 Pylint가 활성화되어야 합니다. 이 문서에 나열된 규칙과 일치하도록 PySpark 전용 체커를 Pylint에 추가로 포함할 수 있습니다. Python 저장소용 내장 Pylint 플러그인에 대한 자세한 내용은 스타일 검사 활성화에 관한 문서를 참고하십시오.
PySpark 특정 사항 외에도, PySpark 저장소에서 깔끔한 코드의 일반적인 관례가 중요합니다. Google PyGuide는 좋은 출발점입니다.
Copied!1 2 3 4 5 6 7 8
# 나쁜 예 df = df.select(F.lower(df1.colA), F.upper(df2.colB)) # df1과 df2는 이 코드 내에서 정의되지 않았습니다. 이렇게 코드를 작성하면 오류가 발생합니다. # 좋은 예 df = df.select(F.lower(F.col("colA")), F.upper(F.col("colB"))) # 이 코드는 "colA" 열의 값을 소문자로, "colB" 열의 값을 대문자로 변경하여 선택합니다. # F.col 함수는 DataFrame 내의 특정 열을 참조하는 데 사용됩니다.
선호하는 옵션이 더 복잡하고 길며 지저분해 보일 수 있습니다. 사실 이는 맞으며, 가능하다면 F.col()을 전혀 사용하지 않는 것이 최선입니다. 그러나 이를 사용하거나 대체 선택지를 사용하지 않을 수 없는 경우가 있습니다. 그러나 첫 번째 예시보다 두 번째 예시를 선호하는 매우 좋은 이유가 있습니다.
첫 번째 경우처럼 명시적인 열을 사용할 때, 데이터프레임 이름과 스키마는 데이터프레임 변수에 명시적으로 바인딩됩니다. 이는 df1
이 삭제되거나 이름이 변경되면 참조 df1.colA
이 중단됨을 의미합니다.
반면, F.col("colA")
는 항상 작업 중인 데이터프레임에서 "colA"라는 열을 참조하게 됩니다. 이 경우에는 df
라는 이름의 데이터프레임입니다. 다른 데이터프레임의 상태를 전혀 추적할 필요가 없으므로 코드가 더 지역화되고 "거리에서의 괴상한 상호작용"에 덜 취약해지며, 이는 디버깅하기 어렵습니다.
첫 번째 경우를 피해야 하는 다른 좋은 이유들:
df1["colA"]
는 F.col(“colA”)
만큼 쓰기 어렵습니다;F.col("prod_status") == 'Delivered'
와 같은 추상 표현식을 변수에 할당하면 여러 데이터프레임에서 재사용 가능하며, df.prod_status == 'Delivered'
는 항상 df에 바인딩됩니다.다행히도, 보통 F.col()
과 함께 복잡한 표현식이 필요하지는 않습니다. 유일한 예외는 F.lower
, F.upper
, ... 그리고 이들입니다.
일부 상황에서는 하나 이상의 데이터프레임에서 열에 접근할 수 있으며, 이름에 중복이 있을 수 있습니다. 일반적인 예는 df.join(df2, on=(df.key == df2.key), how='left'
와 같은 일치 표현식입니다. 이런 경우에는 직접 데이터프레임의 열을 참조하는 것이 괜찮습니다. 데이터프레임 별칭을 사용하여 조인을 분명하게 할 수도 있습니다 (이 가이드의 조인 섹션에서 더 많은 정보를 확인할 수 있습니다).
일반적으로, for 루프는 Spark에서 비효율적입니다. 하이레벨에서 이는 Spark가 게으르게 평가되며 한 번에 하나의 for 루프만 처리하기 때문입니다. 이로 인해 루프의 모든 부분이 한 번에 처리될 수 있다면 실행 시간이 느려질 수 있으며, 드라이버 메모리 부족 오류 (OOM)가 발생할 수 있습니다. 데이터셋의 모든 열 이름을 대문자에서 소문자로 변경하기 위해, 아래의 첫 번째 예시 (# 나쁜
으로 표시) 대신에 리스트 컴프리헨션을 사용하는 것을 권장합니다 (두 번째 예시 # 좋음
에서와 같이):
Copied!1 2 3 4 5 6
# 나쁜 예 for colm in df.columns: df = df.withColumnRenamed(colm, colm.lower()) # 위 코드는 각 열의 이름을 소문자로 바꾸는 코드입니다. # 그러나 이 코드는 비효율적입니다. 왜냐하면 각 열 이름을 바꿀 때마다 전체 데이터프레임을 복사하기 때문입니다. # 따라서, 이 코드는 메모리를 많이 사용하고, 실행 시간이 오래 걸릴 수 있습니다.
Copied!1 2 3 4
# 좋음 df = df.select( *[F.col(colm).alias(colm.lower()) for colm in df.columns] # df.columns의 모든 열 이름을 소문자로 변경하여 다시 할당합니다. )
# good
예시처럼 리스트 컴프리헨션을 사용하면 위에서 언급한 느린 성능 및 쿼리 계획 문제를 피하면서 원하는 결과를 얻을 수 있습니다.
논리 연산은 주로 .filter()
또는 F.when()
내부에 위치하며, 읽기 쉬워야 합니다. 함수 체이닝과 동일한 규칙을 적용하여 동일한 코드 블록 내의 논리 표현식을 최대 3개의 표현식으로 유지합니다. 그 이상으로 길어지면 코드를 단순화하거나 추출할 수 있는 신호입니다. 복잡한 논리 연산을 변수 또는 함수로 추출하면 코드를 읽고 이해하기 쉬워지며 버그도 줄어듭니다.
Copied!1 2 3 4 5 6 7 8 9
# 나쁜 예시 # F.when() 함수는 조건문을 설정할 때 사용됩니다. # 아래의 코드는 prod_status가 'Delivered'인 경우 또는 # 실제 배달 날짜와 현재 날짜의 차이가 0보다 작고, 현재 등록 상태가 어떤 문자열에도 일치하거나 # 실제 배달 날짜와 현재 날짜의 차이가 0보다 작고, 원래의 운영자 또는 현재의 운영자가 어떤 문자열에도 일치하는 경우에 # 'In Service'라는 값을 반환합니다. F.when( (df.prod_status == 'Delivered') | (((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & ((df.currentRegistration.rlike('.+')) | ((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')))))), 'In Service')
위의 코드를 여러 가지 방법으로 단순화할 수 있습니다. 우선, 몇 가지 명명된 변수로 로직 단계를 그룹화하는 것에 초점을 맞춰봅시다. Pyspark에서는 표현식이 괄호로 묶여야 합니다. 이것은 실제 괄호로 논리 연산을 그룹화하는 것과 혼합되어 가독성이 떨어질 수 있습니다. 예를 들어, 위의 코드는 (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
과 같은 중복되는 부분이 있지만 원본 작성자는 이를 찾기 어려워 눈치채지 못했습니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# 더 나은 버전 # 원래 운영자 또는 현재 운영자가 있는지 확인 has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')) # 배송 날짜가 지났는지 확인 delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0) # 현재 등록이 있는지 확인 has_registration = (df.currentRegistration.rlike('.+')) # 배송 완료 여부 확인 is_delivered = (df.prod_status == 'Delivered') # 배송이 완료되었거나, 배송 날짜가 지난 경우에 등록이나 운영자가 있는 경우 'In Service'로 표시 F.when(is_delivered | (delivery_date_passed & (has_registration | has_operator)), 'In Service')
위의 예제는 읽기 쉽고 불필요한 표현식도 줄였습니다. 우리는 작업의 수를 줄임으로써 더 개선할 수 있습니다.
Copied!1 2 3 4 5 6 7 8
# 좋음 has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')) # 연산자가 있는지 확인 delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0) # 배송 날짜가 지났는지 확인 has_registration = (df.currentRegistration.rlike('.+')) # 등록이 되어 있는지 확인 is_delivered = (df.prod_status == 'Delivered') # 배송이 완료되었는지 확인 is_active = (has_registration | has_operator) # 활성 상태인지 확인(등록 또는 연산자가 있는 경우) F.when(is_delivered | (delivery_date_passed & is_active), 'In Service') # 배송 완료 또는 활성 상태일 때 'In Service' 반환
F.when
표현식이 이제 간결하고 읽기 쉬우며, 이 코드를 검토하는 사람이 원하는 동작을 명확히 알 수 있습니다. 독자는 오류가 의심될 때만 개별 표현식을 방문하면 됩니다. 또한 코드에 유닛 테스트가 있고 함수로 추상화하려는 경우, 각 로직 덩어리를 쉽게 테스트할 수 있게 합니다.
최종 예시에 여전히 코드의 중복이 있는데, 이 중복을 어떻게 제거할지는 독자에게 남겨두겠습니다.
select
문을 사용하세요PySpark 변환의 시작 부분이나 반환하기 전에 select를 수행하는 것은 좋은 관행입니다. 이 select
문은 입력값과 결과물에 대한 예상 데이터프레임 스키마에 대한 독자와 코드 모두와의 계약을 명시합니다.
어떤 select라도 다음 변환 단계에서 데이터프레임을 소비하기 위한 준비 작업인 청소 작업으로 봐야 합니다.
항상 select 문을 가능한 한 간단하게 유지하려고 노력하세요. 일반적인 SQL 관용구로 인해, 선택된 열 당 spark.sql.function
에서 최대 하나의 함수를 사용하고, 선택적으로 .alias()
를 사용하여 의미 있는 이름을 지정할 수 있습니다. 이는 아껴서 사용해야 하며, 같은 select에서 세 개 이상의 사용이 있다면, clean_<dataframe name>()
과 같은 별도의 함수로 리팩토링하여 작업을 캡슐화해야 합니다.
하나 이상의 데이터프레임을 포함하는 표현식이나 .when()
과 같은 조건 연산이 select에서 사용되는 것을 절대로 허용하지 마세요.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# 나쁜 예 aircraft = aircraft.select( 'aircraft_id', # 항공기 ID 'aircraft_msn', # 항공기 제조 일련 번호 F.col('aircraft_registration').alias('registration'), # 항공기 등록 번호. 'registration'이라는 별칭으로 사용 'aircraft_type', # 항공기 유형 F.avg('staleness').alias('avg_staleness'), # 'staleness'의 평균. 'avg_staleness'라는 별칭으로 사용 F.col('number_of_economy_seats').cast('long'), # 경제석 수. long 타입으로 변환 F.avg('flight_hours').alias('avg_flight_hours'), # 비행 시간의 평균. 'avg_flight_hours'라는 별칭으로 사용 'operator_code', # 운영자 코드 F.col('number_of_business_seats').cast('long'), # 비즈니스석 수. long 타입으로 변환 )
같은 유형의 연산을 함께 묶으십시오. 모든 개별 열은 upfront에 나열되어야 하며, spark.sql.function
에서 함수로의 호출은 별도의 줄에 있어야 합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
# 좋음 aircraft = aircraft.select( 'aircraft_id', 'aircraft_msn', 'aircraft_type', 'operator_code', # 'aircraft_registration' 컬럼을 'registration'이라는 이름으로 별칭(alias) 설정 F.col('aircraft_registration').alias('registration'), # 'number_of_economy_seats' 컬럼을 long 타입으로 캐스팅 F.col('number_of_economy_seats').cast('long'), # 'number_of_business_seats' 컬럼을 long 타입으로 캐스팅 F.col('number_of_business_seats').cast('long'), # 'staleness' 컬럼의 평균을 'avg_staleness'라는 이름으로 별칭(alias) 설정 F.avg('staleness').alias('avg_staleness'), # 'flight_hours' 컬럼의 평균을 'avg_flight_hours'라는 이름으로 별칭(alias) 설정 F.avg('flight_hours').alias('avg_flight_hours'), )
select()
문은 그 자체로 데이터프레임의 스키마를 재정의하므로, 이전의 열과 새로운 열을 포함하거나 제외하는 것, 그리고 기존 열을 재정의하는 것을 자연스럽게 지원합니다. 이러한 모든 작업을 단일 문에서 집중화함으로써 최종 스키마를 파악하기가 훨씬 쉬워지며, 이로 인해 코드의 가독성이 향상됩니다. 또한 코드를 약간 더 간결하게 만듭니다.
withColumnRenamed()
을 호출하는 대신 별칭을 사용하십시오:
Copied!1 2 3 4 5
# 나쁜 예시 df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') # 'comments' 열을 'num_comments'로 이름을 바꾸려면 withColumnRenamed 함수를 사용합니다. # 좋은 예시 df.select('key', F.col('comments').alias('num_comments')) # 'comments' 열을 'num_comments'로 이름을 바꾸려면 alias 함수를 사용하는 것이 더 효율적입니다.
withColumn()
을 사용하여 유형을 재정의하는 대신, select에서 캐스트:
Copied!1 2 3 4 5 6 7
# 나쁜 예시 df.select('comments').withColumn('comments', F.col('comments').cast('double')) # 위 코드는 'comments' 열을 선택하고, 다시 같은 'comments' 열에 대하여 double 형식으로 변환하는 불필요한 작업을 수행합니다. # 좋은 예시 df.select(F.col('comments').cast('double')) # 이 코드는 'comments' 열을 선택하면서 바로 double 형식으로 변환합니다. 이는 불필요한 작업을 줄이고 코드를 간결하게 만들어 줍니다.
하지만 간단하게 유지하세요:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# 나쁜 예 df.select( # 'closed_at' 컬럼의 유닉스 타임스탬프와 현재 시간의 유닉스 타임스탬프를 코알리스 함수를 이용해 결합하고, # 'created_at' 컬럼의 유닉스 타임스탬프를 빼고, 결과를 86400으로 나누어 'days_open'이라는 별칭으로 선택합니다. ((F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400).alias('days_open') ) # 좋은 예 df.withColumn( 'days_open', # 'closed_at' 컬럼의 유닉스 타임스탬프와 현재 시간의 유닉스 타임스탬프를 코알리스 함수를 이용해 결합하고, # 'created_at' 컬럼의 유닉스 타임스탬프를 빼고, 결과를 86400으로 나눈 값을 'days_open'이라는 새로운 컬럼으로 추가합니다. (F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400 )
select 문에서 사용되지 않을 열을 포함하는 대신 명시적으로 열 집합을 선택하는 것이 좋습니다. 이것은 스키마 변형으로 인해 예기치 않은 열이 데이터프레임을 부풀리는 것을 보장하기 때문에 .drop()
을 사용하는 것보다 선호되는 대안입니다. 그렇다고 해서 모든 경우에 열을 삭제하는 것이 근본적으로 권장되지 않는 것은 아닙니다. 예를 들어 조인 후에는 일반적으로 중복된 열이 생겨나기 때문에 열을 삭제하는 것이 적절할 수 있습니다.
마지막으로, select 문을 사용하여 새 열을 추가하는 대신 .withColumn()
을 사용하는 것이 좋습니다.
스키마를 만족시키기 위해 빈 열을 추가해야 하는 경우 항상 F.lit(None)
을 사용하여 해당 열을 채우십시오. 빈 문자열이나 빈 값을 나타내는 다른 문자열(예: NA
)을 사용하지 마십시오.
의미론적으로 올바를 뿐만 아니라 실용적인 이유도 있습니다. 예를 들어, isNull
과 같은 유틸리티를 사용할 수 있는 기능을 유지하는 대신 빈 문자열, 널 및 'NA'
등을 확인해야 합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# 나쁜 예 df = df.withColumn('foo', F.lit('')) # 이 코드는 'foo'라는 새로운 컬럼을 추가하고, 그 값으로 빈 문자열('')을 할당합니다. # 하지만 이렇게 빈 문자열을 사용하면, 나중에 데이터를 분석할 때 값이 없는 것인지, 실제로 빈 문자열이 값인지 구별하기 어렵습니다. # 나쁜 예 df = df.withColumn('foo', F.lit('NA')) # 이 코드는 'foo'라는 새로운 컬럼을 추가하고, 그 값으로 'NA'라는 문자열을 할당합니다. # 이 경우도 마찬가지로, 'NA'가 실제로 의미하는 값인지, 아니면 값이 없음을 나타내는 표시인지 구별하기 어렵습니다. # 좋은 예 - `None`은 타입이 없으므로 적절한 타입으로 캐스팅이 필요합니다. 예상되는 사용 방식에 따라 적절한 타입을 선택하세요. df = df.withColumn('foo', F.lit(None).cast('string')) # 이 코드는 'foo'라는 새로운 컬럼을 추가하고, 그 값으로 None을 할당한 후 문자열 타입으로 캐스팅합니다. # 이렇게 하면 나중에 데이터를 분석할 때, 값이 없음을 명확하게 알 수 있습니다.
주석은 코드에 유용한 통찰력을 제공할 수 있지만, 코드의 가독성을 높이기 위해 리팩터링하는 것이 종종 더 가치있는 작업입니다. 코드는 자체로 읽을 수 있어야 합니다. 단계별로 로직을 설명하기 위해 주석을 사용하는 경우, 리팩터링해야 합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11
# 나쁜 예시 # 타임스탬프 열을 캐스팅합니다. cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType())) # 타임스탬프 열들을 정의합니다. cols = ["start_date", "delivery_date"] # 각각의 열들에 대해 타임스탬프 형식으로 변환합니다. for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
위의 예에서, 해당 열이 타임스탬프로 캐스팅되고 있는 것을 볼 수 있습니다. 주석은 큰 가치를 추가하지 않습니다. 더욱이, 코드에 이미 존재하는 정보만 제공한다면 더 자세한 주석도 도움이 되지 않을 수 있습니다. 예를 들어:
Copied!1 2 3 4 5 6
# 나쁜 예 # 각 열을 순회하며, 밀리초 단위이므로 1000을 빼고 타임스탬프로 형변환 cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
코드를 작성할 때 작성한 로직을 설명하는 주석을 남기기보다는, 코드 작성 시 결정 내린 이유를 설명하는 "왜"에 대한 맥락을 제공하는 주석을 남겨야 합니다. 이는 PySpark에 특히 중요한데, 이는 독자가 코드를 이해할 수는 있지만, PySpark 변환에 데이터를 입력하는 맥락에 대한 정보는 없기 때문입니다. 작은 로직 조각들은 정확한 동작을 이해하기 위해 데이터를 살펴보는데 몇 시간이 소요되었을 수 있으며, 이 경우 이유를 설명하는 주석이 특히 유용합니다.
Copied!1 2 3 4 5 6 7 8 9
# 좋음 # 이 데이터셋의 사용자는 날짜 대신 타임스탬프를 기대하고 있습니다. 그리고 원본 데이터 소스가 밀리초로 이를 저장하고 있기 때문에 # 시간을 1000으로 조정해야 합니다. 문서에는 실제로 날짜라고 되어 있지만요. cols = ["start_date", "delivery_date"] for c in cols: # for문을 사용해서 "start_date"와 "delivery_date" 열을 돌면서, # 각 열에 대해 unixtime을 사용해 밀리초를 초 단위로 변환하고, 그 값을 다시 TimestampType으로 캐스팅합니다. df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
모든 상황에서 UDFs를 피하는 것이 좋습니다. 왜냐하면 UDFs는 기본 PySpark보다 성능이 훨씬 떨어지기 때문입니다. 대부분의 경우에, UDF가 필요하다고 보이는 로직은 실제로 기본 PySpark 함수만 사용하여 리팩터링할 수 있습니다.
Spark 드라이버로 데이터를 수집하는 함수는 항상 피해야 합니다. 예를 들면:
DataFrame.collect()
DataFrame.first()
DataFrame.head(...)
DataFrame.take(...)
DataFrame.show(...)
이러한 함수들을 사용하면 Spark와 같은 분산 프레임워크의 이점이 사라져 성능이 저하되거나 메모리 부족 오류가 발생할 수 있습니다. 대신 이러한 함수들을 사용하는 것이 좋습니다:
조인에 주의하세요. 왼쪽 조인을 수행하고 오른쪽 키에 대해 여러 개의 일치 항목이 있는 경우, 그 행이 일치하는 항목 수만큼 중복되어 나타납니다. 이를 "조인 폭발"이라고 하며 데이터셋의 크기가 급격하게 증가할 수 있습니다. 곱셈이 예상되는 경우가 아니라면 조인하는 키가 고유한지 확인하기 위해 항상 가정을 이중 체크하세요.
잘못된 조인은 디버깅이 까다로운 많은 문제의 원인입니다. how
를 명시적으로 지정하는 것과 같은 몇 가지 도움이 되는 방법이 있습니다. 기본값(inner
)을 사용하더라도:
Copied!1 2 3 4 5 6 7 8 9 10 11
# 나쁜 예 flights = flights.join(aircraft, 'aircraft_id') # 'how' 인수가 생략되어 있어 join 함수의 동작 방식이 명확하지 않음 # 또한 나쁜 예 flights = flights.join(aircraft, 'aircraft_id', 'inner') # 'how' 인수가 생략되어 있어 join 함수의 동작 방식이 명확하지 않음 # 좋은 예 flights = flights.join(aircraft, 'aircraft_id', how='inner') # 'how' 인수를 명시적으로 추가하여 join 함수의 동작 방식을 명확하게 표시
또한 right
조인을 피하십시오. right
조인을 사용하려는 경우 데이터프레임의 순서를 바꾸고 대신 left
조인을 사용하십시오. 작업을 수행하는 데이터프레임이 조인을 중심으로 하는 것이므로, 이해하기 더 쉽습니다.
Copied!1 2 3 4 5
# 나쁜 예시 flights = aircraft.join(flights, 'aircraft_id', how='right') # aircraft 데이터프레임에서 'aircraft_id'를 키로 사용해 flights 데이터프레임과 오른쪽 조인합니다. # 좋은 예시 flights = flights.join(aircraft, 'aircraft_id', how='left') # flights 데이터프레임에서 'aircraft_id'를 키로 사용해 aircraft 데이터프레임과 왼쪽 조인합니다.
데이터프레임을 결합할 때 결과물에 열이 중복되는 표현식을 사용하지 마십시오:
Copied!1 2 3 4 5
# 나쁜 예 - 출력에서 aircraft_id 열이 중복됩니다 output = flights.join(aircraft, flights.aircraft_id == aircraft.aircraft_id, how='inner') # 좋은 예 output = flights.join(aircraft, 'aircraft_id', how='inner')
모든 열의 이름을 변경하여 충돌을 피하는 대신 전체 데이터프레임에 별칭을 지정하고 해당 별칭을 사용하여 최종적으로 원하는 열을 선택할 수 있습니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# 나쁜 예시 columns = ["start_time", "end_time", "idle_time", "total_time"] for col in columns: flights = flights.withColumnRenamed(col, 'flights_' + col) # 각 열 이름을 변경합니다. parking = parking.withColumnRenamed(col, 'parking_' + col) flights = flights.join(parking, on="flight_code", how="left") # flights와 parking 테이블을 조인합니다. flights = flights.select( F.col("flights_start_time").alias("flight_start_time"), # 선택한 열의 이름을 변경합니다. F.col("flights_end_time").alias("flight_end_time"), F.col("parking_total_time").alias("client_parking_total_time") ) # 좋은 예시 flights = flights.alias("flights") # 테이블에 별칭을 부여합니다. parking = parking.alias("parking") flights = flights.join(parking, on="flight_code", how="left") # flights와 parking 테이블을 조인합니다. flights = flights.select( F.col("flights.start_time").alias("flight_start_time"), # 선택한 열의 이름을 변경합니다. F.col("flights.end_time").alias("flight_end_time"), F.col("parking.total_time").alias("client_parking_total_time") )
그러나 다음 사항을 유의해야 합니다:
조인에 대한 마지막 말은, .dropDuplicates()
또는 .distinct()
을 지탱하는 도구로 사용하지 마십시오. 예상치 못한 중복 행이 관찰되면, 그 중복 행이 나타나는 이유가 거의 항상 있습니다. .dropDuplicates()
를 추가하는 것은 이 문제를 감추기만 하고 런타임에 오버헤드를 추가합니다.
표현식 체이닝은 논란의 여지가 있는 주제이지만, 체이닝 사용에 대한 일부 제한을 권장합니다. 이 권장사항의 근거에 대한 논의는 이 섹션의 결론을 참조하십시오.
다른 유형의 다중 행 표현식으로 표현식을 체이닝하지 마십시오. 특히 그들이 다른 행동이나 맥락을 가지고 있는 경우입니다. 예를 들어, 열 생성 또는 조인을 선택 및 필터링과 혼합하는 것입니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# 나쁜 예 df = ( df .select("a", "b", "c", "key") # a, b, c, key 열 선택 .filter(df.a == "truthiness") # a 열이 "truthiness"인 행만 필터링 .withColumn("boverc", df.b / df.c) # b열을 c열로 나눈 값을 "boverc"라는 새 열로 추가 .join(df2, "key", how="inner") # key를 기준으로 df2 데이터프레임과 내부 조인 .join(df3, "key", how="left") # key를 기준으로 df3 데이터프레임과 왼쪽 조인 .drop('c') # c 열 삭제 ) # 좋은 예 (단계별로 분리) # 첫째, 필요한 데이터를 선택하고 줄입니다. # 둘째, 필요한 열을 생성합니다. # 셋째, 다른 데이터프레임과 결합합니다. df = ( df .select("a", "b", "c", "key") # a, b, c, key 열 선택 .filter(df.a == "truthiness") # a 열이 "truthiness"인 행만 필터링 ) df = df.withColumn("boverc", df.b / df.c) # b열을 c열로 나눈 값을 "boverc"라는 새 열로 추가 df = ( df .join(df2, "key", how="inner") # key를 기준으로 df2 데이터프레임과 내부 조인 .join(df3, "key", how="left") # key를 기준으로 df3 데이터프레임과 왼쪽 조인 .drop('c') # c 열 삭제 )
각 표현식 그룹을 별도의 논리 코드 블록으로 격리하면 가독성이 향상되고 관련 로직을 찾기가 쉬워집니다.
예를 들어, 아래 코드의 독자는 데이터프레임이 할당되는 곳인 df = df...
로 이동할 가능성이 높습니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# 나쁜 예시 df = ( df .select('foo', 'bar', 'foobar', 'abc') # 'foo', 'bar', 'foobar', 'abc' 열 선택 .filter(df.abc == 123) # 'abc' 열이 123인 행 필터링 .join(another_table, 'some_field') # 'some_field'를 기준으로 another_table와 조인 ) # 더 나은 예시 df = ( df .select('foo', 'bar', 'foobar', 'abc') # 'foo', 'bar', 'foobar', 'abc' 열 선택 .filter(F.col('abc') == 123) # 'abc' 열이 123인 행 필터링 ) df = df.join(another_table, 'some_field', how='inner') # 'some_field'를 기준으로 another_table와 내부 조인
여기서 F.col은 DataFrame의 열을 참조하는 기능입니다. 이를 통해 'abc'라는 열을 지정하였습니다. 또한, join 메서드에서 how='inner'는 내부 조인을 의미합니다. 표현식을 연결하는 것에는 합당한 이유가 있습니다. 이것들은 일반적으로 원자성 로직 단계를 나타내며, 허용됩니다. 코드를 가독성 있게 유지하기 위해 같은 블록에서 연결된 표현식의 최대 수에 규칙을 적용하세요. 우리는 3-5개의 문장 길이의 체인을 추천합니다.
더 긴 체인을 만들거나, 변수의 크기 때문에 문제를 겪고 있다면, 로직을 별도의 함수로 추출하는 것을 고려하세요:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# 나쁜 예 customers_with_shipping_address = ( customers_with_shipping_address .select("a", "b", "c", "key") # "a", "b", "c", "key" 선택 .filter(F.col("a") == "truthiness") # "a" 열이 "truthiness"와 일치하는 행을 필터 .withColumn("boverc", F.col("b") / F.col("c")) # "b" 열을 "c" 열로 나눈 값을 "boverc"라는 새로운 열에 저장 .join(df2, "key", how="inner") # "key"를 기준으로 df2와 내부 조인 ) # 또한 나쁜 예 customers_with_shipping_address = customers_with_shipping_address.select("a", "b", "c", "key") # "a", "b", "c", "key" 선택 customers_with_shipping_address = customers_with_shipping_address.filter(df.a == "truthiness") # "a" 열이 "truthiness"와 일치하는 행을 필터 customers_with_shipping_address = customers_with_shipping_address.withColumn("boverc", F.col("b") / F.col("c")) # "b" 열을 "c" 열로 나눈 값을 "boverc"라는 새로운 열에 저장 customers_with_shipping_address = customers_with_shipping_address.join(df2, "key", how="inner") # "key"를 기준으로 df2와 내부 조인 # 더 나은 예 def join_customers_with_shipping_address(customers, df_to_join): # 선택, 필터, 컬럼 추가, 조인을 한 함수 안에서 수행 customers = ( customers .select("a", "b", "c", "key") # "a", "b", "c", "key" 선택 .filter(df.a == "truthiness") # "a" 열이 "truthiness"와 일치하는 행을 필터 ) customers = customers.withColumn("boverc", F.col("b") / F.col("c")) # "b" 열을 "c" 열로 나눈 값을 "boverc"라는 새로운 열에 저장 customers = customers.join(df_to_join, "key", how="inner") # "key"를 기준으로 df_to_join과 내부 조인 return customers
사실, 세 개 이상의 구문이 연결된 체인은 이미 캡슐화되고 격리된 논리 블록이므로 별도의 잘 명명된 함수로 분리하는 것이 좋습니다.
이러한 체이닝에 대한 제한에는 여러 가지 이유가 있습니다:
표현식을 체인으로 연결할 수 있는 이유는 PySpark가 Spark로부터 개발되었고, 이는 JVM 언어에서 유래했기 때문입니다. 이는 체인 가능성이라는 디자인 패턴이 전송되었음을 의미합니다.
그러나, 파이썬은 여러 줄의 표현식을 우아하게 지원하지 않으며 유일한 대안은 명시적인 줄 바꿈을 제공하거나, 표현식을 괄호로 감싸는 것입니다. 체인이 루트 노드에서 발생하는 경우에만 명시적인 줄 바꿈을 제공해야 합니다. 예를 들면:
Copied!1 2 3 4 5 6 7 8 9
# `\`가 필요함 df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # 루트 노드에 체인이 없으므로 `\`가 필요하지 않음 df = df.withColumn('safety', F.when(F.col('has_tests') == True, 'is safe') .when(F.col('has_executed') == True, 'no tests but runs') .otherwise('not safe'))
Copied!1 2 3 4 5 6 7 8 9 10 11
# `\`가 필요합니다 df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # 'event' 열이 'executing'인 행만 필터링하고, 'has_tests' 열이 True인 행만 필터링한 다음 'has_tests' 열을 제거합니다. # 루트 노드에 체인이 없으므로 `\`가 필요하지 않습니다 df = df.withColumn('safety', F.when(F.col('has_tests') == True, 'is safe') .when(F.col('has_executed') == True, 'no tests but runs') .otherwise('not safe')) # 'has_tests' 열이 True일 때 'safety' 열을 'is safe'으로, 'has_executed' 열이 True일 때는 'no tests but runs'으로 설정하고, 그 외의 경우에는 'not safe'로 설정합니다.
따라서 일관성을 유지하기 위해, 전체 표현식을 하나의 괄호 블록으로 묶고 \
사용을 피하십시오:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# 나쁜 예 df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # 이 부분에서는 데이터 프레임 df에서 'event' 열이 'executing'인 행을 필터링하고, # 그 다음 'has_tests' 열이 True인 행을 필터링한 후, 'has_tests' 열을 삭제합니다. # 하지만 이 코드는 가독성이 좋지 않아 수정이 필요합니다. # 좋은 예 df = ( df .filter(F.col('event') == 'executing') # 'event' 열이 'executing'인 행을 필터링합니다. .filter(F.col('has_tests') == True) # 그 다음으로 'has_tests' 열이 True인 행을 필터링합니다. .drop('has_tests') # 마지막으로 'has_tests' 열을 삭제합니다. ) # 이 코드는 이전 코드와 동일한 작업을 수행하지만, 각 단계를 새로운 줄에 두어 가독성이 향상되었습니다.
.otherwise(value)
를 사용하지 마십시오. 키 목록을 값 목록에 매핑하고 알 수 없는 키가 여러 개 나타나는 경우, otherwise
를 사용하면 이들을 모두 하나의 값으로 마스킹합니다.types
와 functions
가 pySpark from pyspark.sql import types as T, functions as F
에서 나옵니다.