학습 자료
프로듀서가 레코드를 브로커에 전달하기까지 걸리는 시간은 두 구간으로 나뉜다.
send()
를 비동기로 호출하고 리턴을 받을 때 까지 걸리는 시간. 이 시간 동안에send()
를 비동기 호출한 쓰레드는 블로킹 된다. → 1번
send()
를 리턴 받고 나서 콜백이 호출될 때 까지 걸리는 시간. (레코드가 batch 에 추가된 시점 ~ 카프카로부터 정상/오류 응답을 받기까지 시간과 동일하다.) → 2번
아래 그림은 프로듀서가 메세지를 보내는 과정과 설정값들이 어떤 연관이 있는지 보여준다.
max.block.ms
send()
에서는partitionsFor()
를 호출하여 파티셔닝에 필요한 메타데이터를 카프카로부터 요청하고 버퍼를 할당 받는다.
- [메타데이터를 요청해서 받기까지의 시간 + 버퍼를 할당받는 데 걸리는 시간] 이
max.block.ms
보다 오래 걸리면 예외가 발생한다. (메세지 직렬화, 파티셔닝에 걸리는 시간은 포함되지 않는다.)
max.block.ms
가 길게 설정될수록 1번에서 블로킹 되는 시간도 길어진다.
send()
뿐만 아니라initTransactions()
,sendOffsetsToTransaction()
,commitTransaction()
,abortTransaction()
을 호출해서 블로킹 되는 시간도max.block.ms
으로 관리한다.
buffer.memory
- 프로듀서가 메세지를 전송하기 전에 메세지를 대기시키는 버퍼의 크기를 결정한다.
- 메세지가 카프카 클러스터에 전달되는 속도보다, 메세지를 생성하는 속도가 더 빠르다면
send()
를 호출했을 때max.block.ms
으로 설정된 기간 동안 쓰레드는 버퍼를 할당 받을 때까지 블로킹 된다.
max.block.ms
기간 동안 대기했음에도 버퍼를 할당받지 못하면 예외를 발생시킨다.
buffer.memory
는 프로듀서가 사용하는 총 메모리 용량과 대강 일치해야 버퍼를 할당받는 데 발생하는 지연을 줄일 수 있지만 필수는 아니다. (프로듀서가 사용하는 메모리 = [버퍼 + 레코드 압축 + in-flight 요청] 임을 감안하면 총 메모리 사용량보다 좀 적게 설정해도 된다.)
delivery.timeout.ms
- 메세지의 전송 준비가 완료된 시점 (
send()
호출에 대한 리턴을 받고 레코드가 batch 에 저장된 시점) ~ 카프카 클러스터로부터ACK
를 받거나retries
횟수 만큼 재시도 후 전송을 포기하게 되는 시점까지의 제한 시간을 결정한다.
- 만약 프로듀서가 메세지 전송을 재시도하는 도중에
delivery.timeout.ms
를 넘어가면 브로커가 가장 최근에 응답한 에러를 가지고 콜백이 호출된다.
- 일반적으로 리더 브로커가 불능 상태에 있을 경우 리더를 재선출하기까지 걸리는 시간을 감안하여
delivery.timeout.ms
를 충분히 두고retries
,request.timeout.ms
같은 세부 설정은 디폴트 값으로 두는 방식을 사용한다.
- 디폴트는 2분으로 설정되어 있으며
linger.ms
+request.timeout.ms
보다 같거나 커야 한다.
retries
NOT_ENOUGH_REPLICAS
와 같은 사유로 프로듀서는 메세지 기록에 실패할 수 있다. 메세지 기록에 실패하면 재시도 하도록retries
값을 설정한다.
- 2.1 버전 이후로
retries
는 디폴트로 2147483647 으로 설정되어 있어서 거의 무한정으로 재시도 하도록 되어있다.
request.timeout.ms
- 프로듀서가 batch 에 쌓인 레코드를 카프카 브로커에게 전송하고, 전송에 대한 응답을 받기까지 제한 시간을 결정한다.
request.timeout.ms
시간 내에 브로커로부터 응답이 오지 않으면 요청을 포기하고retries
가 소진될 때 까지 전송을 재시도한다.
retries
가 소진되면TimeoutException
예외를 가지고 콜백을 호출한다.
retry.backoff.ms
- 재시도 이후 다음 재시도를 할 때까지 대기하는 시간으로, 디폴트는 100ms 이다.
linger.ms
- batch 를 전송하기까지 대기하는 시간을 결정한다.
- 프로듀서는 batch 가 가득 찼거나
linger.ms
주기가 되면 무조건 batch 를 전송한다.
linger.ms
를 0보다 크게 설정하면 메세지 전송에 지연이 조금 생기겠지만batch.size
를 높이고 메세지 압축을 활용하면 쓰루풋을 증가시킬 수 있다.
compression.type
- 프로듀서가 만드는 모든 데이터 (레코드, 배치) 에 어떤 압축 알고리즘을 사용할 지 결정한다.
- 디폴트는 none (압축하지 않음) 이며 gzip, snappy, zstd, lz4 에서 선택한다.
- batch 사이즈를 크게 잡을수록 압축에서 얻는 장점이 크다.
브로커의
compression.type
설정compression.type = producer
(디폴트) : 프로듀서로부터 압축된 batch 를 받아서 별도의 압축/해제 처리 없이 그대로 토픽 로그 파일에 기록한다. → 일반적으로 많이 사용하는 설정. 컨수머에서 압축 해제해서 처리한다.
compression.type = uncompressed
: 브로커에서 압축을 해제하여 로그파일에 기록한다. → 컨수머에서 압축 해제된 메세지를 읽어들여야 하기 때문에 컨수머 쪽에서 소켓 버퍼 많이 잡아먹고 쓰루풋 안나옴.
compressson.type = zstd | lz4 | snappy | gzip
: 프로듀서로부터 압축된 batch 를 받아서 압축을 해제하고 해당 알고리즘으로 다시 압축해서 로그 파일에 기록한다. (프로듀서의 압축 알고리즘과 일치하면 해제/재압축 과정을 거치지 않는다.) → 굳이 사용해야할 이유가?
batch.size
- 동일한 파티션에 여러 개의 레코드를 전송하기 위해 프로듀서는 배치 단위로 레코드를 묶어서 카프카 클러스터에 한꺼번에 전송한다.
batch.size
는 배치의 용량을(바이트 단위) 결정한다.
linger.ms
주기가 되기 전에batch.size
만큼 배치에 레코드가 차면 클러스터에 메세지를 전송한다.
batch.size
가 너무 크다고 해도linger.ms
주기를 적당히 짧게 설정한다면 메세지 전송에 지연이 발생하지는 않고,batch.size
가 너무 작으면 메세지를 자주 전송해야 하기 때문에 오버헤드가 발생한다.
max.in.flight.requests.per.connection
- 1개 커낵션 당 카프카 클러스터로부터
ACK
를 받지 못한 요청을(batch 단위) 동시에 몇 개까지 허용하는지 결정한다.
max.in.flight.requests.per.connection
을 1보다 높게 설정한다는 것은 in-flight 요청이 동시에 2개 이상 존재하는 걸 허용한다는 건데, 1보다 높게 설정하고enable.idempotence = false
으로 설정하면 재시도에 의한 메세지 중복 전송 과정에서 메세지의 순서가 보장되지 않는 위험이 있다.
enable.idempotence
enable.idempotence = true
으로 설정하면 2개 이상의 in-flight 요청이 발생하더라도 메세지의 순서를 보장하면서 재시도에 의한 멱등성을 보장해준다.
- 단,
max.in.flight.requests.per.connection
은 5 이하로 설정하고acks=all
으로 설정해야 한다.
max.request.size
- 프로듀서의 쓰기 요청의 크기(바이트 단위)를 결정한다.
- 1개 요청에 너무 큰 용량의 배치가 포함되는 것을 방지하는 용도로 쓰인다.
- 카프카 클러스터에서도 1개 요청 당 허용하는 최대 배치 크기를 나타내는
message.max.bytes
설정이 있는데, 이 설정값보다max.request.size
를 같거나 작게 해야 한다.
receive.buffer.bytes & send.buffer.bytes
- 클러스터로부터 데이터를 읽고 쓸 때 사용하는 TCP 소켓의 버퍼 크기를 결정한다.
- 각각의 값이 -1 일 경우 운영체제의 기본값으로 설정한다.
- 네트워크 대역폭이 낮고 지연시간이 길다면
receive.buffer.bytes
,send.buffer.bytes
를 높게 잡아서 대역폭의 효율을 최대한 취하는 게 좋다.