반응형

공통

build.gradle.kts

dependencies {
    // ...
    implementation("org.springframework.boot:spring-boot-starter")
    implementation("org.springframework.kafka:spring-kafka")
}

Producer

appilcation.yml

spring:
  kafka:
    producer:
      bootstrap-servers: kafka-1:9091
      acks: all

ProducerService

@Service
class ProducerService(
    private val kafkaTemplate: KafkaTemplate<String, String>,
) {
    fun produce(topic: String, value: String) {
        kafkaTemplate.send(topic, value)
    }
}

Producer - 커스텀 템플릿

KafkaTemplateConfig

@Configuration
class KafkaTemplateConfig {
    @Bean
    fun customKafkaTemplate(): KafkaTemplate<String, String> {
        val props = mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "kafka-1:9091",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.ACKS_CONFIG to "all",
        )

        val factory = DefaultKafkaProducerFactory<String, String>(props)

        return KafkaTemplate(factory)
    }
}

ProducerService

@Service
class ProducerService(
    private val customKafkaTemplate: KafkaTemplate<String, String>,
) {
    private val log = LoggerFactory.getLogger(this::class.java)

    fun produce(topic: String, value: String) {
        val future = customKafkaTemplate.send(topic, value)

        future.whenComplete { result, e ->
            if (e != null) {
                log.warn("[ERROR] result: {}", result, e)
            } else {
                log.info("[SUCCESS] result: {}", result)
            }
        }
    }
}

Consumer

application.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: kafka-1:9091
    listener:
      type: single

DemoConsumer

@Component
class DemoConsumer {
    private val log = LoggerFactory.getLogger(this::class.java)

    @KafkaListener(topics = ["test"], groupId = "test-group-00")
    fun recordListener(record: ConsumerRecord<String, String>) {
        log.info("[test-group-00] {}", record)
    }

    @KafkaListener(topics = ["test"], groupId = "test-group-01")
    fun valueListener(value: String) {
        log.info("[test-group-01] {}", value)
    }

    @KafkaListener(
        topics = ["test"], groupId = "test-group-02", properties = [
            "max.poll.interval.ms:60000",
            "auto.offset.reset:earliest",
        ]
    )
    fun withPropertiesListener(value: String) {
        log.info("[test-group-02] {}", value)
    }

    @KafkaListener(topics = ["test"], groupId = "test-group-03", concurrency = "3") // 3개의 컨슈머 스레드가 각각 파티션에 할당되어 병렬 처리
    fun concurrencyListener(value: String) {
        log.info("[test-group-03] {}", value)
    }

    @KafkaListener(
        groupId = "test-group-04", topicPartitions = [
            TopicPartition(topic = "test01", partitions = ["0", "1"]),
            TopicPartition(topic = "test02", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "3")]),
        ]
    )
    fun specificPartitionListener(record: ConsumerRecord<String, String>) {
        log.info("[test-group-04] {}", record)
    }
}

Consumer - Batch

application.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: kafka-1:9091
    listener:
      type: batch
      ack-mode: manual_immediate # 리스너 종료시 자동 커밋되지 않도록 설정(수동 커밋 필요)

DemoBatchConsumer

@Component
class DemoBatchConsumer {
    private val log = LoggerFactory.getLogger(this::class.java)

    @KafkaListener(topics = ["test"], groupId = "test-group-01")
    fun ackListener(records: ConsumerRecords<String, String>, ack: Acknowledgment) {
        records.forEach { log.info("[test-group-01] {}", it) }
        ack.acknowledge() // commit
    }

    @KafkaListener(topics = ["test"], groupId = "test-group-02")
    fun consumerListener(records: ConsumerRecords<String, String>, consumer: Consumer<String, String>) {
        records.forEach { log.info("[test-group-02] {}", it) }
        consumer.commitAsync() // commit
    }
}

Consumer - 커스텀 컨테이너

CustomContainerConfig

@Configuration
class CustomContainerConfig {
    private val log = LoggerFactory.getLogger(this::class.java)

    @Bean
    fun customContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
        val props = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "kafka-1:9091",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        )

        val consumerFactory = DefaultKafkaConsumerFactory<String, String>(props)
        val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()

        containerFactory.containerProperties.setConsumerRebalanceListener(object : ConsumerAwareRebalanceListener {
            override fun onPartitionsRevokedBeforeCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
                log.info("[beforeCommit]")
            }

            override fun onPartitionsRevokedAfterCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
                log.info("[afterCommit]")
            }

            override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>) {
                log.info("[assigned]")
            }

            override fun onPartitionsLost(partitions: MutableCollection<TopicPartition>) {
                log.info("[lost]")
            }
        })

        containerFactory.isBatchListener = false // RECORD 사용을 위해 false
        containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.RECORD
        containerFactory.consumerFactory = consumerFactory
        return containerFactory
    }
}

DemoConsumer

@Component
class DemoConsumer {
    private val log = LoggerFactory.getLogger(this::class.java)

    @KafkaListener(topics = ["test"], groupId = "test-group-00", containerFactory = "customContainerFactory")
    fun valueListener(value: String) {
        log.info("[test-group-00] {}", value)
    }
}

Embedded Kafka 예제

build.gradle.kts

dependencies {
    // ...
    implementation("org.springframework.kafka:spring-kafka-test")
}

KafkaConfig

@Configuration
class KafkaConfig {
    @Bean
    fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
        return EmbeddedKafkaZKBroker(1)
            .kafkaPorts(9092)
            .brokerProperty("listeners", "PLAINTEXT://localhost:9092")
    }
}

application.yml

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
반응형

'Development > Kafka' 카테고리의 다른 글

[Kafka] 명령어  (0) 2023.11.20
[Kafka] 설치  (0) 2019.03.07
[Kafka] ZooKeeper 설치  (0) 2019.03.07

+ Recent posts