-
[Kafka] 프로듀서의 동작과 원리Data Engineer 2023. 5. 26. 13:46728x90
프로듀서의 기본 역할은 소스에 있는 메시지들을 카프카의 토픽으로 전송하는 것이다.
Partitioner
- 카프카의 토픽은 성능 향상을 위한 병렬 처리가 가능하도록 하기 위해 파티션으로 나뉘고, 최소 하나 또는 둘 이상의 파티션으로 구성
- 프로듀서가 카프카로 전송한 메시지는 해당 토픽 내 각 파티션의 로그 세그먼트에 저장
- 프로듀서는 토픽으로 메시지를 보낼 때 해당 토픽의 어느 파티션으로 메시지를 보내야 할 지를 결정해야 하는데, 이때 사용하는 것이 바로 Partitioner
프로듀서가 파티션을 결정하는 알고리즘은 기본적으로 메시지의 키를 해시 처리해 파티션을 구하는 방식을 사용한다. 메시지의 키값이 동일하면 해당 메시지들은 모두 같은 파티션으로 전송된다. 만약 예상치 못한 많은 양의 메시지가 카프카로 인입되는 경우 카프카는 클라이언트의 처리량을 높이기 위해 토픽의 파티션을 늘릴 수 있는 기능을 제공한다. 이때 파티션 수가 변경됨과 동시에 메시지의 키와 매핑된 해시 테이블도 변경된다.
단, 메시지의 키를 이용해 카프카로 전송하는 경우, 만약 토픽의 파티션 수를 확장하게 되면 매핑되었던 파티션의 위치가 변경될 수 있다. 따라서 관리자의 의도와는 다른 방식으로 메시지 전송이 이뤄질 수 있기 때문에, 되도록 파티션 수를 변경하지 않는 것을 권장한다.
만약 별도의 키값을 지정하지 않는다면?
프로듀서의 메시지 중 레코드의 키값은 필숫값이 아니기 때문에, 별도의 레코드 키값을 지정하지 않고 메시지를 전송할 수 있다. 키값을 지정하지 않는다면 키값은 null이 되고, Round Robin 알고리즘을 사용하여 프로듀서는 목적지 토픽의 파티션들로 레코드들을 랜덤 전송을 한다.
ROUND ROBIN
- 파티셔너를 거친 후의 레코드들은 배치 처리를 위해 프로듀서의 버퍼 메모리 영역에서 잠시 대기 후 카프카로 전송
- 배치 처리를 위해 잠시 메시지들이 대기하는 과정에서 효율이 떨어질 수 있음
프로듀서는 키값이 null인 레코드1을 보내고, 파티셔너는 키값이 null인 레코드를 확인해서 라운드 로빈 전략으로 토픽A-파티션0에 레코드1을 담아놓습니다. 이 메시지는 카프카로 전송되지 않은 채 배치 전송을 위해 프로듀서의 버퍼 메모리에서 잠시 대기한다. 다음으로 프로듀서는 키값이 null인 레코드2를 보내고, 파티셔너는 라운드 로빈 전략으로 토픽A-파티션1에 레코드2를 담아놓는다. 동일한 방법으로 키값이 null인 레코드들이 총 5개 전송되고 라운드 로빈 전략에 의해 각 파티션에 하나씩 순차적으로 할당된다.
총 5개의 레코드가 파티셔너를 거쳐 지나갔으나 카프카로 전송되지 못한 채 프로듀서 내에서 대기하고 있다. 그 이유는 배치 전송을 위한 최소 레코드 수가 3으로 설정되어 있으나 파티션별 최소 레코드 수의 기준인 3을 충족하지 못했기 때문이다.
스티키 파티셔닝 전략
- 라운드 로빈 전략에서 지연시간이 불필요하게 증가되는 비효율적인 전송을 개선하고자 사용
- 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송
프로듀서에서 파티셔너를 거켜 레코드가 처리되는 과정을 한번 알아보자. 프로듀서는 키값인 null인 레코드1을 보내고 파티셔너는 키값이 null인 레코드를 확인하고 배치를 위해 임의의 토픽A-파티션0에 레코드1을 담아 놓는다. 그러고 나서 프로듀서는 키값이 null인 레코드2를 보내고, 파티셔너는 조금 전 토픽A-파티션0에 레코드2를 담아 놓는다. 이런 방식으로 파티셔너는 배치를 위한 레코드 수에 도달할 때까지 다른 파티션으로 보내지 않고 동일한 파티션으로 레코드를 담아놓는다.
이런 스티키 파티셔닝 전략은 기본 설정인 Round Robin에 비해 약 30% 이상 지연시간이 감소하고 프로듀서의 CPU 사용률도 줄이는 효과를 볼 수 있다. 카프카로 전송하는 메시지의 순서가 그다지 중요하지 않은 경우라면 스티키 파티셔닝 전략을 적용하는 것을 권장한다.
프로듀서의 배치
토픽의 처리량을 높이기 위한 방법으로 토픽을 파티션으로 나눠 처리하며, 카프카 클라이언트인 프로듀서에서는 처리량을 높이기 위해 배치 전송을 권장한다. 프로듀서에서는 카프카로 전송하기 전, 배치 전송을 위해 토픽의 파티션별로 레코드들을 잠시 보관하고 있다.
- buffer.memory : 카프카로 메시지들을 전송하기 위해 담아두는 프로듀서의 버퍼 메모리 옵션. 기본값은 32M
- batch.size : 배치 전송을 위해 메시지들을 묶는 단위를 설정하는 배치 크기 옵션. 기본 옵션은 16KB로 설정되어 있으며, 관리자는 배치 크기를 더 높이거나 줄일 수 있다.
- linger.ms : 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간을 설정하는 옵션. 단위는 밀리초(ms)이며 기본값은 0이다. 기본값 0으로 설정하면 배치 전송을 위해 기다리지 않고 메시지들이 즉시 전송된다.
프로듀서의 배치 전송 방식은 한 번에 다량의 메시지를 묶어 전송하는 방법으로, 불필요한 I/O를 줄일 수 있어 매우 효율적이며, 더불어 카프카의 요청 수를 줄여주는 효과도 있다.
그렇지만 카프카를 사용하는 목적에 따라 처리량을 높일지, 혹은 지연 없는 전송을 해야 할 지 선택을 해야 한다. 대량의 메시지를 처리할 시 처리량을 높여야 하는 경우라면 효율적인 배치 전송을 위해 프로듀서의 설정을 변경해야 하고, 지연 없는 전송이 목표라면 프로듀서의 배치 전송 관련 설정을 제거해야 한다. 즉 전자의 경우 batch.size와 linger.ms의 값을 크게 설정하고, 후자는 batch.size와 linger.ms의 값을 작게 설정해야 한다.
사용자가 프로듀서의 높은 처리량을 목표로 배치 전송을 설정하는 경우, 반드시 버퍼 메모리 크기가 충분히 커야 한다. 왜냐하면 프로듀서가 전송에 실패할 시 재시도를 수행하는 데, 이를 위해 buffer.memory 크기가 batch.size의 3배는 되어야 한다. 또 배치 전송과 더불어 압축 기능을 같이 사용한다면 프로듀서는 메시지들을 더욱 효율적으로 카프카에 전송할 수 있다. 클라이언트를 포함해 카프카에서는 gzip, snappy, lz4, zstd 등의 압축 포맷을 지원한다. 메시지들의 높은 압축률을 선호한다면 gzip, zstd를 선택하고, 낮은 지연시간을 선호하면 lz4, snappy를 선택한다.
*멱등성 : 동일한 작업을 여러 번 수행하더라도 결과가 달라지지 않는 것
중복 없는 전송
메시지 시스템들의 메시지 전송 방식에는 적어도 한 번 전송, 최대 한 번 전송, 정확히 한 번 전송이 있다.
적어도 한 번 전송
1. 프로듀서가 브로커의 특정 토픽으로 메시지A 전송
2. 브로커는 메시지A를 기록하고, 잘 받았다는 ACK를 프로듀서에게 응답
3. 브로커의 ACK를 받은 프로듀서는 다음 메시지인 메시지B를 브로커에게 전송
4. 브로커는 메시지B를 기록하고, 잘 받았다는 ACK를 프로듀서에게 전송, 그러나 네트워크 오류 또는 브로커 장애다 발생하여 결국 프로듀서는 메시지B에 대한 ACK 받지 못함
5. 메시지B를 전송한 후 브로커로부터 ACK를 받지 못한 프로듀서는 브로커가 메시지B를 받지 못했다고 판단해 메시지B를 재전송
5번에 대해 더 자세히 살펴보면, 프로듀서 입장에서는 브로커가 메시지를 저장하고 ACK를 전송하지 못한 것인지, 메시지를 저장하지 못해서 ACK를 전송하지 않은 것인지 정확하게 알 수 없다. 그러나 메시지B에 대한 ACK를 받지 못한 프로듀서는 적어도 한 번 전송 방식에 따라 메시지B를 전송한다. 이렇게 되면 브로커가 메시지B를 받지 못한 상황이라면 브로커는 처음으로 메시지B를 저장하지만, ACK만 전송하지 못한 경우라면 메시지B는 브로커에 중복 저장될 것이다. 적어도 한 번 전송 방식은 네트워크 회선 장애나 기타 장애 상황에 따라 일부 메시지 중복이 발생할 수 있지만, 최소한 하나의 메시지는 반드시 보장하며, 카프카는 이 방식을 기반으로 동작한다.
최대 한 번 전송
1. 프로듀서가 브로커의 특정 토픽으로 메시지A 전송
2. 브로커는 메시지A를 기록하고, 잘 받았다는 ACK를 프로듀서에 응답
3. 프로듀서는 다음 메시지인 메시지B를 브로커에게 전송
4. 브로커는 메시지B를 기록하지 못하고 잘 받았다는 ACK를 프로듀서에게 전송 못함
5. 프로듀서는 브로커가 메시지B를 받았다고 가정하고, 메시지C를 전송
최대 한 번 전송은 ACK를 받지 못해도 재전송을 하지 않는다. 이 방식은 프로듀서가 메시지의 중복 가능성을 회피하기 위해 일부 메시지의 손실을 감안하더라도 재전송을 하지 않는 경우이다. 높은 처리량을 필요로 하는 대량의 로그 수집이나 IoT 환경에서 사용한다.
중복 없는 전송
앞서 다룬 두 방법과 달리 각 메시지 하단에 숫자가 추가된 것이 특징이다
.1. 카프카가 브로커의 특정 토픽으로 메시지A를 전송한다. PID(Producer ID) 0과 메시지 번호 0을 포함해 함께 전송
2. 브로커는 메시지A를 저장하고, PID와 메시지 번호 0을 메모리에 기록 + ACK를 프로듀서에게 응답
3. 프로듀서는 메시지B를 브로커에게 전송하고 PID는 0, 메시지 번호 1 포함 전송
4. 브로커는 메시지B를 전송하고, PID와 메시지 번호 1을 메모리에 기록. 그러나 네트워크 오류 혹은 브로커 장애가 발생해 메시지B에 대한 ACK를 받지 못함
5. 브로커로부터 ACK를 받지 못한 프로듀서는 브로커가 메시지B를 받지 못했다고 판단, 메시지B를 재전송
이 방법은 프로듀서가 재전송한 메시지B의 헤더에서 PID와 메시지 번호를 비교해 이미 브로커에 해당 메시지가 저장되어 있는지를 확인하여 중복 저장을 피하는 것이 특징이다 (이미 저장되어 있는 경우 ACK만 전송한다). PID는 사용자가 별도로 생성하는 것이 아닌 프로듀서에 의해 자동 생성된다. PID는 프로듀서와 카프카 사이에서 내부적으로만 이용되므로 사용자에게 따로 노출되지 않으며, 메시지마다 부여되는 시퀀스 번호는 0번부터 시작해 순차적으로 증가한다. 프로듀서가 보낸 메시지 시퀀스 번호가 브로커가 갖고 있는 시퀀스 번호보다 정확하게 하나 큰 경우가 아니라면 브로커는 프로듀서의 메시지를 저장하지 않는다. PID와 시퀀스 번호 정보는 브로커의 메모리에 유지되고, 리플리케이션 로그에도 저장된다.
이를 통해 예기치 못한 브로커의 장애로 리더가 재선출되더라도 중복 없는 메시지 전송을 가능하게 만든다.매우 유용하지만, 이 방식은 중복을 피하기 위한 메시지 비교 동작 때문에 오버헤드가 존재한다. 이를 최소화하기 위해 카프카에서는 메시지에 단순히 숫자 필드만 추가하여 구현해서 오버헤드가 생각보다 높은 편은 아니다 (중복 없는 전송 적용 시 기존 대비 최대 약 20% 성능 감소 - by 컨플루언트 블로그). 따라서 프로듀서 전송 성능에 그다지 민감하지 않은 상황에서 중복 없는 전송이 필요하다면 이 방식을 설정 및 적용한다.
OPTION VALUE EXPLANATION enable.idempotence true - 프로듀서가 중복 없는 전송의 허용 여부를 결정하는 옵션
- true로 변경 시 다음에 나오는 옵션도 반드시 변경해야 ConfigException이 발생하지 않음max.in.flight.requests.
per.connection1 ~ 5 - ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수
- 기본 값은 5이고, 5 이하로 설정acks all - 프로듀서 acks와 관련된 옵션
- 기본 값은 1이고, all로 설정retries 5 - ACK를 받지 못한 경우 재시도, 0보다 큰 값으로 설정 정확히 한 번 전송
카프카에서 정확히 한 번 전송은 트랜잭션과 같은 전체적인 프로세스 처리를 의미하며, 중복 없는 전송은 정확히 한 번 전송의 일부 기능이라 할 수 있다. 전체적인 프로세스를 관리하기 위해 카프카에서는 정확히 한 번 처리를 담당하는 별도의 프로세스가 존재하는 데, 이를 트랜잭션 API라고 한다.
Design
- 프로듀서가 보내는 메시지들은 원자적으로 처리되어 전송에 성공하거나 실패하게 된다.
- 트랜잭션 코디네이터 : 프로듀서에 의해 전송된 메시지 관리. 커밋 또는 중단 등을 표시
트랜잭션 코디네이터는 카프카 서버에 존재한다. 카프카에서는 컨슈머 오프셋 관리를 위해 오프셋 정보를 카프카의 내부 토픽에 저장하는데, 트랜잭션도 동일하게 트랜잭션 로그를 카프카의 내부 토픽인 _transaction_state에 저장한다.
*_transaction_state : 카프카의 내부토픽이지만 타 토픽들과 동이하게 파티션 수와 리플리케이션 수가 존재하며, 브로커의 설정을 통해 관리자가 될 수 있다.
## _transaction_state의 기본값 transaction.state.log.num.partitions=50 transaction.state.log.num.replication.factor=3
프로듀서가 해당 토픽에 트랜잭션 로그를 직접 기록하는 것이 아니다! 프로듀서는 트랜잭션 관련 정보를 트랜잭션 코디네이터에게 알리고 모든 정보의 로그는 트랜잭션 코디네이터가 직접 기록한다.
정확히 한 번 전송을 이용해 전송된 메시지들이 카프카에 저장되면, 카프카의 메시지를 다루는 클라이언트들은 해당 메시지들이 정상적으로 커밋된 것인지 또는 실패한 것인지 식별할 수 있어야 한다. 이를 식별하기 위해 카프카는 컨트롤 메시지라고 불리는 특별 타입 메시지가 추가로 사용된다.
예제
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ExactlyOnceProducer { public static void main(String[] args) { String bootstrapServers = "peter-kafka01.foo.bar:9092"; Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 정확히 한 번 전송을 위한 설정 props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); props.setProperty(ProducerConfig.RETRIES_CONFIG, "5"); props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "peter-transaction-01"); Producer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); // 프로듀서 트랜잭션 초기화 producer.beginTransaction(); // 프로듀서 트랜잭션 시작 try { for (int i = 0; i < 1; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("peter-test05", "Apache Kafka is a distributed streaming platform - " + i); producer.send(record); producer.flush(); System.out.println("Message sent successfully"); } } catch (Exception e){ producer.abortTransaction(); // 프로듀서 트랜잭션 중단 e.printStackTrace(); } finally { producer.commitTransaction(); // 프로듀서 트랜잭션 커밋 producer.close(); } } }
중복 없는 전송과 정확히 한 번 전송의 옵션 설정에서 가장 큰 차이는 TRANSACTIONAL_ID_CONFIG 이다. 프로듀서의 이 옵션은 실행하는 프로듀서 프로세스마다 고유한 아이디로 설정해야 한다.
단계별 동작
- 정확히 한 번 전송을 위해서는 트랜잭션 API를 이용
- 가장 먼저 수행하는 작업은 트랜잭션 코디네이터를 찾는 작업
- 프로듀서는 브로커에게 FindCoordinator Request를 보내서 트랜잭션 코디네이터의 위치 파악
- 트랜잭션 코디네이터는 브로커에 위치
- 주 역할은 PID와 transacional.id를 매핑하고 해당 트랜잭션 전체를 관리
*만약 트랜잭션 코디네이터가 존재하지 않는다면 신규 코디네이터가 생성된다.
_transaction_state 토픽의 파티션 번호는 transactional.id를 기반으로 해시하여 결정되고, 이 파티션의 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정된다. 이는 transaction.id가 정확히 하나의 코디네이터만 갖고 있다는 의미와도 같다.
(# 수정 : 리더 아님. 메시지 전송 동작 참고)
- 프로듀서는 initTransactions() 메서드를 이용해 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터로 보냄
- 이때 TID가 설정된 경우 InitPidRequest와 함께 TID가 트랜잭션 코디네이터에게 전송
- 트랜잭션 코디네이터는 TID, PID를 매핑하고 해당 정보를 트랜잭션 로그에 기록한다. 그 다음에 PID 에포크를 한 단계 올리는 동작을 하게 되고, 에포크가 올라감에 따라 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청은 무시
*에포크를 활용하는 이유는 신뢰성 있는 메시지 전송을 하기 위함이다.
- 트랜잭션 시작 동작
- 프로듀서는 beginTransaction() 메서드를 이용 새로운 트랜잭션의 시작을 고지
- 프로듀서 내부적으로 트랜잭션의 시작을 기록하나 트랜잭션 코디네이터 관점에서는 첫 레코드 전송 전까지 트랜잭션은 시작되지 않음
- 트랜잭션 상태 추가 동작
- 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달
- 트랜잭션 코디네이터는 해당 정보를 트랜잭션 로그에 기록 : TID와 P0(Partition 0) 기록
- 트랜잭션의 현재 상태 : Ongoing
- 트랜잭션 코디네이터 : 해당 트랜잭션에 대한 타이머 시작(기본값 1분)
- timeout 시 해당 트랜잭션은 실패
- 메세지 전송 동작
- 프로듀서는 대상 토픽의 파티션으로 메시지를 전송
- PID, Epoch, Seq No가 함께 포함되어 전송
- 브로커 2개 : 트랜잭션 코디네이터가 있는 브로커와 프로듀서가 전송하는 메시지를 받는 브로커(Leader)가 서로 다름
- 트랜잭션 종료 요청 동작
- 메시지 전송을 완료한 프로듀서는 commitTransaction() 메서드 또는 abortTransaction() 메서드 중 하나를 반드시 호출
- 메서드 호출을 통해 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 고지
- 트랜잭션 코디네이터는 두 단계의 커밋 과정.
1. 트랜잭션 로그에 대한 PrepareCommit / PrepareAbort 기록
- 사용자 토픽에 표시하는 단계
2. 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록 (Control Message)
*Control Message : 해당 PID의 메시지가 제대로 전송됐는지 여부를 컨슈머에게 나타내는 용도로 사용. 트랜잭션의 커밋이 끝나지 않은 메시지는 컨슈머에게 반환되지 않으며 오프셋 순서 보장을 위해 LSO(트랜잭션 성공 혹은 실패를 나타내는 오프셋)를 유지
- 트랜잭션 완료
- 트랜잭션 코디네이터는 완료됨이라고 트랜잭션 로그에 기록
- 프로듀서에게 해당 트랜잭션이 완료됨을 알린 다음 해당 트랜잭션에 대한 처리는 모두 마무리
728x90'Data Engineer' 카테고리의 다른 글
[Boaz] Data Pipeline 발제자료 (1) 2023.06.07 [Netflix TechBlog] Data PipeLine_Asset Management (1) 2023.06.04 [Kafka] 카프카의 내부 동작원리 (0) 2023.05.20 [Kafka] Producer와 Consumer (0) 2023.05.16 [Kafka] 정리_01 (0) 2023.05.14