- 이번 포스팅에서는 Springboot 에서 Kafka 를 사용시 값으로 JSON 형태의 데이터를 전달하도록 할 것입니다.
이전글에서는 StringSerializer, StringDeserializer 를 사용하여 String 형식으로 데이터 값을 보내고 가져왔는데,
이렇게 설정시, 구조를 지닌 데이터를 입출력하기 위하여,
"{\"test\" : \"testString\", \"testInt\" : 2}"
이렇게 String 을 보내면, Springboot 의 Consumer 에서 에러가 발생합니다.
정식으로 Data 형태를 정해서 통신하도록 하겠습니다.
- build.gradle
// (jackson)
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.0")
데이터 매핑을 위하여 jackson 라이브러리를 설정해주세요.(코드 내에서 사용하지는 않고, 라이브러리 주입시 자동으로 처리됩니다.)
- KafkaConfig.kt
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
// [Kafka Consumer 설정]
// kafka_consumers 폴더 안의 Listeners 클래스 파일과 연계하여 사용하세요.
@EnableKafka
@Configuration
class Kafka1MainConfig {
companion object {
// !!!application.yml 의 kafka-cluster 안에 작성된 이름 할당하기!!!
const val KAFKA_CONFIG_NAME: String = "kafka1-main"
const val CONSUMER_BEAN_NAME: String =
"${KAFKA_CONFIG_NAME}_ConsumerFactory"
const val PRODUCER_BEAN_NAME: String =
"${KAFKA_CONFIG_NAME}_ProducerFactory"
}
@Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.uri}")
private lateinit var uri: String
@Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.consumer.username}")
private lateinit var userName: String
@Value("\${kafka-cluster.$KAFKA_CONFIG_NAME.consumer.password}")
private lateinit var password: String
@Bean(CONSUMER_BEAN_NAME)
fun kafkaConsumer(): ConcurrentKafkaListenerContainerFactory<String, Any> {
val config: MutableMap<String, Any> = HashMap()
// Kafka 브로커에 연결하기 위한 주소를 설정합니다. 여러 개의 브로커가 있을 경우, 콤마로 구분하여 나열합니다.
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = uri
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
config[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
config[JsonDeserializer.TRUSTED_PACKAGES] = "*"
// SASL/SCRAM 인증 설정 추가
config["security.protocol"] = "SASL_PLAINTEXT"
config["sasl.mechanism"] = "PLAIN"
config["sasl.jaas.config"] =
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$userName\" password=\"$password\";"
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = DefaultKafkaConsumerFactory(config)
return factory
}
@Bean(PRODUCER_BEAN_NAME)
fun kafkaProducer(): KafkaTemplate<String, Any> {
val config: MutableMap<String, Any> = HashMap()
// Kafka 브로커에 연결하기 위한 주소를 설정합니다. 여러 개의 브로커가 있을 경우, 콤마로 구분하여 나열합니다.
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = uri
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
// SASL/SCRAM 인증 설정 추가
config["security.protocol"] = "SASL_PLAINTEXT"
config["sasl.mechanism"] = "PLAIN"
config["sasl.jaas.config"] =
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$userName\" password=\"$password\";"
return KafkaTemplate(DefaultKafkaProducerFactory(config))
}
}
kafka 설정 파일은 위와 같이 작성하였습니다.
이전 설정과 다른점으론, consumer 설정에서 KEY_DESERIALIZER_CLASS_CONFIG 의 아래쪽 설정 부분이 변경이 있습니다.
제가 설정 후 에러가 났던 부분은 TRUSTED_PACKAGES 를 설정하지 않았을 때였는데,
위 설정은 클래스 매핑시 신뢰할 수 있는 패키지 경로를 적는 것으로, 만약 이를 설정하지 않는다면 클래스 매핑시 신뢰할 수 없다는 에러가 나올 것입니다.
Producer 설정에서는 단순히 VALUE 에 대한 직렬화 설정을 JsonSerializer 로 바꿔준 것 밖에 없습니다.
- 테스트
위와 같이 설정하고,
@KafkaListener(
topics = ["testTopic1"],
groupId = "group_1",
containerFactory = Kafka1MainConfig.CONSUMER_BEAN_NAME
)
fun testTopic1Group0Listener(data: ConsumerRecord<String, TestTopic1Group0ListenerInputVo>) {
classLogger.info(">> testTopic1 group_1 : $data")
}
data class TestTopic1Group0ListenerInputVo(
val test : String,
val test1 : Int
)
Consumer 리스너를 위와 같이 하고,
fun sendMessageToTestTopic1(message: Kafka1MainConsumer.TestTopic1Group0ListenerInputVo) {
// kafkaProducer1 에 토픽 메세지 발행
kafka1MainProducerTemplate.send("testTopic1", message)
}
Producer 로 위와 같이, Consumer 에서 받는 data class 를 value 로 넣어주면,
[ls] [2024_10_28_T_10_44_50_025_KST] [INFO ] [>> testTopic1 group_1 : ConsumerRecord(topic = testTopic1, partition = 0, leaderEpoch = 10, offset = 26, CreateTime = 1730079889855, serialized key size = -1, serialized value size = 32, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = TestTopic1Group0ListenerInputVo(test=testMessage, test1=1))] [le]
이렇게 Springboot 코드 내 자동 매핑으로 값을 받아올 수 있습니다.
'Springboot' 카테고리의 다른 글
분산 소켓 서버 설명 및 구현(Springboot, SockJS, STOMP, Kafka, Redis, Javascript) (0) | 2025.03.28 |
---|---|
Springboot JPA 데이터 타입별 매핑 총정리 (0) | 2025.03.28 |
Springboot 서버 비동기 처리 - Redis 를 이용한 분산락 설명 및 구현 (0) | 2024.10.21 |
Springboot kotlin JPA QueryDSL 설정 및 테스트 (11) | 2024.10.16 |
Springboot logback 적용 (0) | 2024.10.16 |