반응형

공통

build.gradle.kts

dependencies {
    // ...
    implementation("org.springframework.boot:spring-boot-starter")
    implementation("org.springframework.kafka:spring-kafka")
}
kotlincopy

application.yml

spring:
  kafka:
    bootstrap-servers:
      - localhost:9091
      - localhost:9092
      - localhost:9093
yml yamlcopy

[Consumer] Basic

application.yml

spring:
  kafka:
    listener:
      type: single # 컨슈머가 단일 레코드를 처리
yml yamlcopy

DemoConsumer

@Component
class DemoConsumer {
    private val log = LoggerFactory.getLogger(this::class.java)

    @KafkaListener(topics = ["test"], groupId = "test-group-00")
    fun recordListener(record: ConsumerRecord<String, String>) {
        log.info("[test-group-00] {}", record)
    }

    @KafkaListener(topics = ["test"], groupId = "test-group-01")
    fun valueListener(value: String) {
        log.info("[test-group-01] {}", value)
    }

    @KafkaListener(
        topics = ["test"], groupId = "test-group-02", properties = [
            "max.poll.interval.ms:60000",
            "auto.offset.reset:earliest",
        ]
    )
    fun withPropertiesListener(value: String) {
        log.info("[test-group-02] {}", value)
    }

    @KafkaListener(topics = ["test"], groupId = "test-group-03", concurrency = "3") // 3개의 컨슈머 스레드가 각각 파티션에 할당되어 병렬 처리
    fun concurrencyListener(value: String) {
        log.info("[test-group-03] {}", value)
    }

    @KafkaListener(
        groupId = "test-group-04", topicPartitions = [
            TopicPartition(topic = "test01", partitions = ["0", "1"]),
            TopicPartition(topic = "test02", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "3")]),
        ]
    )
    fun specificPartitionListener(record: ConsumerRecord<String, String>) {
        log.info("[test-group-04] {}", record)
    }
}
kotlincopy

[Consumer] Batch

application.yml

spring:
  kafka:
    listener:
      type: batch # 컨슈머가 레코드 리스트를 처리
      ack-mode: manual_immediate # 리스너 종료시 자동 커밋되지 않도록 설정(수동 커밋 필요)
yml yamlcopy

DemoBatchConsumer

@Component
class DemoBatchConsumer {
    private val log = LoggerFactory.getLogger(this::class.java)

    @KafkaListener(topics = ["test"], groupId = "test-group-01")
    fun ackListener(records: ConsumerRecords<String, String>, ack: Acknowledgment) {
        records.forEach { log.info("[test-group-01] {}", it) }
        ack.acknowledge() // commit
    }

    @KafkaListener(topics = ["test"], groupId = "test-group-02")
    fun consumerListener(records: ConsumerRecords<String, String>, consumer: Consumer<String, String>) {
        records.forEach { log.info("[test-group-02] {}", it) }
        consumer.commitAsync() // commit
    }
}
kotlincopy

[Consumer] ContainerFactory

ContainerFactory란?

  • @KafkaListener를 사용할 때 Kafka 컨슈머 리스너 컨테이너를 생성하고 관리하는 역할을 하는 설정 클래스
  • 이를 통해 멀티스레드 컨슈머 실행, 배치 소비(batch consumption), 오프셋 커밋 방식 등 다양한 설정을 커스텀 가능

설정 예시

@Configuration
class CustomContainerConfig {
    @Bean
    fun customContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
        val props = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to listOf("localhost:9091", "localhost:9092", "localhost:9093"),
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        )

        val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
        containerFactory.consumerFactory = DefaultKafkaConsumerFactory(props)
        containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.RECORD
        containerFactory.isBatchListener = false // single type
        containerFactory.setConcurrency(2)
        return containerFactory
    }
}
kotlincopy
@Component
class DemoConsumer {
    private val log = LoggerFactory.getLogger(this::class.java)

    @KafkaListener(topics = ["test"], groupId = "test-group-00", containerFactory = "customContainerFactory")
    fun valueListener(value: String) {
        log.info("[test-group-00] {}", value)
    }
}
kotlincopy

[Cunsumer] ContainerFactory(properties로 설정한 consumerFactory의 속성 포함)

설정 예시

@Configuration
class CustomContainerConfig {
    @Bean
    fun customContainerFactory(
        consumerFactory: ConsumerFactory<String, String>, // 추가
    ): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
        // 아래 추가
        val props = consumerFactory.configurationProperties + mapOf(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        )
        
        // ...
    }
}
kotlincopy

[Consumer] RebalanceListener

RebalanceListener란?

  • Kafka 파티션 리밸런스(rebalance) 이벤트를 감지하고 추가적인 작업을 수행할 수 있도록 해주는 인터페이스
  • ConsumerAwareRebalanceListener를 사용하면 리밸런스가 발생하기 전후에 특정 로직을 실행할 수 있음

설정 예시

class DemoRebalanceListener : ConsumerAwareRebalanceListener {
    private val log = LoggerFactory.getLogger(this::class.java)

    override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>) {
        // 리밸런스 발생시 기존에 할당된 파티션을 해제할 때 실행. 현재까지 처리된 오프셋을 저장하는 용도.
        log.info("### onPartitionsRevoked")
    }

    override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>) {
        // 새로운 파티션이 할당된 후 실행. 보통 저장된 오프셋을 불러와서 복구하는 용도로 사용.
        log.info("### onPartitionsAssigned")
    }

    override fun onPartitionsLost(partitions: MutableCollection<TopicPartition>) {
        log.info("### lost")
    }

    override fun onPartitionsRevokedBeforeCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
        log.info("### beforeCommit")
    }

    override fun onPartitionsRevokedAfterCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
        log.info("### afterCommit")
    }
}
kotlincopy
containerFactory.containerProperties.setConsumerRebalanceListener(DemoRebalanceListener())
kotlincopy

[Consumer] ConsumerInterceptor

ConsumerInterceptor란?

  • ConsumerInterceptor는 Kafka Consumer가 메시지를 가져오기 전후에 데이터를 가로채서 처리할 수 있도록 해주는 인터페이스
  • Consumer가 메시지를 가져오는 과정에서 로깅, 메시지 필터링, 데이터 변환 등의 작업을 수행할 수 있다.

설정 예시

class DemoConsumerInterceptor : ConsumerInterceptor<String, String> {
    private val log = LoggerFactory.getLogger(this::class.java)

    override fun onConsume(records: ConsumerRecords<String, String>): ConsumerRecords<String, String> {
        records.forEach { record ->
            log.info("### onConsume. topic: {}, key: {}, value: {}", record.topic(), record.key(), record.value())
        }
        return records
    }

    override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>?) {
        log.info("### onCommit. offset: {}", offsets)
    }

    override fun configure(configs: MutableMap<String, *>?) {
        log.info("### configure")
    }

    override fun close() {
        log.info("### close")
    }
}
kotlincopy

properties로 설정하기

spring:
  kafka:
    consumer:
      properties:
        interceptor.classes: com.example.demo.support.DemoConsumerInterceptor
yml yamlcopy

JavaConfig로 설정하기

val props = mapOf(
    // ...
    ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to DemoConsumerInterceptor::class.java.name, // 추가
)
kotlincopy

[Consumer] RecordInterceptor

RecordInterceptor란?

  • Record를 소비하기 전에 미리 처리하거나 필터링할 수 있는 기능을 제공
  • 메시지를 가공하거나 로깅, 보안 검증 등의 작업을 수행할 수 있다.

설정 예시

class DemoRecordInterceptor : RecordInterceptor<String, String> {
    private val log = LoggerFactory.getLogger(this::class.java)

    override fun intercept(record: ConsumerRecord<String, String>, consumer: Consumer<String, String>): ConsumerRecord<String, String>? {
        log.info("### intercept")
        return record // null 반환시 해당 메시지는 처리되지 않음
    }

    override fun success(record: ConsumerRecord<String, String>, consumer: Consumer<String, String>) {
        log.info("### success") // 성공했을때만 호출
    }

    override fun failure(record: ConsumerRecord<String, String>, exception: Exception, consumer: Consumer<String, String>) {
        log.info("### failure") // 실패했을때만 호출
    }

    override fun afterRecord(record: ConsumerRecord<String, String>, consumer: Consumer<String, String>) {
        log.info("### afterRecord") // 성공, 실패 상관 없이 호출
    }
}
kotlincopy
containerFactory.setRecordInterceptor(DemoRecordInterceptor())
kotlincopy

[Producer] Basic

appilcation.yml

spring:
  kafka:
    producer:
      bootstrap-servers: kafka-1:9091
      acks: all
kotlincopy

ProducerService

@Service
class ProducerService(
    private val kafkaTemplate: KafkaTemplate<String, String>,
) {
    fun produce(topic: String, value: String) {
        kafkaTemplate.send(topic, value)
    }
}
kotlincopy

[Producer] CustomTemplate

KafkaTemplateConfig

@Configuration
class KafkaTemplateConfig {
    @Bean
    fun customKafkaTemplate(): KafkaTemplate<String, String> {
        val props = mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "kafka-1:9091",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.ACKS_CONFIG to "all",
        )

        val factory = DefaultKafkaProducerFactory<String, String>(props)

        return KafkaTemplate(factory)
    }
}
kotlincopy

ProducerService

@Service
class ProducerService(
    private val customKafkaTemplate: KafkaTemplate<String, String>,
) {
    private val log = LoggerFactory.getLogger(this::class.java)

    fun produce(topic: String, value: String) {
        val future = customKafkaTemplate.send(topic, value)

        future.whenComplete { result, e ->
            if (e != null) {
                log.warn("[ERROR] result: {}", result, e)
            } else {
                log.info("[SUCCESS] result: {}", result)
            }
        }
    }
}
kotlincopy

Embedded Kafka 예제

build.gradle.kts

dependencies {
    // ...
    implementation("org.springframework.kafka:spring-kafka-test")
}
kotlincopy

KafkaConfig

@Configuration
class KafkaConfig {
    @Bean
    fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
        return EmbeddedKafkaZKBroker(1)
            .kafkaPorts(9092)
            .brokerProperty("listeners", "PLAINTEXT://localhost:9092")
    }
}
kotlincopy

application.yml

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
yml yamlcopy
반응형

'Development > Kafka' 카테고리의 다른 글

[Kafka] 명령어  (0) 2023.11.20
[Kafka] 설치  (0) 2019.03.07
[Kafka] ZooKeeper 설치  (0) 2019.03.07

+ Recent posts