[Kafka Study] producer's retries & setting variables

Created
Oct 6, 2023
Created by
Tags
Kafka
Property
 
학습 자료
 
 

 
 
프로듀서가 레코드를 브로커에 전달하기까지 걸리는 시간은 두 구간으로 나뉜다.
  • send() 를 비동기로 호출하고 리턴을 받을 때 까지 걸리는 시간. 이 시간 동안에 send() 를 비동기 호출한 쓰레드는 블로킹 된다. → 1번
  • send() 를 리턴 받고 나서 콜백이 호출될 때 까지 걸리는 시간. (레코드가 batch 에 추가된 시점 ~ 카프카로부터 정상/오류 응답을 받기까지 시간과 동일하다.) → 2번
 
 
아래 그림은 프로듀서가 메세지를 보내는 과정과 설정값들이 어떤 연관이 있는지 보여준다.
notion image
 

max.block.ms

  • send() 에서는 partitionsFor() 를 호출하여 파티셔닝에 필요한 메타데이터를 카프카로부터 요청하고 버퍼를 할당 받는다.
  • [메타데이터를 요청해서 받기까지의 시간 + 버퍼를 할당받는 데 걸리는 시간] 이 max.block.ms 보다 오래 걸리면 예외가 발생한다. (메세지 직렬화, 파티셔닝에 걸리는 시간은 포함되지 않는다.)
  • max.block.ms 가 길게 설정될수록 1번에서 블로킹 되는 시간도 길어진다.
  • send() 뿐만 아니라 initTransactions(), sendOffsetsToTransaction(), commitTransaction(), abortTransaction() 을 호출해서 블로킹 되는 시간도 max.block.ms 으로 관리한다.
 

buffer.memory

  • 프로듀서가 메세지를 전송하기 전에 메세지를 대기시키는 버퍼의 크기를 결정한다.
  • 메세지가 카프카 클러스터에 전달되는 속도보다, 메세지를 생성하는 속도가 더 빠르다면 send() 를 호출했을 때 max.block.ms 으로 설정된 기간 동안 쓰레드는 버퍼를 할당 받을 때까지 블로킹 된다.
  • max.block.ms 기간 동안 대기했음에도 버퍼를 할당받지 못하면 예외를 발생시킨다.
  • buffer.memory 는 프로듀서가 사용하는 총 메모리 용량과 대강 일치해야 버퍼를 할당받는 데 발생하는 지연을 줄일 수 있지만 필수는 아니다. (프로듀서가 사용하는 메모리 = [버퍼 + 레코드 압축 + in-flight 요청] 임을 감안하면 총 메모리 사용량보다 좀 적게 설정해도 된다.)
 

delivery.timeout.ms

  • 메세지의 전송 준비가 완료된 시점 (send() 호출에 대한 리턴을 받고 레코드가 batch 에 저장된 시점) ~ 카프카 클러스터로부터 ACK 를 받거나 retries 횟수 만큼 재시도 후 전송을 포기하게 되는 시점까지의 제한 시간을 결정한다.
  • 만약 프로듀서가 메세지 전송을 재시도하는 도중에 delivery.timeout.ms 를 넘어가면 브로커가 가장 최근에 응답한 에러를 가지고 콜백이 호출된다.
  • 일반적으로 리더 브로커가 불능 상태에 있을 경우 리더를 재선출하기까지 걸리는 시간을 감안하여 delivery.timeout.ms 를 충분히 두고 retries, request.timeout.ms 같은 세부 설정은 디폴트 값으로 두는 방식을 사용한다.
  • 디폴트는 2분으로 설정되어 있으며 linger.ms + request.timeout.ms 보다 같거나 커야 한다.
 

retries

  • NOT_ENOUGH_REPLICAS 와 같은 사유로 프로듀서는 메세지 기록에 실패할 수 있다. 메세지 기록에 실패하면 재시도 하도록 retries 값을 설정한다.
  • 2.1 버전 이후로 retries 는 디폴트로 2147483647 으로 설정되어 있어서 거의 무한정으로 재시도 하도록 되어있다.
 

request.timeout.ms

  • 프로듀서가 batch 에 쌓인 레코드를 카프카 브로커에게 전송하고, 전송에 대한 응답을 받기까지 제한 시간을 결정한다.
  • request.timeout.ms 시간 내에 브로커로부터 응답이 오지 않으면 요청을 포기하고 retries가 소진될 때 까지 전송을 재시도한다.
  • retries 가 소진되면 TimeoutException 예외를 가지고 콜백을 호출한다.
 

retry.backoff.ms

  • 재시도 이후 다음 재시도를 할 때까지 대기하는 시간으로, 디폴트는 100ms 이다.
 

linger.ms

  • batch 를 전송하기까지 대기하는 시간을 결정한다.
  • 프로듀서는 batch 가 가득 찼거나 linger.ms 주기가 되면 무조건 batch 를 전송한다.
  • linger.ms 를 0보다 크게 설정하면 메세지 전송에 지연이 조금 생기겠지만 batch.size 를 높이고 메세지 압축을 활용하면 쓰루풋을 증가시킬 수 있다.
 

compression.type

  • 프로듀서가 만드는 모든 데이터 (레코드, 배치) 에 어떤 압축 알고리즘을 사용할 지 결정한다.
  • 디폴트는 none (압축하지 않음) 이며 gzip, snappy, zstd, lz4 에서 선택한다.
  • batch 사이즈를 크게 잡을수록 압축에서 얻는 장점이 크다.
 

브로커의 compression.type 설정
  • compression.type = producer (디폴트) : 프로듀서로부터 압축된 batch 를 받아서 별도의 압축/해제 처리 없이 그대로 토픽 로그 파일에 기록한다. → 일반적으로 많이 사용하는 설정. 컨수머에서 압축 해제해서 처리한다.
  • compression.type = uncompressed : 브로커에서 압축을 해제하여 로그파일에 기록한다. → 컨수머에서 압축 해제된 메세지를 읽어들여야 하기 때문에 컨수머 쪽에서 소켓 버퍼 많이 잡아먹고 쓰루풋 안나옴.
  • compressson.type = zstd | lz4 | snappy | gzip : 프로듀서로부터 압축된 batch 를 받아서 압축을 해제하고 해당 알고리즘으로 다시 압축해서 로그 파일에 기록한다. (프로듀서의 압축 알고리즘과 일치하면 해제/재압축 과정을 거치지 않는다.) → 굳이 사용해야할 이유가?
 

batch.size

  • 동일한 파티션에 여러 개의 레코드를 전송하기 위해 프로듀서는 배치 단위로 레코드를 묶어서 카프카 클러스터에 한꺼번에 전송한다. batch.size 는 배치의 용량을(바이트 단위) 결정한다.
  • linger.ms 주기가 되기 전에 batch.size 만큼 배치에 레코드가 차면 클러스터에 메세지를 전송한다.
  • batch.size 가 너무 크다고 해도 linger.ms 주기를 적당히 짧게 설정한다면 메세지 전송에 지연이 발생하지는 않고, batch.size가 너무 작으면 메세지를 자주 전송해야 하기 때문에 오버헤드가 발생한다.
 

max.in.flight.requests.per.connection

  • 1개 커낵션 당 카프카 클러스터로부터 ACK 를 받지 못한 요청을(batch 단위) 동시에 몇 개까지 허용하는지 결정한다.
  • max.in.flight.requests.per.connection 을 1보다 높게 설정한다는 것은 in-flight 요청이 동시에 2개 이상 존재하는 걸 허용한다는 건데, 1보다 높게 설정하고 enable.idempotence = false으로 설정하면 재시도에 의한 메세지 중복 전송 과정에서 메세지의 순서가 보장되지 않는 위험이 있다.
 

enable.idempotence

  • enable.idempotence = true 으로 설정하면 2개 이상의 in-flight 요청이 발생하더라도 메세지의 순서를 보장하면서 재시도에 의한 멱등성을 보장해준다.
  • 단, max.in.flight.requests.per.connection 은 5 이하로 설정하고 acks=all 으로 설정해야 한다.
 

max.request.size

  • 프로듀서의 쓰기 요청의 크기(바이트 단위)를 결정한다.
  • 1개 요청에 너무 큰 용량의 배치가 포함되는 것을 방지하는 용도로 쓰인다.
  • 카프카 클러스터에서도 1개 요청 당 허용하는 최대 배치 크기를 나타내는 message.max.bytes 설정이 있는데, 이 설정값보다 max.request.size 를 같거나 작게 해야 한다.
 

receive.buffer.bytes & send.buffer.bytes

  • 클러스터로부터 데이터를 읽고 쓸 때 사용하는 TCP 소켓의 버퍼 크기를 결정한다.
  • 각각의 값이 -1 일 경우 운영체제의 기본값으로 설정한다.
  • 네트워크 대역폭이 낮고 지연시간이 길다면 receive.buffer.bytes, send.buffer.bytes 를 높게 잡아서 대역폭의 효율을 최대한 취하는 게 좋다.