[Kafka Study] Basic CLI

Created
Sep 10, 2023
Created by
Tags
Kafka
Property
 
학습 자료
 
 

 
 
conduktor kafka playground 에서 kafka 클러스터를 SaaS 으로 사용 가능하다.
 
notion image
 

kafka-topics

 
1. 클러스터 내 토픽 목록 조회
$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --list
 
2. 클러스터 내 토픽 생성
conduktor playground 에서는 기본적으로 partitions=3, replication-factor=3 으로 생성
$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --create --topic <토픽 이름>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --create --topic first-topic Created topic first-topic.
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --list first-topic
 
3. 토픽 정보 상세 조회
$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --describe --topic <토픽 이름>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --describe --topic first-topic Topic: first-topic PartitionCount: 3 ReplicationFactor: 3 Configs: cleanup.policy=delete,retention.ms=604800000 Topic: first-topic Partition: 0 Leader: 19 Replicas: 19,38,33 Isr: 19,38,33 Topic: first-topic Partition: 1 Leader: 32 Replicas: 32,24,22 Isr: 32,24,22 Topic: first-topic Partition: 2 Leader: 30 Replicas: 30,19,14 Isr: 30,19,14
  • 0번 파티션의 리더 브로커는 19번, 레플리카는 19, 38, 33번 브로커에 해당한다.
  • 모든 레플리카가 정상적으로 동기화되어 있으므로 ISR 목록도 19, 38, 33번 브로커로 조회된다.
  • 같은 토픽 내 파티션이라고 해서 같은 브로커들에 있는 건 아니다. (1번 파티션의 리더/레플리카 브로커는 32, 24, 22번이다.)
  • 파티션을 특정 브로커에 직접 지정하려면 --replica-assignment 옵션을 사용한다.
 
 
4. 파티션 갯수를 지정하여 클러스터 내 토픽 생성
$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --create --topic <토픽 이름> --partitions <파티션 수>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --create --topic second-topic --partitions 5 Created topic second-topic.
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --describe --topic second-topic Topic: second-topic PartitionCount: 5 ReplicationFactor: 3 Configs: cleanup.policy=delete,retention.ms=604800000 Topic: second-topic Partition: 0 Leader: 19 Replicas: 19,38,33 Isr: 19,38,33 Topic: second-topic Partition: 1 Leader: 32 Replicas: 32,24,22 Isr: 32,24,22 Topic: second-topic Partition: 2 Leader: 30 Replicas: 30,19,14 Isr: 30,19,14 Topic: second-topic Partition: 3 Leader: 30 Replicas: 30,19,14 Isr: 30,19,14 Topic: second-topic Partition: 4 Leader: 19 Replicas: 19,38,33 Isr: 19,38,33
 
 
5. replication factor 를 지정하여 클러스터 내 토픽 생성
$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --create --topic third-topic --replication-factor <복제 계수>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --create --topic third-topic --replication-factor 3
 
 

kafka-console-producer

 
1. 특정 토픽에 메세지를 발행하기 위해 프롬프트 생성
$ kafka-console-producer --producer.config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --topic <메세지를 발행하려는 토픽>
$ kafka-console-producer --producer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic first-topic >
 
2. 키 없이 메세지 발행
$ kafka-console-producer --producer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic first-topic >Hello World >This message is from daeun kim >^C%
[conduktor playground 콘솔에서 확인한 결과]
[conduktor playground 콘솔에서 확인한 결과]
 
 
3. --producer-property acks=all 사용하여 메세지 발행
모든 레플리카 브로커가 메세지를 정상적으로 복제하고 나면 ACK 받도록 설정
$ kafka-console-producer --producer.config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --topic <메세지를 발행하려는 토픽> --producer-property acks=all
$ kafka-console-producer --producer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic first-topic --producer-property acks=all >Message that is acked after all replicas get it
[conduktor playground 콘솔에서 확인한 결과]
[conduktor playground 콘솔에서 확인한 결과]
 
 
4. --property 사용하여 메세지 발행
키를 넣어서 메세지 발행
$ kafka-console-producer --producer.config <프로퍼티 파일 경로> \ --bootstrap-server <클러스터 도메인> \ --topic <메세지를 발행하려는 토픽> \ --property parse.key=true \ --property key.separator=:
  • parse.key=true : 메세지에서 키/값을 파싱해라.
  • key.separator=: : 메세지에서 키/값을 : 으로 파싱해라.
 
$ kafka-console-producer \ --producer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --topic first-topic \ --property parse.key=true \ --property key.separator=: >this is key:this is value >
[conduktor playground 콘솔에서 확인한 결과]
[conduktor playground 콘솔에서 확인한 결과]
 
 

kafka-console-consumer

 
1. 단일 컨수머로 메세지 처음부터 읽어들이기
  • 컨수머는 기본적으로 컨수머 프로세스가 뜨고 나서 발행되는 메세지만 소비한다.
  • 토픽에 기록된 메세지를 처음부터 읽어들이려면 --from-beginning 옵션을 사용한다.
  • --from-beginning 옵션은 컨수머그룹 내에서 커밋한 오프셋이 없을 경우에만 유효하다. (자세한 내용은 여기 참고.)
$ kafka-console-consumer --consumer.config <프로퍼티 파일 경루> \ --bootstrap-server <클러스터 엔드포인트> \ --topic <토픽 이름> \ --from-beginning
$ kafka-console-consumer --consumer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --topic first-topic \ --from-beginning Hello World This message is from daeun kim this is value Message that is acked after all replicas get it
 
2. 여러 파티션에서 메세지 읽어들이기
A. [사전작업] second-topic 에 메세지 발행
  • 라운드-로빈 파티셔너로 발행하도록 변경.
  • 원래 운영에서는 라운드-로빈 파티셔너 사용하지 않는다.
$ kafka-console-producer --producer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --property parse.key=true \ --property key.separator=: \ --topic second-topic \ --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner >1:one >2:two >3:three >4:four >5:five >6:six
 
B. --property, --formatter 옵션 사용해서 컨수머에서 메세지 포맷팅해서 읽어들이기.
  • 프로듀서에서는 1:one, 2:two ... 순서로 메세지를 발행했지만 컨수머는 발행된 순서대로 메세지를 읽지 않는다.
  • 메세지의 순서는 파티션 내에서만 보장하여 읽어들인다.
kafka-console-consumer \ --consumer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --topic second-topic \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.timestamp=true \ --property print.key=true \ --property print.value=true \ --property print.partition=true CreateTime:1694322722829 Partition:1 4 four CreateTime:1694322685540 Partition:0 2 two CreateTime:1694322693807 Partition:3 3 three CreateTime:1694322682724 Partition:2 1 one CreateTime:1694322726066 Partition:2 6 six CreateTime:1694322724415 Partition:4 5 five
 
3. 컨수머그룹으로 메세지 읽어들이기
A. [사전작업] 파티션이 3개 있는 third-topic 생성 & 메세지 발행한다.
kafka-console-producer --producer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --partitions 3 \ --property parse.key=true \ --property key.separator=: \ --topic third-topic >1:one >2:two >3:three >4:four >5:five >6:six >7:seven >8:eight >9:nine >10:ten >11:eleven >12:twelve >13:thirteen >14:fourteen >15:fifteen >9:nin2
 
B. first-consumer-group 이라는 이름으로 컨수머그룹에 컨수머 3개를 생성한다.
  • --group 옵션에 컨수머그룹 이름을 지정하는데, 이름에 해당하는 컨수머그룹이 없으면 신규 생성한다.
  • 컨수머그룹에 컨수머가 신규 등록되면 신규로 등록된 컨수머에게 메세지를 읽어들일 파티션이 배분된다.
  • 파티션 갯수를 초과하여 등록한 컨수머에게는 파티션이 배분되지 않는다. (메세지 중복 처리를 방지.)
 
kafka-console-consumer --consumer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic third-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --group first-consumer-group CreateTime:1694333789788 Partition:0 1 one CreateTime:1694333798031 Partition:0 5 five CreateTime:1694333796388 Partition:1 4 four CreateTime:1694333800432 Partition:1 6 six CreateTime:1694333854365 Partition:0 7 seven <<--- c3 등록 이후 0번 파티션만 담당 CreateTime:1694333856094 Partition:0 8 eight CreateTime:1694333880144 Partition:0 11 eleven CreateTime:1694333921450 Partition:0 15 fifteen
[c1]
 
kafka-console-consumer --consumer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic third-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --group first-consumer-group CreateTime:1694333792537 Partition:2 2 two CreateTime:1694333794839 Partition:2 3 three CreateTime:1694333860680 Partition:1 10 ten <<--- c3 등록 이후 1번 파티션 담당 CreateTime:1694333882735 Partition:1 12 twelve CreateTime:1694333889433 Partition:1 13 thirteen CreateTime:1694333918378 Partition:1 14 fourteen
[c2]
 
kafka-console-consumer --consumer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic third-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --group first-consumer-group CreateTime:1694333857843 Partition:2 9 nine <<--- 신규 등록된 이후 2번 파티션 담당 CreateTime:1694333964558 Partition:2 9 nin2
[c3]
 
 
C. 컨수머그룹에 있는 모든 컨수머를 셧다운 시키고 메세지 발행 & 컨수머 1개를 다시 생성한다.
>16:sixteen >17:seventeen >18:eighteen >19:nineteen >20:twenty
[producer]
 
 
컨수머 3개가 모두 죽어있어서 읽어들이지 못했던 3개 파티션 내 메세지들을 c1 에서 모두 읽어들인다.
kafka-console-consumer --consumer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --topic third-topic --formatter kafka.tools.DefaultMessageFormatter \ --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true \ --group first-consumer-group CreateTime:1694334195318 Partition:1 18 eighteen CreateTime:1694334198643 Partition:1 19 nineteen CreateTime:1694334202923 Partition:1 20 twenty CreateTime:1694334192291 Partition:0 17 seventeen CreateTime:1694334189262 Partition:2 16 sixteen
 
D. 또 다른 컨수머를 --from-beginning 옵션을 줘서 재가동한다.
kafka-console-consumer --consumer.config playground.config \ --bootstrap-server cluster.playground.cdkt.io:9092 \ --topic third-topic --formatter kafka.tools.DefaultMessageFormatter \ --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true \ --group first-consumer-group \ --from-beginning
[c2]
    c2--from-beginning 옵션을 줬는데도 아무런 메세지를 읽어들이지 않는다.
    • 컨수머그룹 내 컨수머들은 파티션 오프셋을 공유하는데, c1에서 이미 메세지들을 읽어들이고 오프셋을 커밋했으므로 c2에서는 처리할 메세지가 없기 때문이다.
    • —from-beginning 옵션은 컨수머그룹을 신규로 생성할 때 해당 컨수머그룹이 메세지를 처음부터 처리해야하는 경우에 사용한다. 이미 파티셧 오프셋을 커밋한 컨수머그룹에 컨수머를 신규 등록하는 것과는 무관하다.
     
     

    kafka-consumer-groups

     
    1. 클러스터에 연결된 컨수머그룹 목록 조회
    $ kafka-consumer-groups --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --list
    $ kafka-consumer-groups --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --list first-consumer-group
     
    2. 클러스터에 연결된 컨수머그룹 상세 조회
    $ kafka-consumer-groups --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --describe --group <컨수머그룹 이름>
    $ kafka-consumer-groups --command-config <프로퍼티 파일 경로> --bootstrap-server cluster.playground.cdkt.io:9092 --describe --group first-consumer-group