반응형
공통
build.gradle.kts
dependencies {
// ...
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.kafka:spring-kafka")
}
kotlincopy
application.yml
spring:
kafka:
bootstrap-servers:
- localhost:9091
- localhost:9092
- localhost:9093
yml yamlcopy
[Consumer] Basic
application.yml
spring:
kafka:
listener:
type: single # 컨슈머가 단일 레코드를 처리
yml yamlcopy
DemoConsumer
@Component
class DemoConsumer {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(topics = ["test"], groupId = "test-group-00")
fun recordListener(record: ConsumerRecord<String, String>) {
log.info("[test-group-00] {}", record)
}
@KafkaListener(topics = ["test"], groupId = "test-group-01")
fun valueListener(value: String) {
log.info("[test-group-01] {}", value)
}
@KafkaListener(
topics = ["test"], groupId = "test-group-02", properties = [
"max.poll.interval.ms:60000",
"auto.offset.reset:earliest",
]
)
fun withPropertiesListener(value: String) {
log.info("[test-group-02] {}", value)
}
@KafkaListener(topics = ["test"], groupId = "test-group-03", concurrency = "3") // 3개의 컨슈머 스레드가 각각 파티션에 할당되어 병렬 처리
fun concurrencyListener(value: String) {
log.info("[test-group-03] {}", value)
}
@KafkaListener(
groupId = "test-group-04", topicPartitions = [
TopicPartition(topic = "test01", partitions = ["0", "1"]),
TopicPartition(topic = "test02", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "3")]),
]
)
fun specificPartitionListener(record: ConsumerRecord<String, String>) {
log.info("[test-group-04] {}", record)
}
}
kotlincopy
[Consumer] Batch
application.yml
spring:
kafka:
listener:
type: batch # 컨슈머가 레코드 리스트를 처리
ack-mode: manual_immediate # 리스너 종료시 자동 커밋되지 않도록 설정(수동 커밋 필요)
yml yamlcopy
DemoBatchConsumer
@Component
class DemoBatchConsumer {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(topics = ["test"], groupId = "test-group-01")
fun ackListener(records: ConsumerRecords<String, String>, ack: Acknowledgment) {
records.forEach { log.info("[test-group-01] {}", it) }
ack.acknowledge() // commit
}
@KafkaListener(topics = ["test"], groupId = "test-group-02")
fun consumerListener(records: ConsumerRecords<String, String>, consumer: Consumer<String, String>) {
records.forEach { log.info("[test-group-02] {}", it) }
consumer.commitAsync() // commit
}
}
kotlincopy
[Consumer] ContainerFactory
ContainerFactory란?
- @KafkaListener를 사용할 때 Kafka 컨슈머 리스너 컨테이너를 생성하고 관리하는 역할을 하는 설정 클래스
- 이를 통해 멀티스레드 컨슈머 실행, 배치 소비(batch consumption), 오프셋 커밋 방식 등 다양한 설정을 커스텀 가능
설정 예시
@Configuration
class CustomContainerConfig {
@Bean
fun customContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
val props = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to listOf("localhost:9091", "localhost:9092", "localhost:9093"),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
containerFactory.consumerFactory = DefaultKafkaConsumerFactory(props)
containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.RECORD
containerFactory.isBatchListener = false // single type
containerFactory.setConcurrency(2)
return containerFactory
}
}
kotlincopy
@Component
class DemoConsumer {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(topics = ["test"], groupId = "test-group-00", containerFactory = "customContainerFactory")
fun valueListener(value: String) {
log.info("[test-group-00] {}", value)
}
}
kotlincopy
[Cunsumer] ContainerFactory(properties로 설정한 consumerFactory의 속성 포함)
설정 예시
@Configuration
class CustomContainerConfig {
@Bean
fun customContainerFactory(
consumerFactory: ConsumerFactory<String, String>, // 추가
): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
// 아래 추가
val props = consumerFactory.configurationProperties + mapOf(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
// ...
}
}
kotlincopy
[Consumer] RebalanceListener
RebalanceListener란?
- Kafka 파티션 리밸런스(rebalance) 이벤트를 감지하고 추가적인 작업을 수행할 수 있도록 해주는 인터페이스
- ConsumerAwareRebalanceListener를 사용하면 리밸런스가 발생하기 전후에 특정 로직을 실행할 수 있음
설정 예시
class DemoRebalanceListener : ConsumerAwareRebalanceListener {
private val log = LoggerFactory.getLogger(this::class.java)
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>) {
// 리밸런스 발생시 기존에 할당된 파티션을 해제할 때 실행. 현재까지 처리된 오프셋을 저장하는 용도.
log.info("### onPartitionsRevoked")
}
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>) {
// 새로운 파티션이 할당된 후 실행. 보통 저장된 오프셋을 불러와서 복구하는 용도로 사용.
log.info("### onPartitionsAssigned")
}
override fun onPartitionsLost(partitions: MutableCollection<TopicPartition>) {
log.info("### lost")
}
override fun onPartitionsRevokedBeforeCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
log.info("### beforeCommit")
}
override fun onPartitionsRevokedAfterCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
log.info("### afterCommit")
}
}
kotlincopy
containerFactory.containerProperties.setConsumerRebalanceListener(DemoRebalanceListener())
kotlincopy
[Consumer] ConsumerInterceptor
ConsumerInterceptor란?
- ConsumerInterceptor는 Kafka Consumer가 메시지를 가져오기 전후에 데이터를 가로채서 처리할 수 있도록 해주는 인터페이스
- Consumer가 메시지를 가져오는 과정에서 로깅, 메시지 필터링, 데이터 변환 등의 작업을 수행할 수 있다.
설정 예시
class DemoConsumerInterceptor : ConsumerInterceptor<String, String> {
private val log = LoggerFactory.getLogger(this::class.java)
override fun onConsume(records: ConsumerRecords<String, String>): ConsumerRecords<String, String> {
records.forEach { record ->
log.info("### onConsume. topic: {}, key: {}, value: {}", record.topic(), record.key(), record.value())
}
return records
}
override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>?) {
log.info("### onCommit. offset: {}", offsets)
}
override fun configure(configs: MutableMap<String, *>?) {
log.info("### configure")
}
override fun close() {
log.info("### close")
}
}
kotlincopy
properties로 설정하기
spring:
kafka:
consumer:
properties:
interceptor.classes: com.example.demo.support.DemoConsumerInterceptor
yml yamlcopy
JavaConfig로 설정하기
val props = mapOf(
// ...
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to DemoConsumerInterceptor::class.java.name, // 추가
)
kotlincopy
[Consumer] RecordInterceptor
RecordInterceptor란?
- Record를 소비하기 전에 미리 처리하거나 필터링할 수 있는 기능을 제공
- 메시지를 가공하거나 로깅, 보안 검증 등의 작업을 수행할 수 있다.
설정 예시
class DemoRecordInterceptor : RecordInterceptor<String, String> {
private val log = LoggerFactory.getLogger(this::class.java)
override fun intercept(record: ConsumerRecord<String, String>, consumer: Consumer<String, String>): ConsumerRecord<String, String>? {
log.info("### intercept")
return record // null 반환시 해당 메시지는 처리되지 않음
}
override fun success(record: ConsumerRecord<String, String>, consumer: Consumer<String, String>) {
log.info("### success") // 성공했을때만 호출
}
override fun failure(record: ConsumerRecord<String, String>, exception: Exception, consumer: Consumer<String, String>) {
log.info("### failure") // 실패했을때만 호출
}
override fun afterRecord(record: ConsumerRecord<String, String>, consumer: Consumer<String, String>) {
log.info("### afterRecord") // 성공, 실패 상관 없이 호출
}
}
kotlincopy
containerFactory.setRecordInterceptor(DemoRecordInterceptor())
kotlincopy
[Producer] Basic
appilcation.yml
spring:
kafka:
producer:
bootstrap-servers: kafka-1:9091
acks: all
kotlincopy
ProducerService
@Service
class ProducerService(
private val kafkaTemplate: KafkaTemplate<String, String>,
) {
fun produce(topic: String, value: String) {
kafkaTemplate.send(topic, value)
}
}
kotlincopy
[Producer] CustomTemplate
KafkaTemplateConfig
@Configuration
class KafkaTemplateConfig {
@Bean
fun customKafkaTemplate(): KafkaTemplate<String, String> {
val props = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "kafka-1:9091",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
)
val factory = DefaultKafkaProducerFactory<String, String>(props)
return KafkaTemplate(factory)
}
}
kotlincopy
ProducerService
@Service
class ProducerService(
private val customKafkaTemplate: KafkaTemplate<String, String>,
) {
private val log = LoggerFactory.getLogger(this::class.java)
fun produce(topic: String, value: String) {
val future = customKafkaTemplate.send(topic, value)
future.whenComplete { result, e ->
if (e != null) {
log.warn("[ERROR] result: {}", result, e)
} else {
log.info("[SUCCESS] result: {}", result)
}
}
}
}
kotlincopy
Embedded Kafka 예제
build.gradle.kts
dependencies {
// ...
implementation("org.springframework.kafka:spring-kafka-test")
}
kotlincopy
KafkaConfig
@Configuration
class KafkaConfig {
@Bean
fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
return EmbeddedKafkaZKBroker(1)
.kafkaPorts(9092)
.brokerProperty("listeners", "PLAINTEXT://localhost:9092")
}
}
kotlincopy
application.yml
spring:
kafka:
bootstrap-servers:
- localhost:9092
yml yamlcopy
반응형
'Development > Kafka' 카테고리의 다른 글
[Kafka] 명령어 (0) | 2023.11.20 |
---|---|
[Kafka] 설치 (0) | 2019.03.07 |
[Kafka] ZooKeeper 설치 (0) | 2019.03.07 |