[Kafka Study] Basic CLI

Created
September 10, 2023
Created by
D
DaEun Kim
Tags
Kafka
Property

학습 자료

conduktor kafka playground 에서 kafka 클러스터를 SaaS 으로 사용 가능하다.

image

kafka-topics

1. 클러스터 내 토픽 목록 조회

$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --list

2. 클러스터 내 토픽 생성

conduktor playground 에서는 기본적으로 partitions=3, replication-factor=3 으로 생성

$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --create --topic <토픽 이름>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --create --topic first-topic
Created topic first-topic.
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --list
first-topic

3. 토픽 정보 상세 조회

$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --describe --topic <토픽 이름>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --describe --topic first-topic
Topic: first-topic	PartitionCount: 3	ReplicationFactor: 3	Configs: cleanup.policy=delete,retention.ms=604800000
	Topic: first-topic	Partition: 0	Leader: 19	Replicas: 19,38,33	Isr: 19,38,33
	Topic: first-topic	Partition: 1	Leader: 32	Replicas: 32,24,22	Isr: 32,24,22
	Topic: first-topic	Partition: 2	Leader: 30	Replicas: 30,19,14	Isr: 30,19,14
  • 0번 파티션의 리더 브로커는 19번, 레플리카는 19, 38, 33번 브로커에 해당한다.
  • 모든 레플리카가 정상적으로 동기화되어 있으므로 ISR 목록도 19, 38, 33번 브로커로 조회된다.
  • 같은 토픽 내 파티션이라고 해서 같은 브로커들에 있는 건 아니다. (1번 파티션의 리더/레플리카 브로커는 32, 24, 22번이다.)
  • 파티션을 특정 브로커에 직접 지정하려면 --replica-assignment 옵션을 사용한다.

4. 파티션 갯수를 지정하여 클러스터 내 토픽 생성

$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --create --topic <토픽 이름> --partitions <파티션 수>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --create --topic second-topic --partitions 5
Created topic second-topic.
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --describe --topic second-topic
Topic: second-topic	PartitionCount: 5	ReplicationFactor: 3	Configs: cleanup.policy=delete,retention.ms=604800000
	Topic: second-topic	Partition: 0	Leader: 19	Replicas: 19,38,33	Isr: 19,38,33
	Topic: second-topic	Partition: 1	Leader: 32	Replicas: 32,24,22	Isr: 32,24,22
	Topic: second-topic	Partition: 2	Leader: 30	Replicas: 30,19,14	Isr: 30,19,14
	Topic: second-topic	Partition: 3	Leader: 30	Replicas: 30,19,14	Isr: 30,19,14
	Topic: second-topic	Partition: 4	Leader: 19	Replicas: 19,38,33	Isr: 19,38,33

5. replication factor 를 지정하여 클러스터 내 토픽 생성

$ kafka-topics --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --create --topic third-topic --replication-factor <복제 계수>
$ kafka-topics --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --create --topic third-topic --replication-factor 3

kafka-console-producer

1. 특정 토픽에 메세지를 발행하기 위해 프롬프트 생성

$ kafka-console-producer --producer.config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --topic <메세지를 발행하려는 토픽>
$ kafka-console-producer --producer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic first-topic
>

2. 키 없이 메세지 발행

$ kafka-console-producer --producer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic first-topic
>Hello World
>This message is from daeun kim
>^C%
[
[conduktor playground 콘솔에서 확인한 결과]

3. --producer-property acks=all 사용하여 메세지 발행

모든 레플리카 브로커가 메세지를 정상적으로 복제하고 나면 ACK 받도록 설정

$ kafka-console-producer --producer.config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --topic <메세지를 발행하려는 토픽> --producer-property acks=all
$ kafka-console-producer --producer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic first-topic --producer-property acks=all
>Message that is acked after all replicas get it
[
[conduktor playground 콘솔에서 확인한 결과]

4. --property 사용하여 메세지 발행

키를 넣어서 메세지 발행

$ kafka-console-producer --producer.config <프로퍼티 파일 경로> \
--bootstrap-server <클러스터 도메인> \
--topic <메세지를 발행하려는 토픽> \
--property parse.key=true \
--property key.separator=:
  • parse.key=true : 메세지에서 키/값을 파싱해라.
  • key.separator=: : 메세지에서 키/값을 : 으로 파싱해라.
$ kafka-console-producer \
--producer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--topic first-topic \
--property parse.key=true \
--property key.separator=:
>this is key:this is value
>
[
[conduktor playground 콘솔에서 확인한 결과]

kafka-console-consumer

1. 단일 컨수머로 메세지 처음부터 읽어들이기

  • 컨수머는 기본적으로 컨수머 프로세스가 뜨고 나서 발행되는 메세지만 소비한다.
  • 토픽에 기록된 메세지를 처음부터 읽어들이려면 --from-beginning 옵션을 사용한다.
  • --from-beginning 옵션은 컨수머그룹 내에서 커밋한 오프셋이 없을 경우에만 유효하다. (자세한 내용은 여기 참고.)
$ kafka-console-consumer --consumer.config <프로퍼티 파일 경루> \
--bootstrap-server <클러스터 엔드포인트> \
--topic <토픽 이름> \
--from-beginning
$ kafka-console-consumer --consumer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--topic first-topic \
--from-beginning
Hello World
This message is from daeun kim
this is value
Message that is acked after all replicas get it

2. 여러 파티션에서 메세지 읽어들이기

A. [사전작업] second-topic 에 메세지 발행

  • 라운드-로빈 파티셔너로 발행하도록 변경.
  • 원래 운영에서는 라운드-로빈 파티셔너 사용하지 않는다.
$ kafka-console-producer --producer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--property parse.key=true \
--property key.separator=: \
--topic second-topic \
--producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>1:one
>2:two
>3:three
>4:four
>5:five
>6:six

B. --property, --formatter 옵션 사용해서 컨수머에서 메세지 포맷팅해서 읽어들이기.

  • 프로듀서에서는 1:one, 2:two ... 순서로 메세지를 발행했지만 컨수머는 발행된 순서대로 메세지를 읽지 않는다.
  • 메세지의 순서는 파티션 내에서만 보장하여 읽어들인다.
kafka-console-consumer \
--consumer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--topic second-topic \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true \
--property print.key=true \
--property print.value=true \
--property print.partition=true
CreateTime:1694322722829	Partition:1	4	four
CreateTime:1694322685540	Partition:0	2	two
CreateTime:1694322693807	Partition:3	3	three
CreateTime:1694322682724	Partition:2	1	one
CreateTime:1694322726066	Partition:2	6	six
CreateTime:1694322724415	Partition:4	5	five

3. 컨수머그룹으로 메세지 읽어들이기

A. [사전작업] 파티션이 3개 있는 third-topic 생성 & 메세지 발행한다.

kafka-console-producer --producer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--partitions 3 \
--property parse.key=true \
--property key.separator=: \
--topic third-topic
>1:one
>2:two
>3:three
>4:four
>5:five
>6:six
>7:seven
>8:eight
>9:nine
>10:ten
>11:eleven
>12:twelve
>13:thirteen
>14:fourteen
>15:fifteen
>9:nin2

B. first-consumer-group 이라는 이름으로 컨수머그룹에 컨수머 3개를 생성한다.

  • --group 옵션에 컨수머그룹 이름을 지정하는데, 이름에 해당하는 컨수머그룹이 없으면 신규 생성한다.
  • 컨수머그룹에 컨수머가 신규 등록되면 신규로 등록된 컨수머에게 메세지를 읽어들일 파티션이 배분된다.
  • 파티션 갯수를 초과하여 등록한 컨수머에게는 파티션이 배분되지 않는다. (메세지 중복 처리를 방지.)
kafka-console-consumer --consumer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic third-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --group first-consumer-group
CreateTime:1694333789788	Partition:0	1	one
CreateTime:1694333798031	Partition:0	5	five
CreateTime:1694333796388	Partition:1	4	four
CreateTime:1694333800432	Partition:1	6	six
CreateTime:1694333854365	Partition:0	7	seven <<--- c3 등록 이후 0번 파티션만 담당
CreateTime:1694333856094	Partition:0	8	eight
CreateTime:1694333880144	Partition:0	11	eleven
CreateTime:1694333921450	Partition:0	15	fifteen
[c1]
kafka-console-consumer --consumer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic third-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --group first-consumer-group
CreateTime:1694333792537	Partition:2	2	two
CreateTime:1694333794839	Partition:2	3	three
CreateTime:1694333860680	Partition:1	10	ten   <<--- c3 등록 이후 1번 파티션 담당
CreateTime:1694333882735	Partition:1	12	twelve
CreateTime:1694333889433	Partition:1	13	thirteen
CreateTime:1694333918378	Partition:1	14	fourteen
[c2]
kafka-console-consumer --consumer.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --topic third-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --group first-consumer-group
CreateTime:1694333857843	Partition:2	9	nine  <<--- 신규 등록된 이후 2번 파티션 담당
CreateTime:1694333964558	Partition:2	9	nin2
[c3]

C. 컨수머그룹에 있는 모든 컨수머를 셧다운 시키고 메세지 발행 & 컨수머 1개를 다시 생성한다.

>16:sixteen
>17:seventeen
>18:eighteen
>19:nineteen
>20:twenty
[producer]

컨수머 3개가 모두 죽어있어서 읽어들이지 못했던 3개 파티션 내 메세지들을 c1 에서 모두 읽어들인다.

kafka-console-consumer --consumer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--topic third-topic --formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true \
--group first-consumer-group
CreateTime:1694334195318	Partition:1	18	eighteen
CreateTime:1694334198643	Partition:1	19	nineteen
CreateTime:1694334202923	Partition:1	20	twenty
CreateTime:1694334192291	Partition:0	17	seventeen
CreateTime:1694334189262	Partition:2	16	sixteen

D. 또 다른 컨수머를 --from-beginning 옵션을 줘서 재가동한다.

kafka-console-consumer --consumer.config playground.config \
--bootstrap-server cluster.playground.cdkt.io:9092 \
--topic third-topic --formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true \
--group first-consumer-group \
--from-beginning
[c2]

c2--from-beginning 옵션을 줬는데도 아무런 메세지를 읽어들이지 않는다.

  • 컨수머그룹 내 컨수머들은 파티션 오프셋을 공유하는데, c1에서 이미 메세지들을 읽어들이고 오프셋을 커밋했으므로 c2에서는 처리할 메세지가 없기 때문이다.
  • —from-beginning 옵션은 컨수머그룹을 신규로 생성할 때 해당 컨수머그룹이 메세지를 처음부터 처리해야하는 경우에 사용한다. 이미 파티셧 오프셋을 커밋한 컨수머그룹에 컨수머를 신규 등록하는 것과는 무관하다.

kafka-consumer-groups

1. 클러스터에 연결된 컨수머그룹 목록 조회

$ kafka-consumer-groups --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --list
$ kafka-consumer-groups --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --list
first-consumer-group

2. 클러스터에 연결된 컨수머그룹 상세 조회

$ kafka-consumer-groups --command-config <프로퍼티 파일 경로> --bootstrap-server <클러스터 엔드포인트> --describe --group <컨수머그룹 이름>
$ kafka-consumer-groups --command-config <프로퍼티 파일 경로> --bootstrap-server cluster.playground.cdkt.io:9092 --describe --group first-consumer-group