이 글은 “카프카, 데이터 플랫폼의 최강자” 책 내용을 정리한 글입니다.

카프카, 데이터 플랫폼의 최강자 - 카프카 컨슈머

컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지 가져오기 요청을 하는 것.
각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신함. 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있음.

1. 컨슈머 주요 옵션

올드 컨슈머는 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식을 지원하다가 카프카 버전 0.9부터 컨슈머의 오프셋 저장을 주키퍼가 아닌 카프카의 토픽에 저장하는 방식으로 변경함.(참고로 주키퍼의 지노드에 저장하는 방식은 사라짐)

  • bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보를 나타냄. 클러스터 중 하나의 호스트만 입력해도 되지만 이 방식은 권장되지 않음.
    카프카 클러스터 전체의 서버 목록을 입력하는게 좋음.
  • fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈. 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답을 하지 않고 데이터가 누적될 때까지 기다림.
  • group.id : 컨슈머가 속한 그룹을 식별하는 식별자
  • enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋할지 여부
  • auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더이상 존재하지 않은 경우에 다음 옵션으로 리셋
    • earliest : 가장 초기의 오프셋값으로 설정
    • latest : 가장 마지막의 오프셋값으로 설정
    • none : 이전 오프셋값을 찾지 못하면 에러를 나타냄
  • fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈
    • 브로커 설정의 “message.max.bytes”와 토픽 설정의 “max.message.bytes” 설정과 관련이 있음
  • max.partition.fetch.bytes : 파티션당 한번의 fetch로 가져올 수 있는 최대 데이터 사이즈. 컨슈머는 레코드를 배치로 가져옴.
  • request.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간
  • connections.max.idle.ms : 이 설정보다 더 오랜 기간동안 idle 상태인 커넥션을 종료시킴
  • session.timeout.ms : 브로커가 컨슈머가 살아있는 것으로 판단하는 시간. 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 리밸런스를 시도함. session.timeout.ms는 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며, 이 속성은 heartbeat.interval.ms와 밀접한 관련이 있음.
    • 브로커 설정의 “group.min.session.timeout.ms”, “group.max.session.timeout.ms” 사이값으로 해야함.
  • heartbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 하트비트를 보낼 것인지 조정함. 이 값은 “session.timeout.ms” 보다 작게 설정해야 함. 또한 일반적으로 “session.timeout.ms”의 1/3보다 작게 설정함.
  • max.poll.records : poll()에 대한 최대 레코드 수를 조정.
  • max.poll.interval.ms : 컨슈머가 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 함.
  • auto.commit.interval.ms : 주기적으로 오프셋을 커밋하는 시간
  • fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간
  • isolation.level : 트랜잭션과 관련된 레코드를 어떻게 읽을지를 결정
    • read_committed : 커밋된 레코드만 읽음.
    • read_uncommitted : 아직 커밋되지 않은 레코드도 읽음. 심지어 트랜잭션이 abort된 레코드도 읽음. (즉 모든 레코드를 읽음)
    • 트랜잭션과 무관한 레코드는 isolation.level과 상관없이 모든 경우에 읽을수 있음.

컨슈머는 레코드를 오프셋 순서대로 읽음.

Read_commited 모드에서는 LSO까지의 레코드만 읽을 수 있음. (LSO = 트랜잭션이 진행중인 레코드 중 맨 앞에 있는 레코드의 오프셋 - 1)
따라서 트랜잭션이 진행중인 레코드 뒤에 있는 레코드들은 관련 트랜잭션이 완료될 때까지는 읽을 수 없음. 또한 Read_commited 컨슈머에서 seekToEnd 메소드를 호출하면 오프셋을 LSO로 옮김.

2. 파티션과 메시지 순서

  • 컨슈머는 오직 파티션의 오프셋 기준으로만 메시지를 가져옴
  • 카프카 컨슈머에서의 메시지 순서는 동일한 파티션 내에서는 프로듀서가 생성한 순서와 동일하게 처리하지만, 파티션과 파티션 사이에서는 순서를 보장하지 않음.

순서를 보장하는 방법

  • 파티션 1개로 토픽을 구성. (단, 파티션이 한개인 경우 처리량이 떨어지는 부분은 감안해야 함)
  • 순서를 보장해야 하는 메시지는 동일한 키를 가지도록 함. 메시지의 키가 같은 경우 같은 파티션으로 전송이 되기 때문에, 같은 키를 가지는 메시지의 경우에는 순서가 보장됨.

3. 컨슈머 그룹

  • 컨슈머 그룹은 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져올 수 있음. 최근에는 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌기 때문에 이러한 특징은 큰 장점!
  • 하나 이상의 컨슈머를 같은 그룹에 속하게 하려면 동일한 group.id로 설정하면 됨.
  • 파티션의 소유권이 이동하는 것을 리밸런스라고 함. 컨슈머 그룹의 리밸런스를 통해 컨슈머 그룹에 컨슈머를 쉽고 안전하게 추가하고 제거할 수 있음. 또한 높은 가용성과 확장성을 확보할 수 있음.
    • 리밸런스를 하는 동안 일시적으로 컨슈머는 메시지를 가져올 수 없음. 그래서 리밸런스가 발생하면 컨슈머 그룹 전체가 일시적으로 사용할 수 없는 단점 존재.
  • 토픽의 파티션에는 하나의 컨슈머만 연결. 서로다른 컨슈머가 하나의 파티션으로부터 메시지를 가져오는것은 불가능.
  • 컨슈머 그룹마다 각자의 오프셋을 별로도 관리.
  • group.id는 중복이 되지 않도록 주의할 것.

Q. 만약 프로듀서가 토픽에 보내는 메시지 속도가 갑자기 증가해 컨슈머가 메시지를 가져가는 속도보다 빨라지면?
컨슈머가 처리하지 못한 메시지들이 점점 많아지게 되어 카프카로 메시지가 들어오는 시간과 컨슈머에 의해 카프카에서 나가는 시간의 차이는 점점 벌어지게 될 것.
그럼 어떻게 해야 할까?

컨슈머를 확장해야 함.
단순한 확장이 아닌, 동일한 토픽에 대해 여러 컨슈머가 메시지를 가져갈 수 있도록 컨슈머 그룹이라는 기능을 사용하면 됨.
이를 통해 컨슈머는 확장이 용이해지고, 컨슈머의 장애에도 빠른 대처가 가능함.

상황 1
토픽(파티션0, 파티션1, 파티션2) , 컨슈머그룹01(컨슈머01) 에 대해 메시지를 가져오는 상황을 가정.
-> 읽지 못한 메시지들이 쌓이게 됨.
-> 지속적으로 쌓이게 되면 커다란 골칫거리가 될 수 있음.
-> 해결: 컨슈머를 충분히 확장해야 함. (컨슈머 그룹을 이용)
=> 토픽(파티션0, 파티션1, 파티션2) , 컨슈머그룹01(컨슈머01, 컨슈머02, 컨슈머03)
-> 이때 리밸런스 발생.

상황 2
토픽(파티션0, 파티션1, 파티션2) , 컨슈머그룹01(컨슈머01, 컨슈머02, 컨슈머03, 컨슈머04) 에 대해 메시지를 가져오는 상황을 가정.
-> 컨슈머04는 아무일도 하지 않고 대기만 함.
-> 왜 이럴까?
-> 이유는 토픽의 파티션에는 하나의 컨슈머만 연결할 수 있기 때문.
-> 이 때, 각각의 파티션에 대해서는 메시지 순서를 보장하기 때문에 안정적으로 메시지 순서를 보장할 수 있는데, 만일 하나의 파티션에 두개의 컨슈머가 연결된다면 안정적으로 메시지 순서를 보장할 수 없게 될 것임.
=> 따라서 카프카에서는 하나의 파티션에 하나의 컨슈머만 연결할 수 있음
=> 컨슈머 수를 늘릴때에는, 토픽의 파티션 수도 같이 늘려줘야 함.
=> 토픽(파티션0, 파티션1, 파티션2) , 컨슈머그룹01(컨슈머01, 컨슈머02, 컨슈머03)

4. 커밋과 오프셋

  • 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보를 기록하고 있음.
  • 커밋 : 각 파티션에 대해 현재 위치를 업데이트하는 동작
  • 카프카 내에 별도로 내부에서 사용하는 토픽을 만들고 그 토픽에 오프셋 정보를 저장하고 있음. (__consumer_offset)
  • 컨슈머는 새로운 파티션에 대해 가장 최근 커밋한 오프셋을 읽고 그 이후부터 메시지들을 가져오기 시작함.
  • 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면 메시지 처리는 중복됨.
  • 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 크면 메시지 처리가 누락됨.

4.1 자동커밋

  • enable.auto.commit을 true로 설정하면 5초마다 컨슈머는 poll()를 호출할 때 가장 마지막 오프셋을 커밋.
    • 5초는 기본값이며 auto.commit.interval.ms 옵션을 통해 조정이 가능함.
  • 컨슈머는 poll을 요청할 때마다 커밋할 시간인지 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋.
  • 자동 커밋을 사용하는 경우, 리밸런싱이 일어나면서 메시지가 중복되어 처리될 수도 있음. 중복을 줄이기 위해 auto.commit.interval.ms 값을 작게 할 수도 있지만 완벽하게 중복을 제거할 순 없음.

4.2 수동커밋

  • 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안 되는 경우에 사용. (응???뭔소리.. 예시 생각해보자.ㅠㅠ)
    • ex) 컨슈머가 메시지를 가져와서 db에 메시지를 저장한다고 가정. 만약 자동 커밋을 사용하는 경우라면 자동커밋의 주기로 인해 poll하면서 마지막 값의 오프셋으로 자동 커밋이 됨. 그리고 일부 메시지들은 db에 저장되지 못한 상태로 컨슈머 장애가 발생한다면????? 메시지가 손실될 것임.
      -> 이런경우를 방지하기 위해 컨슈머가 메시지를 가져오자마자 커밋을 하는 것이 아니라, 먼저 db에 메시지를 저장한 후 커밋을 해야만 안전함!!
  • 커밋을 수동으로 해야하는 경우 enable.auto.commit을 false로 지정한다.
  • commitSync 혹은 commitAsync 메소드를 호출해 수동으로 커밋한다.

4.3 특정 파티션 할당

  • assign 메소드를 통해 특정 파티션의 메시지를 가져올 수 있음.
    • 수동으로 파티션을 할당하는 경우, 컨슈머 인스턴스마다 컨슈머 그룹 아이디를 서로 다르게 설정해야 함.

4.4 특정 오프셋부터 메시지 가져오기

  • seek() 메소드를 통해 특정 오프셋부터 메시지를 가져올 수 있음.

태그:

카테고리:

업데이트:

댓글남기기