PySpark SQL에서 가장 자주 사용되는 패턴과 함수에 대한 빠른 참조 가이드:
찾고 있는 것을 찾을 수 없다면, PySpark 공식 문서에서 커버되었을 가능성이 높습니다.
Copied!1 2 3 4 5 6 7
# Code Workbook 내부 print("예제 로그 출력") # Code Repositories 내부 import logging logger = logging.getLogger(__name__) logger.info("예제 로그 출력")
Copied!1 2
# F.my_function() 및 T.my_type()과 같이 쉽게 참조하실 수 있습니다. from pyspark.sql import functions as F, types as T
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# 동등 조건으로 필터링 df = df.filter(df.is_adult == 'Y') # 'is_adult' 컬럼이 'Y'인 행들만 선택 # >, <, >=, <= 조건으로 필터링 df = df.filter(df.age > 25) # 'age' 컬럼이 25보다 큰 행들만 선택 # 여러 조건은 각 조건마다 괄호가 필요합니다 df = df.filter((df.age > 25) & (df.is_adult == 'Y')) # 'age' 컬럼이 25보다 크고, 'is_adult' 컬럼이 'Y'인 행들만 선택 # 허용된 값들의 목록과 비교 df = df.filter(col('first_name').isin([3, 4, 7])) # 'first_name' 컬럼의 값이 3, 4, 7 중 하나인 행들만 선택 # 결과 정렬 df = df.orderBy(df.age.asc()) # 'age' 컬럼을 오름차순으로 정렬 df = df.orderBy(df.age.desc()) # 'age' 컬럼을 내림차순으로 정렬
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# 다른 데이터셋에서 Left join df = df.join(person_lookup_table, 'person_id', 'left') # 다른 데이터셋에서 Left anti-join (왼쪽 데이터프레임에서 불일치하는 행 반환) df = df.join(person_lookup_table, 'person_id', 'leftanti'); # 왼쪽 & 오른쪽 데이터셋에서 다른 열들을 기준으로 매치 df = df.join(other_table, df.id == other_table.person_id, 'left') # 여러 열들을 기준으로 매치 df = df.join(other_table, ['first_name', 'last_name'], 'left') # 일회성 조회 코드 조인에 유용한 함수 def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value): return ( df1 .join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left') .withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key))) .drop(df2_key) .drop(df2_value) ) df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# 새로운 정적 컬럼 추가 df = df.withColumn('status', F.lit('PASS')) # 새로운 동적 컬럼 구성 df = df.withColumn('full_name', F.when( (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname) ).otherwise(F.lit('N/A'))) # 유지할 컬럼 선택, 필요한 경우 이름 변경 df = df.select( 'name', 'age', F.col('dob').alias('date_of_birth'), ) # 컬럼 제거 df = df.drop('mod_dt', 'mod_username') # 컬럼 이름 변경 df = df.withColumnRenamed('dob', 'date_of_birth') # 다른 데이터셋에도 있는 모든 컬럼 유지 df = df.select(*(F.col(c) for c in df2.columns)) # 일괄적으로 컬럼 이름 변경/정리 for col in df.columns: df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# 특정 타입으로 컬럼을 변환 df = df.withColumn('price', df.price.cast(T.DoubleType())) # 모든 null 값을 특정 값으로 대체 df = df.fillna({ 'first_name': 'Tom', # 이름 칼럼의 null값을 'Tom'으로 대체 'age': 0, # 나이 칼럼의 null값을 0으로 대체 }) # 첫 번째로 null이 아닌 값을 가져옴 df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A'))) # last_name, surname 칼럼에서 처음으로 나타나는 null이 아닌 값을 last_name으로 설정, 둘 다 null인 경우 'N/A'로 설정 # 데이터셋에서 중복 행 제거 (distinct()와 같음) df = df.dropDuplicates() # 특정 칼럼을 고려하여 중복 행 제거 df = df.dropDuplicates(['name', 'height']) # name과 height 칼럼을 기준으로 중복 행 제거
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# Contains - col.contains(string) : 문자열이 포함되어 있는지 확인 df = df.filter(df.name.contains('o')) # 'o'가 포함된 name 필드를 가진 데이터를 필터링 # Starts With - col.startswith(string) : 문자열로 시작하는지 확인 df = df.filter(df.name.startswith('Al')) # 'Al'로 시작하는 name 필드를 가진 데이터를 필터링 # Ends With - col.endswith(string) : 문자열로 끝나는지 확인 df = df.filter(df.name.endswith('ice')) # 'ice'로 끝나는 name 필드를 가진 데이터를 필터링 # Is Null - col.isNull() : 값이 null인지 확인 df = df.filter(df.is_adult.isNull()) # is_adult 필드가 null인 데이터를 필터링 # Is Not Null - col.isNotNull() : 값이 null이 아닌지 확인 df = df.filter(df.first_name.isNotNull()) # first_name 필드가 null이 아닌 데이터를 필터링 # Like - col.like(string_with_sql_wildcards) : 문자열 패턴과 일치하는지 확인 df = df.filter(df.name.like('Al%')) # 'Al'로 시작하는 name 필드를 가진 데이터를 필터링 # Regex Like - col.rlike(regex) : 정규표현식과 일치하는지 확인 df = df.filter(df.name.rlike('[A-Z]*ice$')) # 대문자로 시작하고 'ice'로 끝나는 name 필드를 가진 데이터를 필터링 # Is In List - col.isin(*values) : 값이 리스트에 포함되어 있는지 확인 df = df.filter(df.name.isin('Bob', 'Mike')) # name 필드의 값이 'Bob'이나 'Mike'인 데이터를 필터링
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# 부분 문자열 - col.substr(startPos, length) (1부터 시작하는 인덱스) # Substring - col.substr(startPos, length) (1-based indexing) df = df.withColumn('short_id', df.id.substr(1, 10)) # 공백 제거 - F.trim(col) # Trim - F.trim(col) df = df.withColumn('name', F.trim(df.name)) # 왼쪽 채우기 - F.lpad(col, len, pad) # 오른쪽 채우기 - F.rpad(col, len, pad) # Left Pad - F.lpad(col, len, pad) # Right Pad - F.rpad(col, len, pad) df = df.withColumn('id', F.lpad('id', 4, '0')) # 왼쪽 공백 제거 - F.ltrim(col) # 오른쪽 공백 제거 - F.rtrim(col) # Left Trim - F.ltrim(col) # Right Trim - F.rtrim(col) df = df.withColumn('id', F.ltrim('id')) # 연결 - F.concat(*cols) (어떤 열이든 null 이면 null) # Concatenate - F.concat(*cols) (null if any column null) df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname')) # 구분자/구분 기호와 함께 연결 - F.concat_ws(delimiter, *cols) (null 무시) # Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols) (ignores nulls) df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname')) # 정규식 치환 - F.regexp_replace(str, pattern, replacement) # Regex Replace - F.regexp_replace(str, pattern, replacement) df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1')) # 정규식 추출 - F.regexp_extract(str, pattern, idx) # Regex Extract - F.regexp_extract(str, pattern, idx) df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# Round - F.round(col, scale=0) : 소수점 이하를 반올림 df = df.withColumn('price', F.round('price', 0)) # Floor - F.floor(col) : 소수점 이하를 내림 df = df.withColumn('price', F.floor('price')) # Ceiling - F.ceil(col) : 소수점 이하를 올림 df = df.withColumn('price', F.ceil('price')) # Absolute Value - F.abs(col) : 절대값 계산 df = df.withColumn('price', F.abs('price')) # X raised to power Y – F.pow(x, y) : x의 y 제곱 계산 df = df.withColumn('exponential_growth', F.pow('x', 'y')) # Select smallest value out of multiple columns – F.least(*cols) : 여러 열 중 가장 작은 값 선택 df = df.withColumn('least', F.least('subtotal', 'total')) # Select largest value out of multiple columns – F.greatest(*cols) : 여러 열 중 가장 큰 값 선택 df = df.withColumn('greatest', F.greatest('subtotal', 'total'))
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
# 알려진 형식의 문자열을 날짜로 변환 (시간 정보 제외) df = df.withColumn('date_of_birth', F.to_date('date_of_birth', 'yyyy-MM-dd')) # 알려진 형식의 문자열을 타임스탬프로 변환 (시간 정보 포함) df = df.withColumn('time_of_birth', F.to_timestamp('time_of_birth', 'yyyy-MM-dd HH:mm:ss')) # 날짜에서 연도 얻기: F.year(col) # 날짜에서 월 얻기: F.month(col) # 날짜에서 일 얻기: F.dayofmonth(col) # 날짜에서 시간 얻기: F.hour(col) # 날짜에서 분 얻기: F.minute(col) # 날짜에서 초 얻기: F.second(col) df = df.filter(F.year('date_of_birth') == F.lit('2017')) # 날짜에 일 수 더하기 & 빼기 df = df.withColumn('three_days_after', F.date_add('date_of_birth', 3)) df = df.withColumn('three_days_before', F.date_sub('date_of_birth', 3)) # 날짜에 월 수 더하기 & 빼기 df = df.withColumn('next_month', F.add_months('date_of_birth', 1)) df = df.withColumn('previous_month', F.add_months('date_of_birth', -1)) # 두 날짜 사이의 일 수 구하기 df = df.withColumn('days_between', F.datediff('end', 'start')) # 두 날짜 사이의 월 수 구하기 df = df.withColumn('months_between', F.months_between('end', 'start')) # date_of_birth가 2017-05-10과 2018-07-21 사이인 행만 유지 df = df.filter( (F.col('date_of_birth') >= F.lit('2017-05-10')) & (F.col('date_of_birth') <= F.lit('2018-07-21')) )
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# 컬럼 배열 - F.array(*cols) df = df.withColumn('full_name', F.array('fname', 'lname')) # full_name 컬럼에 fname과 lname을 배열로 추가 # 빈 배열 - F.array(*cols) df = df.withColumn('empty_array_column', F.array(F.lit(""))) # empty_array_column 컬럼에 빈 배열 추가 # 기존 컬럼에서 배열 또는 구조체 컬럼 생성 df = df.withColumn('guardians', F.array('guardian_1', 'guardian_2')) # guardians 컬럼에 guardian_1과 guardian_2를 배열로 추가 df = df.withColumn('properties', F.struct('hair_color', 'eye_color')) # properties 컬럼에 hair_color와 eye_color를 구조체로 추가 # 배열 또는 구조체 컬럼에서 인덱스 또는 키로 추출 (유효하지 않으면 null) df = df.withColumn('hair_color', F.element_at(F.col('properties'), F.col('hair_color'))) # properties 컬럼에서 hair_color 키의 값을 추출하여 hair_color 컬럼에 추가 # 배열 또는 구조체 컬럼을 여러 행으로 분해 df = df.select(F.col('child_name'), F.explode(F.col('guardians'))) # guardians 컬럼을 여러 행으로 분해하여 child_name과 함께 선택 df = df.select(F.col('child_name'), F.explode(F.col('properties'))) # properties 컬럼을 여러 행으로 분해하여 child_name과 함께 선택 # 구조체 컬럼을 여러 컬럼으로 분해 df = df.select(F.col('child_name'), F.col('properties.*')) # properties 구조체 컬럼을 여러 컬럼으로 분해하여 child_name과 함께 선택
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# 행 수 계산: F.count(*cols), F.countDistinct(*cols) # 그룹 내 행들의 합: F.sum(*cols) # 그룹 내 행들의 평균: F.mean(*cols) # 그룹 내 최대 행: F.max(*cols) # 그룹 내 최소 행: F.min(*cols) # 그룹 내 첫 번째 행: F.first(*cols, ignorenulls=False) df = df.groupBy(col('address')).agg( count('uuid').alias('num_residents'), # address 기준으로 그룹화된 데이터에서 각 그룹의 'uuid' 수를 'num_residents'로 계산 max('age').alias('oldest_age'), # 각 그룹에서 'age'의 최대값을 'oldest_age'로 계산 first('city', True).alias('city') # 각 그룹에서 null이 아닌 첫 번째 'city'를 'city'로 선택 ) # 그룹 내 모든 행들의 집합 수집: F.collect_set(col) # 그룹 내 모든 행들의 리스트 수집: F.collect_list(col) df = df.groupBy('address').agg(F.collect_set('name').alias('resident_names')) # 'address' 기준으로 그룹화된 데이터에서 각 그룹의 'name' 집합을 'resident_names'로 수집
Copied!1 2 3 4
# Repartition – df.repartition(num_output_partitions) # 데이터프레임(df)을 새로운 파티션 수로 재분할하는 메소드입니다. # 아래 코드에서는 데이터프레임(df)을 하나의 파티션으로 재분할하고 있습니다. df = df.repartition(1)
Copied!1 2 3 4 5 6 7 8 9 10 11
# 각 행의 나이(age) 컬럼을 두 배로 곱하기 # 각 행의 age 열을 2배로 곱하는 함수 times_two_udf = F.udf(lambda x: x * 2) df = df.withColumn('age', times_two_udf(df.age)) # 무작위로 이름을 선택하여 행의 이름으로 사용 import random # 무작위로 이름 중 하나를 선택하는 함수 random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna'])) df = df.withColumn('name', random_name_udf())