반응형

pom.xml 의존성 추가

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

application.properties 값 추가

kafka.bootstrap.address=kafka01.example.com:9092,kafka02.example.com:9092,kafka03.example.com:9092
kafka.group.id=test-group-id

KafkaTopicConfig 클래스 추가

@Configuration
public class KafkaTopicConfig {
    @Value("${kafka.bootstrap.address}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(config);
    }

    @Bean
    public NewTopic testTopic() {
        return new NewTopic("test-topic-2", 1, (short) 1);
    }
}

KafkaProducerConfig 클래스 추가

@Configuration
public class KafkaProducerConfig {
    @Value("${kafka.bootstrap.address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaConsumerConfig 클래스 추가

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${kafka.bootstrap.address}")
    private String bootstrapAddress;

    @Value("${kafka.group.id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

KafkaController 클래스 추가

@RestController
@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/produce")
    public String produce(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test-topic-2", message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("[FAIL] : " + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("[SUCCESS] : " + result.getRecordMetadata().offset());
            }
        });

        return message;
    }

    @KafkaListener(topics = "test-topic-2")
    public void consume(@Payload String message) {
        System.out.println("[CONSUME] : " + message);
    }
}

확인

  • 메시지 전달(produce)
  • localhost:8080/kafka/produce?message=테스트
  • 메시지 전달 후 KafkaListener에 노출되는지 확인(consume)

참고

반응형

'Development > Kafka' 카테고리의 다른 글

[Kafka] 명령어  (0) 2023.11.20
[Kafka] 설치  (0) 2019.03.07
[Kafka] ZooKeeper 설치  (0) 2019.03.07

+ Recent posts