Spark DataFrame 크기 계산하기

Broadcast join이 적합한지 궁금할 때, 아니면 executor resource를 얼마나 써야하는지 궁금할 때, 실제 DataFrame의 크기를 계산해 보는것은 매우 중요하다.

물론 아쉽게도 정확하게 그 DataFrame이 어떤 크기를 갖는지는 disk에 저장을 해봐야 알겠지만, 다행히도 Spark에서 좋은 API를 제공한다. 아래의 코드는 Scala Spark를 가정했다.

import org.apache.spark.util.SizeEstimator

SizeEstimator.estimate(df)

// res4_1: Long = 27739224L

위와 같이 SizeEstimator를 사용하면 해당 df가 어느 정도 크기를 갖는지 알려준다. 위에서 df의 크기는 약 27.7MB이다.

위의 코드를 production에서 돌릴 수는 없기 때문에, prototyping을 할 때 써보기를 추천한다.

Broadcast Join

출처 : https://www.oreilly.com/api/v2/epubs/9781491943199/files/assets/hpsp_0405.png

Big data processing에서 가장 중요한 양대 operation이 있다면 바로 group by와 join일 것이다. 그 중 join은 특히 실무를 하면서 가장 많이 나를 곤란하게 하는 애증의 상대라고 할 수 있다. 그것은 join이 가장 성능적으로 가장 걸림돌이 많이 되고, 또한 optimize하는 것도 쉽지 않기 때문이다.

Join에 여러 테크닉이 있겠지만, 그 중에서도 가장 깔끔하고 성능도 압도적으로 훌륭한 Broadcast Join에 대해서 간단히 설명하고자 한다.

Broadcast Join이 가능한 경우는 join하려는 두 data중 하나의 크기가 executor의 메모리에 들어갈 만큼 작은 경우이다. 즉, 그렇게 작은 data를 아예 executor memory에 hash map형태로 저장한 다음에 다른 data쪽은 shuffle할 필요 없이 local scan으로 작업을 끝내는 형태이다.

빅데이터 처리에서 가장 시간이 많이 들게 만드는 요소가 data shuffling이기 때문에 셔플을 하지 않을 수 있다면 무조건 혜자다 🙂

따라서 데이터를 잘 분석해서 Broadcast Join을 사용할 수 있다면 반드시 사용해야 함을 잊지 말자!

Spark DataFrame을 JSON으로 변환시 null을 생략하는 이슈 해결법

Spark에서 DataFrame을 JSON으로 표현하고 싶을 때가 꽤나 자주 있습니다. 그럴 때 생각보다 은근 짜증나는 경우가, 값이 null이기 때문에 JSON에서 column자체가 생략되는 경우입니다. 예를들면,

spark.sql("""
select 
  cast(null as string) as null_field,
  1 as num
""").show(false)

/*
+----------+---+
|null_field|num|
+----------+---+
|null      |1  |
+----------+---+
*/

위와 같은 DataFrame이 있다고 가정해 봅시다. 이를 JSON으로 나타내면,

spark.sql("""
select 
  cast(null as string) as null_field,
  1 as num
""").toJSON.collect().foreach(println(_))

// {"num":1}

위와 같이 null_field는 생략된 채 결과가 나오게 돼서 나중에 다시 Spark에서 읽어들일 때 schema를 제대로 보존할 수 없게 됩니다.

참 불편했었는데, Spark 3.0부터 새로운 configuration이 등장했다고 해서 써봤습니다.

spark.conf.set("spark.sql.jsonGenerator.ignoreNullFields", false) // <- 이 config를 false로 설정해주면 null field도 포함시키게 됩니다.

spark.sql("""
select 
  cast(null as string) as null_field,
  1 as num
""").toJSON.collect().foreach(println(_))

// {"null_field":null,"num":1}

정말 아름답네요~!