이 문서의 예제를 따르려면 다음을 추가하세요: from pyspark.sql import functions as F
.
컬럼은 PySpark 클래스인 pyspark.sql.Column
로 관리됩니다. 컬럼 인스턴스는 기존 컬럼을 직접 참조하거나 표현식을 도출할 때마다 생성됩니다. 다음과 같은 방법으로 컬럼을 참조할 수 있습니다:
F.col("column_name")
F.column("column_name")
컬럼을 참조하는 것은 select
를 수행하는 것과 동일하지 않습니다. 왜냐하면 "선택하는" 컬럼은 결과 데이터셋에 나타나길 원하는 컬럼을 하위 집합(및 재정렬)을 참조하기 때문입니다.
DataFrame.columns
모든 컬럼 이름을 파이썬 리스트로 반환합니다.
Copied!1
columns = df.columns # df의 컬럼들을 가져옵니다. 예) ['age', 'name']
DataFrame.dtypes
튜플의 리스트로 모든 열 이름과 그들의 데이터 유형을 반환합니다.
Copied!1
dtypes = df.dtypes # [('age', 'int'), ('name', 'string')] 데이터프레임의 데이터 유형을 가져옵니다.
DataFrame.select(*cols)
원본 DataFrame
의 일부 칼럼으로 구성된 새로운 DataFrame
을 반환합니다.
예를 들어, 우리는 id
, first_name
, last_name
, phone_number
, address
, is_active_member
라는 6개의 칼럼을 갖는 DataFrame을 가지고 있습니다.
id | first_name | last_name | phone_number | zip_code | is_active_member |
---|---|---|---|---|---|
1 | John | Doe | (123) 456-7890 | 10014 | true |
2 | Jane | Eyre | (213) 555-1234 | 90007 | true |
... | ... | ... | ... | ... | ... |
중요하게 생각하는 칼럼명만 포함된 DataFrame으로 변환하고자 할 수 있습니다(제공된 것들의 일부). 단 하나의 칼럼인 phone_number
만 포함된 표를 원한다고 가정해 봅시다:
Copied!1 2
# "phone_number"라는 이름의 열만 선택하여 df에 다시 저장합니다. df = df.select("phone_number")
phone_number |
---|
(123) 456-7890 |
(213) 555-1234 |
... |
또는 id
, first_name
, last_name
만 원하실 수도 있습니다(동일한 작업을 수행하는 최소 3 가지 방법이 있습니다):
열 이름을 직접 전달:
Copied!1
df = df.select("id", "first_name", "last_name")
또는 열 인스턴스 전달:
Copied!1
df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
열 이름의 배열을 전달:
Copied!1 2
select_columns = ["id", "first_name", "last_name"] df = df.select(select_columns)
"풀어 헤친" 배열 전달:
Copied!1 2 3
select_columns = ["id", "first_name", "last_name"] df = df.select(*select_columns) # same as: df = df.select("id", "first_name", "last_name")
id | first_name | last_name |
---|---|---|
1 | John | Doe |
2 | Jane | Eyre |
... | ... | ... |
select_columns
앞의 *
는 배열을 풀어 헤치고(unpacks) #1
과 기능적으로 동일한 방식으로 작동하게 합니다(주석 참조). 이를 통해 다음과 같은 작업을 수행할 수 있습니다:
Copied!1 2 3
select_columns = ["id", "first_name", "last_name"] return df.select(*select_columns, "phone_number") # same as: df = df.select("id", "first_name", "last_name", "phone_number")
id | first_name | last_name | phone_number |
---|---|---|---|
1 | John | Doe | (123) 456-7890 |
2 | Jane | Eyre | (213) 555-1234 |
... | ... | ... | ... |
결과물 데이터셋에는 선택한 열만 포함되며, 선택한 열의 순서대로 포함됩니다(원본 열 순서를 유지하지 않음). 이름은 고유하며 대/소문자 구분이 필요하고, 선택하는 데이터셋의 열로 이미 존재해야 합니다.
그 규칙의 예외는 새로운 열을 파생시키고 즉시 선택할 수 있으나, 새로 파생된 열에 alias
(이름)를 지정해야 합니다:
string1 | string2 | string3 | string4 |
---|---|---|---|
first | second | third | Fourth |
one | two | three | four |
Copied!1 2 3 4 5
# "string1"과 "string2" 열을 ":"로 연결하여 새로운 열을 생성합니다. derived_column = F.concat_ws(":", F.col("string1"), F.col("string2")) # 생성된 새로운 열과 "string3" 열을 선택하여 반환합니다. return df.select("string3", derived_column.alias("derived"))
string3 | derived |
---|---|
third | first |
three | one |
DataFrame.withColumn(name, column)
Copied!1 2
# 새로운 데이터프레임(new_df)을 생성하고, 이전 데이터프레임(old_df)의 기존 열에 파생된 열(derived_column)을 추가합니다. new_df = old_df.withColumn("column_name", derived_column)
new_df
: old_df
의 모든 열을 포함하고 있지만, new_column_name
이 추가된 결과 데이터프레임입니다.old_df
: 새 열을 적용하려는 데이터프레임입니다.column_name
: 생성하려는(만약 old_df
에 없는 경우) 또는 업데이트하려는(만약 old_df
에 이미 있는 경우) 열의 이름입니다.derived_column
: 열을 생성하는 표현식으로, column_name
또는 열에 지정한 이름에 있는 모든 행에 적용됩니다.기존 DataFrame이 주어진 경우, withColumn
메소드를 사용하여 새 열을 생성하거나 기존 열을 새로운 값 또는 수정된 값으로 업데이트할 수 있습니다. 이는 다음 목표에 특히 유용합니다:
기존 값에 기반한 새 값 도출
Copied!1
df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
Copied!1
df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
값을 다른 유형으로 캐스팅
Copied!1 2
# `start_timestamp`를 DateType으로 캐스팅하고 새 값을 `start_date`에 저장 df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
열 업데이트
Copied!1 2
# 열 `string`을 소문자 버전으로 업데이트 df = df.withColumn("string", F.lower(F.col("string")))
DataFrame.withColumnRenamed(name, rename)
.withColumnRenamed()
를 사용하여 열 이름을 변경합니다:
Copied!1 2
# "old_name" 열의 이름을 "new_name"으로 변경합니다. df = df.withColumnRenamed("old_name", "new_name")
PySpark가 변환 구문을 최적화하는 방식에 대한 이해를 돕기 위한 다른 방법인 컬럼 이름 변경 작업은 다음과 같습니다:
Copied!1 2 3
# "df" 라는 데이터프레임에 "new_name" 이라는 새로운 컬럼을 추가합니다. 이 컬럼의 값은 "old_name" 컬럼의 값과 동일합니다. # 그 다음 "old_name" 컬럼을 삭제합니다. df = df.withColumn("new_name", F.col("old_name")).drop("old_name")
그러나 새 열을 withColumn
없이 파생시키고 여전히 이름을 지정해야 하는 몇 가지 경우도 있습니다. 이때 alias
(또는 그 메서드 별칭인 name
)가 유용합니다. 다음은 몇 가지 사용 예입니다:
Copied!1 2 3 4 5
df = df.select(derived_column.alias("new_name")) # derived_column을 "new_name"이라는 새로운 이름으로 선택합니다. df = df.select(derived_column.name("new_name")) # .alias("new_name")와 동일합니다. derived_column을 "new_name"이라는 새로운 이름으로 선택합니다. df = df.groupBy("group") \ .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count")) # "group" 열을 기준으로 그룹화하고, 각 그룹의 "number" 합계("sum_of_numbers")와 각 그룹의 행 수("count")를 계산합니다.
우리는 한 번에 여러 컬럼의 이름을 변경하는 것을 활용할 수도 있습니다:
Copied!1 2 3 4 5 6 7 8 9 10
# "renames"라는 이름의 딕셔너리를 만듭니다. 이 딕셔너리는 원래의 열 이름(column, data)과 변경할 열 이름(column_renamed, data_renamed)을 매핑합니다. renames = { "column": "column_renamed", "data": "data_renamed", } # "renames" 딕셔너리의 각 항목에 대해 반복문을 실행합니다. 각 항목에서 원래의 열 이름(colname)과 변경할 이름(rename)을 가져옵니다. for colname, rename in renames.items(): # 데이터프레임 df에서 'withColumnRenamed' 메소드를 사용하여 열 이름을 변경합니다. 원래 열 이름(colname)을 새 열 이름(rename)으로 변경합니다. df = df.withColumnRenamed(colname, rename)
DataFrame.drop(*cols)
원래 DataFrame
의 열의 일부를 포함하는 새로운 DataFrame
을 반환하고, 지정된 열들을 삭제합니다. (스키마가 주어진 열 이름을 포함하지 않으면 실패합니다.)
열을 삭제하는 두 가지 방법이 있습니다: 직접적인 방법과 간접적인 방법입니다. 간접적인 방법은 select
를 사용하는 것으로, 보존하려는 열의 부분 집합을 선택합니다. 직접적인 방법은 drop
을 사용하는 것으로, 삭제하려는 열의 부분 집합을 제공합니다. 두 방법 모두 비슷한 사용법을 가지고 있지만, 여기서는 순서가 중요하지 않습니다. 몇 가지 예시를 보여드리겠습니다:
id | first_name | last_name | phone_number | zip_code | is_active_member |
---|---|---|---|---|---|
1 | John | Doe | (123) 456-7890 | 10014 | true |
2 | Jane | Eyre | (213) 555-1234 | 90007 | true |
... | ... | ... | ... | ... | ... |
한 열만, phone_number
를 삭제하려는 경우를 가정해 봅시다:
Copied!1 2
# "phone_number"라는 이름의 컬럼을 데이터프레임에서 삭제합니다. df = df.drop("phone_number")
id | first_name | last_name | zip_code | is_active_member |
---|---|---|---|---|
1 | John | Doe | 10014 | true |
2 | Jane | Eyre | 90007 | true |
... | ... | ... | ... | ... |
또는 id
, first_name
, last_name
을 삭제하려는 경우가 있을 수 있습니다(동일한 작업을 수행하는 방법은 적어도 3가지입니다):
직접 열 이름을 전달:
Copied!1
df = df.drop("id", "first_name", "last_name")
또는
Copied!1
df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
배열을 전달:
Copied!1 2
drop_columns = ["id", "first_name", "last_name"] df = df.drop(drop_columns)
"언팩"된 배열을 전달:
Copied!1 2 3
drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns) # same as: df = df.drop("id", "first_name", "last_name")
phone_number | zip_code | is_active_member |
---|---|---|
(123) 456-7890 | 10014 | true |
(213) 555-1234 | 90007 | true |
... | ... | ... |
*
은 drop_columns
앞에서 배열을 언팩하여 #1
과 기능적으로 동일하게 작동하게 합니다(주석 참조). 이를 통해 다음을 수행할 수 있습니다:
Copied!1 2 3
drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns, "phone_number") # same as: df = df.drop("id", "first_name", "last_name", "phone_number")
zip_code | is_active_member |
---|---|
10014 | true |
90007 | true |
... | ... |
Column.cast(type)
다음은 모든 데이터 유형입니다: NullType
, StringType
, BinaryType
, BooleanType
, DateType
, TimestampType
, DecimalType
, DoubleType
, FloatType
, ByteType
, IntegerType
, LongType
, ShortType
, ArrayType
, MapType
, StructType
, StructField
일반적으로, 대부분의 데이터 유형은 열에 있는 cast
메소드를 사용하여 다른 것으로 변환할 수 있습니다:
Copied!1 2 3 4
from pyspark.sql.types import StringType df.select(df.age.cast(StringType()).alias("age")) # df.age가 IntegerType()라고 가정합시다 # 위 코드는 df.age를 StringType()으로 형변환하고 그 결과를 "age"라는 새로운 별명으로 선택합니다.
또는
Copied!1 2
df.select(df.age.cast("string").alias("age")) # StringType()을 사용하는 것과 실질적으로 같습니다.
나이 |
---|
"2" |
"5" |
캐스팅은 기본적으로 새로 파생된 열을 생성하며, 이를 통해 select
, withColumn
, filter
등을 직접 수행할 수 있습니다. PySpark에서도 "다운캐스팅" 및 "업캐스팅" 개념이 적용되므로 이전 데이터 유형에 저장된 더 세밀한 정보를 잃거나 쓰레기 정보를 얻을 수 있습니다.
F.when(조건, 값).otherwise(값2)
파라미터:
value
또는 value2
파라미터와 동일한 열 표현식으로 평가됩니다. Column.otherwise()
가 호출되지 않으면, 일치하지 않는 조건에 대해 None
(null) 열 표현식이 반환됩니다.
when
, otherwise
연산자는 새 열 값을 계산하는 if-else
문에 대한 유사성을 제공합니다. 기본 사용법은 다음과 같습니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# CASE WHEN (age >= 21) THEN true ELSE false END # 위의 SQL 문은 아래의 파이썬 코드와 동일합니다. # 나이가 21세 이상이면 True를 반환하고, 그렇지 않으면 False를 반환합니다. at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False) # CASE WHEN (last_name != "") THEN last_name ELSE null # 위의 SQL 문은 아래의 파이썬 코드와 동일합니다. # last_name 칼럼의 값이 빈 문자열이 아니면 해당 값을 반환하고, 빈 문자열이면 Null을 반환합니다. last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None) # at_least_21과 last_name을 alias로 사용하여 새로운 DataFrame을 생성합니다. df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))
when
문을 연결하여 필요한 만큼 사용할 수 있습니다:
Copied!1 2 3 4
# "age"라는 열의 값이 35 이상이면 "A"를 반환하고, # 그렇지 않고 "age"가 21 이상이면 "B"를 반환합니다. # "age"가 21 미만인 경우 "C"를 반환합니다. switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")
이 평가는 열에 할당되거나 필터에서 사용할 수 있습니다:
Copied!1 2
df = df.withColumn("switch", switch) # switch=A, B, 또는 C df = df.where(~F.isnull(last_name)) # last_name이 널값이 아닌 행만 필터링 (빈 문자열이 널값으로 평가된 후)