ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] 카프카의 내부 동작원리
    Data Engineer 2023. 5. 20. 14:38
    728x90

    Kafka의 내부 동작 원리와 구현에서 가장 중요한 부분 중 하나는 리플리케이션 동작이다. 이러한 리플리케이션 동작의 구현은 매우 어려운 부분일 뿐만 아니라 애플리케이션의 성능 저하도 불러오게 된다. 이를 위해 카프카는 안정성을 높임과 동시에 최대한 성능에 영향을 주지 않도록 설계되었다.

     

    Kafka 리플리케이션

    고가용성 분산 스트리밍 플랫폼인 카프카는 무수히 많은 데이터 파이프라인의 정중앙에 위치하는 메인 허브 역할을 담당한다. 중앙에서 메인 허브 역할을 하는 카프카 클러스터가 만약 하드웨어의 문제나 점검 등으로 인해 정상적으로 동작하지 못한다거나, 카프카에 연결된 전체 데이터 파이프라인에 영향을 미친다면 이는 매우 심각한 문제가 될 수 있다. 이러한 이슈를 해결하기 위해 카프카는 초기 설계 단계에서부터 브로커 한두 대에서 장애가 발생하더라도 중앙 데이터 허브로서 안정적인 서비스가 운영될 수 있도록 구상되었다.

    카프카는 브로커의 장애에도 불구하고 연속적으로 안정적인 서비스를 제공함으로써 데이터 유실을 방지하며 유연성을 제공한다. 카프카의 리플리케이션 동작을 위해 replication factor라는 옵션을 설정해야 한다.

    [ec2-user@~]$ /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092
    	--create --topic peter-test01
        --partittions 1 --replication-factor 3
        
     ## created topic peter-test01.
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092
    	--create --topic peter-test01
        --describe
        
        
    ## Topic: peter-test01 PartitionCount: 1 ReplicationFactor: 3 Configs: segment. 
    ## bytes=1073741824
    ## Topic: peter-test01 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

    ## Topic: peter-test01 PartitionCount: 1 ReplicationFactor: 3 Configs: segment. 
    ## bytes=1073741824

    - peter-test01 토픽의 파티션 수인 1과 리플리케이션 팩터 수인 3이 표시

    ## Topic: peter-test01 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

    - peter-test01 토픽의 파티션 0에 대한 상세 내용. 파티션 0의 리더는 브로커 1,  리플리케이션들은 브로커 1,2,3에 있음을 나타내며, 현재 동기화되고 있는 리플리케이션들은 브로커 1,2,3이라는 의미.

     

    [ec2-user@~]$ /usr/local/kafka/bin/kafka-console-producer.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test01
    > test message1
    
    # kafka-dump-log.sh 명령어 : 세그먼트 파일의 내용 확인
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-dump-log.sh
    	--print-data-log
        --files /data/kafka-logs/peter-test01-0/0000000000000000.log
    
    ...
    
    Dumping /data/kafka-logs/peter-test01-0/00000000000000000000.log
    Starting offset: 0 # 시작 오프셋 위치 = 0
    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
    producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false
    position: 0 CreateTime: 1601008070323 size: 81 magic: 2 compresscodec: NONE
    crc: 3417270022	isvalid: true
    | offset: 0 CreateTime: 1601008070323 keysize: -1 valuesize: 13 sequence: -1 headerKeys:
    []payload: test message1

    - count : 메시지 카운트 =  1

    - 프로듀서를 통해 보낸 메시지는 test message1임을 알 수 있음

    현재 접속한 서버가 peter-kafka01이지만, 카프카 클러스터를 이루는 다른 브로커들인 peter-kafka02, 03에 접속해 방금 실행한 dump 명령어를 이용해 결과를 확인해보면 모든 브로커가 동일한 메시지를 갖고 있음을 알 수 있다. 즉 콘솔 프로듀서로 보낸 메시지 하나를 총 3대의 브로커들이 모두 갖고 있는 것이다. 

    Kafka는 Replication factor라는 옵션을 통해 관리자가 지정한 수만큼의 리플리케이션을 가질 수 있으며, N개의 리플리케이션이 있는 경우 N - 1까지의 브로커 장애가 발생해도 메시지 손실없이 안정적으로 메시지를 주고 받을 수 있다.

     

    Leader와 Follower

    토픽 상세보기 명령어를 실행하면 출력 내용 중 파티션의 리더라는 부분이 존재한다. 모두 동일한 리플리케이션이라 하더라도 리더만의 역할이 따로 있기에 카프카에서 리더를 특별히 강조해 표시한다. 카프카는 내부적으로 모두 동일한 리플리케이션들을 리더와 팔로워로 구분하고, 각자의 역할을 분담한다.

    - Leader : 리플리케이션 중 하나가 선정되며, 모든 읽기와 쓰기는 그 리더를 통해서 가능하다. 즉 프로듀서는 모든 리플리케이션에 메시지를 보내는 것이 아니라 리더에게만 메시지를 전송한다. 컨슈머 또한 오직 리더로부터 메시지를 가져온다.

    - Follower : 앞선 Leader의 동작 과정 중 Follower들은 리더에 문제가 발생하거나 이슈가 있을 경우를 대비한다. 컨슈머가 토픽의 메시지를 꺼내가는 과정과 비슷하게 지속적으로 파티션의 리더가 새로운 메시지를 받았는지 확인하고, 새로운 메시지가 있다면 해당 메시지를 리더로부터 복제한다.

    복제 유지와 커밋

    - Leader와 Follower는 ISR(InSyncReplica)이라는 논리적 그룹으로 묶임

    *리더와 팔로워를 별도의 그룹으로 나누는 이유는 해당 그룹 안에 속한 팔로워들만이 새로운 리더의 자격을 가질 수 있기 때문에, ISR 그룹에 속하지 못한 팔로워는 새로운 리더의 자격을 가질 수 없다.

    ISR 내의 팔로워들을 리더와의 데이터 일치를 유지하기 위해 지속적으로 리더의 데이터를 따라가게 되고, 리더는 ISR 내 모든 팔로워가 메시지를 받을 때까지 대기한다. 하지만 팔로워가 네트워크 오류, 브로커 장애 등 여러 이유로 리더로부터 리플리케이션하지 못하는 경우가 발생하는 때가 있다. 이를 파악하기 위해 파티션의 리더는 팔로워들의 리플리케이션 동작을 감시한다. 이를 통해 리더의 메시지 수신 속도에 맞는 팔로워들만이 ISR 그룹에 속하게 된다.

    리더와 팔로워 중 리플리케이션 동작을 잘하고 있는지 여부 등은 누가 판단하고 어떤 기준으로 판단할까?

    리더는 메시지를 읽고 쓰는 동작과 더불어 팔로워가 리플리케이션 동작을 잘 수행하고 있는지도 판단하게 된다. 팔로워가 특정 주기의 시간만큼 복제 요청을 하지 않는다면 리더는 해당 팔로워가 리플리케이션 동작에 문제가 발생했다고 판단해 ISR 그룹에서 추방한다.

    카프카 클러스터 운영 중 특정 토픽의 상태가 의심되거나 문제가 있다고 판단될 시 토픽 상세보기 명령어를 통해 현재 ISR 상태를 점검함으로써 현재 토픽의 상태가 양호한지 불량한지를 확인할 수 있다.

    ISR 내에서 모든 팔로워의 복제가 완료된다면, 리더는 내부적으로 커밋되었다는 표시를 하게 된다. 마지막 커밋 오프셋 위치는 High water mark라고 부른다. 즉 커밋의 의미는 리플리케이션 팩터 수의 모든 리플리케이션이 전부 메시지를 저장했음을 의미하며, 메시지의 일관성을 유지하기 위해 커밋된 메시지만 컨슈머가 읽어 갈 수 있다.  

    커밋 메시지

    이 구현에 있어서 커밋된 위치가 매우 중요하다. 모든 브로커는 재시작될 때 커밋된 메시지를 유지하기 위해 로컬 디스크의 replication-offset-checkpoint 라는 파일에 마지막 커밋 오프셋 위치를 저장한다. 이 파일은 브로커 설정 파일에서 설정한 로그 디렉토리의 경로에 있으며, 브로커 설정 파일의 로그 디렉토리는 /data/kafka-logs로 설정되어 있으므로, 해당 디렉토리 하위에 위치한다. 

    [ec2-user@~]$ cat /data/kafka-logs/replication-offset-checkpoint
    
    ...
    
    peter-test01 0 1

     peter-test01은 토픽, 0은 파티션 번호, 1은 커밋된 오프셋 번호를 뜻한다.

    [ec2-user@~]$ /usr/local/kafka/bin/kafka-console-producer.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test01
    > text message 2
    
    [ec2-user@~]$ cat /data/kafka-logs/replication-offset-checkpoint
    
    ...
    
    peter-test01 0 2

    콘솔 프로듀서를 이용 test message 2 메시지를 전송한 후 확인하면, 커밋된 오프셋 번호가 1에서 2로 변경됨을 알 수 있다. 리플리케이션된 다른 브로커들에서도 동일한 명령어를 이용해 확인하면 동일한 오프셋 번호를 나타냄을 알 수 있다. 이를 이용해 특정 토픽이나 파티션에 복제가 되지 않거나 문제가 있다고 판단되는 경우 replication-offset-checkpoint라는 파일의 내용을 확인하고 리플리케이션되고 있는 다른 브로커들과 비교한다면 어떤 브로커, 토픽, 파티션에 문제가 있는지를 확인할 수 있다.

    리플리케이션 동작 방식

    카프카로 향하는 수많은 메시지의 읽고 쓰기를 처리하는 리더는 매우 바쁘다. 이런 와중에 리플리케이션 동작을 위해 팔로워들과 많은 통신을 주고받는다면, 결과적으로 리더의 성능은 떨어지고 카프카의 장점인 빠른 성능도 내기 어려울 것이다. 그래서 카프카는 리더와 팔로워 간에 리플리케이션 동작을 처리할 때 서로의 통신을 최소화할 수 있도록 설계함으로써 리더의 부하를 줄인다.

    다음과 같이 리플리케이션이 진행된다고 가정하자. 현 상태에서 리더는 모든 팔로워가 0번 오프셋 메시지를 리플리케이션하기 위한 요청을 보냈다는 사실을 알고 있다. 그러나 리더는 팔로워들이 0번 오프셋에 대한 리플리케이션 동작을 성공했는지 실패했는지에 대한 여부를 알지 못한다. 전통적인 메시징 큐 시스템인 래빗MQ의 트랜잭션 모드에서는 모든 미러(Mirror : 카프카의 팔로워에 해당)가 메시지를 받았는지에 대한 ACK를 리더에게 리턴하므로 이를 확인할 수 있지만, 카프카에서는 ACK 통신을 제거하였다. 이는 리플리케이션 동작의 성능을 더욱 높이기도 했는데, 어떤 방식일까?

    - 다음 과정으로서, 리더는 1번 오프셋 위치에 두 번째 새로운 메시지 message2를 프로듀서로부터 받은 뒤 저장

    - 0번 오프셋에 대한 리플리케이션 동작을 마친 팔로워들은 1번 오프셋에 대한 리플리케이션을 요청

    - 팔로워들로부터 1번 오프셋에 대한 리플리케이션 요청을 받은 리더는 팔로워들의 0번 오프셋에 대한 리플리케이션 동작이 성공했음을 인지하고, 오프셋 0에 대해 커밋 표시를 한 후 하이워터마크를 증가

    - 팔로워가 0번 오프셋에 대한 리플리케이션을 성공하지 못했다면 팔로워는 1번 오프셋에 대한 리플리케이션 요청이 아닌 0번 오프셋에 대한 리플리케이션 요청. 따라서 리더는 팔로워들이 보내는 리플리케이션 요청의 오프셋을 보고, 팔로워들이 어느 위치의 오프셋까지 리플리케이션을 성공했는지를 인지

    - 팔로워들로부터 1번 오프셋 메시지에 대한 리플리케이션 요청을 받은 리더는 응답에 0번 오프셋 message1 메시지가 커밋되었다는 내용도 함께 전달

    - 리플리케이션의 마지막 과정으로서, 리더의 응답을 받은 모든 팔로워는 0번 오프셋 메시지가 커밋되었다는 사실을 인지하게 되고, 리더와 동일하게 커밋을 표시

    - 1번 오프셋 메시지인 message2를 리플리케이션 : 계속 반복하며 동일 파티션 내에서 리더와 팔로워 간 메시지 최신 상태 유지 가능

    리더와 팔로워 사이에서 한 두번의 ACK 통신을 하는 것은 성능상에 별 문제가 되지 않으나, 카프카처럼 대량 메시지를 처리하는 애플리케이션은 작은 차이도 크게 부각된다. 따라서 ACK를 제외한 카프카에 리더는 메시지를 주고받는 기능에 더욱 집중할 수 있게 된다. 또한 리플리케이션 동작 방식은 리더가 푸시(push)하는 것이 아닌 팔로워들이 풀(pull)하는 방식으로 동작하여, 리더의 부하를 줄여준다.

     

    리더 에포크와 복구

    LeaderEpoch

    - LeaderEpoch는 카프카의 파티션들이 복구 동작을 할 때 메시지의 일관성을 유지하기 위한 용도로 사용

    - 컨트롤러에 의해 관리되는 32비트의 숫자로 구성

    - 해당 LeaderEpoch 정보는 리플리케이션 프로토콜에 의해 전파

    - 새로운 리더가 변경된 후 변경된 리더에 대한 정보는 팔로워에게 전달

    - 복구 동작 시 하이워터마크를 대체하는 수단으로도 활용

    Leader Epoch 미사용시

    1. Leader가 Producer로부터 message1 메시지를 받았고, 0번 오프셋에 저장, Follower는 리더에게 0번 오프셋에 대한 가져오기 요청

    2. Follower는 message1 메시지를 Leader로부터 리플리케이션

    3. Leader는 하이워터마크  = 1로 설정

    4. Leader는 Producer로부터 message2 메시지를 받은 뒤 1번 오프셋에 저장

    5. Follower는 message2 에 대해 가져오기 요청, 응답으로 하이워트마크 변화를 감지 후 자신의 하이워터마크도 1로 올림

    6. 팔로워는 1번 오프셋의 message2 메시지를 리더로부터 리플리케이션

    7. Follower는 2번 오프셋에 대한 요청을 리더에게 보내고, 요청을 받은 리더는 하이워터마크를 2로 올림

    8. 팔로워는 2번 오프셋인 message2 메시지까지 리플리케이션을 완료했으나 아직 리더로부터 하이워터마크를 2로 올리는 내용은 미전달

    9. 장애 발생  -> Follower Down

    이렇게 장애가 발생하고 복구 시 팔로워는 자신의 워터마크보다 높은 메시지들은 신뢰할 수 없는 메시지로 판단하고 message2를 삭제하게 된다. 이 상황에서 리더가 갑작스런 장애로 다운된다면 유일하게 남아있는 팔로워가 새로운 리더가 되는데, message2는 워터마크보다 크기 때문에 손실된 채로 리더가 변경된다.

    그렇다면 LeaderEpoch를 사용하게 되면 어떨까?

    위 그림은 리더와 팔로워의 리플리케이션 동작 이후 팔로워가 장애로 종료된 후 막 복구된 상태이다. 리더에포크를 사용하는 경우, 하이워터마크보다 높은 메시지를 무조건 삭제하지 않으며 리더에게 리더에포크 요청을 한다.

    1. Follower는 복구 동작을 하면서 리더에게 리더에포크 요청

    2. 요청을 받은 리더는 리더에포크의 응답으로 '1번 오프셋의 message2까지' 라고 팔로우에 응답

    3. 팔로워는 자신의 하이워터마크보다 높은 1번 오프셋의 message2를 삭제하지 않고 리더의 응답을 확인한 후 message2까지 자신의 하이워터마크를 상향 조정

    이처럼 LeaderEpoch를 사용하게 되면 삭제 동작에 앞서 리더에포크 요청과 응답 과정을 통해 팔로워의 하이워터마크 조정이 가능하다.

    [리플리케이션 동작이 완료되지 않은 상태]

    팔로워는 아직 1번 오프셋 메시지에 대해 리플리케이션 동작이 완료되지 않았다고 가정하자. 이 시점에서 해당 브로커들의 장애가 발생해 리더와 팔로워 모두 다운됐다고 가정한다. 

    1. 팔로워였던 브로커가 장애에서 먼저 복구

    2.  peter-test01 토픽의 0번 파티션에 리더가 없으므로 팔로워는 새로운 리더로 승격

    3. 새로운 리더는 프로듀서로부터 다음 메시지 message3을 전달받고 1번 오프셋에 저장 한 후 자신의 하이워터마크를 상향 조정

     

    1. 구 리더였던 브로커가 장애에서 복구

    2. peter-test01 토픽의 0번 파티션에 이미 리더가 있으므로, 복구된 브로커는 팔로워가 된다.

    3. 리더와 메시지 정합성 확인을 위해 자신의 하이워터마크를 비교해보니 리더의 하이워터마크와 일치하므로 브로커는 자신이 갖고 있던 메시지를 삭제하지 않음

    4. 리더는 프로듀서로부터 message4 메시지를 받은 후 오프셋2의 위치에 저장

    5. 팔로워는 오프셋2인 message4를 리플리케이션하기 위해 준비

    이렇게 되면 리더와 팔로워 모두 동일한 하이워터마크를 나타내지만 서로의 메시지가 달라지게 된다. 

     

    [LeaderEpoch 사용 시]

    가장 중요한 점은, 뉴리더가 자신이 팔로워일때의 하이워터마크와 뉴리더일 때의 하이워터마크를 알고 있다.

    1. 구 리더였던 브로커가 장애에서 복구

    2. peter-test01 토픽의 0번 파티션에 이미 리더가 있고 자신은 팔로워가 됨.

    3. 팔로워는 뉴리더에게 리더에포크를 요청

    4. 뉴리더는 0번 오프셋까지 유효하다 응답

    5. 팔로워는 메시지 일관성을 위해 로컬 파일에서 1번 오프셋인 message2를 삭제

    *팔로워는 쓰기 권한이 없으므로 리더에게 message2를 추가할 수 없음

    6. 팔로워는 리더로부터 1번 오프셋인 message3를 리플리케이션하기 위해 준비

    실습

    ## peter-test02 토픽 생성(파티션 1, 리플리케이션 2)
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-topics.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --create --topic peter-test02
        --partitions 1
        --replication-factor 2
        
    ## 파티션 리더가 어느 브로커인지 확인
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-topics.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test02
        --describe
        
    ...
    
    Topic: peter-test02 PartitionCount: 1 ReplicationFactor: 2 Configs: segment.
    bytes=1073741824
    Topic: peter-test02 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
    
    ## 현재 리더에포크 상태 확인
    [ec2-user@~]$ cat /data/kafka-logs/peter-test02-0/leader-epoch-checkpoint
    
    ...
    
    0 
    1 # 현재 리더에포크 번호
    0 0 # 첫 번째 0 : 리더에포크 번호 / 두 번째 0 : 최종 커밋 후 새로운 메시지를 전송받게 될 오프셋 번호

    이제 토픽 파티션의 변화를 준 뒤 리더에포크의 내용을 확인한다.

    [ec2-user@~]$ /usr/local/kafka/bin/kafka-console-producer.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test02
    > message1
    
    # 현재 리더에포크 상태 확인
    [ec2-user@~]$ cat /data/kafka-logs/peter-test02-0/leader-epoch-checkpoint
    
    ...
    
    0
    1
    0 0

    조금 전 확인한 사항과 달라진 것이 없다. 현재까지의 상황은 콘솔 프로듀서를 이용해 peter-test02 토픽으로 전송한 message1은 0번 파티션의 0번 오프셋에 저장됐고, 리플리케이션 동작으로 팔로워까지 저장된 상태이다. 리더에포크는 새로운 리더 선출 시 변경된 정보가 업데이트 된다.

    [ec2-user@~]$ sudo systemctl stop kafka-server
    [ec2-user@~]$ sudo systeamctl status kafka-server
    
    ...
    
    * kafka-server.service - kafka-server
      Loaded: loaded (/etc/systemd/system/kafka-server.service; enabled; vendor preset:
    disabled)
      Active: failed(Result: exit-code) since 목 2023-05-18 00:00:32 KST; 59s ago
    Process: 6691 ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh (code=exited, status=0/
    SUCCESS)...
    
    # Active: failed가 확인되면 정상적으로 카프카 프로세스가 종료됨을 의미
    
    # 상태 확인
    [ec2-user@~]$ cat /data/kafka-logs/peter-test02-0/leader-epoch-checkpoint
    
    ...
    
    0
    2 # 현재의 리더에포크 번호. 리더가 변경될 때마다 하나씩 증가
    0 0 # 리더에포크 번호 / 최종 커밋 후 새로운 메시지를 전송받게 될 오프셋 번호
    1 1 # 리더에포크 번호 / 최종 커밋 후 새로운 메시지를 전송받게 될 오프셋 번호

    1. 리더 브로커 1번이 다운되면서 리더 에포크 번호는 2로, 1 증가하였다.

    2. 리더에포크 번호가 1이었을 때를 기준으로 가장 마지막에 커밋된 후 새로 메시지를 받게 될 오프셋 번호를 기록

    3. 가장 마지막에 커밋된 오프셋 번호는 0이므로 카프카는 리더에포크 번호 1과 최종 커미소딘 후 새로이 메시지를 전송받도록 준비된 오프셋 번호 1을 leader-epoch-checkpoint 파일에 기록

    다운됐던 브로커는 leader-epoch-checkpoint 파일에 기록된 정보를 이용해 복구 동작 실행. 구 리더는 종료 직전 마지막 리더에포크 번호가 1이므로 뉴리더에게 1번 리더에포크에 대한 요청을 보내고, 뉴리더는 1번 리더에포크의 최종 커밋 후 준비된 오프셋 위치가 1이라는 응답을 보낸다. 

    # 다시 브로커 실행
    [ec2-user@~]$ sudo systemctl start kafka-server
    
    [ec2-user@~]$ cat /data/kafka-logs/peter-test02-0/leader-epoch-checkpoint
    
    ...
    
    0
    3 # 리더에포크 2 -> 3
    0 0
    1 1
    2 2  # 리더에포크 번호 2, 최종 커밋 오프셋 2

    팔로워는 자신의 하이워터마크보다 높은 오프셋의 메시지를 무조건 삭제하고, 먼저 리더에게 리더에포크 요청을 보내 응답을 받아 최종 커밋된 오프셋 위치를 확인한다. 이를 통해 안정적인 복구 동작을 가능하게 만든다.

    Controller

    - 카프카 클러스터 중 하나의 브로커가 컨트롤러 역할

    - 파티션의 ISR 리스트 중에서 리더를 선출.

    *ISR 리스트 정보는 가용성 보장을 위해 주키퍼에 저장

    - 브로커의 실패를 예의주시하고 브로커의 실패가 감지되면 ISR 리스트 중 하나를 새로운 파티션 리더로 선출

    - 새로운 리더의 정보를 주키퍼에 기록하고 변경된 정보를 모든 브로커에게 전달

    파티션의 리더가 다운됐다는 것은 해당 파티션의 리더가 없는 상태를 의미하며, 카프카 클라이언트(프로듀서, 컨슈머)가 해당 파티션으로 읽기/쓰기 동작이 불가능하다. 이렇게 되면 모든 읽기/쓰기 동작은 실패하고, 클라이언트에 설정된 재시도 숫자만큼 재시도를 하게 된다.

    따라서 클라이언트들이 재시도하는 시간 내 리더 선출 작업이 빠르게 이뤄져야 한다.

    [리더 선출 과정 : 예기치 않은 장애]

    1. 파티션 0번의 리더가 있는 브로커 1번이 예기지 않게 다운

    2. 주키퍼는 1번 브로커와의 연결이 끊어진 후, 0번 파티션의 ISR에서 변화 감지

    3. 컨트롤러는 주키퍼 워치를 통해 0번 파티션에 변화를 감지하고, 해당 파티션 ISR 중 3번을 새로운 리더로 선출

    4. 컨트롤러는 0번 파티션의 새로운 리더가 3이라는 정보를 주키퍼에 기록

    5. 갱신된 정보는 현재 활성화 상태인 모든 브로커에게 전달

     

    [리더 선출 과정: 제어된 종료 과정]

    제어된 종료 과정은 관리자가 자연스럽게 종료하는 상황을 얘기한다.

    1. 관리자가 브로커 종료 명령어를 실행하고, SIG_TERM 신호가 브로커에게 전달

    2. SIG_TERM 신호를 받은 브로커는 컨트롤러에게 알림

    3. 컨트롤러는 리더 선출 작업을 진행하고, 해당 정보를 주키퍼에게 기록

    4. 컨트롤러는 새로운 리더 정보를 다른 브로커들에게 전송

    5. 컨트롤러는 종료 요청을 보낸 브로커에게 정상 종료한다는 응답

    6. 응답을 받은 브로커는 캐시에 있는 내용을 디스크에 저장하고 종료

    제어된 종료와 급작스러운 종료의 가장 큰 차이는, Downtime이다. 제어된 종료를 사용하면 카프카 내부적으로 파티션들의 다운타임을 최소화할 수 있다. 왜냐하면 컨트롤러는 브로커가 종료되기 전 해당 브로커가 리더로 할당된 전체 파티션에 대해 리더 선출 작업을 진행하기 때문이다.(물론 제어된 종료라도 일시적으로 다운타임이 발생). 리더 선출 작업 대상 파티션들의 리더들이 활성화된 상태에서 순차적으로 하나의 파티션마다 리더를 선출하므로, 각 파티션들은 다운타임을 최소화할 수 있다. 제어된 종료의 경우 브로커가 자신의 모든 로그를 디스크에 동기화한 후 종료되므로, 재시작 시 로그 복구 시간이 짧다.

    * 제어된 종료를 사용하려면 controlled.shutdown.enable = true 설정이 브로커의 설정 파일인 server.properties에 적용되어야 함.

    # 현재의 브로커 상태를 보고 싶을 때
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-configs.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --broker 1
        --describe --all

    반면 브로커 장애로 인한 리더 선출 작업은, 이미 대상 파티션들의 리더가 종료된 상태이고 파티션들의 다운타임은 새로운 리더 선출 작업이 될 때까지 지속된다. 컨트롤러는 제어된 종료의 경우와 마찬가지로 순차적인 파티션 리더 선출을 하게되며, 이렇게 되면 첫 번째 대상 파티션의 다운타임은 길지 않으나 마지막 대상 파티션의 다운타임은 꽤 오랜 시간이 소요된다.

     

    Log Segment

    - 카프카의 토픽으로 들어오는 메시지는 Segment라는 파일에 저장

    - 메시지는 정해진 형식에 맞춰 순차적으로 Log Segment 파일에 저장 : 메시지의 내용만 저장되는 것이 아니라 메시지의 키, 밸류, 오프셋, 메시지 크기와 같은 정보가 함께 저장되며, 로그 세그먼트 파일은 브로커의 로컬 디스크에 보관

    - 하나의 로그 세그먼트 크기가 너무 커져버리면 파일을 관리하기 어렵기 때문에, 최대크기는 1GB가 기본값

    - 로그 세그먼트가 1GB보다 클 경우

    Rolling : 하나의 로그 세그먼트에 카프카로 인입되는 메시지들을 계속해서 덧붙이다 로그 세그먼트의 크기가 최대크기에 도달하면 해당 세그먼트 파일을 클로즈하고, 새로운 로그 세그먼트를 생성함.

    카프카에 기본적으로 로그 세그먼트 파일에 대한 롤링 전략이 준비되어 있지만 1GB 크기의 로그 세그먼트 파일이 무한히 늘어날 경우를 대비해 로그 세그먼트 삭제와 컴팩션의 두 가지 대비책을 수립했다.

    로그 세그먼트 삭제

    로그 세그먼트 삭제 옵션은 브로커 설정 파일인 server.properties에서 log.cleanup.policy가 delete로 명시되어야 한다. 해당 값은 기본값으로 적용되지만, 관리자가 server.properties에 해당 옵션을 따로 명시하지 않았다면 삭제 정책이 필요하다.

    # 토픽 생성
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-topics.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --create --topic peter-test03
      	--partitions 1
        --replication-factor 3
    
    # 콘솔 프로듀서 이용 log1 메시지 전달
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-console-producer.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test03
    >log1
    
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-console-consumer.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test03
        --from-beginning # 강제로 peter-test03 토픽의 처음부터 메시지를 가져옴
        
     ...
     
     log1
     
     # 메시지 삭제 : retention.ms=0 설정
     # 로그 세그먼트 파일 보관 시간이 0보다 크면 삭제
    [ec2-user@~] /usr/local/kafka/bin/kafka-configs.sh
     	--bootstrap-server peter-kafka01.foo.var:9092
        --topic peter-test03
        --add-config retention.ms=0 --alter
        
       
    [ec2-user@~] /usr/local/kafka/bin/kafka-topics.sh
     	--bootstrap-server peter-kafka01.foo.var:9092
        --topic peter-test03
       	--describe
        
     ...
     
     Topic: peter-test03 PartitionCount: 1 ReplicationFactor: 3 Configs: segment.
     bytes=1073741824,retention.ms=0
     Topic: peter-test03 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

    로그 세그먼트의 삭제 작업은 일정 주기를 가지고 체크하는 데, 카프카의 기본값은 5분 간격으로 로그 세그먼트 파일을 체크하면서 삭제 작업을 수행한다. 

    # 삭제가 일어나기 전 상태 확인
    [ec2-user@~]$ ls /data/kafka-logs/peter-test03-0/
    
    ...
    
    00000000000000000000.index 0000000000000000.log 0000000000000000.timeindex
    leader-epoch-checkpoint
    
    # retention.ms 초과 후
    [ec2-user@~]$ ls /data/kafka-logs/peter-test03-0/
    
    ...
    
    0000000000000001.log 0000000000000001.timeindex leader-epoch-checkpoint

    - 0000000000000000.index :  로그 세그먼트에 저장된 위치와 오프셋 정보를 기록하는 파일

    - 0000000000000000.log : 실제 메시지들이 저장되는 파일

    - 0000000000000000.timeindex : 메시지의 타임스탬프를 기록하는 파일

    로그 세그먼트 파일 보관 주기가 지나게 되면 해당 파일은 삭제되고 새로운 파일이 생성된다는 의미이다. 카프카에서는 로그 세그먼트 파일명이 생성되는 규칙이 있는데, 신규 로그 세그먼트 파일은 처음에 확인했던 로그 세그먼트 파일에서 1씩 증가한다. 즉 오프셋 시작 번호를 이용해 파일 이름을 생성하는 규칙을 따른다.

    # 토픽 peter-test03에 추가했던 retention.ms 옵션 삭제
    [ec2-user@~]$ /usr/local/kafka/bin/kafka-configs.sh
    	--bootstrap-server peter-kafka01.foo.bar:9092
        --topic peter-test03
        --delete-config retention.ms --alter
        
    # 토픽 상태 확인
     [ec2-user@~] /usr/local/kafka/bin/kafka-topics.sh
     	--bootstrap-server peter-kafka01.foo.var:9092
        --topic peter-test03
       	--describe
        
     ...
     
     Topic: peter-test03 PartitionCount: 1 ReplicationFactor: 3 Configs: segment.
     bytes=1073741824
     Topic: peter-test03 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

    보관 시간 외에도 retention.bytes라는 옵션을 이용해 지정된 크기를 기준으로도 로그 세그먼트를 삭제할 수 있다.

     

    로그 세그먼트 컴팩션

    - 카프카에서 제공하는 로그 세그먼트 관리 정책 중 하나- 로그를 삭제하지 않고 컴팩션하여 보관이 가능하며, 기본적으로 로컬 디스크에 저장되어 있는 세그먼트를 대상으로 실행- 메시지의 키값을 기준으로 마지막의 데이터만 보관로그 컴팩션 기능을 이용하는 대표적인 예제는 카프카의 _consumer_offset 토픽이다. _consumer_offset 토픽은 카프카의 내부 토픽으로 컨슈머 그룹의 정보를 저장하는 토픽이다. 각 컨슈머 그룹의 중요한 정보는 해당 컨슈머 그룹이 어디까지 읽었는지를 나타내는 오프셋 커밋 정보인데, _consumer_offset에 키(컨슈머 그룹명, 토픽명)와 밸류(오프셋 커밋 정보) 형태로 메시지가 저장된다.

    로그 컴팩션은 메시지의 키값을 기준으로 과거 정보는 중요하지 않고 가장 마지막 값이 필요한 경우에 사용한다. 프로듀서가 카프카로 메시지를 전송할 때, 메시지에는 메시지의 키와 밸류를 함께 전송한다. 밸류는 필숫값이나 키는 필숫값이 아니다. 그러므로 로그 컴팩션 기능을 사용하고자 한다면 카프카로 메시지를 전송할 때 키도 필숫값으로 전송해야 한다.

    위 그림과 같이, 로그 컴팩션을 거치면 키 값을 기준으로 가장 마지막의 메시지들만 남게 된다.

    로그 컴팩션은 장애 복구 시 전체 로그가 아닌 최신의 상태만 복구하므로, 복구 시간이 짧아져 빠른 장애 복구가 가능하다. 마지막 메시지만 빠르게 재처리할 수 있다는 장점이 있으나 모든 토픽에 로그 컴팩션을 적용하는 것은 좋지 않으며, 키값을 기준으로 최종값만 필요한 워크로드에 적용하는 것이 바람직하다. 또한 카프카에서 로그 컴팩션 작업이 실행되는 동안 브로커의 과도한 입출력 부하가 발생할 수 있으므로 브로커의 리소스 모니터링도 병행해 로그 컴팩션을 사용해야 한다.

     

    로그 컴팩션 관련 옵션

    옵션 이름 옵션 값 적용 범위 설명
    cleanup.policy compact 토픽의 옵션으로 적용 토픽 레벨에서 로그 컴팩션을 설정할 때 적용
    log.cleanup.policy compact 브로커의 설정 파일에 적용 브로커 레벨에서 로그 컴팩션을 설정할 때 적용
    log.cleaner.min.
    compaction.lag.ms
    0 브로커의 설정 파일에 적용 메시지가 기록된 후 컴팩션하기 전 경과되어야 할 최소 시간을 지정. 이 옵션을 설정하지 않으면 마지막 세그먼트를 제외하고 모든 세그먼트를 컴팩션할 수 있음
    log.cleaner.max.
    compaction.lag.ms
    9223372036854775807 브로커의 설정 파일에 적용 메시지가 기록된 후 컴팩션하기 전 경과되어야 할 최대 시간 지정
    log.cleaner.min.
    cleanable.ratio
    0.5 브로커의 설정 파일에 적용 로그에서 압축이 되지 않은 부분을 dirty로 표현. 전체 로그 대비 더티의 비율이 50%가 넘으면 로그 컴팩션이 실행

     

    728x90

    'Data Engineer' 카테고리의 다른 글

    [Netflix TechBlog] Data PipeLine_Asset Management  (0) 2023.06.04
    [Kafka] 프로듀서의 동작과 원리  (0) 2023.05.26
    [Kafka] Producer와 Consumer  (0) 2023.05.16
    [Kafka] 정리_01  (0) 2023.05.14
    [Data Engineer] Kafka란 무엇인가  (0) 2023.05.07
Designed by Tistory.