본문 바로가기
programming/Big data

Spark와 직렬화 - transient lazy val 패턴의 도움받기

by lightlee 2020. 5. 27.

INTRO

Spark에서 데이터 처리 로직을 main에 다 드러내니 너무 복잡해서, 복잡한 코드는 별도의 클래스에 캡슐화를 하고 싶었다. spark 데이터처리의 특성상 한 dataframe에 연속된 transformation을 적용하는 경우가 많은데, 이 로직을 다른 클래스에서 표현하고 싶은 것이기 때문에 spark Session을 받든 dataframe을 받든 매개변수를 받아야했다.

매개변수를 받아야 하는 경우 class를 만들어야한다. (object로 만들면 싱글톤이기 때문에 문제가 없지만)

매개변수 받고 싶으면 무조건 class인지는 더 검토를 해봐야 한다.

class를 만든다는 것은 후에 객체를 생성해야한다는 뜻이고, 이 객체가 직렬화 될 수 있어야함을 의미한다. driver 프로세스로부터 각 executor 프로세스에게로 전달이 되어야하기 때문이다. Spark에서 객체를 사용할 때 어떤 것을 고려해야하는지 정리해보았다.

Spark 와 직렬화

스파크 스트리밍에서 클래스를 사용하면서 드는 걱정은 두가지이다.

  1. 객체 생성이 너무 많이 된다면?
  2. 객체 안에 직렬화 될 수 없는 필드가 있다면?

이 두 걱정에 대한 방안으로 transient lazy val pattern을 사용할 수 있다. 요약하자면 직렬화하지말고 다시 계산해! 단, 한번만! 라는 뜻이다.

아래와 같은 클래스가 있다고 해보자. (kafka로부터 읽는 스트림을 만드는 클래스다.)

//이 클래스 객체는 driver 프로세스에서 직렬화된 후 executor 프로세스에서 역 직렬화 될 것이다. 
class Example(spark: SparkSession) extends Serializable {
    @transient lazy val dataStreamReader : DataStreamReader = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", topic)

    def createDataStream : DataFrame = {
    dataStreamReader.load()
  }
}

이 Example클래스가 쓰였을 땐 다음과 같은 코드일 것이다.

val spark = new sql.SparkSession.Builder().appName("example").getOrCreate()
val example = new Example(spark)

val dataStream = example.createDataStream
// 이 때 처음 access 되었으므로 계산되어 dataStreamReader에 저장되어있을 것이다. 
// 다시 dataStreamReader를 접근해도 다시 계산되지 않음

이때, DataStreamReader 클래스는 직렬화가 되지 않는 클래스이므로 직렬화 대상에서 제외한다는 의미로 @transient 어노테이션을 붙인다.

lazy가 붙었기 때문에 객체 생성이 된 후 처음으로 사용할 때만 한번 계산이 되고 다시 계산되지 않는다.

참고:

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

'programming > Big data' 카테고리의 다른 글

Kafka Spark Streaming  (0) 2020.05.20