Springboot Kafka Json Value 매핑하기

2024. 10. 28. 10:45·Programming/BackEnd

- 이번 포스팅에서는 Springboot 에서 Kafka 를 사용시 값으로 JSON 형태의 데이터를 전달하도록 할 것입니다.

이전글에서는 StringSerializer, StringDeserializer 를 사용하여 String 형식으로 데이터 값을 보내고 가져왔는데,

이렇게 설정시, 구조를 지닌 데이터를 입출력하기 위하여,

 

"{\"test\" : \"testString\", \"testInt\" : 2}"

 

이렇게 String 을 보내면, Springboot 의 Consumer 에서 에러가 발생합니다.

정식으로 Data 형태를 정해서 통신하도록 하겠습니다.

 

- build.gradle

    // (jackson)
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.0")

데이터 매핑을 위하여 jackson 라이브러리를 설정해주세요.(코드 내에서 사용하지는 않고, 라이브러리 주입시 자동으로 처리됩니다.)

 

- KafkaConfig.kt

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer


// [Kafka Consumer 설정]
// kafka_consumers 폴더 안의 Listeners 클래스 파일과 연계하여 사용하세요.
@EnableKafka
@Configuration
class Kafka1MainConfig {
    companion object {
        // !!!application.yml 의 kafka-cluster 안에 작성된 이름 할당하기!!!
        const val KAFKA_CONFIG_NAME: String = "kafka1-main"

        const val CONSUMER_BEAN_NAME: String =
            "${KAFKA_CONFIG_NAME}_ConsumerFactory"

        const val PRODUCER_BEAN_NAME: String =
            "${KAFKA_CONFIG_NAME}_ProducerFactory"
    }

    @Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.uri}")
    private lateinit var uri: String

    @Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.consumer.username}")
    private lateinit var userName: String

    @Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.consumer.password}")
    private lateinit var password: String

    @Bean(CONSUMER_BEAN_NAME)
    fun kafkaConsumer(): ConcurrentKafkaListenerContainerFactory<String, Any> {
        val config: MutableMap<String, Any> = HashMap()
        // Kafka 브로커에 연결하기 위한 주소를 설정합니다. 여러 개의 브로커가 있을 경우, 콤마로 구분하여 나열합니다.
        config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = uri
        config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        config[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
        config[JsonDeserializer.TRUSTED_PACKAGES] = "*"

        // SASL/SCRAM 인증 설정 추가
        config["security.protocol"] = "SASL_PLAINTEXT"
        config["sasl.mechanism"] = "PLAIN"
        config["sasl.jaas.config"] =
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$userName\" password=\"$password\";"

        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(config)

        return factory
    }

    @Bean(PRODUCER_BEAN_NAME)
    fun kafkaProducer(): KafkaTemplate<String, Any> {
        val config: MutableMap<String, Any> = HashMap()
        // Kafka 브로커에 연결하기 위한 주소를 설정합니다. 여러 개의 브로커가 있을 경우, 콤마로 구분하여 나열합니다.
        config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = uri
        config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java

        // SASL/SCRAM 인증 설정 추가
        config["security.protocol"] = "SASL_PLAINTEXT"
        config["sasl.mechanism"] = "PLAIN"
        config["sasl.jaas.config"] =
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$userName\" password=\"$password\";"

        return KafkaTemplate(DefaultKafkaProducerFactory(config))
    }
}

 

kafka 설정 파일은 위와 같이 작성하였습니다.

이전 설정과 다른점으론, consumer 설정에서 KEY_DESERIALIZER_CLASS_CONFIG 의 아래쪽 설정 부분이 변경이 있습니다.

제가 설정 후 에러가 났던 부분은 TRUSTED_PACKAGES 를 설정하지 않았을 때였는데,

위 설정은 클래스 매핑시 신뢰할 수 있는 패키지 경로를 적는 것으로, 만약 이를 설정하지 않는다면 클래스 매핑시 신뢰할 수 없다는 에러가 나올 것입니다.

 

Producer 설정에서는 단순히 VALUE 에 대한 직렬화 설정을 JsonSerializer 로 바꿔준 것 밖에 없습니다.

 

- 테스트

위와 같이 설정하고,

    @KafkaListener(
        topics = ["testTopic1"],
        groupId = "group_1",
        containerFactory = Kafka1MainConfig.CONSUMER_BEAN_NAME
    )
    fun testTopic1Group0Listener(data: ConsumerRecord<String, TestTopic1Group0ListenerInputVo>) {
        classLogger.info(">> testTopic1 group_1 : $data")
    }

    data class TestTopic1Group0ListenerInputVo(
        val test : String,
        val test1 : Int
    )

 

Consumer 리스너를 위와 같이 하고,

 

    fun sendMessageToTestTopic1(message: Kafka1MainConsumer.TestTopic1Group0ListenerInputVo) {
        // kafkaProducer1 에 토픽 메세지 발행
        kafka1MainProducerTemplate.send("testTopic1", message)
    }

 

Producer 로 위와 같이, Consumer 에서 받는 data class 를 value 로 넣어주면, 

 

[ls] [2024_10_28_T_10_44_50_025_KST] [INFO ] [>> testTopic1 group_1 : ConsumerRecord(topic = testTopic1, partition = 0, leaderEpoch = 10, offset = 26, CreateTime = 1730079889855, serialized key size = -1, serialized value size = 32, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestTopic1Group0ListenerInputVo(test=testMessage, test1=1))] [le]

 

이렇게 Springboot 코드 내 자동 매핑으로 값을 받아올 수 있습니다.

저작자표시 비영리 변경금지 (새창열림)

'Programming > BackEnd' 카테고리의 다른 글

서버 부하 테스트 - Locust 사용 (FastAPI, Springboot 비디오 스트리밍 성능 비교), FastAPI Media Streaming 코드 수록  (0) 2025.04.23
분산 소켓 서버 설명 및 구현(Springboot, SockJS, STOMP, Kafka, Redis, Javascript)  (0) 2025.03.28
Springboot 서버 비동기 처리 - Redis 를 이용한 분산락 설명 및 구현  (0) 2024.10.21
Springboot kotlin JPA QueryDSL 설정 및 테스트  (11) 2024.10.16
Springboot logback 적용  (0) 2024.10.16
'Programming/BackEnd' 카테고리의 다른 글
  • 서버 부하 테스트 - Locust 사용 (FastAPI, Springboot 비디오 스트리밍 성능 비교), FastAPI Media Streaming 코드 수록
  • 분산 소켓 서버 설명 및 구현(Springboot, SockJS, STOMP, Kafka, Redis, Javascript)
  • Springboot 서버 비동기 처리 - Redis 를 이용한 분산락 설명 및 구현
  • Springboot kotlin JPA QueryDSL 설정 및 테스트
Railly Linker
Railly Linker
IT 지식 정리 및 공유 블로그
Railly`s IT 정리노트IT 지식 정리 및 공유 블로그
  • Railly Linker
    Railly`s IT 정리노트
    Railly Linker
  • 전체
    오늘
    어제
  • 공지사항

    • 분류 전체보기 (116)
      • Programming (34)
        • BackEnd (17)
        • FrontEnd (4)
        • DBMS (1)
        • ETC (12)
      • Study (81)
        • Computer Science (21)
        • Data Science (22)
        • Computer Vision (20)
        • NLP (15)
        • ETC (3)
      • Error Note (1)
      • ETC (0)
  • 인기 글

  • 최근 글

  • 최근 댓글

  • 태그

    kotlin linkedlist
    docker compose
    localhost
    지리 정보
    Kotlin
    jvm 메모리 누수
    docker 배포
    springboot 배포
    논리적 삭제
    kotlin mutablelist
    list
    kotlin arraylist
    단축키
    unique
    데이터베이스 제약
    network_mode: "host"
    MacOS
  • 링크

    • RaillyLinker Github
  • hELLO· Designed By정상우.v4.10.0
Railly Linker
Springboot Kafka Json Value 매핑하기
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.