반응형
pom.xml 의존성 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
application.properties 값 추가
kafka.bootstrap.address=kafka01.example.com:9092,kafka02.example.com:9092,kafka03.example.com:9092
kafka.group.id=test-group-id
KafkaTopicConfig 클래스 추가
@Configuration
public class KafkaTopicConfig {
@Value("${kafka.bootstrap.address}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(config);
}
@Bean
public NewTopic testTopic() {
return new NewTopic("test-topic-2", 1, (short) 1);
}
}
KafkaProducerConfig 클래스 추가
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap.address}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig 클래스 추가
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.address}")
private String bootstrapAddress;
@Value("${kafka.group.id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaController 클래스 추가
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/produce")
public String produce(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test-topic-2", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("[FAIL] : " + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("[SUCCESS] : " + result.getRecordMetadata().offset());
}
});
return message;
}
@KafkaListener(topics = "test-topic-2")
public void consume(@Payload String message) {
System.out.println("[CONSUME] : " + message);
}
}
확인
- 메시지 전달(produce)
- localhost:8080/kafka/produce?message=테스트
- 메시지 전달 후 KafkaListener에 노출되는지 확인(consume)
참고
반응형
'Development > Kafka' 카테고리의 다른 글
[Kafka] 명령어 (0) | 2023.11.20 |
---|---|
[Kafka] 설치 (0) | 2019.03.07 |
[Kafka] ZooKeeper 설치 (0) | 2019.03.07 |