본문 바로가기
programming/Big data

Kafka Spark Streaming

by lightlee 2020. 5. 20.

Spark Streaming를 통해 실시간 분석 애플리케이션을 만들 수 있는데, 스트리밍 데이터 소스 중 하나로 Kafka를 사용할 수 있다. 이 포스트에서는 Kafka가 필요한 이유와 Spark Streaming과 연동하는 방법을 알아보고자 한다.

Kafka 등장 배경

데이터 파이프라인의 복잡성을 줄이고 원격 통신을 쉽게 할 수 있게 만들어준다. 그림으로 보자면,

아래와 같이 기존에 복잡했던 파이프라인을

 

이렇게 단순화 시켰다고 생각하면 된다. 데이터를 요청하는 Consumer와 메시지를 보내주는 Producer로 나누어진 구조에서 매개자 클러스터 역할을 하는 것으로 보인다.

Kafka 컴포넌트

  • Broker : 위에서 봤던 그림에서 메세지를 양 쪽에서 잘 주고 받을 수 있도록 하는 책임이 있다. 보통은 이 Broker들을 여러 개 두어 클러스터를 이루게 한다. 따라서 zookeeper로 서로의 정보를 조율하는 과정이 필요하다. (새로운 Broker 등장, 특정 Broker의 실패 등에 대한 보고)
  • Message : byte array, JSON Avro 등등 어떤 format이든지 가능하다.
  • Topic : Message들은 각각 어느 Topic에 속하게 되어있다. Topic 이라는 기준 안에서 카테고리화되고 publish 된다. 몇가지 partition으로 나누어진다.
  • Producer: Message를 하나 또는 여러개의 topic에 publish하는 프로세스들이다. Kafka에서 data stream이 나오는 근원이다.
  • Consumer: 하나 또는 여러개의 Topic을 subscribe함으로써 데이터를 topic으로부터 읽어오는 프로세스들이다.
  • Paritions: 각 Broker들은 몇가지 partition으로 나누어져 있고, 각 partition들은 topic에 대한 리더일 수도 있고 복제본일 수도 있다. 이는 확장성/중복성의 효과를 준다.

Kafka 아키텍쳐

Producer, Broker, Consumer 순서로 동작 과정을 알아보겠다. Producer, Broker, Consumer 모두 여러 개로 이루어진 그룹을 이루고 있다.

  • Producer: producer들은 message를 topic으로 보내는데, topic 별로 어느 parition에 message를 보낼 것인지 또한 결정한다.
  • Broker: producer로부터 message를 받으면 offset를 할당한다. consumer부터 fetch 요청이 올 때마다 반응한다.
  • Consumer: topic들을 subscribe하면서 message들이 생성된 순서대로 읽어내려간다. message들의 offset을 따라가면서 이미 읽혀진 message와 아닌 message를 구분 할 수 있다. 같은 키를 가진 message들은 기본적으로 같은 consumer에게 간다. 즉, 각 partition들은 하나의 consumer에 의해서만 consume 되어야한다.

아래는 topic이 여러 개의 partition으로 나뉘고, 할당 가능한 Broker가 각 topic의 partition을 맡는다는 것을 알기 좋은 그림이다.

Kafka 띄우기

  • zookeeper start
    zookeeper-server-start.sh kafka/config/zookeeper.properties
  • kafka server start
    kafka-server-start.sh kafka/config/server.properties
  • jps로 QuroumPeerMain과 Kafka가 있는지 확인

  • topic 생성 (이때, 프로그램에서 사용되는 topic 명과 동일할 것) 
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic mytest

 

여기부터는 애플리케이션의 성격에 따라 달라지나, 데모 앱이라는 것을 참고해서 보며 되겠다. 

  • producer start
    kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
    hi hello
    how are you ....

콘솔 명령어를 쓰는 것은 실제 서비스로 사용하기는 부적합하나, 테스트 용도에서는 충분하다.

  • consumer start
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning

또는 여기서 부터는 Spark Streaming 애플리케이션을 통해 producer에서 생성된 message들을 확인 할 수 있다. 아래에서 확인해보자

Spark Streaming WorkFlow

Spark Streaming 은 live input data stream을 받아야 하고 그 데이터들을 작은 배치단위로 나눈다. 그 작은 배치 단위의 데이터들이 spark engine을 거쳐 처리되는 것이다.

간단한 스트리밍 예제를 살펴보자.

object kafkar {
    def main(args: Array[String]){
        val conf = new SparkConf().setMaster("local").setAppName("kafkaex")
        val ssc = new StreamingContext(conf, Seconds(2))

        // topic name is 'mytest'
        val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group",Map("mytest"->5))

        kafkaStream.print()
        ssc.start
        ssc.awaitTermination()

    }

}

 

출처: www.youtube.com/watch?v=2z6scTH_C4c