반응형
Kafka 명령어
토픽 생성
- --partitions : 파티션 개수 지정
- --replication-factor : 토픽의 파티션을 복제할 복제 개수 지정. 1은 복제하지 않음. 2면 1개의 복제본 사용.
- --config retension.ms : 토픽의 데이터를 유지하는 기간 설정(ms). 아래 설정은 2일.
kafka-topics.sh \ --create \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1 \ --config retention.ms=172800000 \ --topic hello.kafka
토픽 리스트 조회
kafka-topics.sh --bootstrap-server localhost:9092 --list
토픽 상세 조회
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka
토픽 옵션 수정
- 파티션 수는 늘릴 수 있지만 줄일 수는 없다.
파티션 수 변경
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic hello.kafka \
--alter \
--partitions 4
retention.ms 설정 변경
kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name hello.kafka \
--alter --add-config retention.ms=86400000
데이터 추가(키 없음)
- 아래 명령어 실행 후 입력창에 문자열을 입력하면 데이터가 추가된다.
- 메시지 키가 null인 경우에는 데이터를 파티션에 라운드 로빈으로 전송한다.
$ kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic hello.kafka >hello >world >0 >1 >2
데이터 추가(키 있음)
- 메시지 키가 존재하면 키의 해시값에 해당하는 파티션 중 한개에 전송한다.
$ kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic hello.kafka \ --property "parse.key=true" \ --property "key.separator=:" >key1:no1 >key2:no2 >key3:no3
데이터 조회(키 없음)
$ kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic hello.kafka \
--from-beginning
hello
no2
kafka
5
...
데이터 조회(키 있음)
- --group : 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋을 한다.
- 조회 결과가 넣은 순서와 다른 이유는 파티션으로 나눠진 데이터를 동일한 중요도로 조회하기 때문이다.
- 데이터의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 만들면 된다.
$ kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic hello.kafka \ --property print.key=true \ --property key.separator="-" \ --group hello-group \ --from-beginning null-hello key2-no2 null-kafka ...
컨슈머 그룹 목록 조회
$ kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
hello-group
컨슈머 그룹 상세 조회
$ kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group hello-group \
--describe
Consumer group 'hello-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello.kafka 3 2 2 0 - - -
hello-group hello.kafka 2 0 0 0 - - -
hello-group hello.kafka 1 5 5 0 - - -
hello-group hello.kafka 0 9 9 0 - - -
데이터 삭제
- test 토픽의 0번에 저장된 데이터 0부터 50까지 삭제
- 카프카는 토픽에 저장된 특정 데이터만 삭제할 수는 없다.
echo '{"partitions": [{"topic": "test", "partition": 0, "offset": 50}], "version": 1}' > delete-topic.json
kafka-delete-records.sh \
—bootstrap-server localhost:9092 \
—offset-json-file delete-topic.json
Kafka 명령어 - Old Version
topic 생성
/home/ubuntu/app/kafka/bin/kafka-topics.sh \
--create \
--zookeeper zookeeper01.example.com:2181,zookeeper02.example.com:2181,zookeeper03.example.com:2181/test-znode \
--replication-factor 1 \
--partitions 1 \
--topic test-topic
topic 제거
/home/ubuntu/app/kafka/bin/kafka-topics.sh \
--delete \
--zookeeper zookeeper01.example.com:2181,zookeeper02.example.com:2181,zookeeper03.example.com:2181/test-znode \
--topic test-topic
메시지 생성 (producer)
- 아래 명령어 수행 후 표준입력하면 메시지를 생성
/home/ubuntu/app/kafka/bin/kafka-console-producer.sh \
--broker-list kafka01.example.com:9092,kafka02.example.com:9092,kafka03.example.com:9092 \
--topic test-topic
메시지 출력 (consumer)
/home/ubuntu/app/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka01.example.com:9092,kafka02.example.com:9092,kafka03.example.com:9092 \
--topic test-topic \
--from-beginning
참고
반응형
'Development > Kafka' 카테고리의 다른 글
[Kafka] 스프링 Kafka 연동 (0) | 2019.03.08 |
---|---|
[Kafka] 설치 (0) | 2019.03.07 |
[Kafka] ZooKeeper 설치 (0) | 2019.03.07 |