반응형
공통
build.gradle.kts
dependencies {
// ...
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.kafka:spring-kafka")
}
Producer
appilcation.yml
spring:
kafka:
producer:
bootstrap-servers: kafka-1:9091
acks: all
ProducerService
@Service
class ProducerService(
private val kafkaTemplate: KafkaTemplate<String, String>,
) {
fun produce(topic: String, value: String) {
kafkaTemplate.send(topic, value)
}
}
Producer - 커스텀 템플릿
KafkaTemplateConfig
@Configuration
class KafkaTemplateConfig {
@Bean
fun customKafkaTemplate(): KafkaTemplate<String, String> {
val props = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "kafka-1:9091",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
)
val factory = DefaultKafkaProducerFactory<String, String>(props)
return KafkaTemplate(factory)
}
}
ProducerService
@Service
class ProducerService(
private val customKafkaTemplate: KafkaTemplate<String, String>,
) {
private val log = LoggerFactory.getLogger(this::class.java)
fun produce(topic: String, value: String) {
val future = customKafkaTemplate.send(topic, value)
future.whenComplete { result, e ->
if (e != null) {
log.warn("[ERROR] result: {}", result, e)
} else {
log.info("[SUCCESS] result: {}", result)
}
}
}
}
Consumer
application.yml
spring:
kafka:
consumer:
bootstrap-servers: kafka-1:9091
listener:
type: single
DemoConsumer
@Component
class DemoConsumer {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(topics = ["test"], groupId = "test-group-00")
fun recordListener(record: ConsumerRecord<String, String>) {
log.info("[test-group-00] {}", record)
}
@KafkaListener(topics = ["test"], groupId = "test-group-01")
fun valueListener(value: String) {
log.info("[test-group-01] {}", value)
}
@KafkaListener(
topics = ["test"], groupId = "test-group-02", properties = [
"max.poll.interval.ms:60000",
"auto.offset.reset:earliest",
]
)
fun withPropertiesListener(value: String) {
log.info("[test-group-02] {}", value)
}
@KafkaListener(topics = ["test"], groupId = "test-group-03", concurrency = "3") // 3개의 컨슈머 스레드가 각각 파티션에 할당되어 병렬 처리
fun concurrencyListener(value: String) {
log.info("[test-group-03] {}", value)
}
@KafkaListener(
groupId = "test-group-04", topicPartitions = [
TopicPartition(topic = "test01", partitions = ["0", "1"]),
TopicPartition(topic = "test02", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "3")]),
]
)
fun specificPartitionListener(record: ConsumerRecord<String, String>) {
log.info("[test-group-04] {}", record)
}
}
Consumer - Batch
application.yml
spring:
kafka:
consumer:
bootstrap-servers: kafka-1:9091
listener:
type: batch
ack-mode: manual_immediate # 리스너 종료시 자동 커밋되지 않도록 설정(수동 커밋 필요)
DemoBatchConsumer
@Component
class DemoBatchConsumer {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(topics = ["test"], groupId = "test-group-01")
fun ackListener(records: ConsumerRecords<String, String>, ack: Acknowledgment) {
records.forEach { log.info("[test-group-01] {}", it) }
ack.acknowledge() // commit
}
@KafkaListener(topics = ["test"], groupId = "test-group-02")
fun consumerListener(records: ConsumerRecords<String, String>, consumer: Consumer<String, String>) {
records.forEach { log.info("[test-group-02] {}", it) }
consumer.commitAsync() // commit
}
}
Consumer - 커스텀 컨테이너
CustomContainerConfig
@Configuration
class CustomContainerConfig {
private val log = LoggerFactory.getLogger(this::class.java)
@Bean
fun customContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
val props = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "kafka-1:9091",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
val consumerFactory = DefaultKafkaConsumerFactory<String, String>(props)
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
containerFactory.containerProperties.setConsumerRebalanceListener(object : ConsumerAwareRebalanceListener {
override fun onPartitionsRevokedBeforeCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
log.info("[beforeCommit]")
}
override fun onPartitionsRevokedAfterCommit(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {
log.info("[afterCommit]")
}
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>) {
log.info("[assigned]")
}
override fun onPartitionsLost(partitions: MutableCollection<TopicPartition>) {
log.info("[lost]")
}
})
containerFactory.isBatchListener = false // RECORD 사용을 위해 false
containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.RECORD
containerFactory.consumerFactory = consumerFactory
return containerFactory
}
}
DemoConsumer
@Component
class DemoConsumer {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(topics = ["test"], groupId = "test-group-00", containerFactory = "customContainerFactory")
fun valueListener(value: String) {
log.info("[test-group-00] {}", value)
}
}
Embedded Kafka 예제
build.gradle.kts
dependencies {
// ...
implementation("org.springframework.kafka:spring-kafka-test")
}
KafkaConfig
@Configuration
class KafkaConfig {
@Bean
fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
return EmbeddedKafkaZKBroker(1)
.kafkaPorts(9092)
.brokerProperty("listeners", "PLAINTEXT://localhost:9092")
}
}
application.yml
spring:
kafka:
bootstrap-servers:
- localhost:9092
반응형
'Development > Kafka' 카테고리의 다른 글
[Kafka] 명령어 (0) | 2023.11.20 |
---|---|
[Kafka] 설치 (0) | 2019.03.07 |
[Kafka] ZooKeeper 설치 (0) | 2019.03.07 |