Programming/BackEnd

Springboot Kafka Json Value 매핑하기

Railly Linker 2024. 10. 28. 10:45

- 이번 포스팅에서는 Springboot 에서 Kafka 를 사용시 값으로 JSON 형태의 데이터를 전달하도록 할 것입니다.

이전글에서는 StringSerializer, StringDeserializer 를 사용하여 String 형식으로 데이터 값을 보내고 가져왔는데,

이렇게 설정시, 구조를 지닌 데이터를 입출력하기 위하여,

 

"{\"test\" : \"testString\", \"testInt\" : 2}"

 

이렇게 String 을 보내면, Springboot 의 Consumer 에서 에러가 발생합니다.

정식으로 Data 형태를 정해서 통신하도록 하겠습니다.

 

- build.gradle

    // (jackson)
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.0")

데이터 매핑을 위하여 jackson 라이브러리를 설정해주세요.(코드 내에서 사용하지는 않고, 라이브러리 주입시 자동으로 처리됩니다.)

 

- KafkaConfig.kt

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer


// [Kafka Consumer 설정]
// kafka_consumers 폴더 안의 Listeners 클래스 파일과 연계하여 사용하세요.
@EnableKafka
@Configuration
class Kafka1MainConfig {
    companion object {
        // !!!application.yml 의 kafka-cluster 안에 작성된 이름 할당하기!!!
        const val KAFKA_CONFIG_NAME: String = "kafka1-main"

        const val CONSUMER_BEAN_NAME: String =
            "${KAFKA_CONFIG_NAME}_ConsumerFactory"

        const val PRODUCER_BEAN_NAME: String =
            "${KAFKA_CONFIG_NAME}_ProducerFactory"
    }

    @Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.uri}")
    private lateinit var uri: String

    @Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.consumer.username}")
    private lateinit var userName: String

    @Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.consumer.password}")
    private lateinit var password: String

    @Bean(CONSUMER_BEAN_NAME)
    fun kafkaConsumer(): ConcurrentKafkaListenerContainerFactory<String, Any> {
        val config: MutableMap<String, Any> = HashMap()
        // Kafka 브로커에 연결하기 위한 주소를 설정합니다. 여러 개의 브로커가 있을 경우, 콤마로 구분하여 나열합니다.
        config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = uri
        config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        config[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
        config[JsonDeserializer.TRUSTED_PACKAGES] = "*"

        // SASL/SCRAM 인증 설정 추가
        config["security.protocol"] = "SASL_PLAINTEXT"
        config["sasl.mechanism"] = "PLAIN"
        config["sasl.jaas.config"] =
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$userName\" password=\"$password\";"

        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(config)

        return factory
    }

    @Bean(PRODUCER_BEAN_NAME)
    fun kafkaProducer(): KafkaTemplate<String, Any> {
        val config: MutableMap<String, Any> = HashMap()
        // Kafka 브로커에 연결하기 위한 주소를 설정합니다. 여러 개의 브로커가 있을 경우, 콤마로 구분하여 나열합니다.
        config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = uri
        config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java

        // SASL/SCRAM 인증 설정 추가
        config["security.protocol"] = "SASL_PLAINTEXT"
        config["sasl.mechanism"] = "PLAIN"
        config["sasl.jaas.config"] =
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$userName\" password=\"$password\";"

        return KafkaTemplate(DefaultKafkaProducerFactory(config))
    }
}

 

kafka 설정 파일은 위와 같이 작성하였습니다.

이전 설정과 다른점으론, consumer 설정에서 KEY_DESERIALIZER_CLASS_CONFIG 의 아래쪽 설정 부분이 변경이 있습니다.

제가 설정 후 에러가 났던 부분은 TRUSTED_PACKAGES 를 설정하지 않았을 때였는데,

위 설정은 클래스 매핑시 신뢰할 수 있는 패키지 경로를 적는 것으로, 만약 이를 설정하지 않는다면 클래스 매핑시 신뢰할 수 없다는 에러가 나올 것입니다.

 

Producer 설정에서는 단순히 VALUE 에 대한 직렬화 설정을 JsonSerializer 로 바꿔준 것 밖에 없습니다.

 

- 테스트

위와 같이 설정하고,

    @KafkaListener(
        topics = ["testTopic1"],
        groupId = "group_1",
        containerFactory = Kafka1MainConfig.CONSUMER_BEAN_NAME
    )
    fun testTopic1Group0Listener(data: ConsumerRecord<String, TestTopic1Group0ListenerInputVo>) {
        classLogger.info(">> testTopic1 group_1 : $data")
    }

    data class TestTopic1Group0ListenerInputVo(
        val test : String,
        val test1 : Int
    )

 

Consumer 리스너를 위와 같이 하고,

 

    fun sendMessageToTestTopic1(message: Kafka1MainConsumer.TestTopic1Group0ListenerInputVo) {
        // kafkaProducer1 에 토픽 메세지 발행
        kafka1MainProducerTemplate.send("testTopic1", message)
    }

 

Producer 로 위와 같이, Consumer 에서 받는 data class 를 value 로 넣어주면, 

 

[ls] [2024_10_28_T_10_44_50_025_KST] [INFO ] [>> testTopic1 group_1 : ConsumerRecord(topic = testTopic1, partition = 0, leaderEpoch = 10, offset = 26, CreateTime = 1730079889855, serialized key size = -1, serialized value size = 32, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestTopic1Group0ListenerInputVo(test=testMessage, test1=1))] [le]

 

이렇게 Springboot 코드 내 자동 매핑으로 값을 받아올 수 있습니다.