데이터 통합PythonPySpark Reference개념: 컬럼

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

개념: 컬럼

이 문서의 예제를 따르려면 다음을 추가하세요: from pyspark.sql import functions as F.

컬럼은 PySpark 클래스인 pyspark.sql.Column로 관리됩니다. 컬럼 인스턴스는 기존 컬럼을 직접 참조하거나 표현식을 도출할 때마다 생성됩니다. 다음과 같은 방법으로 컬럼을 참조할 수 있습니다:

  • F.col("column_name")
  • F.column("column_name")

컬럼을 참조하는 것은 select를 수행하는 것과 동일하지 않습니다. 왜냐하면 "선택하는" 컬럼은 결과 데이터셋에 나타나길 원하는 컬럼을 하위 집합(및 재정렬)을 참조하기 때문입니다.

목차

스키마 얻기

DataFrame.columns

모든 컬럼 이름을 파이썬 리스트로 반환합니다.

Copied!
1 columns = df.columns # df의 컬럼들을 가져옵니다. 예) ['age', 'name']

DataFrame.dtypes

튜플의 리스트로 모든 열 이름과 그들의 데이터 유형을 반환합니다.

Copied!
1 dtypes = df.dtypes # [('age', 'int'), ('name', 'string')] 데이터프레임의 데이터 유형을 가져옵니다.

선택하기

DataFrame.select(*cols)

원본 DataFrame의 일부 칼럼으로 구성된 새로운 DataFrame을 반환합니다.

예를 들어, 우리는 id, first_name, last_name, phone_number, address, is_active_member라는 6개의 칼럼을 갖는 DataFrame을 가지고 있습니다.

idfirst_namelast_namephone_numberzip_codeis_active_member
1JohnDoe(123) 456-789010014true
2JaneEyre(213) 555-123490007true
..................

중요하게 생각하는 칼럼명만 포함된 DataFrame으로 변환하고자 할 수 있습니다(제공된 것들의 일부). 단 하나의 칼럼인 phone_number만 포함된 표를 원한다고 가정해 봅시다:

Copied!
1 2 # "phone_number"라는 이름의 열만 선택하여 df에 다시 저장합니다. df = df.select("phone_number")
phone_number
(123) 456-7890
(213) 555-1234
...

또는 id, first_name, last_name 만 원하실 수도 있습니다(동일한 작업을 수행하는 최소 3 가지 방법이 있습니다):

  1. 열 이름을 직접 전달:

    Copied!
    1 df = df.select("id", "first_name", "last_name")

    또는 열 인스턴스 전달:

    Copied!
    1 df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
  2. 열 이름의 배열을 전달:

    Copied!
    1 2 select_columns = ["id", "first_name", "last_name"] df = df.select(select_columns)
  3. "풀어 헤친" 배열 전달:

    Copied!
    1 2 3 select_columns = ["id", "first_name", "last_name"] df = df.select(*select_columns) # same as: df = df.select("id", "first_name", "last_name")
    idfirst_namelast_name
    1JohnDoe
    2JaneEyre
    .........

    select_columns 앞의 *는 배열을 풀어 헤치고(unpacks) #1과 기능적으로 동일한 방식으로 작동하게 합니다(주석 참조). 이를 통해 다음과 같은 작업을 수행할 수 있습니다:

    Copied!
    1 2 3 select_columns = ["id", "first_name", "last_name"] return df.select(*select_columns, "phone_number") # same as: df = df.select("id", "first_name", "last_name", "phone_number")
    idfirst_namelast_namephone_number
    1JohnDoe(123) 456-7890
    2JaneEyre(213) 555-1234
    ............

결과물 데이터셋에는 선택한 열만 포함되며, 선택한 열의 순서대로 포함됩니다(원본 열 순서를 유지하지 않음). 이름은 고유하며 대/소문자 구분이 필요하고, 선택하는 데이터셋의 열로 이미 존재해야 합니다.

그 규칙의 예외는 새로운 열을 파생시키고 즉시 선택할 수 있으나, 새로 파생된 열에 alias(이름)를 지정해야 합니다:

string1string2string3string4
firstsecondthirdFourth
onetwothreefour
Copied!
1 2 3 4 5 # "string1"과 "string2" 열을 ":"로 연결하여 새로운 열을 생성합니다. derived_column = F.concat_ws(":", F.col("string1"), F.col("string2")) # 생성된 새로운 열과 "string3" 열을 선택하여 반환합니다. return df.select("string3", derived_column.alias("derived"))
string3derived
thirdfirst
threeone

생성, 업데이트

DataFrame.withColumn(name, column)

Copied!
1 2 # 새로운 데이터프레임(new_df)을 생성하고, 이전 데이터프레임(old_df)의 기존 열에 파생된 열(derived_column)을 추가합니다. new_df = old_df.withColumn("column_name", derived_column)
  • new_df: old_df의 모든 열을 포함하고 있지만, new_column_name이 추가된 결과 데이터프레임입니다.
  • old_df: 새 열을 적용하려는 데이터프레임입니다.
  • column_name: 생성하려는(만약 old_df에 없는 경우) 또는 업데이트하려는(만약 old_df에 이미 있는 경우) 열의 이름입니다.
  • derived_column: 열을 생성하는 표현식으로, column_name 또는 열에 지정한 이름에 있는 모든 행에 적용됩니다.

기존 DataFrame이 주어진 경우, withColumn 메소드를 사용하여 새 열을 생성하거나 기존 열을 새로운 값 또는 수정된 값으로 업데이트할 수 있습니다. 이는 다음 목표에 특히 유용합니다:

  1. 기존 값에 기반한 새 값 도출

    Copied!
    1 df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
    Copied!
    1 df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
  2. 값을 다른 유형으로 캐스팅

    Copied!
    1 2 # `start_timestamp`를 DateType으로 캐스팅하고 새 값을 `start_date`에 저장 df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
  3. 열 업데이트

    Copied!
    1 2 # 열 `string`을 소문자 버전으로 업데이트 df = df.withColumn("string", F.lower(F.col("string")))

이름 변경, 별칭

DataFrame.withColumnRenamed(name, rename)

.withColumnRenamed()를 사용하여 열 이름을 변경합니다:

Copied!
1 2 # "old_name" 열의 이름을 "new_name"으로 변경합니다. df = df.withColumnRenamed("old_name", "new_name")

PySpark가 변환 구문을 최적화하는 방식에 대한 이해를 돕기 위한 다른 방법인 컬럼 이름 변경 작업은 다음과 같습니다:

Copied!
1 2 3 # "df" 라는 데이터프레임에 "new_name" 이라는 새로운 컬럼을 추가합니다. 이 컬럼의 값은 "old_name" 컬럼의 값과 동일합니다. # 그 다음 "old_name" 컬럼을 삭제합니다. df = df.withColumn("new_name", F.col("old_name")).drop("old_name")

그러나 새 열을 withColumn 없이 파생시키고 여전히 이름을 지정해야 하는 몇 가지 경우도 있습니다. 이때 alias(또는 그 메서드 별칭인 name)가 유용합니다. 다음은 몇 가지 사용 예입니다:

Copied!
1 2 3 4 5 df = df.select(derived_column.alias("new_name")) # derived_column을 "new_name"이라는 새로운 이름으로 선택합니다. df = df.select(derived_column.name("new_name")) # .alias("new_name")와 동일합니다. derived_column을 "new_name"이라는 새로운 이름으로 선택합니다. df = df.groupBy("group") \ .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count")) # "group" 열을 기준으로 그룹화하고, 각 그룹의 "number" 합계("sum_of_numbers")와 각 그룹의 행 수("count")를 계산합니다.

우리는 한 번에 여러 컬럼의 이름을 변경하는 것을 활용할 수도 있습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 # "renames"라는 이름의 딕셔너리를 만듭니다. 이 딕셔너리는 원래의 열 이름(column, data)과 변경할 열 이름(column_renamed, data_renamed)을 매핑합니다. renames = { "column": "column_renamed", "data": "data_renamed", } # "renames" 딕셔너리의 각 항목에 대해 반복문을 실행합니다. 각 항목에서 원래의 열 이름(colname)과 변경할 이름(rename)을 가져옵니다. for colname, rename in renames.items(): # 데이터프레임 df에서 'withColumnRenamed' 메소드를 사용하여 열 이름을 변경합니다. 원래 열 이름(colname)을 새 열 이름(rename)으로 변경합니다. df = df.withColumnRenamed(colname, rename)

삭제

DataFrame.drop(*cols)

원래 DataFrame의 열의 일부를 포함하는 새로운 DataFrame을 반환하고, 지정된 열들을 삭제합니다. (스키마가 주어진 열 이름을 포함하지 않으면 실패합니다.)

열을 삭제하는 두 가지 방법이 있습니다: 직접적인 방법과 간접적인 방법입니다. 간접적인 방법은 select를 사용하는 것으로, 보존하려는 열의 부분 집합을 선택합니다. 직접적인 방법은 drop을 사용하는 것으로, 삭제하려는 열의 부분 집합을 제공합니다. 두 방법 모두 비슷한 사용법을 가지고 있지만, 여기서는 순서가 중요하지 않습니다. 몇 가지 예시를 보여드리겠습니다:

idfirst_namelast_namephone_numberzip_codeis_active_member
1JohnDoe(123) 456-789010014true
2JaneEyre(213) 555-123490007true
..................

한 열만, phone_number를 삭제하려는 경우를 가정해 봅시다:

Copied!
1 2 # "phone_number"라는 이름의 컬럼을 데이터프레임에서 삭제합니다. df = df.drop("phone_number")
idfirst_namelast_namezip_codeis_active_member
1JohnDoe10014true
2JaneEyre90007true
...............

또는 id, first_name, last_name을 삭제하려는 경우가 있을 수 있습니다(동일한 작업을 수행하는 방법은 적어도 3가지입니다):

  1. 직접 열 이름을 전달:

    Copied!
    1 df = df.drop("id", "first_name", "last_name")

    또는

    Copied!
    1 df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
  2. 배열을 전달:

    Copied!
    1 2 drop_columns = ["id", "first_name", "last_name"] df = df.drop(drop_columns)
  3. "언팩"된 배열을 전달:

    Copied!
    1 2 3 drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns) # same as: df = df.drop("id", "first_name", "last_name")
    phone_numberzip_codeis_active_member
    (123) 456-789010014true
    (213) 555-123490007true
    .........

    *drop_columns 앞에서 배열을 언팩하여 #1과 기능적으로 동일하게 작동하게 합니다(주석 참조). 이를 통해 다음을 수행할 수 있습니다:

    Copied!
    1 2 3 drop_columns = ["id", "first_name", "last_name"] df = df.drop(*drop_columns, "phone_number") # same as: df = df.drop("id", "first_name", "last_name", "phone_number")
    zip_codeis_active_member
    10014true
    90007true
    ......

캐스트

Column.cast(type)

다음은 모든 데이터 유형입니다: NullType, StringType, BinaryType, BooleanType, DateType, TimestampType, DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType, MapType, StructType, StructField

일반적으로, 대부분의 데이터 유형은 열에 있는 cast 메소드를 사용하여 다른 것으로 변환할 수 있습니다:

Copied!
1 2 3 4 from pyspark.sql.types import StringType df.select(df.age.cast(StringType()).alias("age")) # df.age가 IntegerType()라고 가정합시다 # 위 코드는 df.age를 StringType()으로 형변환하고 그 결과를 "age"라는 새로운 별명으로 선택합니다.

또는

Copied!
1 2 df.select(df.age.cast("string").alias("age")) # StringType()을 사용하는 것과 실질적으로 같습니다.
나이
"2"
"5"

캐스팅은 기본적으로 새로 파생된 열을 생성하며, 이를 통해 select, withColumn, filter 등을 직접 수행할 수 있습니다. PySpark에서도 "다운캐스팅" 및 "업캐스팅" 개념이 적용되므로 이전 데이터 유형에 저장된 더 세밀한 정보를 잃거나 쓰레기 정보를 얻을 수 있습니다.

When, otherwise

F.when(조건, 값).otherwise(값2)

파라미터:

  • condition - 불리언 열 표현식
  • value - 리터럴 값 또는 열 표현식

value 또는 value2 파라미터와 동일한 열 표현식으로 평가됩니다. Column.otherwise()가 호출되지 않으면, 일치하지 않는 조건에 대해 None (null) 열 표현식이 반환됩니다.

when, otherwise 연산자는 새 열 값을 계산하는 if-else 문에 대한 유사성을 제공합니다. 기본 사용법은 다음과 같습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 # CASE WHEN (age >= 21) THEN true ELSE false END # 위의 SQL 문은 아래의 파이썬 코드와 동일합니다. # 나이가 21세 이상이면 True를 반환하고, 그렇지 않으면 False를 반환합니다. at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False) # CASE WHEN (last_name != "") THEN last_name ELSE null # 위의 SQL 문은 아래의 파이썬 코드와 동일합니다. # last_name 칼럼의 값이 빈 문자열이 아니면 해당 값을 반환하고, 빈 문자열이면 Null을 반환합니다. last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None) # at_least_21과 last_name을 alias로 사용하여 새로운 DataFrame을 생성합니다. df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))

when 문을 연결하여 필요한 만큼 사용할 수 있습니다:

Copied!
1 2 3 4 # "age"라는 열의 값이 35 이상이면 "A"를 반환하고, # 그렇지 않고 "age"가 21 이상이면 "B"를 반환합니다. # "age"가 21 미만인 경우 "C"를 반환합니다. switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")

이 평가는 열에 할당되거나 필터에서 사용할 수 있습니다:

Copied!
1 2 df = df.withColumn("switch", switch) # switch=A, B, 또는 C df = df.where(~F.isnull(last_name)) # last_name이 널값이 아닌 행만 필터링 (빈 문자열이 널값으로 평가된 후)