Broadcast Join

출처 : https://www.oreilly.com/api/v2/epubs/9781491943199/files/assets/hpsp_0405.png

Big data processing에서 가장 중요한 양대 operation이 있다면 바로 group by와 join일 것이다. 그 중 join은 특히 실무를 하면서 가장 많이 나를 곤란하게 하는 애증의 상대라고 할 수 있다. 그것은 join이 가장 성능적으로 가장 걸림돌이 많이 되고, 또한 optimize하는 것도 쉽지 않기 때문이다.

Join에 여러 테크닉이 있겠지만, 그 중에서도 가장 깔끔하고 성능도 압도적으로 훌륭한 Broadcast Join에 대해서 간단히 설명하고자 한다.

Broadcast Join이 가능한 경우는 join하려는 두 data중 하나의 크기가 executor의 메모리에 들어갈 만큼 작은 경우이다. 즉, 그렇게 작은 data를 아예 executor memory에 hash map형태로 저장한 다음에 다른 data쪽은 shuffle할 필요 없이 local scan으로 작업을 끝내는 형태이다.

빅데이터 처리에서 가장 시간이 많이 들게 만드는 요소가 data shuffling이기 때문에 셔플을 하지 않을 수 있다면 무조건 혜자다 🙂

따라서 데이터를 잘 분석해서 Broadcast Join을 사용할 수 있다면 반드시 사용해야 함을 잊지 말자!

Spark DataFrame을 JSON으로 변환시 null을 생략하는 이슈 해결법

Spark에서 DataFrame을 JSON으로 표현하고 싶을 때가 꽤나 자주 있습니다. 그럴 때 생각보다 은근 짜증나는 경우가, 값이 null이기 때문에 JSON에서 column자체가 생략되는 경우입니다. 예를들면,

spark.sql("""
select 
  cast(null as string) as null_field,
  1 as num
""").show(false)

/*
+----------+---+
|null_field|num|
+----------+---+
|null      |1  |
+----------+---+
*/

위와 같은 DataFrame이 있다고 가정해 봅시다. 이를 JSON으로 나타내면,

spark.sql("""
select 
  cast(null as string) as null_field,
  1 as num
""").toJSON.collect().foreach(println(_))

// {"num":1}

위와 같이 null_field는 생략된 채 결과가 나오게 돼서 나중에 다시 Spark에서 읽어들일 때 schema를 제대로 보존할 수 없게 됩니다.

참 불편했었는데, Spark 3.0부터 새로운 configuration이 등장했다고 해서 써봤습니다.

spark.conf.set("spark.sql.jsonGenerator.ignoreNullFields", false) // <- 이 config를 false로 설정해주면 null field도 포함시키게 됩니다.

spark.sql("""
select 
  cast(null as string) as null_field,
  1 as num
""").toJSON.collect().foreach(println(_))

// {"null_field":null,"num":1}

정말 아름답네요~!

CPU-bound vs IO-bound

제 개인적인 특징인지는 모르겠지만, 개발하다보면 자연스럽게 performance를 어떻게든 최대로 뽑아내고 싶게 됩니다.. ㅎㅎ 그러다보면 자연스럽게 알게되고 사용하게 되는 것이 multithreading 입니다.

Multithreading이란 한 process안에서 여러개의 thread를 만들어서 task를 진행하는 개념이죠. 멀티코어의 세상에서 꼭 필요한 parallel computing의 기본 방법이라고 할 수 있습니다.

하지만 multithreading이 만병통치약은 아닙니다. 저도 이번에 일하면서 알게 되었는데, 해당 task가 CPU-bound인지, IO-bound인지가 매우 중요합니다.

  • CPU-bound : Task가 대부분 cpu power를 사용해야 하는 경우를 말합니다. 예를들면 machine learning model의 경우 대부분 수학 계산을 해야하기 때문에 cpu를 반드시 사용해야합니다.
  • IO-bound : Task가 전체 걸리는 시간에 비해 cpu idle time이 많은 경우를 말합니다. 다시 말해 cpu가 일을 하고 싶어도 반드시 기다려야만 하는 상황을 말합니다. 예를들면 http request를 보내고 response를 기다리는 경우입니다.

어떻게 보면 CPU-bound는 우리가 책을 읽거나 수업을 듣는 등 active하게 상호작용해야하는 상황을 말한다고 할 수 있고, IO-bound는 햄버거 가게에서 햄버거를 주문하고 햄버거가 나올때까지는 할 일이 없는 뭐 그런 비유를 할 수 있겠습니다.

그렇기 때문에 IO-bound일 경우에는 multithreading을 사용하는 것이 비약적인 성능 향상을 가져오겠지만, CPU-bound일 경우에는 available한 core 수까지만 효과를 보고(이마저도 해당 task가 얼마나 cpu를 많이 사용하는지에 따라 다릅니다.) 그 이상은 성능 향상을 기대하기 어렵습니다.

그래서 병렬처리로 성능 향상할 때 task가 정확히 어떤 일을 하고, 어떤 부분에서 IO-bound가 발생하는지 파악하는 것이 중요합니다.